@@ -27,7 +27,7 @@ use ccore::{
2727} ;
2828use cmerkle:: snapshot:: ChunkDecompressor ;
2929use cmerkle:: snapshot:: Restore as SnapshotRestore ;
30- use cmerkle:: TrieFactory ;
30+ use cmerkle:: { skewed_merkle_root , TrieFactory } ;
3131use cnetwork:: { Api , EventSender , NetworkExtension , NodeId } ;
3232use cstate:: FindActionHandler ;
3333use ctimer:: TimerToken ;
@@ -64,7 +64,7 @@ pub struct TokenInfo {
6464enum State {
6565 SnapshotHeader ( BlockHash , u64 ) ,
6666 SnapshotBody {
67- block : BlockHash ,
67+ header : EncodedHeader ,
6868 prev_root : H256 ,
6969 } ,
7070 SnapshotChunk {
@@ -150,7 +150,7 @@ impl Extension {
150150 let parent =
151151 client. block_header ( & parent_hash. into ( ) ) . expect ( "Parent header of the snapshot header must exist" ) ;
152152 return State :: SnapshotBody {
153- block : hash ,
153+ header ,
154154 prev_root : parent. transactions_root ( ) ,
155155 }
156156 }
@@ -414,8 +414,30 @@ impl NetworkExtension<Event> for Extension {
414414 }
415415 }
416416 State :: SnapshotBody {
417+ ref header,
417418 ..
418- } => unimplemented ! ( ) ,
419+ } => {
420+ for id in & peer_ids {
421+ if let Some ( requests) = self . requests . get_mut ( id) {
422+ ctrace ! ( SYNC , "Send snapshot body request to {}" , id) ;
423+ let request = RequestMessage :: Bodies ( vec ! [ header. hash( ) ] ) ;
424+ let request_id = self . last_request ;
425+ self . last_request += 1 ;
426+ requests. push ( ( request_id, request. clone ( ) ) ) ;
427+ self . api
428+ . send ( id, Arc :: new ( Message :: Request ( request_id, request) . rlp_bytes ( ) . into_vec ( ) ) ) ;
429+
430+ let token = & self . tokens [ id] ;
431+ let token_info = self . tokens_info . get_mut ( token) . unwrap ( ) ;
432+
433+ let _ = self . api . clear_timer ( * token) ;
434+ self . api
435+ . set_timer_once ( * token, Duration :: from_millis ( SYNC_EXPIRE_REQUEST_INTERVAL ) )
436+ . expect ( "Timer set succeeds" ) ;
437+ token_info. request_id = Some ( request_id) ;
438+ }
439+ }
440+ }
419441 State :: SnapshotChunk {
420442 block,
421443 ref mut restore,
@@ -811,20 +833,11 @@ impl Extension {
811833 match self . state {
812834 State :: SnapshotHeader ( hash, _) => match headers {
813835 [ parent, header] if header. hash ( ) == hash => {
814- match self . client . import_bootstrap_header ( & header) {
815- Ok ( _) | Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
816- self . state = State :: SnapshotBody {
817- block : hash,
818- prev_root : * parent. transactions_root ( ) ,
819- } ;
820- cdebug ! ( SYNC , "Transitioning state to {:?}" , self . state) ;
821- }
822- Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => { }
823- // FIXME: handle import errors
824- Err ( err) => {
825- cwarn ! ( SYNC , "Cannot import header({}): {:?}" , header. hash( ) , err) ;
826- }
827- }
836+ self . state = State :: SnapshotBody {
837+ header : EncodedHeader :: new ( header. rlp_bytes ( ) . to_vec ( ) ) ,
838+ prev_root : * parent. transactions_root ( ) ,
839+ } ;
840+ cdebug ! ( SYNC , "Transitioning state to {:?}" , self . state) ;
828841 }
829842 _ => cdebug ! (
830843 SYNC ,
@@ -883,42 +896,75 @@ impl Extension {
883896
884897 fn on_body_response ( & mut self , hashes : Vec < BlockHash > , bodies : Vec < Vec < UnverifiedTransaction > > ) {
885898 ctrace ! ( SYNC , "Received body response with lenth({}) {:?}" , hashes. len( ) , hashes) ;
886- {
887- self . body_downloader . import_bodies ( hashes , bodies ) ;
888- let completed = self . body_downloader . drain ( ) ;
889- for ( hash , transactions ) in completed {
890- let header = self
891- . client
892- . block_header ( & BlockId :: Hash ( hash ) )
893- . expect ( "Downloaded body's header must exist" )
894- . decode ( ) ;
895- let block = Block {
896- header,
897- transactions,
898- } ;
899- cdebug ! ( SYNC , "Body download completed for #{}({})" , block . header . number ( ) , hash ) ;
900- match self . client . import_block ( block . rlp_bytes ( & Seal :: With ) ) {
901- Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
902- cwarn ! ( SYNC , "Downloaded already existing block({})" , hash)
903- }
904- Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => {
905- cwarn ! ( SYNC , "Downloaded already queued in the verification queue({}) " , hash )
906- }
907- Err ( err ) => {
899+
900+ match self . state {
901+ State :: SnapshotBody {
902+ ref header ,
903+ prev_root ,
904+ } => {
905+ let body = bodies . first ( ) . expect ( "Body response in SnapshotBody state has only one body" ) ;
906+ let new_root = skewed_merkle_root ( prev_root , body. iter ( ) . map ( Encodable :: rlp_bytes ) ) ;
907+ if header . transactions_root ( ) == new_root {
908+ let block = Block {
909+ header : header . decode ( ) ,
910+ transactions : body . clone ( ) ,
911+ } ;
912+ match self . client . import_bootstrap_block ( & block ) {
913+ Ok ( _ ) | Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
914+ self . state = State :: SnapshotChunk {
915+ block : header . hash ( ) ,
916+ restore : SnapshotRestore :: new ( header . state_root ( ) ) ,
917+ } ;
918+ cdebug ! ( SYNC , "Transitioning state to {:?} " , self . state ) ;
919+ }
920+ Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => { }
908921 // FIXME: handle import errors
909- cwarn ! ( SYNC , "Cannot import block({}): {:?}" , hash, err) ;
910- break
922+ Err ( err) => {
923+ cwarn ! ( SYNC , "Cannot import header({}): {:?}" , header. hash( ) , err) ;
924+ }
911925 }
912- _ => { }
913926 }
914927 }
915- }
928+ State :: Full => {
929+ {
930+ self . body_downloader . import_bodies ( hashes, bodies) ;
931+ let completed = self . body_downloader . drain ( ) ;
932+ for ( hash, transactions) in completed {
933+ let header = self
934+ . client
935+ . block_header ( & BlockId :: Hash ( hash) )
936+ . expect ( "Downloaded body's header must exist" )
937+ . decode ( ) ;
938+ let block = Block {
939+ header,
940+ transactions,
941+ } ;
942+ cdebug ! ( SYNC , "Body download completed for #{}({})" , block. header. number( ) , hash) ;
943+ match self . client . import_block ( block. rlp_bytes ( & Seal :: With ) ) {
944+ Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
945+ cwarn ! ( SYNC , "Downloaded already existing block({})" , hash)
946+ }
947+ Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => {
948+ cwarn ! ( SYNC , "Downloaded already queued in the verification queue({})" , hash)
949+ }
950+ Err ( err) => {
951+ // FIXME: handle import errors
952+ cwarn ! ( SYNC , "Cannot import block({}): {:?}" , hash, err) ;
953+ break
954+ }
955+ _ => { }
956+ }
957+ }
958+ }
916959
917- let mut peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
918- peer_ids. shuffle ( & mut thread_rng ( ) ) ;
960+ let mut peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
961+ peer_ids. shuffle ( & mut thread_rng ( ) ) ;
919962
920- for id in peer_ids {
921- self . send_body_request ( & id) ;
963+ for id in peer_ids {
964+ self . send_body_request ( & id) ;
965+ }
966+ }
967+ _ => { }
922968 }
923969 }
924970
0 commit comments