diff --git a/cmd/config.go b/cmd/config.go index 77ce94606..220ef7dbb 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -79,6 +79,7 @@ type Config struct { LogDBSummarySnapshots bool DatadogProfiler bool TimeEvents bool + LogToStdErr bool // State Syncer StateChangeDir string @@ -192,6 +193,7 @@ func LoadConfig() *Config { config.LogDBSummarySnapshots = viper.GetBool("log-db-summary-snapshots") config.DatadogProfiler = viper.GetBool("datadog-profiler") config.TimeEvents = viper.GetBool("time-events") + config.LogToStdErr = true // State Syncer config.StateChangeDir = viper.GetString("state-change-dir") diff --git a/cmd/node.go b/cmd/node.go index bed017139..45518d297 100644 --- a/cmd/node.go +++ b/cmd/node.go @@ -66,7 +66,7 @@ func (node *Node) Start(exitChannels ...*chan struct{}) { flag.Set("log_dir", node.Config.LogDirectory) flag.Set("v", fmt.Sprintf("%d", node.Config.GlogV)) flag.Set("vmodule", node.Config.GlogVmodule) - flag.Set("alsologtostderr", "true") + flag.Set("alsologtostderr", fmt.Sprintf("%t", node.Config.LogToStdErr)) flag.Parse() glog.CopyStandardLogTo("INFO") node.runningMutex.Lock() diff --git a/integration_testing/tools.go b/integration_testing/tools.go index 94617bd18..613a93947 100644 --- a/integration_testing/tools.go +++ b/integration_testing/tools.go @@ -82,6 +82,7 @@ func _generateConfig(t *testing.T, config *cmd.Config, port uint32, dataDir stri config.ConnectIPs = []string{} config.PrivateMode = true config.GlogV = 0 + config.LogToStdErr = true config.GlogVmodule = "*bitcoin_manager*=0,*balance*=0,*view*=0,*frontend*=0,*peer*=0,*addr*=0,*network*=0,*utils*=0,*connection*=0,*main*=0,*server*=0,*mempool*=0,*miner*=0,*blockchain*=0" config.MaxInboundPeers = maxPeers config.TargetOutboundPeers = maxPeers diff --git a/lib/db_utils.go b/lib/db_utils.go index 5657e8404..8263881d9 100644 --- a/lib/db_utils.go +++ b/lib/db_utils.go @@ -1107,12 +1107,23 @@ func DBSetWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, value []byte, eve var ancestralValue []byte var getError error + isCoreState := isCoreStateKey(key) + // If snapshot was provided, we will need to load the current value of the record // so that we can later write it in the ancestral record. We first lookup cache. - if isState { - // We check if we've already read this key and stored it in the cache. - // Otherwise, we fetch the current value of this record from the DB. - ancestralValue, getError = DBGetWithTxn(txn, snap, key) + if isState || (isCoreState && eventManager != nil && eventManager.isMempoolManager) { + + // When we are syncing state from the mempool, we need to read the last committed view txn. + // This is because we will be querying the badger DB, and during the flush loop, every entry that is + // updated will first be deleted. In order to counteract this, we reference a badger transaction that was + // initiated before the flush loop started. + if eventManager != nil && eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil { + ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, nil, key) + } else { + // We check if we've already read this key and stored it in the cache. + // Otherwise, we fetch the current value of this record from the DB. + ancestralValue, getError = DBGetWithTxn(txn, snap, key) + } // If there is some error with the DB read, other than non-existent key, we return. if getError != nil && getError != badger.ErrKeyNotFound { @@ -1203,26 +1214,39 @@ func DBDeleteWithTxn(txn *badger.Txn, snap *Snapshot, key []byte, eventManager * var getError error isState := snap != nil && snap.isState(key) + isCoreState := isCoreStateKey(key) + // If snapshot was provided, we will need to load the current value of the record // so that we can later write it in the ancestral record. We first lookup cache. - if isState { - // We check if we've already read this key and stored it in the cache. - // Otherwise, we fetch the current value of this record from the DB. - ancestralValue, getError = DBGetWithTxn(txn, snap, key) - // If the key doesn't exist then there is no point in deleting this entry. - if getError == badger.ErrKeyNotFound { - return nil + if isState || (isCoreState && eventManager != nil && eventManager.isMempoolManager) { + // When we are syncing state from the mempool, we need to read the last committed view txn. + // This is because we will be querying the badger DB, and during the flush loop, every entry that is + // updated will first be deleted. In order to counteract this, we reference a badger transaction that was + // initiated before the flush loop started. + if eventManager != nil && eventManager.isMempoolManager && eventManager.lastCommittedViewTxn != nil { + ancestralValue, getError = DBGetWithTxn(eventManager.lastCommittedViewTxn, snap, key) + } else { + // We check if we've already read this key and stored it in the cache. + // Otherwise, we fetch the current value of this record from the DB. + ancestralValue, getError = DBGetWithTxn(txn, snap, key) + // If the key doesn't exist then there is no point in deleting this entry. + if getError == badger.ErrKeyNotFound { + return nil + } } // If there is some error with the DB read, other than non-existent key, we return. - if getError != nil { + if getError != nil && getError != badger.ErrKeyNotFound { return errors.Wrapf(getError, "DBDeleteWithTxn: problem checking for DB record "+ "with key: %v", key) } } err := txn.Delete(key) - if err != nil { + if err != nil && err == badger.ErrKeyNotFound && eventManager != nil && eventManager.isMempoolManager { + // If the key doesn't exist then there is no point in deleting this entry. + return nil + } else if err != nil { return errors.Wrapf(err, "DBDeleteWithTxn: Problem deleting record "+ "from DB with key: %v", key) } diff --git a/lib/event_manager.go b/lib/event_manager.go index 4584b0101..486f7390e 100644 --- a/lib/event_manager.go +++ b/lib/event_manager.go @@ -1,6 +1,9 @@ package lib -import "github.com/google/uuid" +import ( + "github.com/dgraph-io/badger/v3" + "github.com/google/uuid" +) type TransactionEventFunc func(event *TransactionEvent) type StateSyncerOperationEventFunc func(event *StateSyncerOperationEvent) @@ -59,7 +62,10 @@ type EventManager struct { blockCommittedHandlers []BlockEventFunc blockAcceptedHandlers []BlockEventFunc snapshotCompletedHandlers []SnapshotCompletedEventFunc - isMempoolManager bool + // A transaction used by the state syncer mempool routine to reference the state of the badger db + // prior to flushing mempool transactions. This represents the last committed view of the db. + lastCommittedViewTxn *badger.Txn + isMempoolManager bool } func NewEventManager() *EventManager { diff --git a/lib/server.go b/lib/server.go index d54f56d1e..bdd1fc098 100644 --- a/lib/server.go +++ b/lib/server.go @@ -165,7 +165,7 @@ type Server struct { // It can be used to find computational bottlenecks. timer *Timer - stateChangeSyncer *StateChangeSyncer + StateChangeSyncer *StateChangeSyncer // DbMutex protects the badger database from concurrent access when it's being closed & re-opened. // This is necessary because the database is closed & re-opened when the node finishes hypersyncing in order // to change the database options from Default options to Performance options. @@ -484,7 +484,7 @@ func NewServer( } if stateChangeSyncer != nil { - srv.stateChangeSyncer = stateChangeSyncer + srv.StateChangeSyncer = stateChangeSyncer } // The same timesource is used in the chain data structure and in the connection @@ -556,8 +556,8 @@ func NewServer( _connectIps, _targetOutboundPeers, _maxInboundPeers, _limitOneInboundConnectionPerIP, _peerConnectionRefreshIntervalMillis, _minFeeRateNanosPerKB, nodeServices) - if srv.stateChangeSyncer != nil { - srv.stateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) + if srv.StateChangeSyncer != nil { + srv.StateChangeSyncer.BlockHeight = uint64(_chain.headerTip().Height) } // Create a mempool to store transactions until they're ready to be mined into @@ -3292,8 +3292,8 @@ func (srv *Server) Start() { } // Initialize state syncer mempool job, if needed. - if srv.stateChangeSyncer != nil { - srv.stateChangeSyncer.StartMempoolSyncRoutine(srv) + if srv.StateChangeSyncer != nil { + srv.StateChangeSyncer.StartMempoolSyncRoutine(srv) } // Start the network manager's internal event loop to open and close connections to peers. diff --git a/lib/state_change_syncer.go b/lib/state_change_syncer.go index 7498690b7..080c31a14 100644 --- a/lib/state_change_syncer.go +++ b/lib/state_change_syncer.go @@ -174,8 +174,11 @@ func (stateChangeEntry *StateChangeEntry) RawDecodeWithoutMetadata(blockHeight u ancestralRecord := stateChangeEntry.EncoderType.New() if exist, err := DecodeFromBytes(ancestralRecord, rr); exist && err == nil { stateChangeEntry.AncestralRecord = ancestralRecord + stateChangeEntry.AncestralRecordBytes = EncodeToBytes(blockHeight, ancestralRecord) } else if err != nil { return errors.Wrapf(err, "StateChangeEntry.RawDecodeWithoutMetadata: error decoding ancestral record") + } else { + stateChangeEntry.AncestralRecordBytes = EncodeToBytes(blockHeight, nil) } // Decode the flush UUID. @@ -415,7 +418,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S if event.IsMempoolTxn { // Set the flushId to the mempool flush ID. - //flushId = stateChangeSyncer.BlockSyncFlushI + //flushId = StateChangeSyncer.BlockSyncFlushI // If the event flush ID is nil, then we need to use the global mempool flush ID. if flushId == uuid.Nil { @@ -445,6 +448,7 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S } encoderType = encoder.GetEncoderType() + } else { // If the value associated with the key is not an encoder, then we decode the encoder entirely from the key bytes. // Examples of this are FollowEntry, LikeEntry, DeSoBalanceEntry, etc. @@ -455,8 +459,18 @@ func (stateChangeSyncer *StateChangeSyncer) _handleStateSyncerOperation(event *S encoderType = keyEncoder.GetEncoderType() stateChangeEntry.Encoder = keyEncoder stateChangeEntry.EncoderBytes = nil - } + if stateChangeEntry.AncestralRecordBytes != nil && len(stateChangeEntry.AncestralRecordBytes) > 0 { + // Decode the ancestral record. + ancestralRecord, err := DecodeStateKey(stateChangeEntry.KeyBytes, stateChangeEntry.AncestralRecordBytes) + if err != nil { + glog.Fatalf("Server._handleStateSyncerOperation: Error decoding ancestral record: %v", err) + } + stateChangeEntry.AncestralRecord = ancestralRecord + stateChangeEntry.AncestralRecordBytes = nil + } + } + // Set the encoder type. stateChangeEntry.EncoderType = encoderType @@ -813,6 +827,12 @@ func (stateChangeSyncer *StateChangeSyncer) SyncMempoolToStateSyncer(server *Ser // more than once in the mempool transactions. txn := server.blockchain.db.NewTransaction(true) defer txn.Discard() + + // Create a read-only view of the badger DB prior to the mempool flush. This view will be used to get the ancestral + // records of entries that are being modified in the mempool. + mempoolEventManager.lastCommittedViewTxn = server.blockchain.db.NewTransaction(false) + defer mempoolEventManager.lastCommittedViewTxn.Discard() + glog.V(2).Infof("Time since mempool sync start: %v", time.Since(startTime)) startTime = time.Now() err = mempoolUtxoView.FlushToDbWithTxn(txn, uint64(server.blockchain.bestChain[len(server.blockchain.bestChain)-1].Height)) @@ -1033,6 +1053,7 @@ func (stateChangeSyncer *StateChangeSyncer) StartMempoolSyncRoutine(server *Serv // Sleep for a short while to avoid a tight loop. time.Sleep(100 * time.Millisecond) var err error + // If the mempool is not empty, sync the mempool to the state syncer. mempoolClosed, err = stateChangeSyncer.SyncMempoolToStateSyncer(server) if err != nil {