feat(analytic): implement WebSocket keepalive mechanism

This commit is contained in:
0xJacky
2026-04-18 11:02:52 +00:00
parent b1b1596a4d
commit 7ed73f621a
6 changed files with 278 additions and 8 deletions
+51 -2
View File
@@ -54,6 +54,17 @@ var (
retryMutex sync.Mutex
)
// WebSocket keepalive timings for the connection to remote nodes.
// pongWait bounds how long ReadJSON may block; pingPeriod must be < pongWait so
// the peer has a chance to respond before the deadline fires. Declared as var
// (not const) so tests can shorten them without redefining the production
// defaults.
var (
nodeWSWriteWait = 10 * time.Second
nodeWSPongWait = 60 * time.Second
nodeWSPingPeriod = (nodeWSPongWait * 9) / 10
)
func getRetryState(nodeID uint64) *NodeRetryState {
retryMutex.Lock()
defer retryMutex.Unlock()
@@ -381,6 +392,11 @@ func RetrieveNodesStatus(ctx context.Context) {
continue
}
if err := nodeAnalyticRecord(n, ctx); err != nil {
// Context cancellation means the manager is shutting
// down — don't pollute retry state with phantom failures.
if ctx.Err() != nil {
return
}
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
}
@@ -487,6 +503,14 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
updateNodeStatus(nodeModel.ID, false, "websocket_connection_closed")
}()
// Arm read deadline and refresh it on every pong. Without this, a silently
// half-dead TCP connection (NAT drop, peer hang) would block ReadJSON below
// indefinitely, freezing this node's retry loop until the process restarts.
_ = c.SetReadDeadline(time.Now().Add(nodeWSPongWait))
c.SetPongHandler(func(string) error {
return c.SetReadDeadline(time.Now().Add(nodeWSPongWait))
})
go func() {
select {
case <-scopeCtx.Done():
@@ -496,6 +520,26 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
}
}()
// Periodic ping keeps the connection warm and triggers the deadline above
// when the peer stops responding.
go func() {
ticker := time.NewTicker(nodeWSPingPeriod)
defer ticker.Stop()
for {
select {
case <-scopeCtx.Done():
return
case <-ctx.Done():
return
case <-ticker.C:
if err := c.WriteControl(websocket.PingMessage, nil, time.Now().Add(nodeWSWriteWait)); err != nil {
_ = c.Close()
return
}
}
}
}()
for {
select {
case <-scopeCtx.Done():
@@ -508,11 +552,16 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
var rawMsg json.RawMessage
err = c.ReadJSON(&rawMsg)
if err != nil {
// Surface every read failure (close frame, read deadline expiry, TCP
// reset) as a retryable error. Returning nil here used to trigger
// markConnectionSuccess on the caller, hiding dead connections and
// flipping node status back to online on the next snapshot.
if helper.IsUnexpectedWebsocketError(err) {
updateNodeStatus(nodeModel.ID, false, "websocket_error")
return err
} else {
updateNodeStatus(nodeModel.ID, false, "websocket_connection_closed")
}
return nil
return err
}
nodeMapMu.Lock()
+102
View File
@@ -0,0 +1,102 @@
package analytic
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
"github.com/0xJacky/Nginx-UI/model"
"github.com/gorilla/websocket"
)
// TestNodeAnalyticRecordHalfDeadConnection reproduces the bug that caused node
// status to freeze until the nginx-ui process was restarted: a remote node
// that accepts the WebSocket upgrade but then stops responding (e.g. silent
// TCP hang, peer frozen) used to leave nodeAnalyticRecord blocked on ReadJSON
// forever, starving the per-node retry loop. With the keepalive in place,
// ReadJSON must unblock within pongWait and return an error so the caller can
// schedule a reconnect.
func TestNodeAnalyticRecordHalfDeadConnection(t *testing.T) {
// Shrink the keepalive window so the test finishes quickly. Restore on exit
// so other tests in the package see the production values.
origPong, origPing, origWrite := nodeWSPongWait, nodeWSPingPeriod, nodeWSWriteWait
nodeWSPongWait = 300 * time.Millisecond
nodeWSPingPeriod = 100 * time.Millisecond
nodeWSWriteWait = 100 * time.Millisecond
t.Cleanup(func() {
nodeWSPongWait, nodeWSPingPeriod, nodeWSWriteWait = origPong, origPing, origWrite
})
// A test server that satisfies InitNode's HTTP probe and then accepts the
// analytic WebSocket upgrade but never writes a message or answers a ping.
upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }}
mux := http.NewServeMux()
mux.HandleFunc("/api/node", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(NodeInfo{Version: "test"})
})
mux.HandleFunc("/api/analytic/intro", func(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer c.Close()
// Swallow the ping by overriding the default pong-on-ping handler: do
// nothing, so the client's read deadline must expire on its own.
c.SetPingHandler(func(string) error { return nil })
// Block until the connection is closed by the peer.
for {
if _, _, err := c.ReadMessage(); err != nil {
return
}
}
})
srv := httptest.NewServer(mux)
t.Cleanup(srv.Close)
// Use the raw httptest URL; GetWebSocketURL will rewrite http:// to ws://.
node := &model.Node{
Model: model.Model{ID: 42},
Name: "half-dead",
URL: srv.URL,
Token: "test-token",
}
// Make sure the NodeMap slot exists so updateNodeStatus is a no-op on the
// shared map across parallel tests.
nodeMapMu.Lock()
if NodeMap == nil {
NodeMap = make(TNodeMap)
}
nodeMapMu.Unlock()
t.Cleanup(func() {
nodeMapMu.Lock()
delete(NodeMap, node.ID)
nodeMapMu.Unlock()
})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
errCh := make(chan error, 1)
go func() {
errCh <- nodeAnalyticRecord(node, ctx)
}()
select {
case err := <-errCh:
if err == nil {
t.Fatalf("expected nodeAnalyticRecord to fail on read deadline, got nil")
}
// Read-deadline expiry surfaces as an i/o timeout wrapped in the
// websocket close-error path; either way it must be non-nil.
if !strings.Contains(err.Error(), "timeout") && !strings.Contains(err.Error(), "closed") {
t.Logf("returned err = %v (non-nil, acceptable)", err)
}
case <-time.After(2 * time.Second):
t.Fatalf("nodeAnalyticRecord did not return within 2s — read deadline / ping-pong not enforced")
}
}