fix: reduce log indexing write amplification

This commit is contained in:
0xJacky
2026-06-04 15:20:28 +08:00
parent fa81149e01
commit 071582ec46
5 changed files with 148 additions and 2 deletions
+44 -2
View File
@@ -2,10 +2,10 @@
import type { CustomRenderArgs, StdTableColumn } from '@uozi-admin/curd'
import type { NginxLogData } from '@/api/nginx_log'
import type { TabOption } from '@/components/TabFilter'
import { CheckCircleOutlined, ExclamationCircleOutlined, SyncOutlined } from '@ant-design/icons-vue'
import { CheckCircleOutlined, ExclamationCircleOutlined, StopOutlined, SyncOutlined } from '@ant-design/icons-vue'
import { StdCurd } from '@uozi-admin/curd'
import { useRouteQuery } from '@vueuse/router'
import { Badge, Tag, Tooltip } from 'ant-design-vue'
import { Badge, Modal, Tag, Tooltip } from 'ant-design-vue'
import dayjs from 'dayjs'
import nginxLog from '@/api/nginx_log'
import { DevDebugPanel } from '@/components/DevDebugPanel'
@@ -24,6 +24,7 @@ const indexManagementRef = ref()
const indexingSettingsModalVisible = ref(false)
const advancedIndexingEnabled = ref(false)
const enableIndexingLoading = ref(false)
const disableIndexingLoading = ref(false)
// WebSocket event bus and global store
const websocketEventBus = useWebSocketEventBusStore()
@@ -411,6 +412,32 @@ async function enableAdvancedIndexing() {
}
}
function disableAdvancedIndexing() {
Modal.confirm({
title: $gettext('Disable Advanced Indexing'),
content: $gettext('Advanced log indexing will stop and structured log analytics will be unavailable. Existing index files will not be deleted. Continue?'),
okText: $gettext('Disable'),
okType: 'danger',
cancelText: $gettext('Cancel'),
async onOk() {
disableIndexingLoading.value = true
try {
await nginxLog.disableAdvancedIndexing()
advancedIndexingEnabled.value = false
message.success($gettext('Advanced indexing disabled successfully'))
refreshTable()
}
catch (error) {
console.error('Failed to disable advanced indexing:', error)
message.error($gettext('Failed to disable advanced indexing'))
}
finally {
disableIndexingLoading.value = false
}
},
})
}
function cancelIndexingSettings() {
indexingSettingsModalVisible.value = false
}
@@ -423,6 +450,7 @@ const debugData = computed(() => ({
nginxLogStatus: nginxLogStatus.value,
isGlobalIndexing: isGlobalIndexing.value,
enableIndexingLoading: enableIndexingLoading.value,
disableIndexingLoading: disableIndexingLoading.value,
indexingSettingsModalVisible: indexingSettingsModalVisible.value,
columns: columns.value.map(col => ({ title: col.title, dataIndex: col.dataIndex })),
tabOptions,
@@ -483,6 +511,20 @@ const debugData = computed(() => ({
:indexing="isGlobalIndexing || processingStatus.nginx_log_indexing"
@refresh="refreshTable"
/>
<AButton
v-if="activeLogType === 'access' && advancedIndexingEnabled"
type="link"
size="small"
danger
:loading="disableIndexingLoading"
:disabled="processingStatus.nginx_log_indexing"
@click="disableAdvancedIndexing"
>
<template #icon>
<StopOutlined />
</template>
{{ $gettext('Disable Advanced Indexing') }}
</AButton>
</div>
</template>
<template #beforeActions="{ record }">
+26
View File
@@ -12,6 +12,7 @@ import (
"github.com/blevesearch/bleve/v2"
"github.com/blevesearch/bleve/v2/mapping"
"github.com/blevesearch/bleve/v2/search/query"
indexapi "github.com/blevesearch/bleve_index_api"
"github.com/gabriel-vasile/mimetype"
"github.com/uozi-tech/cosy/logger"
)
@@ -277,6 +278,11 @@ func (si *SearchIndexer) IndexDocument(doc SearchDocument) (err error) {
contentSize := int64(len(doc.Content))
existingDoc, err := si.index.Document(doc.ID)
isNewDocument := err != nil || existingDoc == nil
if !isNewDocument {
if existingContent, ok := documentStringField(existingDoc, "content"); ok && existingContent == doc.Content {
return nil
}
}
// For new documents, check memory limits
if isNewDocument {
@@ -300,6 +306,26 @@ func (si *SearchIndexer) IndexDocument(doc SearchDocument) (err error) {
return nil
}
func documentStringField(doc indexapi.Document, name string) (string, bool) {
if doc == nil {
return "", false
}
var value string
var found bool
doc.VisitFields(func(field indexapi.Field) {
if found {
return
}
if field.Name() == name {
value = string(field.Value())
found = true
}
})
return value, found
}
// Search performs a search query
func (si *SearchIndexer) Search(ctx context.Context, queryStr string, limit int) ([]SearchResult, error) {
return si.searchWithType(ctx, queryStr, "", limit)
+51
View File
@@ -1,7 +1,9 @@
package cache
import (
"context"
"testing"
"time"
)
// TestIsNumericQuery tests the isNumericQuery function
@@ -138,4 +140,53 @@ func TestSearchStrategyDifference(t *testing.T) {
}
}
func TestHandleConfigScanSkipsUnchangedContent(t *testing.T) {
indexer := &SearchIndexer{
indexPath: t.TempDir(),
maxMemoryUsage: 100 * 1024 * 1024,
}
ctx := context.Background()
if err := indexer.Initialize(ctx); err != nil {
t.Fatalf("Initialize() error = %v", err)
}
t.Cleanup(func() {
if err := indexer.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
})
configPath := "/etc/nginx/sites-enabled/example.conf"
content := []byte("server { listen 80; server_name example.com; }")
if err := indexer.handleConfigScan(configPath, content); err != nil {
t.Fatalf("handleConfigScan() first call error = %v", err)
}
results, err := indexer.Search(ctx, "example.com", 10)
if err != nil {
t.Fatalf("Search() after first index error = %v", err)
}
if len(results) != 1 {
t.Fatalf("Search() after first index returned %d results, want 1", len(results))
}
firstUpdatedAt := results[0].Document.UpdatedAt
if firstUpdatedAt.IsZero() {
t.Fatal("first UpdatedAt is zero")
}
time.Sleep(1100 * time.Millisecond)
if err := indexer.handleConfigScan(configPath, content); err != nil {
t.Fatalf("handleConfigScan() second call error = %v", err)
}
results, err = indexer.Search(ctx, "example.com", 10)
if err != nil {
t.Fatalf("Search() after second index error = %v", err)
}
if len(results) != 1 {
t.Fatalf("Search() after second index returned %d results, want 1", len(results))
}
if !results[0].Document.UpdatedAt.Equal(firstUpdatedAt) {
t.Fatalf("UpdatedAt changed for unchanged content: got %s, want %s",
results[0].Document.UpdatedAt, firstUpdatedAt)
}
}
+9
View File
@@ -48,6 +48,11 @@ func setupIncrementalIndexingJob(s gocron.Scheduler) (gocron.Job, error) {
// performIncrementalIndexing performs the actual incremental indexing check
func performIncrementalIndexing() {
if !shouldRunIncrementalIndexing() {
logger.Debug("Advanced log indexing is disabled; skipping incremental log indexing scan")
return
}
logger.Debug("Starting incremental log indexing scan")
// Get log file manager
@@ -122,6 +127,10 @@ func performIncrementalIndexing() {
}
}
func shouldRunIncrementalIndexing() bool {
return settings.NginxLogSettings.IndexingEnabled
}
// needsIncrementalIndexing checks if a log file needs incremental indexing
func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex, persistence logIndexProvider) bool {
// Skip if already indexing or queued
@@ -9,6 +9,7 @@ import (
"github.com/0xJacky/Nginx-UI/internal/nginx_log"
"github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/settings"
)
// Test that grouped (aggregated) log metadata with oversized LastSize values
@@ -137,3 +138,20 @@ func TestNeedsIncrementalIndexingDetectsGrowth(t *testing.T) {
t.Fatalf("expected incremental indexing when file grew")
}
}
func TestShouldRunIncrementalIndexingRequiresEnabledSetting(t *testing.T) {
originalEnabled := settings.NginxLogSettings.IndexingEnabled
t.Cleanup(func() {
settings.NginxLogSettings.IndexingEnabled = originalEnabled
})
settings.NginxLogSettings.IndexingEnabled = false
if shouldRunIncrementalIndexing() {
t.Fatalf("expected incremental indexing cron task to be disabled")
}
settings.NginxLogSettings.IndexingEnabled = true
if !shouldRunIncrementalIndexing() {
t.Fatalf("expected incremental indexing cron task to be enabled")
}
}