1- use alloc:: vec:: Vec ;
2- use core:: { fmt, marker:: PhantomData } ;
1+ use alloc:: { vec:: Vec , collections :: VecDeque } ;
2+ use core:: { fmt, marker:: PhantomData , ops :: ControlFlow } ;
33use necsim_core_bond:: NonNegativeF64 ;
44
55use necsim_core:: {
@@ -12,10 +12,21 @@ use necsim_partitioning_core::LocalPartition;
1212
1313use super :: WaterLevelReporterProxy ;
1414
15+ #[ derive( Clone , Copy ) ]
16+ struct Run {
17+ start : usize ,
18+ len : usize ,
19+ }
20+
1521#[ allow( clippy:: module_name_repetitions) ]
1622pub struct LiveWaterLevelReporterProxy < ' l , ' p , R : Reporter , P : LocalPartition < ' p , R > > {
1723 water_level : NonNegativeF64 ,
1824 slow_events : Vec < PackedEvent > ,
25+ tmp_events : Vec < PackedEvent > ,
26+ run : Run ,
27+ runs : Vec < Run > ,
28+ overflow : VecDeque < Run > ,
29+ sort_batch_size : usize ,
1930 fast_events : Vec < PackedEvent > ,
2031
2132 local_partition : & ' l mut P ,
@@ -36,6 +47,8 @@ impl<'l, 'p, R: Reporter, P: LocalPartition<'p, R>> fmt::Debug
3647
3748 fmt. debug_struct ( stringify ! ( LiveWaterLevelReporterProxy ) )
3849 . field ( "water_level" , & self . water_level )
50+ . field ( "runs" , & self . runs . len ( ) )
51+ . field ( "overflow" , & self . overflow . len ( ) )
3952 . field ( "slow_events" , & EventBufferLen ( self . slow_events . len ( ) ) )
4053 . field ( "fast_events" , & EventBufferLen ( self . fast_events . len ( ) ) )
4154 . finish ( )
@@ -46,35 +59,248 @@ impl<'l, 'p, R: Reporter, P: LocalPartition<'p, R>> Reporter
4659 for LiveWaterLevelReporterProxy < ' l , ' p , R , P >
4760{
4861 impl_report ! ( speciation( & mut self , speciation: MaybeUsed <R :: ReportSpeciation >) {
49- if speciation. event_time < self . water_level {
50- self . slow_events. push( speciation. clone( ) . into( ) ) ;
62+ let speciation: PackedEvent = speciation. clone( ) . into( ) ;
63+
64+ if speciation. event_time( ) < self . water_level {
65+ let new_run = self . run. len > self . sort_batch_size; // self.slow_events.last().map_or(true, |prev| prev > &speciation);
66+
67+ if new_run {
68+ let old_run = core:: mem:: replace( & mut self . run, Run {
69+ start: self . slow_events. len( ) ,
70+ len: 1 ,
71+ } ) ;
72+ self . overflow. push_back( old_run) ;
73+ } else {
74+ self . run. len += 1 ;
75+ }
76+
77+ self . slow_events. push( speciation) ;
5178 } else {
52- self . fast_events. push( speciation. clone ( ) . into ( ) ) ;
79+ self . fast_events. push( speciation) ;
5380 }
5481 } ) ;
5582
5683 impl_report ! ( dispersal( & mut self , dispersal: MaybeUsed <R :: ReportDispersal >) {
57- if dispersal. event_time < self . water_level {
58- self . slow_events. push( dispersal. clone( ) . into( ) ) ;
84+ let dispersal: PackedEvent = dispersal. clone( ) . into( ) ;
85+
86+ if dispersal. event_time( ) < self . water_level {
87+ let new_run = self . run. len > self . sort_batch_size; // self.slow_events.last().map_or(true, |prev| prev > &dispersal);
88+
89+ if new_run {
90+ let old_run = core:: mem:: replace( & mut self . run, Run {
91+ start: self . slow_events. len( ) ,
92+ len: 1 ,
93+ } ) ;
94+ self . overflow. push_back( old_run) ;
95+ } else {
96+ self . run. len += 1 ;
97+ }
98+
99+ self . slow_events. push( dispersal) ;
59100 } else {
60- self . fast_events. push( dispersal. clone ( ) . into ( ) ) ;
101+ self . fast_events. push( dispersal) ;
61102 }
62103 } ) ;
63104
64105 impl_report ! ( progress( & mut self , _progress: Ignored ) { } ) ;
65106}
66107
108+ impl < ' l , ' p , R : Reporter , P : LocalPartition < ' p , R > > LiveWaterLevelReporterProxy < ' l , ' p , R , P > {
109+ #[ inline]
110+ fn collapse ( & self , force_merge : bool ) -> Option < usize > {
111+ let n = self . runs . len ( ) ;
112+ if n >= 2
113+ && ( force_merge
114+ || self . runs [ n - 2 ] . len <= self . runs [ n - 1 ] . len
115+ || ( n >= 3 && self . runs [ n - 3 ] . len <= self . runs [ n - 2 ] . len + self . runs [ n - 1 ] . len )
116+ || ( n >= 4 && self . runs [ n - 4 ] . len <= self . runs [ n - 3 ] . len + self . runs [ n - 2 ] . len ) )
117+ {
118+ if n >= 3 && self . runs [ n - 3 ] . len < self . runs [ n - 1 ] . len { Some ( n - 3 ) } else { Some ( n - 2 ) }
119+ } else {
120+ None
121+ }
122+ }
123+
124+ unsafe fn merge < T , F > ( v : & mut [ T ] , mid : usize , buf : * mut T , is_less : & mut F )
125+ where
126+ F : FnMut ( & T , & T ) -> bool ,
127+ {
128+ unsafe fn get_and_increment < T > ( ptr : & mut * mut T ) -> * mut T {
129+ let old = * ptr;
130+ * ptr = unsafe { ptr. add ( 1 ) } ;
131+ old
132+ }
133+
134+ unsafe fn decrement_and_get < T > ( ptr : & mut * mut T ) -> * mut T {
135+ * ptr = unsafe { ptr. sub ( 1 ) } ;
136+ * ptr
137+ }
138+
139+ // When dropped, copies the range `start..end` into `dest..`.
140+ struct MergeHole < T > {
141+ start : * mut T ,
142+ end : * mut T ,
143+ dest : * mut T ,
144+ }
145+
146+ impl < T > Drop for MergeHole < T > {
147+ fn drop ( & mut self ) {
148+ // `T` is not a zero-sized type, and these are pointers into a slice's elements.
149+ unsafe {
150+ let len = self . end . sub_ptr ( self . start ) ;
151+ core:: ptr:: copy_nonoverlapping ( self . start , self . dest , len) ;
152+ }
153+ }
154+ }
155+
156+ let len = v. len ( ) ;
157+ let v = v. as_mut_ptr ( ) ;
158+ let ( v_mid, v_end) = unsafe { ( v. add ( mid) , v. add ( len) ) } ;
159+
160+ // The merge process first copies the shorter run into `buf`. Then it traces the newly copied
161+ // run and the longer run forwards (or backwards), comparing their next unconsumed elements and
162+ // copying the lesser (or greater) one into `v`.
163+ //
164+ // As soon as the shorter run is fully consumed, the process is done. If the longer run gets
165+ // consumed first, then we must copy whatever is left of the shorter run into the remaining
166+ // hole in `v`.
167+ //
168+ // Intermediate state of the process is always tracked by `hole`, which serves two purposes:
169+ // 1. Protects integrity of `v` from panics in `is_less`.
170+ // 2. Fills the remaining hole in `v` if the longer run gets consumed first.
171+ //
172+ // Panic safety:
173+ //
174+ // If `is_less` panics at any point during the process, `hole` will get dropped and fill the
175+ // hole in `v` with the unconsumed range in `buf`, thus ensuring that `v` still holds every
176+ // object it initially held exactly once.
177+ let mut hole;
178+
179+ if mid <= len - mid {
180+ // The left run is shorter.
181+ unsafe {
182+ core:: ptr:: copy_nonoverlapping ( v, buf, mid) ;
183+ hole = MergeHole { start : buf, end : buf. add ( mid) , dest : v } ;
184+ }
185+
186+ // Initially, these pointers point to the beginnings of their arrays.
187+ let left = & mut hole. start ;
188+ let mut right = v_mid;
189+ let out = & mut hole. dest ;
190+
191+ while * left < hole. end && right < v_end {
192+ // Consume the lesser side.
193+ // If equal, prefer the left run to maintain stability.
194+ unsafe {
195+ let to_copy = if is_less ( & * right, & * * left) {
196+ get_and_increment ( & mut right)
197+ } else {
198+ get_and_increment ( left)
199+ } ;
200+ core:: ptr:: copy_nonoverlapping ( to_copy, get_and_increment ( out) , 1 ) ;
201+ }
202+ }
203+ } else {
204+ // The right run is shorter.
205+ unsafe {
206+ core:: ptr:: copy_nonoverlapping ( v_mid, buf, len - mid) ;
207+ hole = MergeHole { start : buf, end : buf. add ( len - mid) , dest : v_mid } ;
208+ }
209+
210+ // Initially, these pointers point past the ends of their arrays.
211+ let left = & mut hole. dest ;
212+ let right = & mut hole. end ;
213+ let mut out = v_end;
214+
215+ while v < * left && buf < * right {
216+ // Consume the greater side.
217+ // If equal, prefer the right run to maintain stability.
218+ unsafe {
219+ let to_copy = if is_less ( & * right. sub ( 1 ) , & * left. sub ( 1 ) ) {
220+ decrement_and_get ( left)
221+ } else {
222+ decrement_and_get ( right)
223+ } ;
224+ core:: ptr:: copy_nonoverlapping ( to_copy, decrement_and_get ( & mut out) , 1 ) ;
225+ }
226+ }
227+ }
228+ // Finally, `hole` gets dropped. If the shorter run was not fully consumed, whatever remains of
229+ // it will now be copied into the hole in `v`.
230+ }
231+
232+ fn sort_slow_events_step ( & mut self , force_merge : bool ) -> ControlFlow < ( ) > {
233+ let r = loop {
234+ if let Some ( r) = self . collapse ( force_merge && self . overflow . is_empty ( ) ) {
235+ break r;
236+ }
237+
238+ let next_run = match self . overflow . pop_front ( ) {
239+ Some ( next_run) => next_run,
240+ None if self . run . len > 0 => core:: mem:: replace ( & mut self . run , Run { start : self . slow_events . len ( ) , len : 0 } ) ,
241+ None => return ControlFlow :: Break ( ( ) ) ,
242+ } ;
243+
244+ // let Some(mut next_run) = self.overflow.pop_front() else {
245+ // return ControlFlow::Break(());
246+ // };
247+
248+ // if next_run.len < self.sort_batch_size {
249+ // while next_run.len < self.sort_batch_size {
250+ // let Some(extra_run) = self.overflow.pop_front() else {
251+ // break;
252+ // };
253+ // next_run.len += extra_run.len;
254+ // }
255+ // self.slow_events[next_run.start..next_run.start+next_run.len].sort_unstable();
256+ // }
257+
258+ self . slow_events [ next_run. start ..next_run. start +next_run. len ] . sort_unstable ( ) ;
259+
260+ self . runs . push ( next_run) ;
261+ } ;
262+
263+ let left = self . runs [ r] ;
264+ let right = self . runs [ r + 1 ] ;
265+
266+ let min_len = left. len . min ( right. len ) ;
267+
268+ if min_len > self . tmp_events . capacity ( ) {
269+ self . tmp_events . reserve ( min_len - self . tmp_events . capacity ( ) ) ;
270+ }
271+
272+ unsafe {
273+ Self :: merge (
274+ & mut self . slow_events [ left. start ..right. start + right. len ] ,
275+ left. len ,
276+ self . tmp_events . as_mut_ptr ( ) ,
277+ & mut core:: cmp:: PartialOrd :: lt,
278+ ) ;
279+ }
280+
281+ self . runs [ r] = Run { start : left. start , len : left. len + right. len } ;
282+ self . runs . remove ( r + 1 ) ;
283+
284+ ControlFlow :: Continue ( ( ) )
285+ }
286+ }
287+
67288#[ contract_trait]
68289impl < ' l , ' p , R : Reporter , P : LocalPartition < ' p , R > > WaterLevelReporterProxy < ' l , ' p , R , P >
69290 for LiveWaterLevelReporterProxy < ' l , ' p , R , P >
70291{
71- fn new ( capacity : usize , local_partition : & ' l mut P ) -> Self {
292+ fn new ( capacity : usize , local_partition : & ' l mut P , sort_batch_size : usize ) -> Self {
72293 info ! ( "Events will be reported using the live water-level algorithm ..." ) ;
73294
74295 Self {
75296 water_level : NonNegativeF64 :: zero ( ) ,
76297 slow_events : Vec :: with_capacity ( capacity) ,
298+ tmp_events : Vec :: with_capacity ( capacity) ,
299+ run : Run { start : 0 , len : 0 } ,
300+ runs : Vec :: new ( ) ,
301+ overflow : VecDeque :: new ( ) ,
77302 fast_events : Vec :: with_capacity ( capacity) ,
303+ sort_batch_size,
78304
79305 local_partition,
80306 _marker : PhantomData :: < ( & ' p ( ) , R ) > ,
@@ -85,9 +311,22 @@ impl<'l, 'p, R: Reporter, P: LocalPartition<'p, R>> WaterLevelReporterProxy<'l,
85311 self . water_level
86312 }
87313
314+ fn partial_sort_step ( & mut self ) -> ControlFlow < ( ) > {
315+ self . sort_slow_events_step ( false )
316+ }
317+
88318 fn advance_water_level ( & mut self , water_level : NonNegativeF64 ) {
319+ let mut i = 0 ;
320+
89321 // Report all events below the water level in sorted order
90- self . slow_events . sort ( ) ;
322+ while let ControlFlow :: Continue ( ( ) ) = self . sort_slow_events_step ( true ) {
323+ if ( i % 100 ) == 0 {
324+ info ! ( "{:?}" , self ) ;
325+ }
326+ i += 1 ;
327+ }
328+
329+ debug_assert ! ( self . slow_events. is_sorted( ) ) ;
91330
92331 for event in self . slow_events . drain ( ..) {
93332 match event. into ( ) {
@@ -104,14 +343,28 @@ impl<'l, 'p, R: Reporter, P: LocalPartition<'p, R>> WaterLevelReporterProxy<'l,
104343 }
105344 }
106345
346+ self . runs . clear ( ) ;
347+ self . run = Run { start : 0 , len : 0 } ;
348+
107349 // Update the water level
108350 self . water_level = water_level;
109351
110352 // Move fast events below the new water level into slow events
111- self . slow_events . extend (
112- self . fast_events
113- . drain_filter ( |event| event. event_time ( ) < water_level) ,
114- ) ;
353+ for event in self . fast_events . drain_filter ( |event| event. event_time ( ) < water_level) {
354+ let new_run = self . run . len > self . sort_batch_size ; // self.slow_events.last().map_or(true, |prev| prev > &event);
355+
356+ if new_run {
357+ let old_run = core:: mem:: replace ( & mut self . run , Run {
358+ start : self . slow_events . len ( ) ,
359+ len : 1 ,
360+ } ) ;
361+ self . overflow . push_back ( old_run) ;
362+ } else {
363+ self . run . len += 1 ;
364+ }
365+
366+ self . slow_events . push ( event) ;
367+ }
115368 }
116369
117370 fn local_partition ( & mut self ) -> & mut P {
@@ -124,7 +377,9 @@ impl<'l, 'p, R: Reporter, P: LocalPartition<'p, R>> Drop
124377{
125378 fn drop ( & mut self ) {
126379 // Report all events below the water level in sorted order
127- self . slow_events . sort ( ) ;
380+ while let ControlFlow :: Continue ( ( ) ) = self . sort_slow_events_step ( true ) { }
381+
382+ debug_assert ! ( self . slow_events. is_sorted( ) ) ;
128383
129384 for event in self . slow_events . drain ( ..) {
130385 match event. into ( ) {
@@ -141,6 +396,9 @@ impl<'l, 'p, R: Reporter, P: LocalPartition<'p, R>> Drop
141396 }
142397 }
143398
399+ self . runs . clear ( ) ;
400+ self . run = Run { start : 0 , len : 0 } ;
401+
144402 // Report all events above the water level in sorted order
145403 self . fast_events . sort ( ) ;
146404
0 commit comments