diff --git a/src/boost/boost.go b/src/boost/boost.go index edfb9b74..619a214b 100644 --- a/src/boost/boost.go +++ b/src/boost/boost.go @@ -347,6 +347,9 @@ func init() { if err == nil { Contracts = c } + + // Start the background save queue worker + startSaveQueueWorker() } func changeContractState(contract *Contract, newstate int) { diff --git a/src/boost/boost_datastore.go b/src/boost/boost_datastore.go index b9a08cbd..2ec454f1 100644 --- a/src/boost/boost_datastore.go +++ b/src/boost/boost_datastore.go @@ -8,6 +8,7 @@ import ( "fmt" "log" "strings" + "sync" "time" "github.com/peterbourgon/diskv/v3" @@ -21,6 +22,11 @@ var ctx = context.Background() var ddl string var queries *Queries +// Save queue infrastructure +var saveQueue = make(chan string, 100) // Buffer up to 100 save requests +var saveQueueMutex sync.Mutex +var pendingSaves = make(map[string]bool) // Track contracts pending save + func sqliteInit() { db, _ := sql.Open("sqlite", "ttbb-data/ContractData.sqlite?_busy_timeout=5000") @@ -31,6 +37,23 @@ func sqliteInit() { queries = New(db) } +// startSaveQueueWorker initializes the background worker that processes save requests +func startSaveQueueWorker() { + go func() { + for contractHash := range saveQueue { + // Process the actual save + processSingleContractSave(contractHash) + + // Mark as no longer pending after processing completes + // This ensures that if the contract is modified during processing, + // a new save request will be queued rather than being skipped + saveQueueMutex.Lock() + delete(pendingSaves, contractHash) + saveQueueMutex.Unlock() + } + }() +} + // SaveAllData will save all contract data to disk func SaveAllData() { log.Print("Saving contract data") @@ -69,32 +92,35 @@ func InverseTransform(pathKey *diskv.PathKey) (key string) { func saveData(contractHash string) { if contractHash != "" { - contract := FindContractByHash(contractHash) - if contract == nil { - return + // Queue individual contract save with deduplication + saveQueueMutex.Lock() + // If already pending, no need to add another request + if !pendingSaves[contractHash] { + pendingSaves[contractHash] = true + saveQueue <- contractHash } - - /* - if contract.State == ContractStateSignup { - if time.Since(contract.LastSaveTime) < 30*time.Second && len(contract.Boosters) < contract.CoopSize { - // Only save signup contracts every 30 seconds during signup - return - } - } else { - if time.Since(contract.LastSaveTime) < 15*time.Second { - // Only save non-signup contracts every 15 seconds - return - } - } - */ - contract.LastSaveTime = time.Now() - saveSqliteData(contract) + saveQueueMutex.Unlock() return } + // Save all contracts - queue each one individually for _, c := range Contracts { - saveSqliteData(c) - c.LastSaveTime = time.Now() + contractHash := c.ContractHash + saveQueueMutex.Lock() + if !pendingSaves[contractHash] { + select { + case saveQueue <- contractHash: + // Successfully queued + pendingSaves[contractHash] = true + default: + // Queue is full, skip this one (it will be retried in the next save cycle) + log.Printf("Save queue full, skipping contract: %s", contractHash) + delete(pendingSaves, contractHash) // Remove from pending since we couldn't queue it + saveQueueMutex.Unlock() + continue + } + } + saveQueueMutex.Unlock() } // Legacy disk store backup @@ -102,6 +128,20 @@ func saveData(contractHash string) { //_ = dataStore.Write("EggsBackup", b) } +// processSingleContractSave handles the actual database write for a single contract +func processSingleContractSave(contractHash string) { + contract := FindContractByHash(contractHash) + if contract == nil { + return + } + + contract.mutex.Lock() + contract.LastSaveTime = time.Now() + contract.mutex.Unlock() + saveSqliteData(contract) + contract.mutex.Unlock() +} + /* func saveEndData(c *Contract) error { //diskmutex.Lock()