Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions beacon/blsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"github.com/ethereum/go-ethereum/beacon/params"
"github.com/ethereum/go-ethereum/beacon/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/ethdb/memorydb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/restapi"
"github.com/ethereum/go-ethereum/rpc"
)

Expand All @@ -38,6 +40,8 @@ type Client struct {
scheduler *request.Scheduler
blockSync *beaconBlockSync
engineRPC *rpc.Client
apiServer *api.BeaconApiServer
recentBlocks *lru.Cache[common.Hash, []byte]

chainHeadSub event.Subscription
engineClient *engineClient
Expand All @@ -55,12 +59,13 @@ func NewClient(config params.ClientConfig) *Client {
log.Error("Failed to save beacon checkpoint", "file", config.CheckpointFile, "checkpoint", checkpoint, "error", err)
}
})
checkpointStore = light.NewCheckpointStore(db, committeeChain)
)
headSync := sync.NewHeadSync(headTracker, committeeChain)

// set up scheduler and sync modules
scheduler := request.NewScheduler()
checkpointInit := sync.NewCheckpointInit(committeeChain, config.Checkpoint)
checkpointInit := sync.NewCheckpointInit(committeeChain, checkpointStore, config.Checkpoint)
forwardSync := sync.NewForwardUpdateSync(committeeChain)
beaconBlockSync := newBeaconBlockSync(headTracker)
scheduler.RegisterTarget(headTracker)
Expand All @@ -69,29 +74,37 @@ func NewClient(config params.ClientConfig) *Client {
scheduler.RegisterModule(forwardSync, "forwardSync")
scheduler.RegisterModule(headSync, "headSync")
scheduler.RegisterModule(beaconBlockSync, "beaconBlockSync")
recentBlocks := lru.NewCache[common.Hash, []byte](4)
apiServer := api.NewBeaconApiServer(checkpointStore, committeeChain, headTracker, recentBlocks)

return &Client{
scheduler: scheduler,
urls: config.Apis,
customHeader: config.CustomHeader,
config: &config,
blockSync: beaconBlockSync,
apiServer: apiServer,
recentBlocks: recentBlocks,
}
}

func (c *Client) SetEngineRPC(engine *rpc.Client) {
c.engineRPC = engine
}

func (c *Client) RestAPI(server *restapi.Server) restapi.API {
return c.apiServer.RestAPI(server)
}

func (c *Client) Start() error {
headCh := make(chan types.ChainHeadEvent, 16)
c.chainHeadSub = c.blockSync.SubscribeChainHead(headCh)
c.engineClient = startEngineClient(c.config, c.engineRPC, headCh)

c.scheduler.Start()
for _, url := range c.urls {
beaconApi := api.NewBeaconLightApi(url, c.customHeader)
c.scheduler.RegisterServer(request.NewServer(api.NewApiServer(beaconApi), &mclock.System{}))
beaconApi := api.NewBeaconApiClient(url, c.customHeader, c.recentBlocks)
c.scheduler.RegisterServer(request.NewServer(api.NewSyncServer(beaconApi), &mclock.System{}))
}
return nil
}
Expand Down
186 changes: 20 additions & 166 deletions beacon/light/api/light_api.go → beacon/light/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,98 +30,39 @@ import (

"github.com/donovanhide/eventsource"
"github.com/ethereum/go-ethereum/beacon/merkle"
"github.com/ethereum/go-ethereum/beacon/params"
"github.com/ethereum/go-ethereum/beacon/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/common/lru"
"github.com/ethereum/go-ethereum/log"
)

var (
ErrNotFound = errors.New("404 Not Found")
ErrInternal = errors.New("500 Internal Server Error")
)

type CommitteeUpdate struct {
Update types.LightClientUpdate
NextSyncCommittee types.SerializedSyncCommittee
}

// See data structure definition here:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientupdate
type committeeUpdateJson struct {
Version string `json:"version"`
Data committeeUpdateData `json:"data"`
}

