@@ -494,14 +494,18 @@ impl QueueFile {
494494
495495 fn cache_last_offset ( & mut self ) {
496496 debug_assert ! ( self . elem_cnt != 0 ) ;
497+
497498 let i = self . elem_cnt - 1 ;
499+
498500 if let Some ( ( index, elem) ) = self . cached_offsets . last ( ) {
499501 if * index == i {
500502 debug_assert_eq ! ( elem. pos, self . last. pos) ;
501503 debug_assert_eq ! ( elem. len, self . last. len) ;
504+
502505 return ;
503506 }
504507 }
508+
505509 self . cached_offsets . push ( ( i, self . last ) ) ;
506510 }
507511
@@ -525,6 +529,7 @@ impl QueueFile {
525529 }
526530
527531 ensure ! ( self . elem_cnt + count < i32 :: max_value( ) as usize , TooManyElementsSnafu { } ) ;
532+
528533 self . expand_if_necessary ( total_len as u64 ) ?;
529534
530535 let was_empty = self . is_empty ( ) ;
@@ -536,11 +541,13 @@ impl QueueFile {
536541
537542 let mut first_added = None ;
538543 let mut last_added = None ;
544+
539545 self . write_buf . clear ( ) ;
540546
541547 for elem in elems {
542548 let elem = elem. as_ref ( ) ;
543549 let len = elem. len ( ) ;
550+
544551 ensure ! ( i32 :: try_from( len) . is_ok( ) , ElementTooBigSnafu { } ) ;
545552
546553 if first_added. is_none ( ) {
@@ -700,7 +707,9 @@ impl QueueFile {
700707 for _ in 0 ..left / Self :: BLOCK_LENGTH {
701708 self . inner . write ( & Self :: ZEROES ) ?;
702709 }
710+
703711 let tail = left % Self :: BLOCK_LENGTH ;
712+
704713 if tail != 0 {
705714 self . inner . write ( & Self :: ZEROES [ ..tail as usize ] ) ?;
706715 }
@@ -872,6 +881,7 @@ impl QueueFile {
872881
873882 self . write_buf . clear ( ) ;
874883 self . write_buf . extend ( Self :: ZEROES ) ;
884+
875885 while len > 0 {
876886 let chunk_len = min ( len, Self :: ZEROES . len ( ) ) ;
877887 self . write_buf . truncate ( chunk_len) ;
@@ -926,6 +936,7 @@ impl QueueFile {
926936 let end_of_last_elem =
927937 self . wrap_pos ( self . last . pos + Element :: HEADER_LENGTH as u64 + self . last . len as u64 ) ;
928938 self . inner . sync_set_len ( new_len) ?;
939+
929940 let mut count = 0u64 ;
930941
931942 // If the buffer is split, we need to make it contiguous
@@ -973,6 +984,7 @@ impl QueueFileInner {
973984
974985 let res = self . file . seek ( SeekFrom :: Start ( self . expected_seek ) ) ;
975986 self . last_seek = res. as_ref ( ) . ok ( ) . copied ( ) ;
987+
976988 res
977989 }
978990
@@ -990,6 +1002,7 @@ impl QueueFileInner {
9901002 . is_none ( )
9911003 } else {
9921004 self . read_buffer . resize ( size, 0 ) ;
1005+
9931006 true
9941007 } ;
9951008
@@ -1000,6 +1013,7 @@ impl QueueFileInner {
10001013
10011014 let mut read = 0 ;
10021015 let mut res = Ok ( ( ) ) ;
1016+
10031017 while !buf. is_empty ( ) {
10041018 match self . file . read ( & mut self . read_buffer [ read..] ) {
10051019 Ok ( 0 ) => break ,
@@ -1019,6 +1033,7 @@ impl QueueFileInner {
10191033 if let Err ( err) = res {
10201034 self . read_buffer_offset = None ;
10211035 self . last_seek = None ;
1036+
10221037 return Err ( err) ;
10231038 }
10241039
@@ -1041,6 +1056,7 @@ impl QueueFileInner {
10411056
10421057 if let Err ( err) = self . file . write_all ( buf) {
10431058 self . last_seek = None ;
1059+
10441060 return Err ( err) ;
10451061 }
10461062
@@ -1062,20 +1078,23 @@ impl QueueFileInner {
10621078 // need to copy whole write buffer
10631079 ( true , true ) => {
10641080 let start = ( self . expected_seek - read_buffer_offset) as usize ;
1081+
10651082 self . read_buffer [ start..start + buf. len ( ) ] . copy_from_slice ( buf) ;
10661083 }
10671084 // exp_seek .. rd_buf_offset .. exp_seek+buf.len .. rd_buf_end
10681085 // need to copy only a tail of write buffer
10691086 ( false , true ) => {
10701087 let need_to_skip = ( read_buffer_offset - self . expected_seek ) as usize ;
10711088 let need_to_copy = buf. len ( ) - need_to_skip;
1089+
10721090 self . read_buffer [ ..need_to_copy] . copy_from_slice ( & buf[ need_to_skip..] ) ;
10731091 }
10741092 // rd_buf_offset .. exp_seek .. rd_buf_end .. exp_seek+buf.len
10751093 // need to copy only a head of write buffer
10761094 ( true , false ) => {
10771095 let need_to_skip = ( self . expected_seek - read_buffer_offset) as usize ;
10781096 let need_to_copy = self . read_buffer . len ( ) - need_to_skip;
1097+
10791098 self . read_buffer [ need_to_skip..need_to_skip + need_to_copy]
10801099 . copy_from_slice ( & buf[ ..need_to_copy] ) ;
10811100 }
@@ -1201,6 +1220,7 @@ impl<'a> Iterator for Iter<'a> {
12011220 fn nth ( & mut self , n : usize ) -> Option < Self :: Item > {
12021221 if self . queue_file . elem_cnt - self . next_elem_index < n {
12031222 self . next_elem_index = self . queue_file . elem_cnt ;
1223+
12041224 return None ;
12051225 }
12061226
0 commit comments