Skip to content

Commit 2dc4ebd

Browse files
committed
Add utility methods to parallelize over blocks
1 parent 065f241 commit 2dc4ebd

File tree

3 files changed

+574
-0
lines changed

3 files changed

+574
-0
lines changed
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* #%L
3+
* ImgLib2: a general-purpose, multidimensional image processing library.
4+
* %%
5+
* Copyright (C) 2009 - 2016 Tobias Pietzsch, Stephan Preibisch, Stephan Saalfeld,
6+
* John Bogovic, Albert Cardona, Barry DeZonia, Christian Dietz, Jan Funke,
7+
* Aivar Grislis, Jonathan Hale, Grant Harris, Stefan Helfrich, Mark Hiner,
8+
* Martin Horn, Steffen Jaensch, Lee Kamentsky, Larry Lindsey, Melissa Linkert,
9+
* Mark Longair, Brian Northan, Nick Perry, Curtis Rueden, Johannes Schindelin,
10+
* Jean-Yves Tinevez and Michael Zinsmaier.
11+
* %%
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions are met:
14+
*
15+
* 1. Redistributions of source code must retain the above copyright notice,
16+
* this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright notice,
18+
* this list of conditions and the following disclaimer in the documentation
19+
* and/or other materials provided with the distribution.
20+
*
21+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24+
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
25+
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31+
* POSSIBILITY OF SUCH DAMAGE.
32+
* #L%
33+
*/
34+
35+
package net.imglib2.algorithm.util;
36+
37+
import java.util.ArrayList;
38+
import java.util.Arrays;
39+
import java.util.List;
40+
import java.util.function.Function;
41+
42+
import net.imglib2.Interval;
43+
44+
/**
45+
*
46+
* @author Philipp Hanslovsky
47+
*
48+
*/
49+
public class BlockOffsets
50+
{
51+
52+
/**
53+
*
54+
* Get all blocks of size {@code blockSize} contained within an interval
55+
* specified by {@code dimensions}.
56+
*
57+
* @param dimensions
58+
* @param blockSize
59+
* @return list of blocks defined by minimum
60+
*/
61+
public static List< long[] > collectAllOffsets( final long[] dimensions, final int[] blockSize )
62+
{
63+
return collectAllOffsets( dimensions, blockSize, block -> block );
64+
}
65+
66+
/**
67+
*
68+
* Get all blocks of size {@code blockSize} contained within an interval
69+
* specified by {@code dimensions}.
70+
*
71+
* @param dimensions
72+
* @param blockSize
73+
* @param func
74+
* Apply this function to each block, e.g. create a
75+
* {@link Interval} for each block.
76+
* @return list of blocks mapped by {@code funk}
77+
*/
78+
public static < T > List< T > collectAllOffsets( final long[] dimensions, final int[] blockSize, final Function< long[], T > func )
79+
{
80+
return collectAllOffsets( new long[ dimensions.length ], Arrays.stream( dimensions ).map( d -> d - 1 ).toArray(), blockSize, func );
81+
}
82+
83+
/**
84+
*
85+
* Get all blocks of size {@code blockSize} contained within an interval
86+
* specified by {@code min} and {@code max}.
87+
*
88+
* @param min
89+
* @param max
90+
* @param blockSize
91+
* @return list of blocks defined by minimum
92+
*/
93+
public static List< long[] > collectAllOffsets( final long[] min, final long[] max, final int[] blockSize )
94+
{
95+
return collectAllOffsets( min, max, blockSize, block -> block );
96+
}
97+
98+
/**
99+
*
100+
* Get all blocks of size {@code blockSize} contained within an interval
101+
* specified by {@code min} and {@code max}.
102+
*
103+
* @param min
104+
* @param max
105+
* @param blockSize
106+
* @param func
107+
* Apply this function to each block, e.g. create a
108+
* {@link Interval} for each block.
109+
* @return list of blocks mapped by {@code funk}
110+
*/
111+
public static < T > List< T > collectAllOffsets( final long[] min, final long[] max, final int[] blockSize, final Function< long[], T > func )
112+
{
113+
final List< T > blocks = new ArrayList<>();
114+
final int nDim = min.length;
115+
final long[] offset = min.clone();
116+
for ( int d = 0; d < nDim; )
117+
{
118+
final long[] target = offset.clone();
119+
blocks.add( func.apply( target ) );
120+
for ( d = 0; d < nDim; ++d )
121+
{
122+
offset[ d ] += blockSize[ d ];
123+
if ( offset[ d ] <= max[ d ] )
124+
break;
125+
else
126+
offset[ d ] = 0;
127+
}
128+
}
129+
return blocks;
130+
}
131+
132+
}
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* #%L
3+
* ImgLib2: a general-purpose, multidimensional image processing library.
4+
* %%
5+
* Copyright (C) 2009 - 2016 Tobias Pietzsch, Stephan Preibisch, Stephan Saalfeld,
6+
* John Bogovic, Albert Cardona, Barry DeZonia, Christian Dietz, Jan Funke,
7+
* Aivar Grislis, Jonathan Hale, Grant Harris, Stefan Helfrich, Mark Hiner,
8+
* Martin Horn, Steffen Jaensch, Lee Kamentsky, Larry Lindsey, Melissa Linkert,
9+
* Mark Longair, Brian Northan, Nick Perry, Curtis Rueden, Johannes Schindelin,
10+
* Jean-Yves Tinevez and Michael Zinsmaier.
11+
* %%
12+
* Redistribution and use in source and binary forms, with or without
13+
* modification, are permitted provided that the following conditions are met:
14+
*
15+
* 1. Redistributions of source code must retain the above copyright notice,
16+
* this list of conditions and the following disclaimer.
17+
* 2. Redistributions in binary form must reproduce the above copyright notice,
18+
* this list of conditions and the following disclaimer in the documentation
19+
* and/or other materials provided with the distribution.
20+
*
21+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
22+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
23+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24+
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
25+
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
26+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
27+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
29+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
30+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
31+
* POSSIBILITY OF SUCH DAMAGE.
32+
* #L%
33+
*/
34+
35+
package net.imglib2.algorithm.util;
36+
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.concurrent.ExecutionException;
40+
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.Future;
42+
import java.util.function.Consumer;
43+
import java.util.function.Function;
44+
import java.util.stream.Collectors;
45+
46+
import net.imglib2.FinalInterval;
47+
import net.imglib2.Interval;
48+
import net.imglib2.util.Intervals;
49+
50+
/**
51+
*
52+
* @author Philipp Hanslovsky
53+
*
54+
* Utility methods to parallelize image processing tasks in blocks
55+
*
56+
*/
57+
public class ParallelizeOverBlocks
58+
{
59+
60+
/**
61+
*
62+
* Submit blocked tasks and wait for execution.
63+
*
64+
* @param func
65+
* {@link Function} to be applied to each block as specified by
66+
* first parameter {@link Interval}.
67+
* @param interval
68+
* @param blockSize
69+
* @param es
70+
* @param numTasks
71+
* @return {@link List} of results of computation. Note that for
72+
* computations that return void, this should be a list of
73+
* {@link Void}.
74+
* @throws InterruptedException
75+
* @throws ExecutionException
76+
*/
77+
public static < T > List< T > parallelizeAndWait(
78+
final Function< Interval, T > func,
79+
final Interval interval,
80+
final int[] blockSize,
81+
final ExecutorService es,
82+
final int numTasks ) throws InterruptedException, ExecutionException
83+
{
84+
return parallelizeAndWait( func, Intervals.minAsLongArray( interval ), Intervals.maxAsLongArray( interval ), blockSize, es, numTasks );
85+
}
86+
87+
/**
88+
*
89+
* Submit blocked tasks and wait for execution.
90+
*
91+
* @param func
92+
* {@link Function} to be applied to each block as specified by
93+
* first parameter {@link Interval}.
94+
* @param interval
95+
* @param blockSize
96+
* @param es
97+
* @param numTasks
98+
* @return List of futures of the submitted tasks. Each future contains a
99+
* list of results.
100+
* @throws InterruptedException
101+
* @throws ExecutionException
102+
*/
103+
public static < T > List< Future< List< T > > > parallelize(
104+
final Function< Interval, T > func,
105+
final Interval interval,
106+
final int[] blockSize,
107+
final ExecutorService es,
108+
final int numTasks )
109+
{
110+
return parallelize( func, Intervals.minAsLongArray( interval ), Intervals.maxAsLongArray( interval ), blockSize, es, numTasks );
111+
}
112+
113+
/**
114+
*
115+
* Submit blocked tasks and wait for execution.
116+
*
117+
* @param func
118+
* {@link Function} to be applied to each block as specified by
119+
* first parameter {@link Interval}.
120+
* @param min
121+
* @param max
122+
* @param blockSize
123+
* @param es
124+
* @param numTasks
125+
* @return {@link List} of results of computation. Note that for
126+
* computations that return void, this should be a list of
127+
* {@link Void}.
128+
* @throws InterruptedException
129+
* @throws ExecutionException
130+
*/
131+
public static < T > List< T > parallelizeAndWait(
132+
final Function< Interval, T > func,
133+
final long[] min,
134+
final long[] max,
135+
final int[] blockSize,
136+
final ExecutorService es,
137+
final int numTasks ) throws InterruptedException, ExecutionException
138+
{
139+
final List< Future< List< T > > > futures = parallelize( func, min, max, blockSize, es, numTasks );
140+
final List< T > results = new ArrayList<>();
141+
for ( final Future< List< T > > future : futures )
142+
results.addAll( future.get() );
143+
return results;
144+
}
145+
146+
/**
147+
*
148+
* Submit blocked tasks and wait for execution.
149+
*
150+
* @param func
151+
* {@link Function} to be applied to each block as specified by
152+
* first parameter {@link Interval}.
153+
* @param min
154+
* @param max
155+
* @param blockSize
156+
* @param es
157+
* @param numTasks
158+
* @return List of futures of the submitted tasks. Each future contains a
159+
* list of results.
160+
* @throws InterruptedException
161+
* @throws ExecutionException
162+
*/
163+
public static < T > List< Future< List< T > > > parallelize(
164+
final Function< Interval, T > func,
165+
final long[] min,
166+
final long[] max,
167+
final int[] blockSize,
168+
final ExecutorService es,
169+
final int numTasks )
170+
{
171+
final List< Interval > blocks = BlockOffsets.collectAllOffsets( min, max, blockSize, blockMin -> {
172+
final long[] blockMax = new long[ blockMin.length ];
173+
for ( int d = 0; d < blockMax.length; ++d )
174+
blockMax[ d ] = Math.min( blockMin[ d ] + blockSize[ d ] - 1, max[ d ] );
175+
return new FinalInterval( blockMin, blockMax );
176+
} );
177+
return parallelize( func, blocks, es, numTasks );
178+
}
179+
180+
/**
181+
*
182+
* Submit blocked tasks and wait for execution.
183+
*
184+
* @param func
185+
* {@link Function} to be applied to each block as specified by
186+
* first parameter {@link Interval}.
187+
* @param blocks
188+
* @param es
189+
* @param numTasks
190+
* @return {@link List} of results of computation. Note that for
191+
* computations that return void, this should be a list of
192+
* {@link Void}.
193+
* @throws InterruptedException
194+
* @throws ExecutionException
195+
*/
196+
public static < T > List< T > parallelizeAndWait(
197+
final Function< Interval, T > func,
198+
final List< Interval > blocks,
199+
final ExecutorService es,
200+
final int numTasks ) throws InterruptedException, ExecutionException
201+
{
202+
final List< Future< List< T > > > futures = parallelize( func, blocks, es, numTasks );
203+
final List< T > results = new ArrayList<>();
204+
for ( final Future< List< T > > future : futures )
205+
results.addAll( future.get() );
206+
return results;
207+
}
208+
209+
/**
210+
*
211+
* Submit blocked tasks and wait for execution.
212+
*
213+
* @param func
214+
* {@link Function} to be applied to each block as specified by
215+
* first parameter {@link Interval}.
216+
* @param blocks
217+
* @param es
218+
* @param numTasks
219+
* @return List of futures of the submitted tasks. Each future contains a
220+
* list of results.
221+
* @throws InterruptedException
222+
* @throws ExecutionException
223+
*/
224+
public static < T > List< Future< List< T > > > parallelize(
225+
final Function< Interval, T > func,
226+
final List< Interval > blocks,
227+
final ExecutorService es,
228+
final int numTasks )
229+
{
230+
final ArrayList< Future< List< T > > > futures = new ArrayList<>();
231+
final int taskSize = Math.max( blocks.size() / numTasks, 1 );
232+
for ( int i = 0; i < blocks.size(); ++i )
233+
{
234+
final int finalI = i;
235+
futures.add( es.submit( () -> blocks.subList( finalI, finalI + taskSize ).stream().map( func ).collect( Collectors.toList() ) ) );
236+
}
237+
238+
return futures;
239+
}
240+
241+
/**
242+
* Utility method to turn a {@link Consumer} into a {@link Function}.
243+
*
244+
* @param consumer
245+
* @return
246+
*/
247+
public static < T > Function< T, Void > ofConsumer( final Consumer< T > consumer )
248+
{
249+
return t -> {
250+
consumer.accept( t );
251+
return null;
252+
};
253+
}
254+
255+
}

0 commit comments

Comments
 (0)