type committeeUpdateData struct {
Header jsonBeaconHeader `json:"attested_header"`
NextSyncCommittee types.SerializedSyncCommittee `json:"next_sync_committee"`
NextSyncCommitteeBranch merkle.Values `json:"next_sync_committee_branch"`
FinalizedHeader *jsonBeaconHeader `json:"finalized_header,omitempty"`
FinalityBranch merkle.Values `json:"finality_branch,omitempty"`
SyncAggregate types.SyncAggregate `json:"sync_aggregate"`
SignatureSlot common.Decimal `json:"signature_slot"`
}

type jsonBeaconHeader struct {
Beacon types.Header `json:"beacon"`
}

type jsonHeaderWithExecProof struct {
Beacon types.Header `json:"beacon"`
Execution json.RawMessage `json:"execution"`
ExecutionBranch merkle.Values `json:"execution_branch"`
}

// UnmarshalJSON unmarshals from JSON.
func (u *CommitteeUpdate) UnmarshalJSON(input []byte) error {
var dec committeeUpdateJson
if err := json.Unmarshal(input, &dec); err != nil {
return err
}
u.NextSyncCommittee = dec.Data.NextSyncCommittee
u.Update = types.LightClientUpdate{
Version: dec.Version,
AttestedHeader: types.SignedHeader{
Header: dec.Data.Header.Beacon,
Signature: dec.Data.SyncAggregate,
SignatureSlot: uint64(dec.Data.SignatureSlot),
},
NextSyncCommitteeRoot: u.NextSyncCommittee.Root(),
NextSyncCommitteeBranch: dec.Data.NextSyncCommitteeBranch,
FinalityBranch: dec.Data.FinalityBranch,
}
if dec.Data.FinalizedHeader != nil {
u.Update.FinalizedHeader = &dec.Data.FinalizedHeader.Beacon
}
return nil
}

// fetcher is an interface useful for debug-harnessing the http api.
type fetcher interface {
Do(req *http.Request) (*http.Response, error)
}

