3333 */
3434package net .imglib2 .algorithm .convolution ;
3535
36- import java .util .ArrayList ;
37- import java .util .List ;
38- import java .util .concurrent .Callable ;
39- import java .util .concurrent .ExecutionException ;
40- import java .util .concurrent .ExecutorService ;
41- import java .util .concurrent .Future ;
42- import java .util .function .Consumer ;
43- import java .util .function .Supplier ;
44-
4536import net .imglib2 .FinalInterval ;
4637import net .imglib2 .Interval ;
4738import net .imglib2 .Localizable ;
48- import net .imglib2 .Point ;
4939import net .imglib2 .RandomAccess ;
5040import net .imglib2 .RandomAccessible ;
5141import net .imglib2 .RandomAccessibleInterval ;
52- import net .imglib2 .util .IntervalIndexer ;
42+ import net .imglib2 .loops .LoopBuilder ;
43+ import net .imglib2 .parallel .Parallelization ;
44+ import net .imglib2 .parallel .TaskExecutor ;
45+ import net .imglib2 .parallel .TaskExecutors ;
46+ import net .imglib2 .util .Cast ;
5347import net .imglib2 .util .Intervals ;
48+ import net .imglib2 .util .Localizables ;
5449import net .imglib2 .view .Views ;
5550
51+ import java .util .concurrent .ExecutorService ;
52+
5653/**
5754 * This class can be used to implement a separable convolution. It applies a
5855 * {@link LineConvolverFactory} on the given images.
5956 *
6057 * @author Matthias Arzt
6158 */
62- public class LineConvolution < T > extends AbstractMultiThreadedConvolution < T >
59+ public class LineConvolution < T > implements Convolution < T >
6360{
6461 private final LineConvolverFactory < ? super T > factory ;
6562
6663 private final int direction ;
6764
65+ private ExecutorService executor ;
66+
6867 public LineConvolution ( final LineConvolverFactory < ? super T > factory , final int direction )
6968 {
7069 this .factory = factory ;
7170 this .direction = direction ;
7271 }
7372
73+ @ Deprecated
74+ @ Override
75+ public void setExecutor ( ExecutorService executor )
76+ {
77+ this .executor = executor ;
78+ }
79+
7480 @ Override
7581 public Interval requiredSourceInterval ( final Interval targetInterval )
7682 {
@@ -84,104 +90,38 @@ public Interval requiredSourceInterval( final Interval targetInterval )
8490 @ Override
8591 public T preferredSourceType ( final T targetType )
8692 {
87- return ( T ) factory .preferredSourceType ( targetType );
93+ return Cast . unchecked ( factory .preferredSourceType ( targetType ) );
8894 }
8995
9096 @ Override
91- protected void process ( final RandomAccessible < ? extends T > source , final RandomAccessibleInterval < ? extends T > target , final ExecutorService executorService , final int numThreads )
97+ public void process ( RandomAccessible < ? extends T > source , RandomAccessibleInterval < ? extends T > target )
9298 {
9399 final RandomAccessibleInterval < ? extends T > sourceInterval = Views .interval ( source , requiredSourceInterval ( target ) );
94100 final long [] sourceMin = Intervals .minAsLongArray ( sourceInterval );
95101 final long [] targetMin = Intervals .minAsLongArray ( target );
96102
97- final Supplier < Consumer < Localizable > > actionFactory = () -> {
98-
99- final RandomAccess < ? extends T > in = sourceInterval .randomAccess ();
100- final RandomAccess < ? extends T > out = target .randomAccess ();
101- final Runnable convolver = factory .getConvolver ( in , out , direction , target .dimension ( direction ) );
102-
103- return position -> {
104- in .setPosition ( sourceMin );
105- out .setPosition ( targetMin );
106- in .move ( position );
107- out .move ( position );
108- convolver .run ();
109- };
110- };
111-
112103 final long [] dim = Intervals .dimensionsAsLongArray ( target );
113104 dim [ direction ] = 1 ;
114105
115- final int numTasks = numThreads > 1 ? timesFourAvoidOverflow (numThreads ) : 1 ;
116- LineConvolution .forEachIntervalElementInParallel ( executorService , numTasks , new FinalInterval ( dim ), actionFactory );
117- }
106+ RandomAccessibleInterval < Localizable > positions = Localizables .randomAccessibleInterval ( new FinalInterval ( dim ) );
107+ TaskExecutor taskExecutor = executor == null ? Parallelization .getTaskExecutor () : TaskExecutors .forExecutorService ( executor );
108+ LoopBuilder .setImages ( positions ).multiThreaded (taskExecutor ).forEachChunk (
109+ chunk -> {
118110
119- private int timesFourAvoidOverflow ( int x )
120- {
121- return (int ) Math .min ((long ) x * 4 , Integer .MAX_VALUE );
122- }
111+ final RandomAccess < ? extends T > in = sourceInterval .randomAccess ();
112+ final RandomAccess < ? extends T > out = target .randomAccess ();
113+ final Runnable convolver = factory .getConvolver ( in , out , direction , target .dimension ( direction ) );
123114
124- /**
125- * {@link #forEachIntervalElementInParallel(ExecutorService, int, Interval, Supplier)}
126- * executes a given action for each position in a given interval. Therefor
127- * it starts the specified number of tasks. Each tasks calls the action
128- * factory once, to get an instance of the action that should be executed.
129- * The action is then called multiple times by the task.
130- *
131- * @param service
132- * {@link ExecutorService} used to create the tasks.
133- * @param numTasks
134- * number of tasks to use.
135- * @param interval
136- * interval to iterate over.
137- * @param actionFactory
138- * factory that returns the action to be executed.
139- */
140- // TODO: move to a better place
141- public static void forEachIntervalElementInParallel ( final ExecutorService service , final int numTasks , final Interval interval ,
142- final Supplier < Consumer < Localizable > > actionFactory )
143- {
144- final long [] min = Intervals .minAsLongArray ( interval );
145- final long [] dim = Intervals .dimensionsAsLongArray ( interval );
146- final long size = Intervals .numElements ( dim );
147- final int boundedNumTasks = (int ) Math .max ( 1 , Math .min (size , numTasks ));
148- final long taskSize = ( size - 1 ) / boundedNumTasks + 1 ; // taskSize = roundUp(size / boundedNumTasks);
149- final ArrayList < Callable < Void > > callables = new ArrayList <>();
115+ chunk .forEachPixel ( position -> {
116+ in .setPosition ( sourceMin );
117+ out .setPosition ( targetMin );
118+ in .move ( position );
119+ out .move ( position );
120+ convolver .run ();
121+ } );
150122
151- for ( int taskNum = 0 ; taskNum < boundedNumTasks ; ++taskNum )
152- {
153- final long myStartIndex = taskNum * taskSize ;
154- final long myEndIndex = Math .min ( size , myStartIndex + taskSize );
155- final Callable < Void > r = () -> {
156- final Consumer < Localizable > action = actionFactory .get ();
157- final long [] position = new long [ dim .length ];
158- final Localizable localizable = Point .wrap ( position );
159- for ( long index = myStartIndex ; index < myEndIndex ; ++index )
160- {
161- IntervalIndexer .indexToPositionWithOffset ( index , dim , min , position );
162- action .accept ( localizable );
123+ return null ;
163124 }
164- return null ;
165- };
166- callables .add ( r );
167- }
168- execute ( service , callables );
169- }
170-
171- private static void execute ( final ExecutorService service , final ArrayList < Callable < Void > > callables )
172- {
173- try
174- {
175- final List < Future < Void > > futures = service .invokeAll ( callables );
176- for ( final Future < Void > future : futures )
177- future .get ();
178- }
179- catch ( final InterruptedException | ExecutionException e )
180- {
181- final Throwable cause = e .getCause ();
182- if ( cause instanceof RuntimeException )
183- throw ( RuntimeException ) cause ;
184- throw new RuntimeException ( e );
185- }
125+ );
186126 }
187127}
0 commit comments