Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,39 @@ import (
"time"
"validator/config"
"validator/pkgs/clients"
"validator/pkgs/helpers"
"validator/pkgs/ipfs"
"validator/pkgs/prost"
"validator/pkgs/redis"
"validator/pkgs/utils"
)

func main() {
var wg sync.WaitGroup
// Initiate logger
utils.InitLogger()

helpers.InitLogger()
// Load the config object
config.LoadConfig()
clients.InitializeReportingClient(config.SettingsObj.SlackReportingUrl, 60*time.Second)
helpers.ConfigureClient()
helpers.ConfigureContractInstance()
helpers.RedisClient = helpers.NewRedisClient()
helpers.ConnectIPFSNode()
helpers.PopulateStateVars()

// Initialize reporting service
clients.InitializeReportingClient(config.SettingsObj.SlackReportingUrl, 5*time.Second)

// Initialize tx relayer service
clients.InitializeTxClient(config.SettingsObj.TxRelayerUrl, time.Duration(config.SettingsObj.HttpTimeout)*time.Second)

// Setup redis
redis.RedisClient = redis.NewRedisClient()

// Connect to IPFS node
ipfs.ConnectIPFSNode()

// Set up the RPC client, contract, and ABI instance
prost.ConfigureClient()
prost.ConfigureContractInstance()
prost.ConfigureABI()

var wg sync.WaitGroup

wg.Add(1)
go helpers.StartFetchingBlocks()
go prost.StartBatchAttestation() // Start batch attestation process
wg.Wait()
}
136 changes: 44 additions & 92 deletions config/settings.go
Original file line number Diff line number Diff line change
@@ -1,114 +1,72 @@
package config

import (
"crypto/ecdsa"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
log "github.com/sirupsen/logrus"
"encoding/json"
"log"
"os"
"strconv"

"github.com/ethereum/go-ethereum/common"
)

var SettingsObj *Settings

type Settings struct {
ClientUrl string
ContractAddress string
DataMarketAddress common.Address
RedisHost string
RedisPort string
IPFSUrl string
SignerAccountAddress common.Address
PrivateKey *ecdsa.PrivateKey
ChainID int
BlockTime int
BatchSubmissionLimit int
SlackReportingUrl string
RedisDB int
ClientUrl string
ContractAddress string
RedisHost string
RedisPort string
RedisDB string
IPFSUrl string
SlackReportingUrl string
TxRelayerUrl string
TxRelayerAuthWriteToken string
DataMarketAddresses []string
DataMarketContractAddresses []common.Address
BlockTime int
HttpTimeout int
}

