refactor(broker): replace Redis with HTTP-over-UDS transport

Drop Redis as a broker dependency. Broker now exposes an HTTP listener
on a Unix domain socket at /run/coolify/broker.sock instead of reading
from Redis streams.

- Remove RedisInstallCommand and redis.go entirely
- Remove ActionInstallRedis from plan and apply phases
- Drop redisURL param from BrokerServiceUnit; add BrokerUnixSocketPath
  constant; systemd unit gains RuntimeDirectory=coolify (creates socket dir)
- e2e smoke tests switch from redis-cli XADD/LPOP to curl --unix-socket
  against /v1/build/dispatch, /v1/build/result/:id, /v1/build/:id/cancel
This commit is contained in:
Andras Bacsai
2026-04-22 21:10:44 +02:00
parent 92a45c6b0d
commit 483fa075f7
7 changed files with 53 additions and 53 deletions
+16 -5
View File
@@ -16,20 +16,31 @@ const BrokerJWTPrivPath = "/etc/coolify/jwt.priv"
// HostJWTPath is the on-host path where coold reads its bearer JWT.
const HostJWTPath = "/etc/coolify/host-jwt"
// BrokerUnixSocketPath is the on-host path of the broker's HTTP-over-UDS
// listener. The central-plane caller (Laravel) connects here. Access
// control is filesystem perms — see BrokerServiceUnit.
const BrokerUnixSocketPath = "/run/coolify/broker.sock"
// BrokerServiceUnit returns the systemd unit text for broker.
//
// grpcBind is "ip:port" for the single gRPC listener (e.g. "100.64.0.1:6443").
// It binds on the central host's wg0 mgmt IP so the listener is unreachable
// outside the mesh.
func BrokerServiceUnit(grpcBind, redisURL, jwtPubPath string) string {
//
// RuntimeDirectory=coolify creates /run/coolify owned by the broker user
// at unit start, which is where the UDS gets bound. Laravel group access
// is configured at deploy time via BROKER_UNIX_SOCKET_GROUP once the
// PHP-FPM group is finalized; until then the socket stays 0600.
func BrokerServiceUnit(grpcBind, jwtPubPath string) string {
return fmt.Sprintf(`[Unit]
Description=Coolify broker
After=network-online.target wg-quick@wg0.service redis-server.service redis.service
Wants=redis-server.service redis.service
After=network-online.target wg-quick@wg0.service
[Service]
RuntimeDirectory=coolify
RuntimeDirectoryMode=0750
Environment=BROKER_GRPC_BIND=%s
Environment=BROKER_REDIS_URL=%s
Environment=BROKER_UNIX_SOCKET_PATH=%s
Environment=BROKER_JWT_PUBLIC_KEY_PATH=%s
ExecStart=/usr/local/bin/broker
Restart=on-failure
@@ -37,7 +48,7 @@ RestartSec=2s
[Install]
WantedBy=multi-user.target
`, grpcBind, redisURL, jwtPubPath)
`, grpcBind, BrokerUnixSocketPath, jwtPubPath)
}
// BrokerInstallCommand returns a shell snippet that downloads and installs
+6 -2
View File
@@ -30,7 +30,7 @@ func TestBrokerInstallCommand_VersionTagEmbedded(t *testing.T) {
}
func TestBrokerServiceUnit_ExecStartPath(t *testing.T) {
unit := BrokerServiceUnit("100.64.0.1:6443", "redis://127.0.0.1:6379", BrokerJWTPubPath)
unit := BrokerServiceUnit("100.64.0.1:6443", BrokerJWTPubPath)
if !strings.Contains(unit, "ExecStart=/usr/local/bin/broker") {
t.Error("BrokerServiceUnit ExecStart does not point to /usr/local/bin/broker")
@@ -41,9 +41,13 @@ func TestBrokerServiceUnit_ExecStartPath(t *testing.T) {
if strings.Contains(unit, "BUILDER_GRPC_BIND") {
t.Error("BrokerServiceUnit still emits BROKER_BUILDER_GRPC_BIND; builder port was removed")
}
if strings.Contains(unit, "BROKER_REDIS_URL") || strings.Contains(unit, "redis") {
t.Error("BrokerServiceUnit still references Redis; UDS migration should have dropped it")
}
for _, want := range []string{
"BROKER_GRPC_BIND=100.64.0.1:6443",
"BROKER_REDIS_URL=redis://127.0.0.1:6379",
"BROKER_UNIX_SOCKET_PATH=" + BrokerUnixSocketPath,
"RuntimeDirectory=coolify",
BrokerJWTPubPath,
} {
if !strings.Contains(unit, want) {
-12
View File
@@ -1,12 +0,0 @@
package services
// RedisInstallCommand returns a shell snippet that installs Redis via apt and
// enables the service. Works on both Debian (redis-server unit) and Ubuntu
// (same, but service name may be redis or redis-server).
func RedisInstallCommand() string {
return `DEBIAN_FRONTEND=noninteractive apt-get update -qq 2>/dev/null && ` +
`DEBIAN_FRONTEND=noninteractive apt-get install -y ` +
`-o Dpkg::Options::="--force-confold" ` +
`redis-server 2>&1 && ` +
`(systemctl enable --now redis-server 2>&1 || systemctl enable --now redis 2>&1)`
}
+7 -14
View File
@@ -186,7 +186,7 @@ func ApplyMesh(
}
}
// Phase 4: central-only — install Redis + broker, generate JWT keypair.
// Phase 4: central-only — install broker, generate JWT keypair.
if desired.CentralHost != "" && err == nil {
p4 := ssh.ForEachServer(ctx, []string{desired.CentralHost}, 1,
func(ctx context.Context, host string) ([]ActionResult, error) {
@@ -520,8 +520,8 @@ chmod %[4]o %[1]s.tmp
mv %[1]s.tmp %[1]s`, remotePath, body, tag, mode)
}
// phase4Central installs Redis, broker, generates the JWT keypair, and
// enables the broker systemd service on the central host.
// phase4Central installs broker, generates the JWT keypair, and enables
// the broker systemd service on the central host.
func phase4Central(
ctx context.Context,
runner ssh.Runner,
@@ -532,31 +532,24 @@ func phase4Central(
) ([]ActionResult, error) {
var out []ActionResult
// 1. Install Redis.
if err := runStep(ctx, runner, host, user, port, &out,
ActionInstallRedis, "", services.RedisInstallCommand(),
fmt.Sprintf("install redis on %s", host)); err != nil {
return out, err
}
// 2. Install broker binary.
// 1. Install broker binary.
if err := runStep(ctx, runner, host, user, port, &out,
ActionInstallBroker, "", services.BrokerInstallCommand(desired.BrokerVersion),
fmt.Sprintf("install broker on %s", host)); err != nil {
return out, err
}
// 3. Generate JWT keypair (idempotent).
// 2. Generate JWT keypair (idempotent).
if err := runStep(ctx, runner, host, user, port, &out,
ActionGenerateJWTKeypair, "", services.EnsureJWTKeypairCommand(),
fmt.Sprintf("generate JWT keypair on %s", host)); err != nil {
return out, err
}
// 4. Write broker unit + enable service.
// 3. Write broker unit + enable service.
mgmtIP := mgmtAssignments[host]
grpcBind := fmt.Sprintf("%s:%d", mgmtIP, services.BrokerGRPCPort)
brokerUnit := services.BrokerServiceUnit(grpcBind, "redis://127.0.0.1:6379", services.BrokerJWTPubPath)
brokerUnit := services.BrokerServiceUnit(grpcBind, services.BrokerJWTPubPath)
serviceCmd := heredocWrite("/etc/systemd/system/broker.service",
brokerUnit, "COOLIFY_BROKER_UNIT_EOF", 0o644) +
` && systemctl daemon-reload` +
-6
View File
@@ -35,7 +35,6 @@ const (
ActionWriteCorrosionSchema ActionType = "write-corrosion-schema"
ActionInstallCorrosionService ActionType = "install-corrosion-service"
ActionInstallCooldService ActionType = "install-coold-service"
ActionInstallRedis ActionType = "install-redis"
ActionInstallBroker ActionType = "install-broker"
ActionGenerateJWTKeypair ActionType = "generate-jwt-keypair"
ActionInstallBrokerService ActionType = "install-broker-service"
@@ -397,11 +396,6 @@ func BuildPlan(desired *DesiredMesh, current MeshState) (*Plan, error) {
// --- Broker + JWT stack (central-only) ---
if desired.CentralHost != "" {
plan.Actions = append(plan.Actions,
PlannedAction{
Host: desired.CentralHost,
Type: ActionInstallRedis,
Detail: "redis-server via apt (loopback only)",
},
PlannedAction{
Host: desired.CentralHost,
Type: ActionInstallBroker,
+2 -2
View File
@@ -277,8 +277,8 @@ type DesiredMesh struct {
// CorrosionAPIPort is the corrosion HTTP API port bound to 127.0.0.1 (default 8080).
CorrosionAPIPort int
// CentralHost is the SSH address of the central VM that runs broker
// and Redis. Empty string disables phases 4+5 (broker setup).
// CentralHost is the SSH address of the central VM that runs broker.
// Empty string disables phases 4+5 (broker setup).
// Must be an element of Hosts.
CentralHost string
+22 -12
View File
@@ -203,20 +203,28 @@ cli init apply \
# not pass --central, so builder capability may be disabled — gate on a marker
# file or just skip when /etc/coolify/jwt.priv is absent.
if ssh_exec "$SERVER_A" "test -f /etc/coolify/jwt.priv" >/dev/null 2>&1; then
say "9/10 builder smoke test — push build:cmd, expect localhost image on central"
say "9/10 builder smoke test — POST /v1/build/dispatch, expect localhost image on central"
# Broker UDS; central runs broker as root so the default 0600 socket is
# reachable for ssh-exec'd curl without group setup.
BROKER_SOCK="/run/coolify/broker.sock"
UDS_CURL="curl -sS --unix-socket $BROKER_SOCK"
REQ_ID="e2e-$(date +%s)"
BUILD_PAYLOAD="{\"request_id\":\"$REQ_ID\",\"command\":{\"type\":\"static_build\",\"repo_url\":\"https://github.com/coollabsio/static-test-site\",\"git_ref\":\"main\",\"target_image\":\"localhost/e2e-$REQ_ID\"}}"
ssh_exec "$SERVER_A" "redis-cli XADD build:cmd '*' payload '$BUILD_PAYLOAD'" >/dev/null
ACK=$(ssh_exec "$SERVER_A" "$UDS_CURL -w '\\n%{http_code}' -X POST -H 'Content-Type: application/json' --data '$BUILD_PAYLOAD' http://localhost/v1/build/dispatch")
echo "$ACK" | tail -n1 | grep -q '^202$' || fail "dispatch did not return 202: $ACK"
DEADLINE=$(($(date +%s)+180))
RESP=""
while :; do
RESP=$(ssh_exec "$SERVER_A" "redis-cli LPOP build:resp:$REQ_ID" 2>/dev/null || true)
[[ -n "$RESP" && "$RESP" != "(nil)" ]] && break
OUT=$(ssh_exec "$SERVER_A" "$UDS_CURL -w '\\n%{http_code}' 'http://localhost/v1/build/result/$REQ_ID?timeout_ms=25000'")
CODE=$(echo "$OUT" | tail -n1)
RESP=$(echo "$OUT" | sed '$d')
[[ "$CODE" == "200" ]] && break
[[ "$CODE" != "408" && "$CODE" != "404" ]] && fail "build result unexpected $CODE: $RESP"
[[ $(date +%s) -ge $DEADLINE ]] && fail "builder smoke timed out after 180s"
sleep 3
done
echo "$RESP" | grep -q '"status":"ok"' || fail "builder smoke returned error: $RESP"
@@ -230,13 +238,13 @@ if ssh_exec "$SERVER_A" "test -f /etc/coolify/jwt.priv" >/dev/null 2>&1; then
printf ' OK: build succeeded; image on %s ✓\n' "$IMG_HOST"
# ─── 10. cancel test ────────────────────────────────────────────────────────
say "10/10 cancel test — dispatch then cancel; expect scope killed and cancel response"
say "10/10 cancel test — dispatch then POST /v1/build/:id/cancel; expect scope killed and cancel response"
CAN_ID="e2e-cancel-$(date +%s)"
CAN_BUILD="{\"request_id\":\"$CAN_ID\",\"command\":{\"type\":\"static_build\",\"repo_url\":\"https://github.com/torvalds/linux\",\"git_ref\":\"master\",\"target_image\":\"localhost/$CAN_ID\"}}"
CAN_MSG="{\"request_id\":\"$CAN_ID\",\"command\":{\"type\":\"cancel\"}}"
ssh_exec "$SERVER_A" "redis-cli XADD build:cmd '*' payload '$CAN_BUILD'" >/dev/null
ACK=$(ssh_exec "$SERVER_A" "$UDS_CURL -w '\\n%{http_code}' -X POST -H 'Content-Type: application/json' --data '$CAN_BUILD' http://localhost/v1/build/dispatch")
echo "$ACK" | tail -n1 | grep -q '^202$' || fail "cancel-test dispatch did not return 202: $ACK"
SCOPE_HOST=""
for _ in 1 2 3 4 5 6 7 8 9 10; do
@@ -250,15 +258,17 @@ if ssh_exec "$SERVER_A" "test -f /etc/coolify/jwt.priv" >/dev/null 2>&1; then
[[ -n "$SCOPE_HOST" ]] || fail "scope coolify-build-$CAN_ID.service never appeared"
printf ' scope running on %s ✓\n' "$SCOPE_HOST"
ssh_exec "$SERVER_A" "redis-cli XADD build:cmd '*' payload '$CAN_MSG'" >/dev/null
ssh_exec "$SERVER_A" "$UDS_CURL -X POST http://localhost/v1/build/$CAN_ID/cancel" >/dev/null
DEADLINE=$(($(date +%s)+30))
RESP=""
while :; do
RESP=$(ssh_exec "$SERVER_A" "redis-cli LPOP build:resp:$CAN_ID" 2>/dev/null || true)
[[ -n "$RESP" && "$RESP" != "(nil)" ]] && break
OUT=$(ssh_exec "$SERVER_A" "$UDS_CURL -w '\\n%{http_code}' 'http://localhost/v1/build/result/$CAN_ID?timeout_ms=10000'")
CODE=$(echo "$OUT" | tail -n1)
RESP=$(echo "$OUT" | sed '$d')
[[ "$CODE" == "200" ]] && break
[[ "$CODE" != "408" && "$CODE" != "404" ]] && fail "cancel result unexpected $CODE: $RESP"
[[ $(date +%s) -ge $DEADLINE ]] && fail "cancel response timed out"
sleep 2
done
echo "$RESP" | grep -q '"stage":"cancel"' || fail "expected stage=cancel in response, got: $RESP"