From 0a541f0c81249449ecdd1009deb7bcae4dff1d18 Mon Sep 17 00:00:00 2001 From: Kunjal Tandel Date: Wed, 22 Oct 2025 12:25:01 +0530 Subject: [PATCH 1/3] feat: added retry logic for super proof with failed btc tx --- da/super_proof.go | 111 ++++++++++++++++++++++++++++++- main.go | 23 +++++++ models/aggregated_proof.go | 71 ++++++++++++++++++++ tools/db_reader.go | 9 ++- tools/run_failed_super_proofs.go | 12 ++++ 5 files changed, 221 insertions(+), 5 deletions(-) create mode 100644 tools/run_failed_super_proofs.go diff --git a/da/super_proof.go b/da/super_proof.go index bf32b39..3a5eebb 100644 --- a/da/super_proof.go +++ b/da/super_proof.go @@ -32,10 +32,10 @@ func SuperProofCronJob(cfg *config.Config) { InitOPReturnRPC(cfg.BtcEndpoint, cfg.Auth, cfg.WalletPassphrase) log.Println("Starting Super Proof Cron Job") - log.Println("Super proof will run 6 times daily at 12:00 AM, 4:00 AM, 8:00 AM, 12:00 PM, 4:00 PM, and 8:00 PM UTC") + log.Println("Super proof will run 4 times daily at 12:00 AM, 6:00 AM, 12:00 PM, and 6:00 PM UTC") - // Define the scheduled hours (0, 4, 8, 12, 16, 20 in 24-hour format) - scheduledHours := []int{0, 4, 8, 12, 16, 20} + // Define the scheduled hours (0, 6, 12, 18 in 24-hour format) + scheduledHours := []int{0, 6, 12, 18} for { now := time.Now().UTC() @@ -56,6 +56,51 @@ func SuperProofCronJob(cfg *config.Config) { } } +func NonBTCTxSuperProofCronJob(cfg *config.Config, immediate bool) { + + err := models.InitDB(cfg.PostgresConnectionURI) + if err != nil { + log.Fatalf("Error initializing DB Connection: %v", err) + } + defer func() { + if err := models.CloseDB(); err != nil { + log.Printf("Error closing database: %v", err) + } + }() + + InitOPReturnRPC(cfg.BtcEndpoint, cfg.Auth, cfg.WalletPassphrase) + + log.Println("Starting Non BTC TX Super Proof Cron Job") + log.Println("Super proof will run at 1:00 AM, 7:00 AM, 1:00 PM, and 7:00 PM UTC") + + // Define the scheduled hours (1 in 24-hour format) + scheduledHours := []int{1, 7, 13, 19} + + if immediate { + log.Println("Running non BTC TX super proof immediately") + processNonBTCTxSuperProof(cfg) + return + } + + for { + now := time.Now().UTC() + + // Find the next scheduled time + nextScheduledTime := findNextScheduledTime(now, scheduledHours) + + // Calculate duration until next scheduled time + duration := nextScheduledTime.Sub(now) + log.Printf("Next super proof scheduled for: %s (in %v)", nextScheduledTime.Format("2006-01-02 15:04:05 UTC"), duration) + + // Wait until next scheduled time + time.Sleep(duration) + + // Run the super proof process + log.Printf("Running super proof at scheduled time: %s", time.Now().UTC().Format("2006-01-02 15:04:05 UTC")) + processNonBTCTxSuperProof(cfg) + } +} + // findNextScheduledTime calculates the next scheduled time based on current time and scheduled hours func findNextScheduledTime(now time.Time, scheduledHours []int) time.Time { currentHour := now.Hour() @@ -172,3 +217,63 @@ func processSuperProof(cfg *config.Config) { log.Println("No transaction data available for super proof, skipping database storage") } } + +func processNonBTCTxSuperProof(cfg *config.Config) { + log.Println("Processing non BTC TX super proof...") + + // Get the last processed timestamp + superProofWithoutBTCTxHash, err := models.GetSuperProofsWithoutBTCTxHash() + if err != nil { + log.Printf("Error getting last super proof timestamp: %v", err) + return + } + + if len(superProofWithoutBTCTxHash) == 0 { + log.Println("No super proofs without BTC TX hash to process") + return + } + + log.Printf("Found %d super proofs without BTC TX hash to process", len(superProofWithoutBTCTxHash)) + + // Initialize data reader for BTC processing + dataReader := NewBlockSubscriber() + defer func() { + if err := dataReader.Close(); err != nil { + log.Printf("Error closing BlockSubscriber: %v", err) + } + }() + + fnBtc := func(msg [][]byte) ([]byte, error) { + hash, err := ProcessBTCMsg(msg[1], cfg.ProtocolId) + return hash, err + } + + superProof := superProofWithoutBTCTxHash[0] + + log.Printf("Processing super proof without BTC TX hash: %s", superProof.ID) + + hash, err := dataReader.ProcessOutTuple(fnBtc, [][]byte{nil, superProof.AggregateProof}) + if err != nil { + log.Printf("Error writing super proof to BTC: %v", err) + return + } + + btcTxHash := strings.ReplaceAll(string(hash[:]), "\n", "") + log.Printf("Super proof BTC transaction hash: %s", btcTxHash) + + // Get transaction details including block number + _, btcBlockNumber := GetTransactionInfo(btcTxHash) + if btcBlockNumber != nil { + log.Printf("Super proof BTC transaction confirmed in block: %d", *btcBlockNumber) + } else { + log.Printf("Super proof BTC transaction block information not available yet") + } + + err = models.UpdateSuperProofWithBTCTxHash(superProof.ID, &btcTxHash, btcBlockNumber) + if err != nil { + log.Printf("Error updating super proof with BTC TX hash: %v", err) + return + } + + log.Printf("Updated super proof with BTC TX hash: %s", superProof.ID) +} diff --git a/main.go b/main.go index 1cc92d2..892fdd8 100644 --- a/main.go +++ b/main.go @@ -36,6 +36,7 @@ func main() { // Create separate error channels for each service hashBlockDone := make(chan error, 1) superProofDone := make(chan error, 1) + failedSuperProofDone := make(chan error, 1) log.Println("Starting Bitcoin DA services...") utils.LogSystemError("main", "Services starting", nil, map[string]interface{}{ @@ -70,6 +71,20 @@ func main() { superProofDone <- nil }() + // Start NonBTCTxSuperProofCronJob service + go func() { + defer func() { + if r := recover(); r != nil { + utils.RecoverFromPanic("NonBTCTxSuperProofCronJob") + failedSuperProofDone <- fmt.Errorf("NonBTCTxSuperProofCronJob panic: %v", r) + } + }() + + log.Println("Starting NonBTCTxSuperProofCronJob...") + da.NonBTCTxSuperProofCronJob(&cfg, false) + failedSuperProofDone <- nil + }() + // Wait for either shutdown signal or service completion select { case sig := <-sigChan: @@ -89,6 +104,7 @@ func main() { // Wait for both services to complete <-hashBlockDone <-superProofDone + <-failedSuperProofDone servicesShutdown <- true }() @@ -114,5 +130,12 @@ func main() { log.Fatalf("SuperProofCronJob failed: %v", err) } log.Println("SuperProofCronJob completed normally") + + case err := <-failedSuperProofDone: + if err != nil { + utils.LogCriticalError("main", "NonBTCTxSuperProofCronJob failed", err, nil) + log.Fatalf("NonBTCTxSuperProofCronJob failed: %v", err) + } + log.Println("NonBTCTxSuperProofCronJob completed normally") } } diff --git a/models/aggregated_proof.go b/models/aggregated_proof.go index cc3c1a0..c4f38d9 100644 --- a/models/aggregated_proof.go +++ b/models/aggregated_proof.go @@ -173,3 +173,74 @@ func GetLastSuperProofTimestamp() (time.Time, error) { log.Printf("Last super proof timestamp: %v", lastTimestamp) return lastTimestamp, nil } + +func GetSuperProofsWithoutBTCTxHash() ([]AggregatedProof, error) { + var proofs []AggregatedProof + + err := RetryDBOperation(func() error { + db, err := GetDB() + if err != nil { + return fmt.Errorf("failed to get database connection: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Query for the most recent super proof (where btc_tx_hash is not null) + err = db.NewSelect(). + Model(&proofs). + Where("btc_tx_hash = ''"). + Order("id ASC"). + Limit(1). + Scan(ctx) + + if err != nil { + // If no super proof exists yet, return a default timestamp (24 hours ago) + if err == sql.ErrNoRows { + return nil // This will be handled in the calling code + } + return fmt.Errorf("failed to fetch last super proof timestamp: %w", err) + } + + return nil + }) + + if err != nil { + return nil, err // Default to 24 hours ago + } + + log.Printf("Found %d Super Proofs Without BTC TX Hash", len(proofs)) + return proofs, nil +} + +func UpdateSuperProofWithBTCTxHash(id string, btc_tx_hash *string, btc_block_number *int64) error { + err := RetryDBOperation(func() error { + db, err := GetDB() + if err != nil { + return fmt.Errorf("failed to get database connection: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err = db.NewUpdate(). + Model(&AggregatedProof{}). + Where("id = ?", id). + Set("btc_tx_hash = ?", btc_tx_hash). + Set("btc_block_number = ?", btc_block_number). + Exec(ctx) + + if err != nil { + return fmt.Errorf("failed to update super proof with BTC TX hash: %w", err) + } + + return nil + }) + + if err != nil { + return fmt.Errorf("failed to update super proof with BTC TX hash after retries: %w", err) + } + + log.Printf("Updated super proof with BTC TX hash: %s", id) + return nil +} diff --git a/tools/db_reader.go b/tools/db_reader.go index 2994d8b..287e810 100644 --- a/tools/db_reader.go +++ b/tools/db_reader.go @@ -22,8 +22,9 @@ func main() { DB. NewSelect(). Model(&proofs). - Order("id ASC"). - Limit(100). + Where("btc_tx_hash IS NOT NULL"). + Order("id DESC"). + Limit(1). Offset(0). Scan(ctx) @@ -32,4 +33,8 @@ func main() { } fmt.Printf("Response received %s\n", &proofs) + fmt.Printf("BTC TX: %s\n", string(*proofs[0].BTCTxHash)) + fmt.Printf("Edgen Chain TX: %s\n", string(proofs[0].TransactionHash)) + fmt.Printf("Aggregated Proof: %s\n", string(proofs[0].AggregateProof)) + fmt.Printf("Aggregated Proof Timestamp: %s\n", proofs[0].Timestamp.Format("2006-01-02 15:04:05")) } diff --git a/tools/run_failed_super_proofs.go b/tools/run_failed_super_proofs.go new file mode 100644 index 0000000..6d93002 --- /dev/null +++ b/tools/run_failed_super_proofs.go @@ -0,0 +1,12 @@ +package main + +import ( + "github.com/Layer-Edge/bitcoin-da/config" + "github.com/Layer-Edge/bitcoin-da/da" +) + +var cfg = config.GetConfig() + +func main() { + da.NonBTCTxSuperProofCronJob(&cfg, true) +} From b54047e32ea71b89e801c08b25a26bbac8deca1e Mon Sep 17 00:00:00 2001 From: Krishna kumar S Date: Sat, 25 Oct 2025 19:33:06 +0530 Subject: [PATCH 2/3] btc fee changes --- da/op_return_rpc.go | 196 +++++++++++++++++++++++++++++++------ da/super_proof.go | 15 +++ models/aggregated_proof.go | 31 ++++++ 3 files changed, 210 insertions(+), 32 deletions(-) diff --git a/da/op_return_rpc.go b/da/op_return_rpc.go index 269fe0f..cb130ed 100644 --- a/da/op_return_rpc.go +++ b/da/op_return_rpc.go @@ -26,7 +26,11 @@ var ( backoffFactor = 2.0 requestTimeout = 30 * time.Second - // Circuit breaker configuration + // Circuit breaker for RPC calls + rpcMutex sync.RWMutex + failureCount int + lastFailTime time.Time + circuitOpen bool circuitTimeout = 60 * time.Second maxFailures = 5 ) @@ -41,10 +45,9 @@ type response struct { } type utxo struct { - Txid string `json:"txid"` - Vout int `json:"vout"` - Amount float64 `json:"amount"` - Address string `json:"address,omitempty"` + Txid string `json:"txid"` + Vout int `json:"vout"` + Amount float64 `json:"amount"` } type signedtx struct { @@ -271,24 +274,23 @@ func GetRawAddress() string { } func CalculateRequired(numInputs int, dataSize int) float64 { - return float64(53+numInputs*68+dataSize) * float64(0.00000002) + return float64(53+numInputs*68+dataSize) * float64(0.00000001) } -func FilterUTXOs(unspent string, length int) ([]map[string]interface{}, float64, string) { +func FilterUTXOs(unspent string, length int) ([]map[string]interface{}, float64) { inputs := []map[string]interface{}{} if unspent == "" { - return inputs, 0.0, "" + return inputs, 0.0 } var t []json.RawMessage err := json.Unmarshal([]byte(unspent), &t) if err != nil { log.Printf("Failed to unmarshal response: %v", err) - return inputs, 0.0, "" + return inputs, 0.0 } totalAmt := 0.0 numInputs := 0 required := 0.0 - var changeAddress string log.Printf("Found %d UTXOs to process", len(t)) @@ -297,7 +299,7 @@ func FilterUTXOs(unspent string, length int) ([]map[string]interface{}, float64, err := json.Unmarshal(t[numInputs], &u) if err != nil { log.Printf("Failed to unmarshal response: %v", err) - return inputs, 0.0, "" + return inputs, 0.0 } else { log.Printf("UTXO : %+v", u) } @@ -312,11 +314,6 @@ func FilterUTXOs(unspent string, length int) ([]map[string]interface{}, float64, totalAmt += float64(u.Amount) required = CalculateRequired(numInputs+1, length) - // Use the address from the first UTXO as change address - if numInputs == 0 && u.Address != "" { - changeAddress = u.Address - } - log.Printf("Current total: %f BTC, required: %f BTC", totalAmt, required) if totalAmt >= required { @@ -324,12 +321,12 @@ func FilterUTXOs(unspent string, length int) ([]map[string]interface{}, float64, } numInputs++ if numInputs >= 10 { - return []map[string]interface{}{}, 0.0, "" + return []map[string]interface{}{}, 0.0 } } change := ((totalAmt - required) * 100000000) / float64(100000000) - log.Printf("Inputs: %v, Change: %f, Change Address: %s", inputs, change, changeAddress) - return inputs, float64(change), changeAddress + log.Printf("Inputs: %v, Change: %f", inputs, change) + return inputs, float64(change) } func CreateRawTransaction(inputs []map[string]interface{}, address string, change float64, data string) string { @@ -343,10 +340,7 @@ func CreateRawTransaction(inputs []map[string]interface{}, address string, chang return "" } - // Convert BTC to satoshis (1 BTC = 100,000,000 satoshis) - changeSatoshis := int64(change * 100000000) - - log.Printf("Creating raw transaction with %d inputs, change address %s, change amount %f BTC (%d satoshis)", len(inputs), address, change, changeSatoshis) + log.Printf("Creating raw transaction with %d inputs, change address %s, change amount %f", len(inputs), address, change) payload := map[string]interface{}{ "jsonrpc": "1.0", @@ -576,35 +570,37 @@ func CreateOPReturnTransaction(data string) string { } // Step 2: Filter UTXOs - inputs, change, changeAddress := FilterUTXOs(unspent, len(data)) + inputs, change := FilterUTXOs(unspent, len(data)) if len(inputs) == 0 { log.Printf("No suitable UTXOs found for transaction") return "" } - if changeAddress == "" { - log.Printf("No change address found in UTXOs") + // Step 3: Get raw address + rawaddr := GetRawAddress() + if rawaddr == "" { + log.Printf("Failed to get raw address") return "" } - // Step 3: Create raw transaction using change address from UTXOs - rawtscn := CreateRawTransaction(inputs, changeAddress, change, data) + // Step 4: Create raw transaction + rawtscn := CreateRawTransaction(inputs, rawaddr, change, data) if rawtscn == "" { log.Printf("Failed to create raw transaction") return "" } - // Step 4: Decode for verification (optional) + // Step 5: Decode for verification (optional) DecodeRawTransaction(rawtscn) - // Step 5: Sign transaction + // Step 6: Sign transaction signtscn := SignRawTransaction(rawtscn) if signtscn == "" { log.Printf("Failed to sign transaction") return "" } - // Step 6: Parse signed transaction + // Step 7: Parse signed transaction var sgn signedtx err := json.Unmarshal([]byte(signtscn), &sgn) if err != nil { @@ -612,7 +608,7 @@ func CreateOPReturnTransaction(data string) string { return "" } - // Step 7: Send signed transaction + // Step 8: Send signed transaction sendtscn := SendSignedTransaction(sgn.Hex) if sendtscn == "" { log.Printf("Failed to send signed transaction") @@ -623,6 +619,142 @@ func CreateOPReturnTransaction(data string) string { return sendtscn } +// MempoolSpaceTxResponse represents the response from mempool.space transaction API +type MempoolSpaceTxResponse struct { + Txid string `json:"txid"` + Fee int `json:"fee"` + Status struct { + Confirmed bool `json:"confirmed"` + BlockTime int64 `json:"block_time"` + } `json:"status"` +} + +// MempoolSpacePriceResponse represents the response from mempool.space historical price API +type MempoolSpacePriceResponse struct { + Prices []struct { + Time int64 `json:"time"` + USD int64 `json:"USD"` + } `json:"prices"` +} + +// FetchTransactionFromMempoolSpace fetches transaction details from mempool.space API +func FetchTransactionFromMempoolSpace(txHash string) (*MempoolSpaceTxResponse, error) { + url := fmt.Sprintf("https://mempool.space/api/tx/%s", txHash) + + log.Printf("Fetching transaction details from mempool.space: %s", url) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to make request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API returned status code: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + var txResponse MempoolSpaceTxResponse + err = json.Unmarshal(body, &txResponse) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + log.Printf("Fetched transaction fee: %d satoshis, block time: %d", txResponse.Fee, txResponse.Status.BlockTime) + return &txResponse, nil +} + +// FetchHistoricalBTCPrice fetches historical BTC price in USD from mempool.space API +func FetchHistoricalBTCPrice(timestamp int64) (float64, error) { + url := fmt.Sprintf("https://mempool.space/api/v1/historical-price?currency=USD×tamp=%d", timestamp) + + log.Printf("Fetching BTC price from mempool.space: %s", url) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return 0, fmt.Errorf("failed to create request: %w", err) + } + + client := &http.Client{Timeout: 30 * time.Second} + resp, err := client.Do(req) + if err != nil { + return 0, fmt.Errorf("failed to make request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf("API returned status code: %d", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, fmt.Errorf("failed to read response body: %w", err) + } + + var priceResponse MempoolSpacePriceResponse + err = json.Unmarshal(body, &priceResponse) + if err != nil { + return 0, fmt.Errorf("failed to unmarshal response: %w", err) + } + + if len(priceResponse.Prices) == 0 { + return 0, fmt.Errorf("no price data available") + } + + // Convert price from integer (representing price * 100) to float + price := float64(priceResponse.Prices[0].USD) / 100.0 + + log.Printf("Fetched BTC price: $%.2f USD", price) + return price, nil +} + +// CalculateTransactionFeeUSD calculates the transaction fee in USD +func CalculateTransactionFeeUSD(txHash string, blockTime int64) (float64, error) { + // Fetch transaction details + txResponse, err := FetchTransactionFromMempoolSpace(txHash) + if err != nil { + return 0, fmt.Errorf("failed to fetch transaction: %w", err) + } + + // Determine timestamp to use for price lookup + var timestamp int64 + if txResponse.Status.BlockTime != 0 { + // Use block time from transaction if available + timestamp = txResponse.Status.BlockTime + } else if blockTime != 0 { + // Use provided block time as fallback + timestamp = blockTime + } else { + // Use current time as last resort + timestamp = time.Now().Unix() + } + + // Fetch historical BTC price + price, err := FetchHistoricalBTCPrice(timestamp) + if err != nil { + return 0, fmt.Errorf("failed to fetch BTC price: %w", err) + } + + // Convert fee from satoshis to BTC (1 BTC = 100,000,000 satoshis) + feeBTC := float64(txResponse.Fee) / 100000000.0 + + // Calculate fee in USD + feeUSD := feeBTC * price + + log.Printf("Transaction fee: %d satoshis (%.8f BTC) * $%.2f = $%.2f USD", txResponse.Fee, feeBTC, price, feeUSD) + return feeUSD, nil +} + func InitOPReturnRPC(endpoint string, auth string, passphrase string) { BTCEndpoint = endpoint Auth = auth diff --git a/da/super_proof.go b/da/super_proof.go index 3a5eebb..2d74e61 100644 --- a/da/super_proof.go +++ b/da/super_proof.go @@ -276,4 +276,19 @@ func processNonBTCTxSuperProof(cfg *config.Config) { } log.Printf("Updated super proof with BTC TX hash: %s", superProof.ID) + + // Calculate and store transaction fee in USD + transactionFeeUSD, err := CalculateTransactionFeeUSD(btcTxHash, 0) + if err != nil { + log.Printf("Error calculating transaction fee: %v", err) + // Continue execution even if fee calculation fails + } else { + err = models.UpdateSuperProofWithTransactionFee(superProof.ID, transactionFeeUSD) + if err != nil { + log.Printf("Error updating super proof with transaction fee: %v", err) + // Continue execution even if fee update fails + } else { + log.Printf("Successfully updated super proof with transaction fee: $%.2f USD", transactionFeeUSD) + } + } } diff --git a/models/aggregated_proof.go b/models/aggregated_proof.go index c4f38d9..b8b6e4e 100644 --- a/models/aggregated_proof.go +++ b/models/aggregated_proof.go @@ -244,3 +244,34 @@ func UpdateSuperProofWithBTCTxHash(id string, btc_tx_hash *string, btc_block_num log.Printf("Updated super proof with BTC TX hash: %s", id) return nil } + +func UpdateSuperProofWithTransactionFee(id string, transactionFee float64) error { + err := RetryDBOperation(func() error { + db, err := GetDB() + if err != nil { + return fmt.Errorf("failed to get database connection: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + _, err = db.NewUpdate(). + Model(&AggregatedProof{}). + Where("id = ?", id). + Set("transaction_fee = ?", transactionFee). + Exec(ctx) + + if err != nil { + return fmt.Errorf("failed to update super proof with transaction fee: %w", err) + } + + return nil + }) + + if err != nil { + return fmt.Errorf("failed to update super proof with transaction fee after retries: %w", err) + } + + log.Printf("Updated super proof with transaction fee: %s ($%.2f)", id, transactionFee) + return nil +} From 371151e63e6fcda7da82ac3442e64d1f64121095 Mon Sep 17 00:00:00 2001 From: Krishna kumar S Date: Wed, 29 Oct 2025 10:29:26 +0530 Subject: [PATCH 3/3] removed few logs --- da/writer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/da/writer.go b/da/writer.go index eb2aa50..016beea 100644 --- a/da/writer.go +++ b/da/writer.go @@ -103,7 +103,10 @@ func HashBlockSubscriber(cfg *config.Config) { return } - log.Println("Aggregated Data: ", aggr.data) + if !strings.HasPrefix(merkle_root, "0x") { + merkle_root = "0x" + merkle_root + } + log.Println("Aggregated Proof: ", merkle_root) aggr.data = ""