refactor: replace mutex with RWMutex for NodeMap access and implement snapshot functionality #1444

This commit is contained in:
0xJacky
2025-11-24 14:10:23 +00:00
parent aee2352eb5
commit 98e83f13b5
4 changed files with 205 additions and 24 deletions
+3 -2
View File
@@ -107,8 +107,9 @@ func GetNodesAnalytic(c *gin.Context) {
defer ws.Close()
for {
// Send NodeMap data to client
err = ws.WriteJSON(analytic.NodeMap)
// Send snapshot of NodeMap data to client to avoid concurrent access
nodeSnapshot := analytic.SnapshotNodeMap()
err = ws.WriteJSON(nodeSnapshot)
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
logger.Error(err)
+57 -6
View File
@@ -43,7 +43,7 @@ type Node struct {
NodeInfo
}
var mutex sync.Mutex
var nodeMapMu sync.RWMutex
type TNodeMap map[uint64]*Node
@@ -53,6 +53,46 @@ func init() {
NodeMap = make(TNodeMap)
}
func cloneNode(n *Node) *Node {
if n == nil {
return nil
}
cloned := *n
if n.Node != nil {
nodeCopy := *n.Node
cloned.Node = &nodeCopy
}
if n.UpstreamStatusMap != nil {
upstreams := make(map[string]*upstream.Status, len(n.UpstreamStatusMap))
for key, status := range n.UpstreamStatusMap {
if status == nil {
upstreams[key] = nil
continue
}
statusCopy := *status
upstreams[key] = &statusCopy
}
cloned.UpstreamStatusMap = upstreams
}
return &cloned
}
func SnapshotNodeMap() TNodeMap {
nodeMapMu.RLock()
defer nodeMapMu.RUnlock()
snapshot := make(TNodeMap, len(NodeMap))
for id, node := range NodeMap {
snapshot[id] = cloneNode(node)
}
return snapshot
}
func GetNode(node *model.Node) (n *Node) {
if node == nil {
// this should never happen
@@ -64,12 +104,23 @@ func GetNode(node *model.Node) (n *Node) {
Node: node,
}
}
n, ok := NodeMap[node.ID]
if !ok {
n = &Node{}
nodeMapMu.RLock()
cached, ok := NodeMap[node.ID]
nodeMapMu.RUnlock()
if !ok || cached == nil {
return &Node{
Node: node,
}
}
n.Node = node
return n
cloned := cloneNode(cached)
if cloned == nil {
return &Node{
Node: node,
}
}
cloned.Node = node
return cloned
}
func InitNode(node *model.Node) (n *Node, err error) {
+16 -16
View File
@@ -69,8 +69,8 @@ func getRetryState(nodeID uint64) *NodeRetryState {
// updateNodeStatus directly updates node status without condition checks
func updateNodeStatus(nodeID uint64, status bool, reason string) {
mutex.Lock()
defer mutex.Unlock()
nodeMapMu.Lock()
defer nodeMapMu.Unlock()
now := time.Now()
if NodeMap[nodeID] == nil {
@@ -131,8 +131,8 @@ func markConnectionSuccess(nodeID uint64) {
}
func logCurrentNodeStatus(prefix string) {
mutex.Lock()
defer mutex.Unlock()
nodeMapMu.Lock()
defer nodeMapMu.Unlock()
if NodeMap != nil {
logger.Debugf("%s: NodeMap contains %d nodes", prefix, len(NodeMap))
}
@@ -219,13 +219,13 @@ func cleanupDisabledNodes(enabledEnvIDs []uint64) {
}
retryMutex.Unlock()
mutex.Lock()
nodeMapMu.Lock()
for envID := range NodeMap {
if !enabledMap[envID] {
delete(NodeMap, envID)
}
}
mutex.Unlock()
nodeMapMu.Unlock()
}
// getEnabledNodes retrieves enabled nodes from cache or database
@@ -287,11 +287,11 @@ func RetrieveNodesStatus(ctx context.Context) {
logger.Info("RetrieveNodesStatus start")
defer logger.Info("RetrieveNodesStatus exited")
mutex.Lock()
nodeMapMu.Lock()
if NodeMap == nil {
NodeMap = make(TNodeMap)
}
mutex.Unlock()
nodeMapMu.Unlock()
envCheckTicker := time.NewTicker(30 * time.Second)
defer envCheckTicker.Stop()
@@ -396,8 +396,8 @@ func RetrieveNodesStatus(ctx context.Context) {
}
func checkNodeTimeouts(timeout time.Duration) {
mutex.Lock()
defer mutex.Unlock()
nodeMapMu.Lock()
defer nodeMapMu.Unlock()
now := time.Now()
for _, node := range NodeMap {
if node != nil && node.Status && now.Sub(node.ResponseAt) > timeout {
@@ -445,7 +445,7 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
node, err := InitNode(nodeModel)
if err != nil {
mutex.Lock()
nodeMapMu.Lock()
if NodeMap[nodeModel.ID] == nil {
NodeMap[nodeModel.ID] = &Node{
Node: nodeModel,
@@ -455,13 +455,13 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
NodeMap[nodeModel.ID].Status = false
NodeMap[nodeModel.ID].ResponseAt = time.Now()
}
mutex.Unlock()
nodeMapMu.Unlock()
return err
}
mutex.Lock()
nodeMapMu.Lock()
NodeMap[nodeModel.ID] = node
mutex.Unlock()
nodeMapMu.Unlock()
u, err := nodeModel.GetWebSocketURL("/api/analytic/intro")
if err != nil {
@@ -515,7 +515,7 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
return nil
}
mutex.Lock()
nodeMapMu.Lock()
if NodeMap[nodeModel.ID] == nil {
NodeMap[nodeModel.ID] = &Node{
Node: nodeModel,
@@ -535,6 +535,6 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error {
NodeMap[nodeModel.ID].Status = true
NodeMap[nodeModel.ID].ResponseAt = time.Now()
}
mutex.Unlock()
nodeMapMu.Unlock()
}
}
+129
View File
@@ -0,0 +1,129 @@
package analytic
import (
"testing"
"github.com/0xJacky/Nginx-UI/internal/upstream"
"github.com/0xJacky/Nginx-UI/model"
)
func TestSnapshotNodeMapIsolation(t *testing.T) {
nodeMapMu.Lock()
original := NodeMap
NodeMap = make(TNodeMap)
NodeMap[1] = &Node{
Node: &model.Node{
Model: model.Model{ID: 1},
Name: "node-1",
URL: "https://example.com",
},
NodeStat: NodeStat{
Status: true,
UpstreamStatusMap: map[string]*upstream.Status{
"default": {
Online: true,
Latency: 5,
},
},
},
NodeInfo: NodeInfo{
Version: "1.0.0",
},
}
nodeMapMu.Unlock()
t.Cleanup(func() {
nodeMapMu.Lock()
NodeMap = original
nodeMapMu.Unlock()
})
snapshot := SnapshotNodeMap()
nodeMapMu.Lock()
NodeMap[1].Status = false
NodeMap[1].UpstreamStatusMap["default"].Online = false
NodeMap[1].Node.Name = "mutated"
nodeMapMu.Unlock()
cloned := snapshot[1]
if cloned == nil {
t.Fatalf("expected snapshot entry for node 1")
}
if !cloned.Status {
t.Fatalf("expected snapshot status to remain true, got false")
}
upstreamStatus, ok := cloned.UpstreamStatusMap["default"]
if !ok || upstreamStatus == nil {
t.Fatalf("expected upstream status in snapshot")
}
if !upstreamStatus.Online {
t.Fatalf("expected upstream online in snapshot")
}
if cloned.Node == nil {
t.Fatalf("expected cloned node metadata")
}
if cloned.Node.Name != "node-1" {
t.Fatalf("expected cloned node name to remain 'node-1', got %s", cloned.Node.Name)
}
}
func TestGetNodeReturnsClonedData(t *testing.T) {
originalDBNode := &model.Node{
Model: model.Model{ID: 2},
Name: "db-node",
URL: "https://cluster.local",
Token: "secret",
}
nodeMapMu.Lock()
original := NodeMap
NodeMap = make(TNodeMap)
NodeMap[2] = &Node{
Node: &model.Node{
Model: model.Model{ID: 2},
Name: "cached-node",
},
NodeStat: NodeStat{
Status: true,
},
}
nodeMapMu.Unlock()
t.Cleanup(func() {
nodeMapMu.Lock()
NodeMap = original
nodeMapMu.Unlock()
})
result := GetNode(originalDBNode)
if result == nil {
t.Fatalf("expected GetNode result")
}
if result.Node == nil {
t.Fatalf("expected result node metadata")
}
if result.Node.Name != "db-node" {
t.Fatalf("expected node name from DB copy, got %s", result.Node.Name)
}
nodeMapMu.Lock()
NodeMap[2].Node.Name = "mutated-cache"
nodeMapMu.Unlock()
if result.Node.Name != "db-node" {
t.Fatalf("expected result node name to remain 'db-node', got %s", result.Node.Name)
}
result.Node.Name = "updated-result"
nodeMapMu.RLock()
if NodeMap[2].Node.Name == "updated-result" {
t.Fatalf("expected NodeMap to remain isolated from result mutation")
}
nodeMapMu.RUnlock()
}