@@ -274,7 +274,16 @@ def checksum_states(self, state_index=None):
274274 key .split ("." )[1 ],
275275 getattr (inputs_copy , key .split ("." )[1 ])[ind ],
276276 )
277+ # setting files_hash again in case it was cleaned by setting specific element
278+ # that might be important for outer splitter of input variable with big files
279+ # the file can be changed with every single index even if there are only two files
280+ inputs_copy .files_hash = self .inputs .files_hash
277281 input_hash = inputs_copy .hash
282+ # updating self.inputs.files_hash, so big files hashes
283+ # doesn't have to be recompute for the next element
284+ for key , val in inputs_copy .files_hash .items ():
285+ if val :
286+ self .inputs .files_hash [key ].update (val )
278287 if is_workflow (self ):
279288 con_hash = hash_function (self ._connections )
280289 hash_list = [input_hash , con_hash ]
@@ -392,26 +401,28 @@ def output_dir(self):
392401 return [self ._cache_dir / checksum for checksum in self .checksum_states ()]
393402 return self ._cache_dir / self .checksum
394403
395- def __call__ (self , submitter = None , plugin = None , rerun = False , ** kwargs ):
404+ def __call__ (
405+ self , submitter = None , plugin = None , plugin_kwargs = None , rerun = False , ** kwargs
406+ ):
396407 """Make tasks callable themselves."""
397408 from .submitter import Submitter
398409
399410 if submitter and plugin :
400411 raise Exception ("Specify submitter OR plugin, not both" )
401- plugin = plugin or self .plugin
402- if plugin :
403- submitter = Submitter (plugin = plugin )
404- elif self .state :
405- submitter = Submitter ()
412+ elif submitter :
413+ pass
414+ # if there is plugin provided or the task is a Workflow or has a state,
415+ # the submitter will be created using provided plugin, self.plugin or "cf"
416+ elif plugin or self .state or is_workflow (self ):
417+ plugin = plugin or self .plugin or "cf"
418+ if plugin_kwargs is None :
419+ plugin_kwargs = {}
420+ submitter = Submitter (plugin = plugin , ** plugin_kwargs )
406421
407422 if submitter :
408423 with submitter as sub :
409424 res = sub (self )
410- else :
411- if is_workflow (self ):
412- raise NotImplementedError (
413- "TODO: linear workflow execution - assign submitter or plugin for now"
414- )
425+ else : # tasks without state could be run without a submitter
415426 res = self ._run (rerun = rerun , ** kwargs )
416427 return res
417428
0 commit comments