From 483fa075f776bcd4e29e87a015c9a139f3eda1d7 Mon Sep 17 00:00:00 2001 From: Andras Bacsai <5845193+andrasbacsai@users.noreply.github.com> Date: Wed, 22 Apr 2026 21:10:44 +0200 Subject: [PATCH] refactor(broker): replace Redis with HTTP-over-UDS transport Drop Redis as a broker dependency. Broker now exposes an HTTP listener on a Unix domain socket at /run/coolify/broker.sock instead of reading from Redis streams. - Remove RedisInstallCommand and redis.go entirely - Remove ActionInstallRedis from plan and apply phases - Drop redisURL param from BrokerServiceUnit; add BrokerUnixSocketPath constant; systemd unit gains RuntimeDirectory=coolify (creates socket dir) - e2e smoke tests switch from redis-cli XADD/LPOP to curl --unix-socket against /v1/build/dispatch, /v1/build/result/:id, /v1/build/:id/cancel --- internal/services/broker.go | 21 +++++++++++++++----- internal/services/broker_test.go | 8 ++++++-- internal/services/redis.go | 12 ----------- internal/wireguard/apply.go | 21 +++++++------------- internal/wireguard/plan.go | 6 ------ internal/wireguard/state.go | 4 ++-- scripts/e2e-mesh.sh | 34 +++++++++++++++++++++----------- 7 files changed, 53 insertions(+), 53 deletions(-) delete mode 100644 internal/services/redis.go diff --git a/internal/services/broker.go b/internal/services/broker.go index 984150f..6b46b2c 100644 --- a/internal/services/broker.go +++ b/internal/services/broker.go @@ -16,20 +16,31 @@ const BrokerJWTPrivPath = "/etc/coolify/jwt.priv" // HostJWTPath is the on-host path where coold reads its bearer JWT. const HostJWTPath = "/etc/coolify/host-jwt" +// BrokerUnixSocketPath is the on-host path of the broker's HTTP-over-UDS +// listener. The central-plane caller (Laravel) connects here. Access +// control is filesystem perms — see BrokerServiceUnit. +const BrokerUnixSocketPath = "/run/coolify/broker.sock" + // BrokerServiceUnit returns the systemd unit text for broker. // // grpcBind is "ip:port" for the single gRPC listener (e.g. "100.64.0.1:6443"). // It binds on the central host's wg0 mgmt IP so the listener is unreachable // outside the mesh. -func BrokerServiceUnit(grpcBind, redisURL, jwtPubPath string) string { +// +// RuntimeDirectory=coolify creates /run/coolify owned by the broker user +// at unit start, which is where the UDS gets bound. Laravel group access +// is configured at deploy time via BROKER_UNIX_SOCKET_GROUP once the +// PHP-FPM group is finalized; until then the socket stays 0600. +func BrokerServiceUnit(grpcBind, jwtPubPath string) string { return fmt.Sprintf(`[Unit] Description=Coolify broker -After=network-online.target wg-quick@wg0.service redis-server.service redis.service -Wants=redis-server.service redis.service +After=network-online.target wg-quick@wg0.service [Service] +RuntimeDirectory=coolify +RuntimeDirectoryMode=0750 Environment=BROKER_GRPC_BIND=%s -Environment=BROKER_REDIS_URL=%s +Environment=BROKER_UNIX_SOCKET_PATH=%s Environment=BROKER_JWT_PUBLIC_KEY_PATH=%s ExecStart=/usr/local/bin/broker Restart=on-failure @@ -37,7 +48,7 @@ RestartSec=2s [Install] WantedBy=multi-user.target -`, grpcBind, redisURL, jwtPubPath) +`, grpcBind, BrokerUnixSocketPath, jwtPubPath) } // BrokerInstallCommand returns a shell snippet that downloads and installs diff --git a/internal/services/broker_test.go b/internal/services/broker_test.go index 3f921cb..382dff7 100644 --- a/internal/services/broker_test.go +++ b/internal/services/broker_test.go @@ -30,7 +30,7 @@ func TestBrokerInstallCommand_VersionTagEmbedded(t *testing.T) { } func TestBrokerServiceUnit_ExecStartPath(t *testing.T) { - unit := BrokerServiceUnit("100.64.0.1:6443", "redis://127.0.0.1:6379", BrokerJWTPubPath) + unit := BrokerServiceUnit("100.64.0.1:6443", BrokerJWTPubPath) if !strings.Contains(unit, "ExecStart=/usr/local/bin/broker") { t.Error("BrokerServiceUnit ExecStart does not point to /usr/local/bin/broker") @@ -41,9 +41,13 @@ func TestBrokerServiceUnit_ExecStartPath(t *testing.T) { if strings.Contains(unit, "BUILDER_GRPC_BIND") { t.Error("BrokerServiceUnit still emits BROKER_BUILDER_GRPC_BIND; builder port was removed") } + if strings.Contains(unit, "BROKER_REDIS_URL") || strings.Contains(unit, "redis") { + t.Error("BrokerServiceUnit still references Redis; UDS migration should have dropped it") + } for _, want := range []string{ "BROKER_GRPC_BIND=100.64.0.1:6443", - "BROKER_REDIS_URL=redis://127.0.0.1:6379", + "BROKER_UNIX_SOCKET_PATH=" + BrokerUnixSocketPath, + "RuntimeDirectory=coolify", BrokerJWTPubPath, } { if !strings.Contains(unit, want) { diff --git a/internal/services/redis.go b/internal/services/redis.go deleted file mode 100644 index 962fc59..0000000 --- a/internal/services/redis.go +++ /dev/null @@ -1,12 +0,0 @@ -package services - -// RedisInstallCommand returns a shell snippet that installs Redis via apt and -// enables the service. Works on both Debian (redis-server unit) and Ubuntu -// (same, but service name may be redis or redis-server). -func RedisInstallCommand() string { - return `DEBIAN_FRONTEND=noninteractive apt-get update -qq 2>/dev/null && ` + - `DEBIAN_FRONTEND=noninteractive apt-get install -y ` + - `-o Dpkg::Options::="--force-confold" ` + - `redis-server 2>&1 && ` + - `(systemctl enable --now redis-server 2>&1 || systemctl enable --now redis 2>&1)` -} diff --git a/internal/wireguard/apply.go b/internal/wireguard/apply.go index fddb934..7dbeff4 100644 --- a/internal/wireguard/apply.go +++ b/internal/wireguard/apply.go @@ -186,7 +186,7 @@ func ApplyMesh( } } - // Phase 4: central-only — install Redis + broker, generate JWT keypair. + // Phase 4: central-only — install broker, generate JWT keypair. if desired.CentralHost != "" && err == nil { p4 := ssh.ForEachServer(ctx, []string{desired.CentralHost}, 1, func(ctx context.Context, host string) ([]ActionResult, error) { @@ -520,8 +520,8 @@ chmod %[4]o %[1]s.tmp mv %[1]s.tmp %[1]s`, remotePath, body, tag, mode) } -// phase4Central installs Redis, broker, generates the JWT keypair, and -// enables the broker systemd service on the central host. +// phase4Central installs broker, generates the JWT keypair, and enables +// the broker systemd service on the central host. func phase4Central( ctx context.Context, runner ssh.Runner, @@ -532,31 +532,24 @@ func phase4Central( ) ([]ActionResult, error) { var out []ActionResult - // 1. Install Redis. - if err := runStep(ctx, runner, host, user, port, &out, - ActionInstallRedis, "", services.RedisInstallCommand(), - fmt.Sprintf("install redis on %s", host)); err != nil { - return out, err - } - - // 2. Install broker binary. + // 1. Install broker binary. if err := runStep(ctx, runner, host, user, port, &out, ActionInstallBroker, "", services.BrokerInstallCommand(desired.BrokerVersion), fmt.Sprintf("install broker on %s", host)); err != nil { return out, err } - // 3. Generate JWT keypair (idempotent). + // 2. Generate JWT keypair (idempotent). if err := runStep(ctx, runner, host, user, port, &out, ActionGenerateJWTKeypair, "", services.EnsureJWTKeypairCommand(), fmt.Sprintf("generate JWT keypair on %s", host)); err != nil { return out, err } - // 4. Write broker unit + enable service. + // 3. Write broker unit + enable service. mgmtIP := mgmtAssignments[host] grpcBind := fmt.Sprintf("%s:%d", mgmtIP, services.BrokerGRPCPort) - brokerUnit := services.BrokerServiceUnit(grpcBind, "redis://127.0.0.1:6379", services.BrokerJWTPubPath) + brokerUnit := services.BrokerServiceUnit(grpcBind, services.BrokerJWTPubPath) serviceCmd := heredocWrite("/etc/systemd/system/broker.service", brokerUnit, "COOLIFY_BROKER_UNIT_EOF", 0o644) + ` && systemctl daemon-reload` + diff --git a/internal/wireguard/plan.go b/internal/wireguard/plan.go index 3de463c..fdaabef 100644 --- a/internal/wireguard/plan.go +++ b/internal/wireguard/plan.go @@ -35,7 +35,6 @@ const ( ActionWriteCorrosionSchema ActionType = "write-corrosion-schema" ActionInstallCorrosionService ActionType = "install-corrosion-service" ActionInstallCooldService ActionType = "install-coold-service" - ActionInstallRedis ActionType = "install-redis" ActionInstallBroker ActionType = "install-broker" ActionGenerateJWTKeypair ActionType = "generate-jwt-keypair" ActionInstallBrokerService ActionType = "install-broker-service" @@ -397,11 +396,6 @@ func BuildPlan(desired *DesiredMesh, current MeshState) (*Plan, error) { // --- Broker + JWT stack (central-only) --- if desired.CentralHost != "" { plan.Actions = append(plan.Actions, - PlannedAction{ - Host: desired.CentralHost, - Type: ActionInstallRedis, - Detail: "redis-server via apt (loopback only)", - }, PlannedAction{ Host: desired.CentralHost, Type: ActionInstallBroker, diff --git a/internal/wireguard/state.go b/internal/wireguard/state.go index 441d5f5..d947459 100644 --- a/internal/wireguard/state.go +++ b/internal/wireguard/state.go @@ -277,8 +277,8 @@ type DesiredMesh struct { // CorrosionAPIPort is the corrosion HTTP API port bound to 127.0.0.1 (default 8080). CorrosionAPIPort int - // CentralHost is the SSH address of the central VM that runs broker - // and Redis. Empty string disables phases 4+5 (broker setup). + // CentralHost is the SSH address of the central VM that runs broker. + // Empty string disables phases 4+5 (broker setup). // Must be an element of Hosts. CentralHost string diff --git a/scripts/e2e-mesh.sh b/scripts/e2e-mesh.sh index b3b7820..9a13fd7 100755 --- a/scripts/e2e-mesh.sh +++ b/scripts/e2e-mesh.sh @@ -203,20 +203,28 @@ cli init apply \ # not pass --central, so builder capability may be disabled — gate on a marker # file or just skip when /etc/coolify/jwt.priv is absent. if ssh_exec "$SERVER_A" "test -f /etc/coolify/jwt.priv" >/dev/null 2>&1; then - say "9/10 builder smoke test — push build:cmd, expect localhost image on central" + say "9/10 builder smoke test — POST /v1/build/dispatch, expect localhost image on central" + + # Broker UDS; central runs broker as root so the default 0600 socket is + # reachable for ssh-exec'd curl without group setup. + BROKER_SOCK="/run/coolify/broker.sock" + UDS_CURL="curl -sS --unix-socket $BROKER_SOCK" REQ_ID="e2e-$(date +%s)" BUILD_PAYLOAD="{\"request_id\":\"$REQ_ID\",\"command\":{\"type\":\"static_build\",\"repo_url\":\"https://github.com/coollabsio/static-test-site\",\"git_ref\":\"main\",\"target_image\":\"localhost/e2e-$REQ_ID\"}}" - ssh_exec "$SERVER_A" "redis-cli XADD build:cmd '*' payload '$BUILD_PAYLOAD'" >/dev/null + ACK=$(ssh_exec "$SERVER_A" "$UDS_CURL -w '\\n%{http_code}' -X POST -H 'Content-Type: application/json' --data '$BUILD_PAYLOAD' http://localhost/v1/build/dispatch") + echo "$ACK" | tail -n1 | grep -q '^202$' || fail "dispatch did not return 202: $ACK" DEADLINE=$(($(date +%s)+180)) RESP="" while :; do - RESP=$(ssh_exec "$SERVER_A" "redis-cli LPOP build:resp:$REQ_ID" 2>/dev/null || true) - [[ -n "$RESP" && "$RESP" != "(nil)" ]] && break + OUT=$(ssh_exec "$SERVER_A" "$UDS_CURL -w '\\n%{http_code}' 'http://localhost/v1/build/result/$REQ_ID?timeout_ms=25000'") + CODE=$(echo "$OUT" | tail -n1) + RESP=$(echo "$OUT" | sed '$d') + [[ "$CODE" == "200" ]] && break + [[ "$CODE" != "408" && "$CODE" != "404" ]] && fail "build result unexpected $CODE: $RESP" [[ $(date +%s) -ge $DEADLINE ]] && fail "builder smoke timed out after 180s" - sleep 3 done echo "$RESP" | grep -q '"status":"ok"' || fail "builder smoke returned error: $RESP" @@ -230,13 +238,13 @@ if ssh_exec "$SERVER_A" "test -f /etc/coolify/jwt.priv" >/dev/null 2>&1; then printf ' OK: build succeeded; image on %s ✓\n' "$IMG_HOST" # ─── 10. cancel test ──────────────────────────────────────────────────────── - say "10/10 cancel test — dispatch then cancel; expect scope killed and cancel response" + say "10/10 cancel test — dispatch then POST /v1/build/:id/cancel; expect scope killed and cancel response" CAN_ID="e2e-cancel-$(date +%s)" CAN_BUILD="{\"request_id\":\"$CAN_ID\",\"command\":{\"type\":\"static_build\",\"repo_url\":\"https://github.com/torvalds/linux\",\"git_ref\":\"master\",\"target_image\":\"localhost/$CAN_ID\"}}" - CAN_MSG="{\"request_id\":\"$CAN_ID\",\"command\":{\"type\":\"cancel\"}}" - ssh_exec "$SERVER_A" "redis-cli XADD build:cmd '*' payload '$CAN_BUILD'" >/dev/null + ACK=$(ssh_exec "$SERVER_A" "$UDS_CURL -w '\\n%{http_code}' -X POST -H 'Content-Type: application/json' --data '$CAN_BUILD' http://localhost/v1/build/dispatch") + echo "$ACK" | tail -n1 | grep -q '^202$' || fail "cancel-test dispatch did not return 202: $ACK" SCOPE_HOST="" for _ in 1 2 3 4 5 6 7 8 9 10; do @@ -250,15 +258,17 @@ if ssh_exec "$SERVER_A" "test -f /etc/coolify/jwt.priv" >/dev/null 2>&1; then [[ -n "$SCOPE_HOST" ]] || fail "scope coolify-build-$CAN_ID.service never appeared" printf ' scope running on %s ✓\n' "$SCOPE_HOST" - ssh_exec "$SERVER_A" "redis-cli XADD build:cmd '*' payload '$CAN_MSG'" >/dev/null + ssh_exec "$SERVER_A" "$UDS_CURL -X POST http://localhost/v1/build/$CAN_ID/cancel" >/dev/null DEADLINE=$(($(date +%s)+30)) RESP="" while :; do - RESP=$(ssh_exec "$SERVER_A" "redis-cli LPOP build:resp:$CAN_ID" 2>/dev/null || true) - [[ -n "$RESP" && "$RESP" != "(nil)" ]] && break + OUT=$(ssh_exec "$SERVER_A" "$UDS_CURL -w '\\n%{http_code}' 'http://localhost/v1/build/result/$CAN_ID?timeout_ms=10000'") + CODE=$(echo "$OUT" | tail -n1) + RESP=$(echo "$OUT" | sed '$d') + [[ "$CODE" == "200" ]] && break + [[ "$CODE" != "408" && "$CODE" != "404" ]] && fail "cancel result unexpected $CODE: $RESP" [[ $(date +%s) -ge $DEADLINE ]] && fail "cancel response timed out" - sleep 2 done echo "$RESP" | grep -q '"stage":"cancel"' || fail "expected stage=cancel in response, got: $RESP"