// BeaconLightApi requests light client information from a beacon node REST API.
// BeaconApiClient requests light client information from a beacon node REST API.
// Note: all required API endpoints are currently only implemented by Lodestar.
type BeaconLightApi struct {
type BeaconApiClient struct {
url string
client fetcher
customHeaders map[string]string
recentBlocks *lru.Cache[common.Hash, []byte]
}

func NewBeaconLightApi(url string, customHeaders map[string]string) *BeaconLightApi {
return &BeaconLightApi{
func NewBeaconApiClient(url string, customHeaders map[string]string, recentBlocks *lru.Cache[common.Hash, []byte]) *BeaconApiClient {
return &BeaconApiClient{
url: url,
client: &http.Client{
Timeout: time.Second * 10,
},
customHeaders: customHeaders,
recentBlocks: recentBlocks,
}
}

func (api *BeaconLightApi) httpGet(path string, params url.Values) ([]byte, error) {
func (api *BeaconApiClient) httpGet(path string, params url.Values) ([]byte, error) {
uri, err := api.buildURL(path, params)
if err != nil {
return nil, err
Expand Down Expand Up @@ -155,7 +96,7 @@ func (api *BeaconLightApi) httpGet(path string, params url.Values) ([]byte, erro
// equals update.NextSyncCommitteeRoot).
// Note that the results are validated but the update signature should be verified
// by the caller as its validity depends on the update chain.
func (api *BeaconLightApi) GetBestUpdatesAndCommittees(firstPeriod, count uint64) ([]*types.LightClientUpdate, []*types.SerializedSyncCommittee, error) {
func (api *BeaconApiClient) GetBestUpdatesAndCommittees(firstPeriod, count uint64) ([]*types.LightClientUpdate, []*types.SerializedSyncCommittee, error) {
resp, err := api.httpGet("/eth/v1/beacon/light_client/updates", map[string][]string{
"start_period": {strconv.FormatUint(firstPeriod, 10)},
"count": {strconv.FormatUint(count, 10)},
Expand Down Expand Up @@ -195,121 +136,31 @@ func (api *BeaconLightApi) GetBestUpdatesAndCommittees(firstPeriod, count uint64
//
// See data structure definition here:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientoptimisticupdate
func (api *BeaconLightApi) GetOptimisticUpdate() (types.OptimisticUpdate, error) {
func (api *BeaconApiClient) GetOptimisticUpdate() (types.OptimisticUpdate, error) {
resp, err := api.httpGet("/eth/v1/beacon/light_client/optimistic_update", nil)
if err != nil {
return types.OptimisticUpdate{}, err
}
return decodeOptimisticUpdate(resp)
}

func decodeOptimisticUpdate(enc []byte) (types.OptimisticUpdate, error) {
var data struct {
Version string `json:"version"`
Data struct {
Attested jsonHeaderWithExecProof `json:"attested_header"`
Aggregate types.SyncAggregate `json:"sync_aggregate"`
SignatureSlot common.Decimal `json:"signature_slot"`
} `json:"data"`
}
if err := json.Unmarshal(enc, &data); err != nil {
return types.OptimisticUpdate{}, err
}
// Decode the execution payload headers.
attestedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Attested.Execution)
if err != nil {
return types.OptimisticUpdate{}, fmt.Errorf("invalid attested header: %v", err)
}
if data.Data.Attested.Beacon.StateRoot == (common.Hash{}) {
// workaround for different event encoding format in Lodestar
if err := json.Unmarshal(enc, &data.Data); err != nil {
return types.OptimisticUpdate{}, err
}
}

if len(data.Data.Aggregate.Signers) != params.SyncCommitteeBitmaskSize {
return types.OptimisticUpdate{}, errors.New("invalid sync_committee_bits length")
}
if len(data.Data.Aggregate.Signature) != params.BLSSignatureSize {
return types.OptimisticUpdate{}, errors.New("invalid sync_committee_signature length")
}
return types.OptimisticUpdate{
Attested: types.HeaderWithExecProof{
Header: data.Data.Attested.Beacon,
PayloadHeader: attestedExecHeader,
PayloadBranch: data.Data.Attested.ExecutionBranch,
},
Signature: data.Data.Aggregate,
SignatureSlot: uint64(data.Data.SignatureSlot),
}, nil
}

// GetFinalityUpdate fetches the latest available finality update.
//
// See data structure definition here:
// https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/light-client/sync-protocol.md#lightclientfinalityupdate
func (api *BeaconLightApi) GetFinalityUpdate() (types.FinalityUpdate, error) {
func (api *BeaconApiClient) GetFinalityUpdate() (types.FinalityUpdate, error) {
resp, err := api.httpGet("/eth/v1/beacon/light_client/finality_update", nil)
if err != nil {
return types.FinalityUpdate{}, err
}
return decodeFinalityUpdate(resp)
}

func decodeFinalityUpdate(enc []byte) (types.FinalityUpdate, error) {
var data struct {
Version string `json:"version"`
Data struct {
Attested jsonHeaderWithExecProof `json:"attested_header"`
Finalized jsonHeaderWithExecProof `json:"finalized_header"`
FinalityBranch merkle.Values `json:"finality_branch"`
Aggregate types.SyncAggregate `json:"sync_aggregate"`
SignatureSlot common.Decimal `json:"signature_slot"`
}
}
if err := json.Unmarshal(enc, &data); err != nil {
return types.FinalityUpdate{}, err
}
// Decode the execution payload headers.
attestedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Attested.Execution)
if err != nil {
return types.FinalityUpdate{}, fmt.Errorf("invalid attested header: %v", err)
}
finalizedExecHeader, err := types.ExecutionHeaderFromJSON(data.Version, data.Data.Finalized.Execution)
if err != nil {
return types.FinalityUpdate{}, fmt.Errorf("invalid finalized header: %v", err)
}
// Perform sanity checks.
if len(data.Data.Aggregate.Signers) != params.SyncCommitteeBitmaskSize {
return types.FinalityUpdate{}, errors.New("invalid sync_committee_bits length")
}
if len(data.Data.Aggregate.Signature) != params.BLSSignatureSize {
return types.FinalityUpdate{}, errors.New("invalid sync_committee_signature length")
}

return types.FinalityUpdate{
Version: data.Version,
Attested: types.HeaderWithExecProof{
Header: data.Data.Attested.Beacon,
PayloadHeader: attestedExecHeader,
PayloadBranch: data.Data.Attested.ExecutionBranch,
},
Finalized: types.HeaderWithExecProof{
Header: data.Data.Finalized.Beacon,
PayloadHeader: finalizedExecHeader,
PayloadBranch: data.Data.Finalized.ExecutionBranch,
},
FinalityBranch: data.Data.FinalityBranch,
Signature: data.Data.Aggregate,
SignatureSlot: uint64(data.Data.SignatureSlot),
}, nil
}

// GetHeader fetches and validates the beacon header with the given blockRoot.
// If blockRoot is null hash then the latest head header is fetched.
// The values of the canonical and finalized flags are also returned. Note that
// these flags are not validated.
func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, bool, bool, error) {
func (api *BeaconApiClient) GetHeader(blockRoot common.Hash) (types.Header, bool, bool, error) {
var blockId string
if blockRoot == (common.Hash{}) {
blockId = "head"
Expand Down Expand Up @@ -346,7 +197,7 @@ func (api *BeaconLightApi) GetHeader(blockRoot common.Hash) (types.Header, bool,
}

// GetCheckpointData fetches and validates bootstrap data belonging to the given checkpoint.
func (api *BeaconLightApi) GetCheckpointData(checkpointHash common.Hash) (*types.BootstrapData, error) {
func (api *BeaconApiClient) GetCheckpointData(checkpointHash common.Hash) (*types.BootstrapData, error) {
resp, err := api.httpGet(fmt.Sprintf("/eth/v1/beacon/light_client/bootstrap/0x%x", checkpointHash[:]), nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -390,7 +241,7 @@ func (api *BeaconLightApi) GetCheckpointData(checkpointHash common.Hash) (*types
return checkpoint, nil
}

func (api *BeaconLightApi) GetBeaconBlock(blockRoot common.Hash) (*types.BeaconBlock, error) {
func (api *BeaconApiClient) GetBeaconBlock(blockRoot common.Hash) (*types.BeaconBlock, error) {
resp, err := api.httpGet(fmt.Sprintf("/eth/v2/beacon/blocks/0x%x", blockRoot), nil)
if err != nil {
return nil, err
Expand All @@ -413,6 +264,9 @@ func (api *BeaconLightApi) GetBeaconBlock(blockRoot common.Hash) (*types.BeaconB
if computedRoot != blockRoot {
return nil, fmt.Errorf("Beacon block root hash mismatch (expected: %x, got: %x)", blockRoot, computedRoot)
}
if api.recentBlocks != nil {
api.recentBlocks.Add(blockRoot, resp)
}
return block, nil
}

Expand All @@ -438,7 +292,7 @@ type HeadEventListener struct {
// head updates and calls the specified callback functions when they are received.
// The callbacks are also called for the current head and optimistic head at startup.
// They are never called concurrently.
func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func() {
func (api *BeaconApiClient) StartHeadListener(listener HeadEventListener) func() {
var (
ctx, closeCtx = context.WithCancel(context.Background())
streamCh = make(chan *eventsource.Stream, 1)
Expand Down Expand Up @@ -550,7 +404,7 @@ func (api *BeaconLightApi) StartHeadListener(listener HeadEventListener) func()

// startEventStream establishes an event stream. This will keep retrying until the stream has been
// established. It can only return nil when the context is canceled.
func (api *BeaconLightApi) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream {
func (api *BeaconApiClient) startEventStream(ctx context.Context, listener *HeadEventListener) *eventsource.Stream {
for retry := true; retry; retry = ctxSleep(ctx, 5*time.Second) {
log.Trace("Sending event subscription request")
uri, err := api.buildURL("/eth/v1/events", map[string][]string{"topics": {"head", "light_client_finality_update", "light_client_optimistic_update"}})
Expand Down Expand Up @@ -588,7 +442,7 @@ func ctxSleep(ctx context.Context, timeout time.Duration) (ok bool) {
}
}

func (api *BeaconLightApi) buildURL(path string, params url.Values) (string, error) {
func (api *BeaconApiClient) buildURL(path string, params url.Values) (string, error) {
uri, err := url.Parse(api.url)
if err != nil {
return "", err
Expand Down
Loading
Loading