2929use std:: cmp:: min;
3030use std:: fs:: { rename, File , OpenOptions } ;
3131use std:: io;
32- use std:: io:: prelude:: * ;
33- use std:: io:: SeekFrom ;
32+ use std:: io:: { Read , Seek , SeekFrom , Write } ;
3433use std:: mem:: ManuallyDrop ;
3534use std:: path:: Path ;
3635
@@ -798,7 +797,7 @@ impl QueueFile {
798797 self . file_len ( ) - self . used_bytes ( )
799798 }
800799
801- fn sync_header ( & mut self ) -> io :: Result < ( ) > {
800+ fn sync_header ( & mut self ) -> Result < ( ) > {
802801 self . write_header ( self . file_len ( ) , self . size ( ) , self . first . pos , self . last . pos )
803802 }
804803
@@ -808,27 +807,43 @@ impl QueueFile {
808807 /// atomic in the underlying file system.
809808 fn write_header (
810809 & mut self , file_len : u64 , elem_cnt : usize , first_pos : u64 , last_pos : u64 ,
811- ) -> io :: Result < ( ) > {
810+ ) -> Result < ( ) > {
812811 let mut header = [ 0 ; 32 ] ;
813812 let mut header_buf = & mut header[ ..] ;
814813
815814 // Never allow write values that will render file unreadable by Java library.
816815 if self . versioned {
817- assert ! ( i64 :: try_from( file_len) . is_ok( ) ) ;
818- assert ! ( i32 :: try_from( elem_cnt) . is_ok( ) ) ;
819- assert ! ( i64 :: try_from( first_pos) . is_ok( ) ) ;
820- assert ! ( i64 :: try_from( last_pos) . is_ok( ) ) ;
816+ ensure ! ( i64 :: try_from( file_len) . is_ok( ) , CorruptedFileSnafu {
817+ msg: "file length in header will exceed i64::MAX"
818+ } ) ;
819+ ensure ! ( i32 :: try_from( elem_cnt) . is_ok( ) , CorruptedFileSnafu {
820+ msg: "element count in header will exceed i32::MAX"
821+ } ) ;
822+ ensure ! ( i64 :: try_from( first_pos) . is_ok( ) , CorruptedFileSnafu {
823+ msg: "first element position in header will exceed i64::MAX"
824+ } ) ;
825+ ensure ! ( i64 :: try_from( last_pos) . is_ok( ) , CorruptedFileSnafu {
826+ msg: "last element position in header will exceed i64::MAX"
827+ } ) ;
821828
822829 header_buf. put_u32 ( Self :: VERSIONED_HEADER ) ;
823830 header_buf. put_u64 ( file_len) ;
824831 header_buf. put_i32 ( elem_cnt as i32 ) ;
825832 header_buf. put_u64 ( first_pos) ;
826833 header_buf. put_u64 ( last_pos) ;
827834 } else {
828- assert ! ( i32 :: try_from( file_len) . is_ok( ) ) ;
829- assert ! ( i32 :: try_from( elem_cnt) . is_ok( ) ) ;
830- assert ! ( i32 :: try_from( first_pos) . is_ok( ) ) ;
831- assert ! ( i32 :: try_from( last_pos) . is_ok( ) ) ;
835+ ensure ! ( i32 :: try_from( file_len) . is_ok( ) , CorruptedFileSnafu {
836+ msg: "file length in header will exceed i32::MAX"
837+ } ) ;
838+ ensure ! ( i32 :: try_from( elem_cnt) . is_ok( ) , CorruptedFileSnafu {
839+ msg: "element count in header will exceed i32::MAX"
840+ } ) ;
841+ ensure ! ( i32 :: try_from( first_pos) . is_ok( ) , CorruptedFileSnafu {
842+ msg: "first element position in header will exceed i32::MAX"
843+ } ) ;
844+ ensure ! ( i32 :: try_from( last_pos) . is_ok( ) , CorruptedFileSnafu {
845+ msg: "last element position in header will exceed i32::MAX"
846+ } ) ;
832847
833848 header_buf. put_i32 ( file_len as i32 ) ;
834849 header_buf. put_i32 ( elem_cnt as i32 ) ;
@@ -859,7 +874,7 @@ impl QueueFile {
859874
860875 /// Writes `n` bytes from buffer to position in file. Automatically wraps write if position is
861876 /// past the end of the file or if buffer overlaps it.
862- fn ring_write_buf ( & mut self , pos : u64 ) -> io :: Result < ( ) > {
877+ fn ring_write_buf ( & mut self , pos : u64 ) -> Result < ( ) > {
863878 let pos = self . wrap_pos ( pos) ;
864879
865880 if pos + self . write_buf . len ( ) as u64 <= self . file_len ( ) {
@@ -875,7 +890,7 @@ impl QueueFile {
875890 }
876891 }
877892
878- fn ring_erase ( & mut self , pos : u64 , n : usize ) -> io :: Result < ( ) > {
893+ fn ring_erase ( & mut self , pos : u64 , n : usize ) -> Result < ( ) > {
879894 let mut pos = pos;
880895 let mut len = n;
881896
@@ -913,7 +928,7 @@ impl QueueFile {
913928 }
914929
915930 /// If necessary, expands the file to accommodate an additional element of the given length.
916- fn expand_if_necessary ( & mut self , data_len : u64 ) -> io :: Result < ( ) > {
931+ fn expand_if_necessary ( & mut self , data_len : u64 ) -> Result < ( ) > {
917932 let mut rem_bytes = self . remaining_bytes ( ) ;
918933
919934 if rem_bytes >= data_len {
@@ -1051,14 +1066,10 @@ impl QueueFileInner {
10511066 Ok ( ( ) )
10521067 }
10531068
1054- fn write ( & mut self , buf : & [ u8 ] ) -> io :: Result < ( ) > {
1069+ fn write ( & mut self , buf : & [ u8 ] ) -> Result < ( ) > {
10551070 self . real_seek ( ) ?;
10561071
1057- if let Err ( err) = self . file . write_all ( buf) {
1058- self . last_seek = None ;
1059-
1060- return Err ( err) ;
1061- }
1072+ self . file . write_all ( buf) ?;
10621073
10631074 if let Some ( seek) = & mut self . last_seek {
10641075 * seek += buf. len ( ) as u64 ;
@@ -1123,7 +1134,7 @@ impl QueueFileInner {
11231134
11241135 fn transfer_inner (
11251136 & mut self , buf : & mut [ u8 ] , mut read_pos : u64 , mut write_pos : u64 , count : u64 ,
1126- ) -> io :: Result < ( ) > {
1137+ ) -> Result < ( ) > {
11271138 debug_assert ! ( read_pos < self . file_len) ;
11281139 debug_assert ! ( write_pos <= self . file_len) ;
11291140 debug_assert ! ( count < self . file_len) ;
@@ -1153,7 +1164,7 @@ impl QueueFileInner {
11531164 }
11541165
11551166 /// Transfer `count` bytes starting from `read_pos` to `write_pos`.
1156- fn transfer ( & mut self , read_pos : u64 , write_pos : u64 , count : u64 ) -> io :: Result < ( ) > {
1167+ fn transfer ( & mut self , read_pos : u64 , write_pos : u64 , count : u64 ) -> Result < ( ) > {
11571168 let mut buf = self . transfer_buf . take ( ) . unwrap ( ) ;
11581169 let res = self . transfer_inner ( & mut buf, read_pos, write_pos, count) ;
11591170 self . transfer_buf = Some ( buf) ;
0 commit comments