func LoadConfig() {
var err error

missingEnvVars := []string{}

requiredEnvVars := []string{
"PROST_RPC_URL",
"PROTOCOL_STATE_CONTRACT",
"REDIS_HOST",
"REDIS_DB",
"REDIS_PORT",
"IPFS_URL",
"DATA_MARKET_CONTRACT",
"SIGNER_ACCOUNT_PRIVATE_KEY",
"BATCH_SUBMISSION_LIMIT",
"PROST_CHAIN_ID",
"BLOCK_TIME",
"SLACK_REPORTING_URL",
}
dataMarketAddresses := getEnv("DATA_MARKET_ADDRESSES", "[]")
dataMarketAddressesList := []string{}

for envVar := range requiredEnvVars {
if getEnv(requiredEnvVars[envVar], "") == "" {
missingEnvVars = append(missingEnvVars, requiredEnvVars[envVar])
}
err := json.Unmarshal([]byte(dataMarketAddresses), &dataMarketAddressesList)
if err != nil {
log.Fatalf("Failed to parse DATA_MARKET_ADDRESSES environment variable: %v", err)
}

if len(missingEnvVars) > 0 {
log.Fatalf("Missing required environment variables: %v", missingEnvVars)
if len(dataMarketAddressesList) == 0 {
log.Fatalf("DATA_MARKET_ADDRESSES environment variable has an empty array")
}

config := Settings{
ClientUrl: getEnv("PROST_RPC_URL", ""),
ContractAddress: getEnv("PROTOCOL_STATE_CONTRACT", ""),
RedisHost: getEnv("REDIS_HOST", ""),
RedisPort: getEnv("REDIS_PORT", ""),
IPFSUrl: getEnv("IPFS_URL", ""),
SlackReportingUrl: getEnv("SLACK_REPORTING_URL", ""),
}

config.ChainID, err = strconv.Atoi(getEnv("PROST_CHAIN_ID", ""))
if err != nil {
log.Fatalf("PROST_CHAIN_ID is not a valid integer")
}

config.BlockTime, err = strconv.Atoi(getEnv("BLOCK_TIME", ""))
if err != nil {
log.Fatalf("BLOCK_TIME is not a valid integer")
ClientUrl: getEnv("PROST_RPC_URL", ""),
ContractAddress: getEnv("PROTOCOL_STATE_CONTRACT", ""),
RedisHost: getEnv("REDIS_HOST", ""),
RedisPort: getEnv("REDIS_PORT", ""),
RedisDB: getEnv("REDIS_DB", ""),
IPFSUrl: getEnv("IPFS_URL", ""),
SlackReportingUrl: getEnv("SLACK_REPORTING_URL", ""),
TxRelayerUrl: getEnv("TX_RELAYER_URL", ""),
TxRelayerAuthWriteToken: getEnv("TX_RELAYER_AUTH_WRITE_TOKEN", ""),
DataMarketAddresses: dataMarketAddressesList,
}

config.BatchSubmissionLimit, err = strconv.Atoi(getEnv("BATCH_SUBMISSION_LIMIT", ""))
if err != nil {
log.Fatalf("BATCH_SUBMISSION_LIMIT is not a valid integer")
for _, addr := range config.DataMarketAddresses {
config.DataMarketContractAddresses = append(config.DataMarketContractAddresses, common.HexToAddress(addr))
}

config.PrivateKey, err = crypto.HexToECDSA(getEnv("SIGNER_ACCOUNT_PRIVATE_KEY", ""))
if err != nil {
log.Fatalf("SIGNER_ACCOUNT_PRIVATE_KEY is not a valid private key")
blockTime, blockTimeParseErr := strconv.Atoi(getEnv("BLOCK_TIME", ""))
if blockTimeParseErr != nil {
log.Fatalf("Failed to parse BLOCK_TIME environment variable: %v", blockTimeParseErr)
}
config.BlockTime = blockTime

config.RedisDB, err = strconv.Atoi(getEnv("REDIS_DB", ""))
if err != nil {
log.Fatalf("REDIS_DB is not a valid integer")
httpTimeout, timeoutParseErr := strconv.Atoi(getEnv("HTTP_TIMEOUT", ""))
if timeoutParseErr != nil {
log.Fatalf("Failed to parse HTTP_TIMEOUT environment variable: %v", timeoutParseErr)
}

// get signer address from private key
config.SignerAccountAddress = crypto.PubkeyToAddress(config.PrivateKey.PublicKey)

config.DataMarketAddress = common.HexToAddress(getEnv("DATA_MARKET_CONTRACT", ""))

log.Infoln("Configuration loaded successfully")
log.Infoln("Client URL: ", config.ClientUrl)
log.Infoln("Contract Address: ", config.ContractAddress)
log.Infoln("Redis Host: ", config.RedisHost)
log.Infoln("Redis Port: ", config.RedisPort)
log.Infoln("Redis DB: ", config.RedisDB)
log.Infoln("IPFS URL: ", config.IPFSUrl)
log.Infoln("Chain ID: ", config.ChainID)
log.Infoln("Block Time: ", config.BlockTime)
log.Infoln("Batch Submission Limit: ", config.BatchSubmissionLimit)
log.Infoln("Signer Account Address: ", config.SignerAccountAddress.Hex())
log.Infoln("Data Market Address: ", config.DataMarketAddress.Hex())
log.Infoln("Slack Reporting URL: ", config.SlackReportingUrl)
config.HttpTimeout = httpTimeout

SettingsObj = &config
}
Expand All @@ -120,9 +78,3 @@ func getEnv(key, defaultValue string) string {
}
return value
}

func checkOptionalEnvVar(value, key string) {
if value == "" {
log.Warnf("Optional environment variable %s is not set", key)
}
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
module validator

go 1.22
go 1.21

require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.2.0
github.com/ethereum/go-ethereum v1.13.12
github.com/go-redis/redis/v8 v8.11.5
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPx
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4=
github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
Expand Down
6 changes: 3 additions & 3 deletions pkgs/clients/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
"crypto/tls"
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"net/http"
"time"

log "github.com/sirupsen/logrus"
)

var reportingClient *ReportingService
Expand Down Expand Up @@ -37,7 +38,6 @@ func (s ValidatorAlert) String() string {

// sendPostRequest sends a POST request to the specified URL
func SendFailureNotification(processName, errorMsg, timestamp, severity string) {

issue := ValidatorAlert{
processName,
errorMsg,
Expand All @@ -46,11 +46,11 @@ func SendFailureNotification(processName, errorMsg, timestamp, severity string)
}

jsonData, err := json.Marshal(issue)
log.Debugln("Sending notification: ", string(jsonData))
if err != nil {
log.Errorln("Unable to marshal notification: ", issue)
return
}

req, err := http.NewRequest("POST", reportingClient.url, bytes.NewBuffer(jsonData))
if err != nil {
log.Errorln("Error creating request: ", err)
Expand Down
70 changes: 70 additions & 0 deletions pkgs/clients/txClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package clients

import (
"bytes"
"crypto/tls"
"encoding/json"
"fmt"
"math/big"
"net/http"
"time"
"validator/config"
)

type TxRelayerClient struct {
url string
client *http.Client
}

type BatchAttestationRequest struct {
DataMarketAddress string `json:"dataMarketAddress"`
BatchCID string `json:"batchCID"`
EpochID *big.Int `json:"epochID"`
RootHash string `json:"rootHash"`
AuthToken string `json:"authToken"`
}

var txRelayerClient *TxRelayerClient

// InitializeTxClient initializes the TxRelayerClient with the provided URL and timeout
func InitializeTxClient(url string, timeout time.Duration) {
txRelayerClient = &TxRelayerClient{
url: url,
client: &http.Client{
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
},
}
}

// SubmitBatchAttestationRequest submits details for batch attestattion to the transaction relayer service
func SubmitBatchAttestationRequest(dataMarketAddress, batchCID, rootHash string, epochID *big.Int) error {
request := BatchAttestationRequest{
DataMarketAddress: dataMarketAddress,
BatchCID: batchCID,
EpochID: epochID,
RootHash: rootHash,
AuthToken: config.SettingsObj.TxRelayerAuthWriteToken,
}

jsonData, err := json.Marshal(request)
if err != nil {
return fmt.Errorf("unable to marshal batch attestation request: %w", err)
}

url := fmt.Sprintf("%s/submitBatchAttestation", txRelayerClient.url)

resp, err := txRelayerClient.client.Post(url, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("unable to send batch attestation request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to send batch attestation request, status code: %d", resp.StatusCode)
}

return nil
}
26 changes: 26 additions & 0 deletions pkgs/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package pkgs

import "time"

// Process Name Constants
// process : identifier
const (
ProcessEvents = "Validator: ProcessEvents"
BuildMerkleTree = "Validator: BuildMerkleTree"
ContractSubmission = "Validator: ContractSubmission"
StartFetchingBlocks = "Validator: StartFetchingBlocks"
FetchBatchSubmission = "Validator: FetchBatchSubmission"
StoreBatchSubmission = "Validator: StoreBatchSubmission"
SendBatchAttestationToRelayer = "Validator: SendBatchAttestationToRelayer"
)

// General Key Constants
const (
ValidatorKey = "ValidatorKey"
ValidatorSetKey = "ValidatorSetKey"
)

// General Constants
const (
Day = 24 * time.Hour
)
Loading
Loading