From 25854fe9edb2d91c6824de4a3edaf20cce14f153 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Thu, 1 Aug 2019 18:15:34 +0200 Subject: [PATCH 01/32] removed version spec for mne in requirements --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b68b111..ad5cd84 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ numpy scipy matplotlib -mne>0.16.2 +mne ipython pandas setuptools From e4a19239b75045ce6538e84eedce9c5aa0507259 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Mon, 5 Aug 2019 18:49:13 +0200 Subject: [PATCH 02/32] in the process of splitting off preproc of one file --- eegprep/preproc.py | 4 +- eegprep/run_bids.py | 173 ++++++++++++++++++++++++++++++++++++++++++++ requirements.txt | 2 +- 3 files changed, 176 insertions(+), 3 deletions(-) create mode 100644 eegprep/run_bids.py diff --git a/eegprep/preproc.py b/eegprep/preproc.py index d68e3e7..ecadea8 100644 --- a/eegprep/preproc.py +++ b/eegprep/preproc.py @@ -16,7 +16,7 @@ from eegprep.defaults import defaults -def run_preproc(datadir='/data'): +def run_file(datadir='/data'): print('data directory: {}'.format(datadir)) conf_file_path = join(datadir, 'eegprep.conf') @@ -47,7 +47,7 @@ def run_preproc(datadir='/data'): subject_epochs = {} - rawtypes = {'.set': mne.io.read_raw_eeglab, '.bdf': mne.io.read_raw_edf} + rawtypes = {'.set': mne.io.read_raw_eeglab, '.bdf': mne.io.read_raw_bdf} for fname in sorted(glob.glob(join(subjectdir, 'eeg', '*'))): _, ext = splitext(fname) if ext not in rawtypes.keys(): diff --git a/eegprep/run_bids.py b/eegprep/run_bids.py new file mode 100644 index 0000000..15e4b21 --- /dev/null +++ b/eegprep/run_bids.py @@ -0,0 +1,173 @@ +from os.path import join, basename, splitext +import os, glob, random +import numpy +import scipy.io +import mne +import pandas +from autoreject import AutoReject +from eegprep.bids.naming import filename2tuple +from eegprep.guess import guess_montage +from eegprep.util import ( + resample_events_on_resampled_epochs, + plot_rejectlog, + save_rejectlog +) +from eegprep.configuration import Configuration +from eegprep.defaults import defaults + + +def run_bids(datadir='/data'): + + print('data directory: {}'.format(datadir)) + conf_file_path = join(datadir, 'eegprep.conf') + config = Configuration() + config.setDefaults(defaults) + if os.path.isfile(conf_file_path): + with open(conf_file_path) as fh: + conf_string = fh.read() + config.updateFromString(conf_string) + print('configuration:') + print(config) + + bidsdir = join(datadir, 'BIDS') + eegprepdir = join(bidsdir, 'derivatives', 'eegprep') + + + subjectdirs = sorted(glob.glob(join(bidsdir, 'sub-*'))) + for subjectdir in subjectdirs: + assert os.path.isdir(subjectdir) + + sub = basename(subjectdir)[4:] + + # prepare derivatives directory + derivdir = join(eegprepdir, 'sub-' + sub) + os.makedirs(derivdir, exist_ok=True) + reportsdir = join(eegprepdir, 'reports', 'sub-' + sub) + os.makedirs(reportsdir, exist_ok=True) + + + subject_epochs = {} + rawtypes = {'.set': mne.io.read_raw_eeglab, '.bdf': mne.io.read_raw_bdf} + for fname in sorted(glob.glob(join(subjectdir, 'eeg', '*'))): + _, ext = splitext(fname) + if ext not in rawtypes.keys(): + continue + sub, ses, task, run = filename2tuple(basename(fname)) + + print('\nProcessing raw file: ' + basename(fname)) + + # read data + raw = rawtypes[ext](fname, preload=True, verbose=False) + events = mne.find_events(raw) #raw, consecutive=False, min_duration=0.005) + + # Set channel types and select reference channels + channelFile = fname.replace('eeg' + ext, 'channels.tsv') + channels = pandas.read_csv(channelFile, index_col='name', sep='\t') + bids2mne = { + 'MISC': 'misc', + 'EEG': 'eeg', + 'VEOG': 'eog', + 'TRIG': 'stim', + 'REF': 'eeg', + } + channels['mne'] = channels.type.replace(bids2mne) + + # the below fails if the specified channels are not in the data + raw.set_channel_types(channels.mne.to_dict()) + + # set bad channels + raw.info['bads'] = channels[channels.status=='bad'].index.tolist() + + # pick channels to use for epoching + epoching_picks = mne.pick_types(raw.info, eeg=True, eog=False, stim=False, exclude='bads') + + + # Filtering + #raw.filter(l_freq=0.05, h_freq=40, fir_design='firwin') + + montage = mne.channels.read_montage(guess_montage(raw.ch_names)) + print(montage) + raw.set_montage(montage) + + # plot raw data + nchans = len(raw.ch_names) + pick_channels = numpy.arange(0, nchans, numpy.floor(nchans/20)).astype(int) + start = numpy.round(raw.times.max()/2) + fig = raw.plot(start=start, order=pick_channels) + fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_raw.png'.format(sub, ses, task, run) + fig.savefig(join(reportsdir, fname_plot)) + + # Set reference + refChannels = channels[channels.type=='REF'].index.tolist() + raw.set_eeg_reference(ref_channels=refChannels) + + ## epoching + epochs_params = dict( + events=events, + tmin=-0.1, + tmax=0.8, + reject=None, # dict(eeg=250e-6, eog=150e-6) + picks=epoching_picks, + detrend=0, + ) + file_epochs = mne.Epochs(raw, preload=True, **epochs_params) + file_epochs.drop_channels(refChannels) + + # autoreject (under development) + ar = AutoReject(n_jobs=4) + clean_epochs = ar.fit_transform(file_epochs) + + rejectlog = ar.get_reject_log(clean_epochs) + fname_log = 'sub-{}_ses-{}_task-{}_run-{}_reject-log.npz'.format(sub, ses, task, run) + save_rejectlog(join(reportsdir, fname_log), rejectlog) + fig = plot_rejectlog(rejectlog) + fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_bad-epochs.png'.format(sub, ses, task, run) + fig.savefig(join(reportsdir, fname_plot)) + + + # store for now + subject_epochs[(ses, task, run)] = clean_epochs + + # create evoked plots + conds = clean_epochs.event_id.keys() + selected_conds = random.sample(conds, min(len(conds), 6)) + picks = mne.pick_types(clean_epochs.info, eeg=True) + for cond in selected_conds: + evoked = clean_epochs[cond].average() + fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) + fig = evoked.plot_joint(picks=picks) + fig.savefig(join(reportsdir, fname_plot)) + + + + sessSeg = 0 + sessions = sorted(list(set([k[sessSeg] for k in subject_epochs.keys()]))) + for session in sessions: + taskSeg = 1 + tasks = list(set([k[taskSeg] for k in subject_epochs.keys() if k[sessSeg]==session])) + for task in tasks: + print('\nGathering epochs for session {} task {}'.format(session, task)) + epochs_selection = [v for (k, v) in subject_epochs.items() if k[:2]==(session, task)] + + task_epochs = mne.epochs.concatenate_epochs(epochs_selection) + + # downsample if configured to do so + # important to do this after concatenation because + # downsampling may cause rejection for 'TOOSHORT' + if config['downsample'] < task_epochs.info['sfreq']: + task_epochs = task_epochs.copy().resample(config['downsample'], npad='auto') + + ext = config['out_file_format'] + fname = join(derivdir, 'sub-{}_ses-{}_task-{}_epo.{}'.format(sub, session, task, ext)) + variables = { + 'epochs': task_epochs.get_data(), + 'events': task_epochs.events, + 'timepoints': task_epochs.times + } + if ext == 'fif': + task_epochs.save(fname) + elif ext == 'mat': + scipy.io.savemat(fname, mdict=variables) + elif ext == 'npy': + numpy.savez(fname, **variables) + diff --git a/requirements.txt b/requirements.txt index 0b2ce68..e846d89 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ numpy scipy -scikit-learn>=0.18 +scikit-learn matplotlib mne autoreject From 459ea14d872d831d6f91a5afa7fb028f0022ac76 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Mon, 5 Aug 2019 19:19:32 +0200 Subject: [PATCH 03/32] working on basics --- eegprep/preproc.py | 173 ------------------------------------- eegprep/preproc_dataset.py | 95 ++++++++++++++++++++ eegprep/preproc_run.py | 107 +++++++++++++++++++++++ eegprep/run_bids.py | 173 ------------------------------------- scripts/eegprep | 4 +- 5 files changed, 204 insertions(+), 348 deletions(-) delete mode 100644 eegprep/preproc.py create mode 100644 eegprep/preproc_dataset.py create mode 100644 eegprep/preproc_run.py delete mode 100644 eegprep/run_bids.py diff --git a/eegprep/preproc.py b/eegprep/preproc.py deleted file mode 100644 index ecadea8..0000000 --- a/eegprep/preproc.py +++ /dev/null @@ -1,173 +0,0 @@ -from os.path import join, basename, splitext -import os, glob, random -import numpy -import scipy.io -import mne -import pandas -from autoreject import AutoReject -from eegprep.bids.naming import filename2tuple -from eegprep.guess import guess_montage -from eegprep.util import ( - resample_events_on_resampled_epochs, - plot_rejectlog, - save_rejectlog -) -from eegprep.configuration import Configuration -from eegprep.defaults import defaults - - -def run_file(datadir='/data'): - - print('data directory: {}'.format(datadir)) - conf_file_path = join(datadir, 'eegprep.conf') - config = Configuration() - config.setDefaults(defaults) - if os.path.isfile(conf_file_path): - with open(conf_file_path) as fh: - conf_string = fh.read() - config.updateFromString(conf_string) - print('configuration:') - print(config) - - bidsdir = join(datadir, 'BIDS') - eegprepdir = join(bidsdir, 'derivatives', 'eegprep') - - - subjectdirs = sorted(glob.glob(join(bidsdir, 'sub-*'))) - for subjectdir in subjectdirs: - assert os.path.isdir(subjectdir) - - sub = basename(subjectdir)[4:] - - # prepare derivatives directory - derivdir = join(eegprepdir, 'sub-' + sub) - os.makedirs(derivdir, exist_ok=True) - reportsdir = join(eegprepdir, 'reports', 'sub-' + sub) - os.makedirs(reportsdir, exist_ok=True) - - - subject_epochs = {} - rawtypes = {'.set': mne.io.read_raw_eeglab, '.bdf': mne.io.read_raw_bdf} - for fname in sorted(glob.glob(join(subjectdir, 'eeg', '*'))): - _, ext = splitext(fname) - if ext not in rawtypes.keys(): - continue - sub, ses, task, run = filename2tuple(basename(fname)) - - print('\nProcessing raw file: ' + basename(fname)) - - # read data - raw = rawtypes[ext](fname, preload=True, verbose=False) - events = mne.find_events(raw) #raw, consecutive=False, min_duration=0.005) - - # Set channel types and select reference channels - channelFile = fname.replace('eeg' + ext, 'channels.tsv') - channels = pandas.read_csv(channelFile, index_col='name', sep='\t') - bids2mne = { - 'MISC': 'misc', - 'EEG': 'eeg', - 'VEOG': 'eog', - 'TRIG': 'stim', - 'REF': 'eeg', - } - channels['mne'] = channels.type.replace(bids2mne) - - # the below fails if the specified channels are not in the data - raw.set_channel_types(channels.mne.to_dict()) - - # set bad channels - raw.info['bads'] = channels[channels.status=='bad'].index.tolist() - - # pick channels to use for epoching - epoching_picks = mne.pick_types(raw.info, eeg=True, eog=False, stim=False, exclude='bads') - - - # Filtering - #raw.filter(l_freq=0.05, h_freq=40, fir_design='firwin') - - montage = mne.channels.read_montage(guess_montage(raw.ch_names)) - print(montage) - raw.set_montage(montage) - - # plot raw data - nchans = len(raw.ch_names) - pick_channels = numpy.arange(0, nchans, numpy.floor(nchans/20)).astype(int) - start = numpy.round(raw.times.max()/2) - fig = raw.plot(start=start, order=pick_channels) - fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_raw.png'.format(sub, ses, task, run) - fig.savefig(join(reportsdir, fname_plot)) - - # Set reference - refChannels = channels[channels.type=='REF'].index.tolist() - raw.set_eeg_reference(ref_channels=refChannels) - - ## epoching - epochs_params = dict( - events=events, - tmin=-0.1, - tmax=0.8, - reject=None, # dict(eeg=250e-6, eog=150e-6) - picks=epoching_picks, - detrend=0, - ) - file_epochs = mne.Epochs(raw, preload=True, **epochs_params) - file_epochs.drop_channels(refChannels) - - # autoreject (under development) - ar = AutoReject(n_jobs=4) - clean_epochs = ar.fit_transform(file_epochs) - - rejectlog = ar.get_reject_log(clean_epochs) - fname_log = 'sub-{}_ses-{}_task-{}_run-{}_reject-log.npz'.format(sub, ses, task, run) - save_rejectlog(join(reportsdir, fname_log), rejectlog) - fig = plot_rejectlog(rejectlog) - fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_bad-epochs.png'.format(sub, ses, task, run) - fig.savefig(join(reportsdir, fname_plot)) - - - # store for now - subject_epochs[(ses, task, run)] = clean_epochs - - # create evoked plots - conds = clean_epochs.event_id.keys() - selected_conds = random.sample(conds, min(len(conds), 6)) - picks = mne.pick_types(clean_epochs.info, eeg=True) - for cond in selected_conds: - evoked = clean_epochs[cond].average() - fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) - fig = evoked.plot_joint(picks=picks) - fig.savefig(join(reportsdir, fname_plot)) - - - - sessSeg = 0 - sessions = sorted(list(set([k[sessSeg] for k in subject_epochs.keys()]))) - for session in sessions: - taskSeg = 1 - tasks = list(set([k[taskSeg] for k in subject_epochs.keys() if k[sessSeg]==session])) - for task in tasks: - print('\nGathering epochs for session {} task {}'.format(session, task)) - epochs_selection = [v for (k, v) in subject_epochs.items() if k[:2]==(session, task)] - - task_epochs = mne.epochs.concatenate_epochs(epochs_selection) - - # downsample if configured to do so - # important to do this after concatenation because - # downsampling may cause rejection for 'TOOSHORT' - if config['downsample'] < task_epochs.info['sfreq']: - task_epochs = task_epochs.copy().resample(config['downsample'], npad='auto') - - ext = config['out_file_format'] - fname = join(derivdir, 'sub-{}_ses-{}_task-{}_epo.{}'.format(sub, session, task, ext)) - variables = { - 'epochs': task_epochs.get_data(), - 'events': task_epochs.events, - 'timepoints': task_epochs.times - } - if ext == 'fif': - task_epochs.save(fname) - elif ext == 'mat': - scipy.io.savemat(fname, mdict=variables) - elif ext == 'npy': - numpy.savez(fname, **variables) - diff --git a/eegprep/preproc_dataset.py b/eegprep/preproc_dataset.py new file mode 100644 index 0000000..f9821e2 --- /dev/null +++ b/eegprep/preproc_dataset.py @@ -0,0 +1,95 @@ +from os.path import join, basename, splitext +import os, glob, random +from eegprep.configuration import Configuration +from eegprep.defaults import defaults +from eegprep.preproc_run import preproc_run + + +def preproc_dataset(datadir='/data'): + + print('data directory: {}'.format(datadir)) + conf_file_path = join(datadir, 'eegprep.conf') + config = Configuration() + config.setDefaults(defaults) + if os.path.isfile(conf_file_path): + with open(conf_file_path) as fh: + conf_string = fh.read() + config.updateFromString(conf_string) + print('configuration:') + print(config) + + bidsdir = join(datadir, 'BIDS') + eegprepdir = join(bidsdir, 'derivatives', 'eegprep') + + + subjectdirs = sorted(glob.glob(join(bidsdir, 'sub-*'))) + for subjectdir in subjectdirs: + assert os.path.isdir(subjectdir) + + sub = basename(subjectdir)[4:] + + # prepare derivatives directory + derivdir = join(eegprepdir, 'sub-' + sub) + os.makedirs(derivdir, exist_ok=True) + reportsdir = join(eegprepdir, 'reports', 'sub-' + sub) + os.makedirs(reportsdir, exist_ok=True) + + + subject_epochs = {} + rawtypes = {'.set': mne.io.read_raw_eeglab, '.bdf': mne.io.read_raw_bdf} + for fname in sorted(glob.glob(join(subjectdir, 'eeg', '*'))): + + + preproc_run(fname, config) + + _, ext = splitext(fname) + if ext not in rawtypes.keys(): + continue + sub, ses, task, run = filename2tuple(basename(fname)) + + # store for now + subject_epochs[(ses, task, run)] = clean_epochs + + # # create evoked plots + # conds = clean_epochs.event_id.keys() + # selected_conds = random.sample(conds, min(len(conds), 6)) + # picks = mne.pick_types(clean_epochs.info, eeg=True) + # for cond in selected_conds: + # evoked = clean_epochs[cond].average() + # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) + # fig = evoked.plot_joint(picks=picks) + # fig.savefig(join(reportsdir, fname_plot)) + + + + # sessSeg = 0 + # sessions = sorted(list(set([k[sessSeg] for k in subject_epochs.keys()]))) + # for session in sessions: + # taskSeg = 1 + # tasks = list(set([k[taskSeg] for k in subject_epochs.keys() if k[sessSeg]==session])) + # for task in tasks: + # print('\nGathering epochs for session {} task {}'.format(session, task)) + # epochs_selection = [v for (k, v) in subject_epochs.items() if k[:2]==(session, task)] + + # task_epochs = mne.epochs.concatenate_epochs(epochs_selection) + + # # downsample if configured to do so + # # important to do this after concatenation because + # # downsampling may cause rejection for 'TOOSHORT' + # if config['downsample'] < task_epochs.info['sfreq']: + # task_epochs = task_epochs.copy().resample(config['downsample'], npad='auto') + + # ext = config['out_file_format'] + # fname = join(derivdir, 'sub-{}_ses-{}_task-{}_epo.{}'.format(sub, session, task, ext)) + # variables = { + # 'epochs': task_epochs.get_data(), + # 'events': task_epochs.events, + # 'timepoints': task_epochs.times + # } + # if ext == 'fif': + # task_epochs.save(fname) + # elif ext == 'mat': + # scipy.io.savemat(fname, mdict=variables) + # elif ext == 'npy': + # numpy.savez(fname, **variables) + diff --git a/eegprep/preproc_run.py b/eegprep/preproc_run.py new file mode 100644 index 0000000..fca2747 --- /dev/null +++ b/eegprep/preproc_run.py @@ -0,0 +1,107 @@ +from os.path import join, basename, splitext +import os, glob, random +import numpy +import scipy.io +import mne +import pandas +#from autoreject import AutoReject +from eegprep.bids.naming import filename2tuple +from eegprep.guess import guess_montage +from eegprep.util import ( + resample_events_on_resampled_epochs, + plot_rejectlog, + save_rejectlog +) +from eegprep.configuration import Configuration +from eegprep.defaults import defaults + + +def preproc_run(fpath, config): + + #sub, ses, task, run = filename2tuple(basename(fname)) + + # read data + raw = mne.io.read_raw_bdf(fpath, preload=True, verbose=False) + + # Set channel types and select reference channels + channelFile = fpath.replace('eeg.bdf', 'channels.tsv') # maybe should be a string arg + channels = pandas.read_csv(channelFile, index_col='name', sep='\t') + bids2mne = { + 'MISC': 'misc', + 'EEG': 'eeg', + 'EOG': 'eog', + 'VEOG': 'eog', + 'TRIG': 'stim', + 'REF': 'eeg', + } + channels['mne'] = channels.type.replace(bids2mne) + + # the below fails if the specified channels are not in the data + raw.set_channel_types(channels.mne.to_dict()) + + # Set reference + refChannels = channels[channels.type=='REF'].index.tolist() + raw = raw.set_eeg_reference(ref_channels=refChannels) + + + # set bad channels + # raw.info['bads'] = channels[channels.status=='bad'].index.tolist() + + # pick channels to use for epoching + #epoching_picks = mne.pick_types(raw.info, eeg=True, eog=False, stim=False, exclude='bads') + + + # Filtering + raw = raw.filter(l_freq=0.05, h_freq=45, fir_design='firwin') + + montage = mne.channels.read_montage(guess_montage(raw.ch_names)) + print(montage) + raw.set_montage(montage) + + # plot raw data + # nchans = len(raw.ch_names) + # pick_channels = numpy.arange(0, nchans, numpy.floor(nchans/20)).astype(int) + # start = numpy.round(raw.times.max()/2) + # fig = raw.plot(start=start, order=pick_channels) + # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_raw.png'.format(sub, ses, task, run) + # fig.savefig(join(reportsdir, fname_plot)) + + + events = mne.find_events(raw) #raw, consecutive=False, min_duration=0.005) + ## epoching + # epochs_params = dict( + # events=events, + # tmin=-0.1, + # tmax=0.8, + # reject=None, # dict(eeg=250e-6, eog=150e-6) + # picks=epoching_picks, + # detrend=0, + # ) + # file_epochs = mne.Epochs(raw, preload=True, **epochs_params) + # file_epochs.drop_channels(refChannels) + + # # autoreject (under development) + # ar = AutoReject(n_jobs=4) + # clean_epochs = ar.fit_transform(file_epochs) + + # rejectlog = ar.get_reject_log(clean_epochs) + # fname_log = 'sub-{}_ses-{}_task-{}_run-{}_reject-log.npz'.format(sub, ses, task, run) + # save_rejectlog(join(reportsdir, fname_log), rejectlog) + # fig = plot_rejectlog(rejectlog) + # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_bad-epochs.png'.format(sub, ses, task, run) + # fig.savefig(join(reportsdir, fname_plot)) + + + # # store for now + # # subject_epochs[(ses, task, run)] = clean_epochs + + # # create evoked plots + # conds = clean_epochs.event_id.keys() + # selected_conds = random.sample(conds, min(len(conds), 6)) + # picks = mne.pick_types(clean_epochs.info, eeg=True) + # for cond in selected_conds: + # evoked = clean_epochs[cond].average() + # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) + # fig = evoked.plot_joint(picks=picks) + # fig.savefig(join(reportsdir, fname_plot)) + diff --git a/eegprep/run_bids.py b/eegprep/run_bids.py deleted file mode 100644 index 15e4b21..0000000 --- a/eegprep/run_bids.py +++ /dev/null @@ -1,173 +0,0 @@ -from os.path import join, basename, splitext -import os, glob, random -import numpy -import scipy.io -import mne -import pandas -from autoreject import AutoReject -from eegprep.bids.naming import filename2tuple -from eegprep.guess import guess_montage -from eegprep.util import ( - resample_events_on_resampled_epochs, - plot_rejectlog, - save_rejectlog -) -from eegprep.configuration import Configuration -from eegprep.defaults import defaults - - -def run_bids(datadir='/data'): - - print('data directory: {}'.format(datadir)) - conf_file_path = join(datadir, 'eegprep.conf') - config = Configuration() - config.setDefaults(defaults) - if os.path.isfile(conf_file_path): - with open(conf_file_path) as fh: - conf_string = fh.read() - config.updateFromString(conf_string) - print('configuration:') - print(config) - - bidsdir = join(datadir, 'BIDS') - eegprepdir = join(bidsdir, 'derivatives', 'eegprep') - - - subjectdirs = sorted(glob.glob(join(bidsdir, 'sub-*'))) - for subjectdir in subjectdirs: - assert os.path.isdir(subjectdir) - - sub = basename(subjectdir)[4:] - - # prepare derivatives directory - derivdir = join(eegprepdir, 'sub-' + sub) - os.makedirs(derivdir, exist_ok=True) - reportsdir = join(eegprepdir, 'reports', 'sub-' + sub) - os.makedirs(reportsdir, exist_ok=True) - - - subject_epochs = {} - rawtypes = {'.set': mne.io.read_raw_eeglab, '.bdf': mne.io.read_raw_bdf} - for fname in sorted(glob.glob(join(subjectdir, 'eeg', '*'))): - _, ext = splitext(fname) - if ext not in rawtypes.keys(): - continue - sub, ses, task, run = filename2tuple(basename(fname)) - - print('\nProcessing raw file: ' + basename(fname)) - - # read data - raw = rawtypes[ext](fname, preload=True, verbose=False) - events = mne.find_events(raw) #raw, consecutive=False, min_duration=0.005) - - # Set channel types and select reference channels - channelFile = fname.replace('eeg' + ext, 'channels.tsv') - channels = pandas.read_csv(channelFile, index_col='name', sep='\t') - bids2mne = { - 'MISC': 'misc', - 'EEG': 'eeg', - 'VEOG': 'eog', - 'TRIG': 'stim', - 'REF': 'eeg', - } - channels['mne'] = channels.type.replace(bids2mne) - - # the below fails if the specified channels are not in the data - raw.set_channel_types(channels.mne.to_dict()) - - # set bad channels - raw.info['bads'] = channels[channels.status=='bad'].index.tolist() - - # pick channels to use for epoching - epoching_picks = mne.pick_types(raw.info, eeg=True, eog=False, stim=False, exclude='bads') - - - # Filtering - #raw.filter(l_freq=0.05, h_freq=40, fir_design='firwin') - - montage = mne.channels.read_montage(guess_montage(raw.ch_names)) - print(montage) - raw.set_montage(montage) - - # plot raw data - nchans = len(raw.ch_names) - pick_channels = numpy.arange(0, nchans, numpy.floor(nchans/20)).astype(int) - start = numpy.round(raw.times.max()/2) - fig = raw.plot(start=start, order=pick_channels) - fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_raw.png'.format(sub, ses, task, run) - fig.savefig(join(reportsdir, fname_plot)) - - # Set reference - refChannels = channels[channels.type=='REF'].index.tolist() - raw.set_eeg_reference(ref_channels=refChannels) - - ## epoching - epochs_params = dict( - events=events, - tmin=-0.1, - tmax=0.8, - reject=None, # dict(eeg=250e-6, eog=150e-6) - picks=epoching_picks, - detrend=0, - ) - file_epochs = mne.Epochs(raw, preload=True, **epochs_params) - file_epochs.drop_channels(refChannels) - - # autoreject (under development) - ar = AutoReject(n_jobs=4) - clean_epochs = ar.fit_transform(file_epochs) - - rejectlog = ar.get_reject_log(clean_epochs) - fname_log = 'sub-{}_ses-{}_task-{}_run-{}_reject-log.npz'.format(sub, ses, task, run) - save_rejectlog(join(reportsdir, fname_log), rejectlog) - fig = plot_rejectlog(rejectlog) - fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_bad-epochs.png'.format(sub, ses, task, run) - fig.savefig(join(reportsdir, fname_plot)) - - - # store for now - subject_epochs[(ses, task, run)] = clean_epochs - - # create evoked plots - conds = clean_epochs.event_id.keys() - selected_conds = random.sample(conds, min(len(conds), 6)) - picks = mne.pick_types(clean_epochs.info, eeg=True) - for cond in selected_conds: - evoked = clean_epochs[cond].average() - fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) - fig = evoked.plot_joint(picks=picks) - fig.savefig(join(reportsdir, fname_plot)) - - - - sessSeg = 0 - sessions = sorted(list(set([k[sessSeg] for k in subject_epochs.keys()]))) - for session in sessions: - taskSeg = 1 - tasks = list(set([k[taskSeg] for k in subject_epochs.keys() if k[sessSeg]==session])) - for task in tasks: - print('\nGathering epochs for session {} task {}'.format(session, task)) - epochs_selection = [v for (k, v) in subject_epochs.items() if k[:2]==(session, task)] - - task_epochs = mne.epochs.concatenate_epochs(epochs_selection) - - # downsample if configured to do so - # important to do this after concatenation because - # downsampling may cause rejection for 'TOOSHORT' - if config['downsample'] < task_epochs.info['sfreq']: - task_epochs = task_epochs.copy().resample(config['downsample'], npad='auto') - - ext = config['out_file_format'] - fname = join(derivdir, 'sub-{}_ses-{}_task-{}_epo.{}'.format(sub, session, task, ext)) - variables = { - 'epochs': task_epochs.get_data(), - 'events': task_epochs.events, - 'timepoints': task_epochs.times - } - if ext == 'fif': - task_epochs.save(fname) - elif ext == 'mat': - scipy.io.savemat(fname, mdict=variables) - elif ext == 'npy': - numpy.savez(fname, **variables) - diff --git a/scripts/eegprep b/scripts/eegprep index dde2952..303c0be 100644 --- a/scripts/eegprep +++ b/scripts/eegprep @@ -1,5 +1,5 @@ #!/usr/bin/env python3 import sys -from eegprep.preproc import run_preproc +from eegprep.preproc_dataset import preproc_dataset datadir = sys.argv[1] if len(sys.argv) > 1 else '/data' -run_preproc(datadir) \ No newline at end of file +preproc_dataset(datadir) \ No newline at end of file From 9c4e329d1b2b433735de73172e4cb2815746b8b8 Mon Sep 17 00:00:00 2001 From: Jasper Date: Tue, 6 Aug 2019 15:58:27 +0100 Subject: [PATCH 04/32] some reorganizing --- eegprep/preproc_run.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/eegprep/preproc_run.py b/eegprep/preproc_run.py index fca2747..001142d 100644 --- a/eegprep/preproc_run.py +++ b/eegprep/preproc_run.py @@ -1,10 +1,5 @@ from os.path import join, basename, splitext -import os, glob, random -import numpy -import scipy.io -import mne -import pandas -#from autoreject import AutoReject +import os, glob, random, numpy, mne, pandas from eegprep.bids.naming import filename2tuple from eegprep.guess import guess_montage from eegprep.util import ( @@ -12,8 +7,6 @@ plot_rejectlog, save_rejectlog ) -from eegprep.configuration import Configuration -from eegprep.defaults import defaults def preproc_run(fpath, config): @@ -35,10 +28,9 @@ def preproc_run(fpath, config): 'REF': 'eeg', } channels['mne'] = channels.type.replace(bids2mne) - - # the below fails if the specified channels are not in the data raw.set_channel_types(channels.mne.to_dict()) + # Set reference refChannels = channels[channels.type=='REF'].index.tolist() raw = raw.set_eeg_reference(ref_channels=refChannels) From 495f060fdd957037e73c46bd035f45f5cb2900a5 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Tue, 6 Aug 2019 19:23:23 +0100 Subject: [PATCH 05/32] simplified eegprep --- eegprep/bids/__init__.py | 0 eegprep/bids/naming.py | 19 ------ eegprep/preproc_dataset.py | 121 +++++++++---------------------------- eegprep/preproc_run.py | 35 ++++++----- eegprep/preproc_subject.py | 43 +++++++++++++ 5 files changed, 90 insertions(+), 128 deletions(-) delete mode 100644 eegprep/bids/__init__.py delete mode 100644 eegprep/bids/naming.py create mode 100644 eegprep/preproc_subject.py diff --git a/eegprep/bids/__init__.py b/eegprep/bids/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/eegprep/bids/naming.py b/eegprep/bids/naming.py deleted file mode 100644 index b64e3a5..0000000 --- a/eegprep/bids/naming.py +++ /dev/null @@ -1,19 +0,0 @@ -BIDS_SEGMENTS = ['sub', 'ses', 'task', 'run'] - - -def filename2tuple(fname): - segments = fname.split('_') - vals = [None] * len(BIDS_SEGMENTS) - for seg in segments: - for l in [3, 4]: - if seg[:l] in BIDS_SEGMENTS: - vals[BIDS_SEGMENTS.index(seg[:l])] = seg[l+1:] - return tuple(vals) - - -def args2filename(**kwargs): - fname = '' - for seg in BIDS_SEGMENTS: - if seg in kwargs: - fname += seg + '-' + kwargs[seg] - return fname \ No newline at end of file diff --git a/eegprep/preproc_dataset.py b/eegprep/preproc_dataset.py index f9821e2..4781311 100644 --- a/eegprep/preproc_dataset.py +++ b/eegprep/preproc_dataset.py @@ -1,95 +1,30 @@ from os.path import join, basename, splitext import os, glob, random -from eegprep.configuration import Configuration -from eegprep.defaults import defaults -from eegprep.preproc_run import preproc_run - - -def preproc_dataset(datadir='/data'): - - print('data directory: {}'.format(datadir)) - conf_file_path = join(datadir, 'eegprep.conf') - config = Configuration() - config.setDefaults(defaults) - if os.path.isfile(conf_file_path): - with open(conf_file_path) as fh: - conf_string = fh.read() - config.updateFromString(conf_string) - print('configuration:') - print(config) - - bidsdir = join(datadir, 'BIDS') - eegprepdir = join(bidsdir, 'derivatives', 'eegprep') - - - subjectdirs = sorted(glob.glob(join(bidsdir, 'sub-*'))) - for subjectdir in subjectdirs: - assert os.path.isdir(subjectdir) - - sub = basename(subjectdir)[4:] - - # prepare derivatives directory - derivdir = join(eegprepdir, 'sub-' + sub) - os.makedirs(derivdir, exist_ok=True) - reportsdir = join(eegprepdir, 'reports', 'sub-' + sub) - os.makedirs(reportsdir, exist_ok=True) - - - subject_epochs = {} - rawtypes = {'.set': mne.io.read_raw_eeglab, '.bdf': mne.io.read_raw_bdf} - for fname in sorted(glob.glob(join(subjectdir, 'eeg', '*'))): - - - preproc_run(fname, config) - - _, ext = splitext(fname) - if ext not in rawtypes.keys(): - continue - sub, ses, task, run = filename2tuple(basename(fname)) - - # store for now - subject_epochs[(ses, task, run)] = clean_epochs - - # # create evoked plots - # conds = clean_epochs.event_id.keys() - # selected_conds = random.sample(conds, min(len(conds), 6)) - # picks = mne.pick_types(clean_epochs.info, eeg=True) - # for cond in selected_conds: - # evoked = clean_epochs[cond].average() - # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) - # fig = evoked.plot_joint(picks=picks) - # fig.savefig(join(reportsdir, fname_plot)) - - - - # sessSeg = 0 - # sessions = sorted(list(set([k[sessSeg] for k in subject_epochs.keys()]))) - # for session in sessions: - # taskSeg = 1 - # tasks = list(set([k[taskSeg] for k in subject_epochs.keys() if k[sessSeg]==session])) - # for task in tasks: - # print('\nGathering epochs for session {} task {}'.format(session, task)) - # epochs_selection = [v for (k, v) in subject_epochs.items() if k[:2]==(session, task)] - - # task_epochs = mne.epochs.concatenate_epochs(epochs_selection) - - # # downsample if configured to do so - # # important to do this after concatenation because - # # downsampling may cause rejection for 'TOOSHORT' - # if config['downsample'] < task_epochs.info['sfreq']: - # task_epochs = task_epochs.copy().resample(config['downsample'], npad='auto') - - # ext = config['out_file_format'] - # fname = join(derivdir, 'sub-{}_ses-{}_task-{}_epo.{}'.format(sub, session, task, ext)) - # variables = { - # 'epochs': task_epochs.get_data(), - # 'events': task_epochs.events, - # 'timepoints': task_epochs.times - # } - # if ext == 'fif': - # task_epochs.save(fname) - # elif ext == 'mat': - # scipy.io.savemat(fname, mdict=variables) - # elif ext == 'npy': - # numpy.savez(fname, **variables) - +# from eegprep.configuration import Configuration +# from eegprep.defaults import defaults +from eegprep.preproc_subject import preproc_subject +from bids import BIDSLayout + + +def preproc_dataset(datadir): + + # print('data directory: {}'.format(datadir)) + # conf_file_path = join(datadir, 'eegprep.conf') + # config = Configuration() + # config.setDefaults(defaults) + # if os.path.isfile(conf_file_path): + # with open(conf_file_path) as fh: + # conf_string = fh.read() + # config.updateFromString(conf_string) + # print('configuration:') + # print(config) + + eegprepdir = join(datadir, 'derivatives', 'eegprep') + layout = BIDSLayout(datadir) + subjects = layout.get(return_type='id', target='subject') + for subject in subjects: + + subjectdir = join(eegprepdir, 'sub-' + subject) + os.makedirs(subjectdir, exist_ok=True) + out_fpath = join(subjectdir, 'sub-{}_epochs.npz') + preproc_subject(layout, subject, out_fpath) \ No newline at end of file diff --git a/eegprep/preproc_run.py b/eegprep/preproc_run.py index 001142d..bccca8c 100644 --- a/eegprep/preproc_run.py +++ b/eegprep/preproc_run.py @@ -1,6 +1,5 @@ -from os.path import join, basename, splitext +from os.path import join, basename import os, glob, random, numpy, mne, pandas -from eegprep.bids.naming import filename2tuple from eegprep.guess import guess_montage from eegprep.util import ( resample_events_on_resampled_epochs, @@ -9,8 +8,9 @@ ) -def preproc_run(fpath, config): +def preproc_run(fpath): + print(basename(fpath)) #sub, ses, task, run = filename2tuple(basename(fname)) # read data @@ -34,7 +34,8 @@ def preproc_run(fpath, config): # Set reference refChannels = channels[channels.type=='REF'].index.tolist() raw = raw.set_eeg_reference(ref_channels=refChannels) - + # can now drop reference electrodes + raw.set_channel_types({k: 'misc' for k in refChannels}) # set bad channels # raw.info['bads'] = channels[channels.status=='bad'].index.tolist() @@ -47,8 +48,8 @@ def preproc_run(fpath, config): raw = raw.filter(l_freq=0.05, h_freq=45, fir_design='firwin') montage = mne.channels.read_montage(guess_montage(raw.ch_names)) - print(montage) - raw.set_montage(montage) + # print(montage) + raw = raw.set_montage(montage, verbose=False) # plot raw data # nchans = len(raw.ch_names) @@ -59,17 +60,18 @@ def preproc_run(fpath, config): # fig.savefig(join(reportsdir, fname_plot)) - events = mne.find_events(raw) #raw, consecutive=False, min_duration=0.005) + events = mne.find_events(raw, verbose=False) #raw, consecutive=False, min_duration=0.005) ## epoching - # epochs_params = dict( - # events=events, - # tmin=-0.1, - # tmax=0.8, - # reject=None, # dict(eeg=250e-6, eog=150e-6) - # picks=epoching_picks, - # detrend=0, - # ) - # file_epochs = mne.Epochs(raw, preload=True, **epochs_params) + picks = mne.pick_types(raw.info, eeg=True) + epochs_params = dict( + events=events, + tmin=-0.2, + tmax=0.8, + picks=picks, + verbose=False + ) + epochs = mne.Epochs(raw, preload=True, **epochs_params) + epochs = epochs.resample(256., npad='auto') # downsample # file_epochs.drop_channels(refChannels) # # autoreject (under development) @@ -96,4 +98,5 @@ def preproc_run(fpath, config): # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) # fig = evoked.plot_joint(picks=picks) # fig.savefig(join(reportsdir, fname_plot)) + return epochs diff --git a/eegprep/preproc_subject.py b/eegprep/preproc_subject.py new file mode 100644 index 0000000..621fcc7 --- /dev/null +++ b/eegprep/preproc_subject.py @@ -0,0 +1,43 @@ +from eegprep.preproc_run import preproc_run +import numpy + + +def preproc_subject(layout, subject, out_fpath): + + print('preprocessing subject {}'.format(subject)) + + eeg_runs = layout.get(subject=subject, suffix='eeg', extension='bdf') + epochs_all = [] + events_all = [] + for eeg_run in eeg_runs: + epochs_run = preproc_run(eeg_run.path) + epochs_all.append(epochs_run.get_data()) + events_all.append(epochs_run.events[:, 2]) + # else: + # return + + variables = { + 'epochs': numpy.concatenate(epochs_all, axis=0), + 'events': numpy.concatenate(events_all, axis=0), + } + # not using fif because of eegprep#10 + numpy.savez(out_fpath, **variables) + + # sessSeg = 0 + # sessions = sorted(list(set([k[sessSeg] for k in subject_epochs.keys()]))) + # for session in sessions: + # taskSeg = 1 + # tasks = list(set([k[taskSeg] for k in subject_epochs.keys() if k[sessSeg]==session])) + # for task in tasks: + # print('\nGathering epochs for session {} task {}'.format(session, task)) + # epochs_selection = [v for (k, v) in subject_epochs.items() if k[:2]==(session, task)] + + # task_epochs = mne.epochs.concatenate_epochs(epochs_selection) + + # # downsample if configured to do so + # # important to do this after concatenation because + # # downsampling may cause rejection for 'TOOSHORT' + # if config['downsample'] < task_epochs.info['sfreq']: + # task_epochs = task_epochs.copy().resample(config['downsample'], npad='auto') + + From 774ba5c3059939512342f5bb1c58ed7b4a1a522a Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Tue, 6 Aug 2019 19:25:37 +0100 Subject: [PATCH 06/32] fixed output filename --- eegprep/preproc_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eegprep/preproc_dataset.py b/eegprep/preproc_dataset.py index 4781311..a02f4f9 100644 --- a/eegprep/preproc_dataset.py +++ b/eegprep/preproc_dataset.py @@ -26,5 +26,5 @@ def preproc_dataset(datadir): subjectdir = join(eegprepdir, 'sub-' + subject) os.makedirs(subjectdir, exist_ok=True) - out_fpath = join(subjectdir, 'sub-{}_epochs.npz') + out_fpath = join(subjectdir, 'sub-{}_epochs.npz'.format(subject)) preproc_subject(layout, subject, out_fpath) \ No newline at end of file From 61b5ab9a29514b0ce9fe43fc2b69ab1d544b9b68 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Tue, 6 Aug 2019 19:48:51 +0100 Subject: [PATCH 07/32] changed output file naming --- eegprep/preproc_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eegprep/preproc_dataset.py b/eegprep/preproc_dataset.py index a02f4f9..7e45f46 100644 --- a/eegprep/preproc_dataset.py +++ b/eegprep/preproc_dataset.py @@ -26,5 +26,5 @@ def preproc_dataset(datadir): subjectdir = join(eegprepdir, 'sub-' + subject) os.makedirs(subjectdir, exist_ok=True) - out_fpath = join(subjectdir, 'sub-{}_epochs.npz'.format(subject)) + out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) preproc_subject(layout, subject, out_fpath) \ No newline at end of file From da492dbda55bd3f2671302e76323cd5a7a89de44 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Fri, 22 Nov 2019 19:57:45 +0000 Subject: [PATCH 08/32] introduced arg parser --- eegprep/args.py | 22 ++++++++++++++++++++++ eegprep/main.py | 6 ++++++ scripts/eegprep | 6 ++---- 3 files changed, 30 insertions(+), 4 deletions(-) create mode 100644 eegprep/args.py create mode 100644 eegprep/main.py diff --git a/eegprep/args.py b/eegprep/args.py new file mode 100644 index 0000000..84c43af --- /dev/null +++ b/eegprep/args.py @@ -0,0 +1,22 @@ +from argparse import ArgumentParser + + +def parse_arguments(args=None): + """Parse commandline parameters + + Args: + args (str, optional): String of arguments, for testing purposes only. + Defaults to None. + + Returns: + Namespace: Object with parsed arguments as properties + """ + parser = ArgumentParser() + parser.add_argument('data_directory', type=str, nargs='?', default='/data', + help='root data directory') + subject = parser.add_mutually_exclusive_group() + subject.add_argument('-s', '--subject-index', type=int, + help='index of subject to work on, when sorted alphabetically') + subject.add_argument('-l', '--subject-label', type=str, + help='label of subject to work on') + return parser.parse_args(args) diff --git a/eegprep/main.py b/eegprep/main.py new file mode 100644 index 0000000..5905ec7 --- /dev/null +++ b/eegprep/main.py @@ -0,0 +1,6 @@ +from eegprep.args import parse_arguments + +def run(): + print( + parse_arguments() + ) \ No newline at end of file diff --git a/scripts/eegprep b/scripts/eegprep index 303c0be..eadd85a 100644 --- a/scripts/eegprep +++ b/scripts/eegprep @@ -1,5 +1,3 @@ #!/usr/bin/env python3 -import sys -from eegprep.preproc_dataset import preproc_dataset -datadir = sys.argv[1] if len(sys.argv) > 1 else '/data' -preproc_dataset(datadir) \ No newline at end of file +from eegprep.main import run +run() \ No newline at end of file From 9f99c1acb03c35f8a6546a12a3465477c839edd2 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Fri, 22 Nov 2019 20:59:06 +0000 Subject: [PATCH 09/32] updated readme --- README.md | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index e24e492..e19ac51 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,38 @@ # EEGprep Standardized EEG preprocessing +[![https://www.singularity-hub.org/static/img/hosted-singularity--hub-%23e32929.svg](https://www.singularity-hub.org/static/img/hosted-singularity--hub-%23e32929.svg)](https://singularity-hub.org/collections/3833) + ## Singularity -Build the EEGprep singularity image: +Download the EEGprep singularity image: ``` -sudo singularity build eegprep.simg Singularity +singularity pull --name eegprep.simg shub://Charestlab/eegprep ``` Run EEGprep on your data: ``` singularity run -c -e --bind /your/data/dir/:/data eegprep.simg ``` -where /your/data/dir/ contains a *BIDS* folder. + +## Commandline + +You can run eegprep on the commandline. Start by running `eegprep -h` and you'll see: +``` +usage: eegprep [-h] [-s SUBJECT_INDEX] [-l SUBJECT_LABEL] [data_directory] + +positional arguments: + data_directory root data directory + +optional arguments: + -h, --help show this help message and exit + -s SUBJECT_INDEX, --subject-index SUBJECT_INDEX + index of subject to work on when sorted alphabetically + -l SUBJECT_LABEL, --subject-label SUBJECT_LABEL + label of subject to work on + +``` ## Configuration From 0a4861025f4fc61bea74a8c579a529a3eda09cd0 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Fri, 22 Nov 2019 21:01:47 +0000 Subject: [PATCH 10/32] fixed versions of mne and autoreject --- requirements.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index e846d89..ac3573e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,8 +2,8 @@ numpy scipy scikit-learn matplotlib -mne -autoreject +mne==1.17.4 +autoreject==0.2.1 ipython pandas setuptools From 07b851fb81c6d69195e28db7a1f50b0ba5cc96d1 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Fri, 22 Nov 2019 21:02:16 +0000 Subject: [PATCH 11/32] main.run() collects args --- eegprep/main.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/eegprep/main.py b/eegprep/main.py index 5905ec7..8779617 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -1,6 +1,5 @@ from eegprep.args import parse_arguments def run(): - print( - parse_arguments() - ) \ No newline at end of file + args = parse_arguments() + print(args) \ No newline at end of file From 58400a5b6875a483de71a94ac745b774e57c0650 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Fri, 22 Nov 2019 21:14:53 +0000 Subject: [PATCH 12/32] simplified Singularity recipe --- Singularity | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Singularity b/Singularity index c6aad3b..4a28a5d 100644 --- a/Singularity +++ b/Singularity @@ -11,10 +11,10 @@ From: python:3.7 dist/eegprep-0.1.tar.gz . %post - pip install numpy ipython - pip install --no-cache-dir -U https://api.github.com/repos/mne-tools/mne-python/zipball/master#egg=mne - pip install --no-cache-dir -U https://api.github.com/repos/autoreject/autoreject/zipball/master#egg=autoreject pip install eegprep-0.1.tar.gz %runscript exec eegprep + +%labels + Version 0.1 From 03e374604b213fcfc337fd2138f80915500f58f4 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Fri, 22 Nov 2019 23:51:03 +0000 Subject: [PATCH 13/32] notes on i/o approach --- eegprep/main.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/eegprep/main.py b/eegprep/main.py index 8779617..d5615c7 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -1,5 +1,32 @@ from eegprep.args import parse_arguments +from os.path import join, basename, splitext +import os, glob, random +from eegprep.preproc_subject import preproc_subject +from bids import BIDSLayout + def run(): args = parse_arguments() - print(args) \ No newline at end of file + + + + layout = BIDSLayout(args.data_directory) + subjects = layout.get(return_type='id', target='subject') + + + # IO + # contains input, output, mem_storage, report + # job = subject(io.for(subject=sub)) + # io.store(for_run=run, epochs) # io decides whether to keep in memory or to store temp or store long term + # pipeline.add(job) + # pipeline.run() + + + + preproc_subject(layout, subject) + + # output + eegprepdir = join(args.data_directory, 'derivatives', 'eegprep') + subjectdir = join(eegprepdir, 'sub-' + subject) + os.makedirs(subjectdir, exist_ok=True) + out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) \ No newline at end of file From b171235e28522eed0f427ea003f7fba4d098ce7d Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sat, 23 Nov 2019 15:31:12 +0000 Subject: [PATCH 14/32] building new pipeline-based approach --- eegprep/args.py | 2 ++ eegprep/inputoutput.py | 21 +++++++++++++++ eegprep/jobs/__init__.py | 0 eegprep/jobs/subject.py | 51 +++++++++++++++++++++++++++++++++++++ eegprep/main.py | 52 ++++++++++++++++++-------------------- eegprep/pipeline.py | 5 ++++ eegprep/preproc_subject.py | 43 ------------------------------- requirements.txt | 2 +- scripts/eegprep | 3 --- setup.py | 6 ++++- 10 files changed, 109 insertions(+), 76 deletions(-) create mode 100644 eegprep/inputoutput.py create mode 100644 eegprep/jobs/__init__.py create mode 100644 eegprep/jobs/subject.py create mode 100644 eegprep/pipeline.py delete mode 100644 eegprep/preproc_subject.py delete mode 100644 scripts/eegprep diff --git a/eegprep/args.py b/eegprep/args.py index 84c43af..ca6a3f3 100644 --- a/eegprep/args.py +++ b/eegprep/args.py @@ -14,6 +14,8 @@ def parse_arguments(args=None): parser = ArgumentParser() parser.add_argument('data_directory', type=str, nargs='?', default='/data', help='root data directory') + parser.add_argument('--dry-run', action='store_true', + help='rshow assembled pipeline but do not run analyses or store files') subject = parser.add_mutually_exclusive_group() subject.add_argument('-s', '--subject-index', type=int, help='index of subject to work on, when sorted alphabetically') diff --git a/eegprep/inputoutput.py b/eegprep/inputoutput.py new file mode 100644 index 0000000..3ebd2f6 --- /dev/null +++ b/eegprep/inputoutput.py @@ -0,0 +1,21 @@ +#from bids import BIDSLayout + +# TODO: # contains input, output, mem_storage, report + +# io.store(for_run=run, epochs) # io decides whether to keep in memory (with mem limit arg) or to store temp or store long term + +class InputOutput(object): + + def __init__(self, root_dir): + pass + + def get_subject_label_for_index(self, index): + return '' + +# subjects = layout.get(return_type='id', target='subject') + +# - # output +# - eegprepdir = join(args.data_directory, 'derivatives', 'eegprep') +# - subjectdir = join(eegprepdir, 'sub-' + subject) +# - os.makedirs(subjectdir, exist_ok=True) +# - out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) diff --git a/eegprep/jobs/__init__.py b/eegprep/jobs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eegprep/jobs/subject.py b/eegprep/jobs/subject.py new file mode 100644 index 0000000..1527168 --- /dev/null +++ b/eegprep/jobs/subject.py @@ -0,0 +1,51 @@ +from eegprep.preproc_run import preproc_run +import numpy + +# TODO: # -> sub job: parent job can decide that input can be cleared + + +class SubjectJob(object): + + def add_to(self, pipeline): + pass + + def run(self): + + # layout, subject, out_fpath + print('preprocessing subject {}'.format(subject)) + + eeg_runs = layout.get(subject=subject, suffix='eeg', extension='bdf') + epochs_all = [] + events_all = [] + for eeg_run in eeg_runs: + epochs_run = preproc_run(eeg_run.path) + epochs_all.append(epochs_run.get_data()) + events_all.append(epochs_run.events[:, 2]) + # else: + # return + + variables = { + 'epochs': numpy.concatenate(epochs_all, axis=0), + 'events': numpy.concatenate(events_all, axis=0), + } + # not using fif because of eegprep#10 + numpy.savez(out_fpath, **variables) + + # sessSeg = 0 + # sessions = sorted(list(set([k[sessSeg] for k in subject_epochs.keys()]))) + # for session in sessions: + # taskSeg = 1 + # tasks = list(set([k[taskSeg] for k in subject_epochs.keys() if k[sessSeg]==session])) + # for task in tasks: + # print('\nGathering epochs for session {} task {}'.format(session, task)) + # epochs_selection = [v for (k, v) in subject_epochs.items() if k[:2]==(session, task)] + + # task_epochs = mne.epochs.concatenate_epochs(epochs_selection) + + # # downsample if configured to do so + # # important to do this after concatenation because + # # downsampling may cause rejection for 'TOOSHORT' + # if config['downsample'] < task_epochs.info['sfreq']: + # task_epochs = task_epochs.copy().resample(config['downsample'], npad='auto') + + diff --git a/eegprep/main.py b/eegprep/main.py index d5615c7..67388b4 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -1,32 +1,28 @@ from eegprep.args import parse_arguments -from os.path import join, basename, splitext -import os, glob, random -from eegprep.preproc_subject import preproc_subject -from bids import BIDSLayout - - -def run(): - args = parse_arguments() +from eegprep.pipeline import Pipeline +from eegprep.inputoutput import InputOutput +from eegprep.jobs.subject import SubjectJob +def run(args=None): + """Parses commandline arguments and runs EEGprep for the specified scope - layout = BIDSLayout(args.data_directory) - subjects = layout.get(return_type='id', target='subject') - - - # IO - # contains input, output, mem_storage, report - # job = subject(io.for(subject=sub)) - # io.store(for_run=run, epochs) # io decides whether to keep in memory or to store temp or store long term - # pipeline.add(job) - # pipeline.run() - - - - preproc_subject(layout, subject) - - # output - eegprepdir = join(args.data_directory, 'derivatives', 'eegprep') - subjectdir = join(eegprepdir, 'sub-' + subject) - os.makedirs(subjectdir, exist_ok=True) - out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) \ No newline at end of file + Args: + args (Namespace, optional): Object with eeg prep parameters + as attributes. Defaults to None. + """ + args = args or parse_arguments() + print(args) + io = InputOutput( + root_dir = args.data_directory + ) + pipeline = Pipeline( + dry = args.dry_run + ) + if args.subject_index: + args.subject_label = io.get_subject_label_for_index(0) + + if args.subject_label: + job = SubjectJob() + job.add_to(pipeline) + pipeline.run() diff --git a/eegprep/pipeline.py b/eegprep/pipeline.py new file mode 100644 index 0000000..d90d833 --- /dev/null +++ b/eegprep/pipeline.py @@ -0,0 +1,5 @@ + +class Pipeline(object): + + def run(self): + pass diff --git a/eegprep/preproc_subject.py b/eegprep/preproc_subject.py deleted file mode 100644 index 621fcc7..0000000 --- a/eegprep/preproc_subject.py +++ /dev/null @@ -1,43 +0,0 @@ -from eegprep.preproc_run import preproc_run -import numpy - - -def preproc_subject(layout, subject, out_fpath): - - print('preprocessing subject {}'.format(subject)) - - eeg_runs = layout.get(subject=subject, suffix='eeg', extension='bdf') - epochs_all = [] - events_all = [] - for eeg_run in eeg_runs: - epochs_run = preproc_run(eeg_run.path) - epochs_all.append(epochs_run.get_data()) - events_all.append(epochs_run.events[:, 2]) - # else: - # return - - variables = { - 'epochs': numpy.concatenate(epochs_all, axis=0), - 'events': numpy.concatenate(events_all, axis=0), - } - # not using fif because of eegprep#10 - numpy.savez(out_fpath, **variables) - - # sessSeg = 0 - # sessions = sorted(list(set([k[sessSeg] for k in subject_epochs.keys()]))) - # for session in sessions: - # taskSeg = 1 - # tasks = list(set([k[taskSeg] for k in subject_epochs.keys() if k[sessSeg]==session])) - # for task in tasks: - # print('\nGathering epochs for session {} task {}'.format(session, task)) - # epochs_selection = [v for (k, v) in subject_epochs.items() if k[:2]==(session, task)] - - # task_epochs = mne.epochs.concatenate_epochs(epochs_selection) - - # # downsample if configured to do so - # # important to do this after concatenation because - # # downsampling may cause rejection for 'TOOSHORT' - # if config['downsample'] < task_epochs.info['sfreq']: - # task_epochs = task_epochs.copy().resample(config['downsample'], npad='auto') - - diff --git a/requirements.txt b/requirements.txt index ac3573e..b97f841 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ numpy scipy scikit-learn matplotlib -mne==1.17.4 +mne==0.19.2 autoreject==0.2.1 ipython pandas diff --git a/scripts/eegprep b/scripts/eegprep deleted file mode 100644 index eadd85a..0000000 --- a/scripts/eegprep +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env python3 -from eegprep.main import run -run() \ No newline at end of file diff --git a/setup.py b/setup.py index ba16d02..5803aee 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,11 @@ include_package_data=True, zip_safe=False, install_requires=requires, - scripts=['scripts/eegprep'], + entry_points={ + 'console_scripts': [ + 'eegprep = eegprep.main:run', + ], + }, tests_require=requires, test_suite="tests" ) From 490b5a84cdc11602a2e6a461d56f19d63ae71b4b Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sat, 23 Nov 2019 15:48:40 +0000 Subject: [PATCH 15/32] can run whole dataset --- eegprep/inputoutput.py | 6 +++++- eegprep/main.py | 11 ++++++++--- eegprep/pipeline.py | 3 +++ 3 files changed, 16 insertions(+), 4 deletions(-) diff --git a/eegprep/inputoutput.py b/eegprep/inputoutput.py index 3ebd2f6..90afc0a 100644 --- a/eegprep/inputoutput.py +++ b/eegprep/inputoutput.py @@ -9,9 +9,13 @@ class InputOutput(object): def __init__(self, root_dir): pass - def get_subject_label_for_index(self, index): + def get_subject_labels(self): + # TODO: must be sorted return '' + def for_(self, subject=None): + pass + # subjects = layout.get(return_type='id', target='subject') # - # output diff --git a/eegprep/main.py b/eegprep/main.py index 67388b4..a56a192 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -19,10 +19,15 @@ def run(args=None): pipeline = Pipeline( dry = args.dry_run ) - if args.subject_index: - args.subject_label = io.get_subject_label_for_index(0) + + subjects = io.get_subject_labels() + if args.subject_index: + subjects = [subjects[args.subject_index]] if args.subject_label: - job = SubjectJob() + subjects = [args.subject_label] + + for subject_label in subjects: + job = SubjectJob(io.for_(subject=subject_label)) job.add_to(pipeline) pipeline.run() diff --git a/eegprep/pipeline.py b/eegprep/pipeline.py index d90d833..b6db681 100644 --- a/eegprep/pipeline.py +++ b/eegprep/pipeline.py @@ -1,5 +1,8 @@ class Pipeline(object): + def __init__(self, dry=False): + pass + def run(self): pass From 8513c5cf1604dd2446093bad0a65527ec9d38d79 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sat, 23 Nov 2019 16:39:03 +0000 Subject: [PATCH 16/32] worked out SubjectJob --- eegprep/configuration.py | 14 +++ eegprep/{inputoutput.py => input_output.py} | 9 ++ eegprep/jobs/base.py | 12 +++ eegprep/jobs/concat_epochs.py | 7 ++ eegprep/jobs/run.py | 107 ++++++++++++++++++++ eegprep/jobs/subject.py | 59 +++-------- eegprep/main.py | 2 +- eegprep/preproc_dataset.py | 30 ------ eegprep/preproc_run.py | 102 ------------------- 9 files changed, 163 insertions(+), 179 deletions(-) rename eegprep/{inputoutput.py => input_output.py} (64%) create mode 100644 eegprep/jobs/base.py create mode 100644 eegprep/jobs/concat_epochs.py create mode 100644 eegprep/jobs/run.py delete mode 100644 eegprep/preproc_dataset.py delete mode 100644 eegprep/preproc_run.py diff --git a/eegprep/configuration.py b/eegprep/configuration.py index 1dc13a7..a14a9f1 100644 --- a/eegprep/configuration.py +++ b/eegprep/configuration.py @@ -1,4 +1,18 @@ +"""[summary] +Previously used like: + + # print('data directory: {}'.format(datadir)) + # conf_file_path = join(datadir, 'eegprep.conf') + # config = Configuration() + # config.setDefaults(defaults) + # if os.path.isfile(conf_file_path): + # with open(conf_file_path) as fh: + # conf_string = fh.read() + # config.updateFromString(conf_string) + # print('configuration:') + # print(config) +""" class Configuration(object): diff --git a/eegprep/inputoutput.py b/eegprep/input_output.py similarity index 64% rename from eegprep/inputoutput.py rename to eegprep/input_output.py index 90afc0a..b511f2a 100644 --- a/eegprep/inputoutput.py +++ b/eegprep/input_output.py @@ -23,3 +23,12 @@ def for_(self, subject=None): # - subjectdir = join(eegprepdir, 'sub-' + subject) # - os.makedirs(subjectdir, exist_ok=True) # - out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) + # eegprepdir = join(datadir, 'derivatives', 'eegprep') + # layout = BIDSLayout(datadir) + # subjects = layout.get(return_type='id', target='subject') + # for subject in subjects: + + # subjectdir = join(eegprepdir, 'sub-' + subject) + # os.makedirs(subjectdir, exist_ok=True) + # out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) + # preproc_subject(layout, subject, out_fpath) diff --git a/eegprep/jobs/base.py b/eegprep/jobs/base.py new file mode 100644 index 0000000..c87ce20 --- /dev/null +++ b/eegprep/jobs/base.py @@ -0,0 +1,12 @@ + + +class BaseJob(object): + + def __init__(self, io): + self.io = io + + def add_to(self, pipeline): + raise NotImplementedError(self.__class__ + '.add_to()') + + def run(self): + raise NotImplementedError(self.__class__ + '.run()') diff --git a/eegprep/jobs/concat_epochs.py b/eegprep/jobs/concat_epochs.py new file mode 100644 index 0000000..9a107fa --- /dev/null +++ b/eegprep/jobs/concat_epochs.py @@ -0,0 +1,7 @@ +from eegprep.jobs.base import BaseJob + + +class ConcatEpochsJob(BaseJob): + + def run(self): + pass diff --git a/eegprep/jobs/run.py b/eegprep/jobs/run.py new file mode 100644 index 0000000..0cad8c0 --- /dev/null +++ b/eegprep/jobs/run.py @@ -0,0 +1,107 @@ +from eegprep.jobs.base import BaseJob +from os.path import join, basename +import os, glob, random, numpy, mne, pandas +from eegprep.guess import guess_montage +from eegprep.util import ( + resample_events_on_resampled_epochs, + plot_rejectlog, + save_rejectlog +) + + +class RunJob(BaseJob): + """Represents preprocessing of one raw data file. + """ + + def run(self): + + print(basename(fpath)) + #sub, ses, task, run = filename2tuple(basename(fname)) + + # read data + raw = mne.io.read_raw_bdf(fpath, preload=True, verbose=False) + + # Set channel types and select reference channels + channelFile = fpath.replace('eeg.bdf', 'channels.tsv') # maybe should be a string arg + channels = pandas.read_csv(channelFile, index_col='name', sep='\t') + bids2mne = { + 'MISC': 'misc', + 'EEG': 'eeg', + 'EOG': 'eog', + 'VEOG': 'eog', + 'TRIG': 'stim', + 'REF': 'eeg', + } + channels['mne'] = channels.type.replace(bids2mne) + raw.set_channel_types(channels.mne.to_dict()) + + + # Set reference + refChannels = channels[channels.type=='REF'].index.tolist() + raw = raw.set_eeg_reference(ref_channels=refChannels) + # can now drop reference electrodes + raw.set_channel_types({k: 'misc' for k in refChannels}) + + # set bad channels + # raw.info['bads'] = channels[channels.status=='bad'].index.tolist() + + # pick channels to use for epoching + #epoching_picks = mne.pick_types(raw.info, eeg=True, eog=False, stim=False, exclude='bads') + + + # Filtering + raw = raw.filter(l_freq=0.05, h_freq=45, fir_design='firwin') + + montage = mne.channels.read_montage(guess_montage(raw.ch_names)) + # print(montage) + raw = raw.set_montage(montage, verbose=False) + + # plot raw data + # nchans = len(raw.ch_names) + # pick_channels = numpy.arange(0, nchans, numpy.floor(nchans/20)).astype(int) + # start = numpy.round(raw.times.max()/2) + # fig = raw.plot(start=start, order=pick_channels) + # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_raw.png'.format(sub, ses, task, run) + # fig.savefig(join(reportsdir, fname_plot)) + + + events = mne.find_events(raw, verbose=False) #raw, consecutive=False, min_duration=0.005) + ## epoching + picks = mne.pick_types(raw.info, eeg=True) + epochs_params = dict( + events=events, + tmin=-0.2, + tmax=0.8, + picks=picks, + verbose=False + ) + epochs = mne.Epochs(raw, preload=True, **epochs_params) + epochs = epochs.resample(256., npad='auto') # downsample + # file_epochs.drop_channels(refChannels) + + # # autoreject (under development) + # ar = AutoReject(n_jobs=4) + # clean_epochs = ar.fit_transform(file_epochs) + + # rejectlog = ar.get_reject_log(clean_epochs) + # fname_log = 'sub-{}_ses-{}_task-{}_run-{}_reject-log.npz'.format(sub, ses, task, run) + # save_rejectlog(join(reportsdir, fname_log), rejectlog) + # fig = plot_rejectlog(rejectlog) + # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_bad-epochs.png'.format(sub, ses, task, run) + # fig.savefig(join(reportsdir, fname_plot)) + + + # # store for now + # # subject_epochs[(ses, task, run)] = clean_epochs + + # # create evoked plots + # conds = clean_epochs.event_id.keys() + # selected_conds = random.sample(conds, min(len(conds), 6)) + # picks = mne.pick_types(clean_epochs.info, eeg=True) + # for cond in selected_conds: + # evoked = clean_epochs[cond].average() + # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) + # fig = evoked.plot_joint(picks=picks) + # fig.savefig(join(reportsdir, fname_plot)) + return epochs + diff --git a/eegprep/jobs/subject.py b/eegprep/jobs/subject.py index 1527168..a424c7d 100644 --- a/eegprep/jobs/subject.py +++ b/eegprep/jobs/subject.py @@ -1,51 +1,18 @@ -from eegprep.preproc_run import preproc_run -import numpy +from eegprep.jobs.base import BaseJob +from eegprep.jobs.run import RunJob +from eegprep.jobs.concat_epochs import ConcatEpochsJob -# TODO: # -> sub job: parent job can decide that input can be cleared +class SubjectJob(BaseJob): -class SubjectJob(object): + def __init__(self, io): + self.io = io def add_to(self, pipeline): - pass - - def run(self): - - # layout, subject, out_fpath - print('preprocessing subject {}'.format(subject)) - - eeg_runs = layout.get(subject=subject, suffix='eeg', extension='bdf') - epochs_all = [] - events_all = [] - for eeg_run in eeg_runs: - epochs_run = preproc_run(eeg_run.path) - epochs_all.append(epochs_run.get_data()) - events_all.append(epochs_run.events[:, 2]) - # else: - # return - - variables = { - 'epochs': numpy.concatenate(epochs_all, axis=0), - 'events': numpy.concatenate(events_all, axis=0), - } - # not using fif because of eegprep#10 - numpy.savez(out_fpath, **variables) - - # sessSeg = 0 - # sessions = sorted(list(set([k[sessSeg] for k in subject_epochs.keys()]))) - # for session in sessions: - # taskSeg = 1 - # tasks = list(set([k[taskSeg] for k in subject_epochs.keys() if k[sessSeg]==session])) - # for task in tasks: - # print('\nGathering epochs for session {} task {}'.format(session, task)) - # epochs_selection = [v for (k, v) in subject_epochs.items() if k[:2]==(session, task)] - - # task_epochs = mne.epochs.concatenate_epochs(epochs_selection) - - # # downsample if configured to do so - # # important to do this after concatenation because - # # downsampling may cause rejection for 'TOOSHORT' - # if config['downsample'] < task_epochs.info['sfreq']: - # task_epochs = task_epochs.copy().resample(config['downsample'], npad='auto') - - + runs = self.io.get_run_labels() + for run_label in runs: + job = RunJob(self.io.for_(run=run_label)) + job.add_to(pipeline) + if runs: + job = ConcatEpochsJob(self.io) + job.add_to(pipeline) diff --git a/eegprep/main.py b/eegprep/main.py index a56a192..024b922 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -1,6 +1,6 @@ from eegprep.args import parse_arguments from eegprep.pipeline import Pipeline -from eegprep.inputoutput import InputOutput +from eegprep.input_output import InputOutput from eegprep.jobs.subject import SubjectJob diff --git a/eegprep/preproc_dataset.py b/eegprep/preproc_dataset.py deleted file mode 100644 index 7e45f46..0000000 --- a/eegprep/preproc_dataset.py +++ /dev/null @@ -1,30 +0,0 @@ -from os.path import join, basename, splitext -import os, glob, random -# from eegprep.configuration import Configuration -# from eegprep.defaults import defaults -from eegprep.preproc_subject import preproc_subject -from bids import BIDSLayout - - -def preproc_dataset(datadir): - - # print('data directory: {}'.format(datadir)) - # conf_file_path = join(datadir, 'eegprep.conf') - # config = Configuration() - # config.setDefaults(defaults) - # if os.path.isfile(conf_file_path): - # with open(conf_file_path) as fh: - # conf_string = fh.read() - # config.updateFromString(conf_string) - # print('configuration:') - # print(config) - - eegprepdir = join(datadir, 'derivatives', 'eegprep') - layout = BIDSLayout(datadir) - subjects = layout.get(return_type='id', target='subject') - for subject in subjects: - - subjectdir = join(eegprepdir, 'sub-' + subject) - os.makedirs(subjectdir, exist_ok=True) - out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) - preproc_subject(layout, subject, out_fpath) \ No newline at end of file diff --git a/eegprep/preproc_run.py b/eegprep/preproc_run.py deleted file mode 100644 index bccca8c..0000000 --- a/eegprep/preproc_run.py +++ /dev/null @@ -1,102 +0,0 @@ -from os.path import join, basename -import os, glob, random, numpy, mne, pandas -from eegprep.guess import guess_montage -from eegprep.util import ( - resample_events_on_resampled_epochs, - plot_rejectlog, - save_rejectlog -) - - -def preproc_run(fpath): - - print(basename(fpath)) - #sub, ses, task, run = filename2tuple(basename(fname)) - - # read data - raw = mne.io.read_raw_bdf(fpath, preload=True, verbose=False) - - # Set channel types and select reference channels - channelFile = fpath.replace('eeg.bdf', 'channels.tsv') # maybe should be a string arg - channels = pandas.read_csv(channelFile, index_col='name', sep='\t') - bids2mne = { - 'MISC': 'misc', - 'EEG': 'eeg', - 'EOG': 'eog', - 'VEOG': 'eog', - 'TRIG': 'stim', - 'REF': 'eeg', - } - channels['mne'] = channels.type.replace(bids2mne) - raw.set_channel_types(channels.mne.to_dict()) - - - # Set reference - refChannels = channels[channels.type=='REF'].index.tolist() - raw = raw.set_eeg_reference(ref_channels=refChannels) - # can now drop reference electrodes - raw.set_channel_types({k: 'misc' for k in refChannels}) - - # set bad channels - # raw.info['bads'] = channels[channels.status=='bad'].index.tolist() - - # pick channels to use for epoching - #epoching_picks = mne.pick_types(raw.info, eeg=True, eog=False, stim=False, exclude='bads') - - - # Filtering - raw = raw.filter(l_freq=0.05, h_freq=45, fir_design='firwin') - - montage = mne.channels.read_montage(guess_montage(raw.ch_names)) - # print(montage) - raw = raw.set_montage(montage, verbose=False) - - # plot raw data - # nchans = len(raw.ch_names) - # pick_channels = numpy.arange(0, nchans, numpy.floor(nchans/20)).astype(int) - # start = numpy.round(raw.times.max()/2) - # fig = raw.plot(start=start, order=pick_channels) - # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_raw.png'.format(sub, ses, task, run) - # fig.savefig(join(reportsdir, fname_plot)) - - - events = mne.find_events(raw, verbose=False) #raw, consecutive=False, min_duration=0.005) - ## epoching - picks = mne.pick_types(raw.info, eeg=True) - epochs_params = dict( - events=events, - tmin=-0.2, - tmax=0.8, - picks=picks, - verbose=False - ) - epochs = mne.Epochs(raw, preload=True, **epochs_params) - epochs = epochs.resample(256., npad='auto') # downsample - # file_epochs.drop_channels(refChannels) - - # # autoreject (under development) - # ar = AutoReject(n_jobs=4) - # clean_epochs = ar.fit_transform(file_epochs) - - # rejectlog = ar.get_reject_log(clean_epochs) - # fname_log = 'sub-{}_ses-{}_task-{}_run-{}_reject-log.npz'.format(sub, ses, task, run) - # save_rejectlog(join(reportsdir, fname_log), rejectlog) - # fig = plot_rejectlog(rejectlog) - # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_bad-epochs.png'.format(sub, ses, task, run) - # fig.savefig(join(reportsdir, fname_plot)) - - - # # store for now - # # subject_epochs[(ses, task, run)] = clean_epochs - - # # create evoked plots - # conds = clean_epochs.event_id.keys() - # selected_conds = random.sample(conds, min(len(conds), 6)) - # picks = mne.pick_types(clean_epochs.info, eeg=True) - # for cond in selected_conds: - # evoked = clean_epochs[cond].average() - # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) - # fig = evoked.plot_joint(picks=picks) - # fig.savefig(join(reportsdir, fname_plot)) - return epochs - From eeb2a8729dae3c09db7b9b3af15945e98065f807 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sat, 23 Nov 2019 17:33:25 +0000 Subject: [PATCH 17/32] introduced Log --- eegprep/input_output.py | 2 +- eegprep/jobs/base.py | 3 ++- eegprep/jobs/subject.py | 3 --- eegprep/log.py | 11 +++++++++++ eegprep/main.py | 17 ++++++++++++----- eegprep/pipeline.py | 2 +- tests/log_tests.py | 14 ++++++++++++++ tests/naming_tests.py | 17 ----------------- 8 files changed, 41 insertions(+), 28 deletions(-) create mode 100644 eegprep/log.py create mode 100644 tests/log_tests.py delete mode 100644 tests/naming_tests.py diff --git a/eegprep/input_output.py b/eegprep/input_output.py index b511f2a..8f68df1 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -6,7 +6,7 @@ class InputOutput(object): - def __init__(self, root_dir): + def __init__(self, root_dir, log): pass def get_subject_labels(self): diff --git a/eegprep/jobs/base.py b/eegprep/jobs/base.py index c87ce20..9afd19a 100644 --- a/eegprep/jobs/base.py +++ b/eegprep/jobs/base.py @@ -2,8 +2,9 @@ class BaseJob(object): - def __init__(self, io): + def __init__(self, io, log): self.io = io + self.log = log def add_to(self, pipeline): raise NotImplementedError(self.__class__ + '.add_to()') diff --git a/eegprep/jobs/subject.py b/eegprep/jobs/subject.py index a424c7d..a79cee5 100644 --- a/eegprep/jobs/subject.py +++ b/eegprep/jobs/subject.py @@ -5,9 +5,6 @@ class SubjectJob(BaseJob): - def __init__(self, io): - self.io = io - def add_to(self, pipeline): runs = self.io.get_run_labels() for run_label in runs: diff --git a/eegprep/log.py b/eegprep/log.py new file mode 100644 index 0000000..095b568 --- /dev/null +++ b/eegprep/log.py @@ -0,0 +1,11 @@ + + +class Log(object): + + def new_partial_log(self): + return self + + def received_arguments(self, args): + pass + + # TODO: job can flush log after done: log.flush(io) (io.write_text(log.xyz)) \ No newline at end of file diff --git a/eegprep/main.py b/eegprep/main.py index 024b922..f0fcab3 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -1,5 +1,6 @@ from eegprep.args import parse_arguments from eegprep.pipeline import Pipeline +from eegprep.log import Log from eegprep.input_output import InputOutput from eegprep.jobs.subject import SubjectJob @@ -11,13 +12,17 @@ def run(args=None): args (Namespace, optional): Object with eeg prep parameters as attributes. Defaults to None. """ + log = Log() args = args or parse_arguments() - print(args) + log.received_arguments(args) + io = InputOutput( - root_dir = args.data_directory + root_dir = args.data_directory, + log ) pipeline = Pipeline( - dry = args.dry_run + dry = args.dry_run, + log ) subjects = io.get_subject_labels() @@ -26,8 +31,10 @@ def run(args=None): if args.subject_label: subjects = [args.subject_label] - for subject_label in subjects: - job = SubjectJob(io.for_(subject=subject_label)) + job = SubjectJob( + io.for_(subject=subject_label), + log.new_partial_log() + ) job.add_to(pipeline) pipeline.run() diff --git a/eegprep/pipeline.py b/eegprep/pipeline.py index b6db681..ce659d4 100644 --- a/eegprep/pipeline.py +++ b/eegprep/pipeline.py @@ -1,7 +1,7 @@ class Pipeline(object): - def __init__(self, dry=False): + def __init__(self, dry, log): pass def run(self): diff --git a/tests/log_tests.py b/tests/log_tests.py new file mode 100644 index 0000000..ed24046 --- /dev/null +++ b/tests/log_tests.py @@ -0,0 +1,14 @@ +from unittest import TestCase +from unittest.mock import Mock + +class MockNamespace(object): + pass + + +class LogTests(TestCase): + + def test_received_arguments(self): + from eegprep.log import Log + args = MockNamespace() + log = Log() + log.received_arguments(args) diff --git a/tests/naming_tests.py b/tests/naming_tests.py deleted file mode 100644 index 6069df2..0000000 --- a/tests/naming_tests.py +++ /dev/null @@ -1,17 +0,0 @@ -from unittest import TestCase - - -class TestNaming(TestCase): - - def test_filename2tuple(self): - from eegprep.bids.naming import filename2tuple - fname = 'sub-03_ses-01_task-irsa_run-15_eeg.set' - sub, ses, task, run = filename2tuple(fname) - self.assertEqual(sub, '03') - self.assertEqual(ses, '01') - self.assertEqual(task, 'irsa') - self.assertEqual(run, '15') - - def test_args2filename(self): - from eegprep.bids.naming import args2filename - self.assertEqual(args2filename(sub='02'), 'sub-02') From 7218817676409101b9f863e91672ce7f47be169b Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sat, 23 Nov 2019 18:21:28 +0000 Subject: [PATCH 18/32] implemented log.received_arguments() --- eegprep/input_output.py | 2 +- eegprep/log.py | 8 +++++++- eegprep/main.py | 10 ++-------- eegprep/pipeline.py | 2 +- setup.py | 2 +- tests/log_tests.py | 9 +++++++++ 6 files changed, 21 insertions(+), 12 deletions(-) diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 8f68df1..4f6ae14 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -6,7 +6,7 @@ class InputOutput(object): - def __init__(self, root_dir, log): + def __init__(self, log, root_dir): pass def get_subject_labels(self): diff --git a/eegprep/log.py b/eegprep/log.py index 095b568..1d98a24 100644 --- a/eegprep/log.py +++ b/eegprep/log.py @@ -2,10 +2,16 @@ class Log(object): + def write(self, message): + print(message) + def new_partial_log(self): return self def received_arguments(self, args): - pass + m = 'eegprep command arguments:\n\t' + m += '\n\t'.join([f'{k}: {v}' for k, v in vars(args).items()]) + self.write(m) + # TODO: job can flush log after done: log.flush(io) (io.write_text(log.xyz)) \ No newline at end of file diff --git a/eegprep/main.py b/eegprep/main.py index f0fcab3..0d7cf4f 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -16,14 +16,8 @@ def run(args=None): args = args or parse_arguments() log.received_arguments(args) - io = InputOutput( - root_dir = args.data_directory, - log - ) - pipeline = Pipeline( - dry = args.dry_run, - log - ) + io = InputOutput(log, args.data_directory) + pipeline = Pipeline(log, args.dry_run) subjects = io.get_subject_labels() if args.subject_index: diff --git a/eegprep/pipeline.py b/eegprep/pipeline.py index ce659d4..0f2b053 100644 --- a/eegprep/pipeline.py +++ b/eegprep/pipeline.py @@ -1,7 +1,7 @@ class Pipeline(object): - def __init__(self, dry, log): + def __init__(self, log, dry): pass def run(self): diff --git a/setup.py b/setup.py index 5803aee..b392826 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ author='', author_email='', keywords='analysis eeg BIDS', - packages=['eegprep', 'eegprep.bids'], + packages=['eegprep'], include_package_data=True, zip_safe=False, install_requires=requires, diff --git a/tests/log_tests.py b/tests/log_tests.py index ed24046..ca5afa7 100644 --- a/tests/log_tests.py +++ b/tests/log_tests.py @@ -1,6 +1,7 @@ from unittest import TestCase from unittest.mock import Mock + class MockNamespace(object): pass @@ -10,5 +11,13 @@ class LogTests(TestCase): def test_received_arguments(self): from eegprep.log import Log args = MockNamespace() + args.foo = 'abc' + args.bar = 1 log = Log() + log.write = Mock() log.received_arguments(args) + log.write.assert_called_with( + 'eegprep command arguments:\n' + '\tfoo: abc\n' + '\tbar: 1' + ) From 719c1ba5ec275ec88ef406dce5eee7a17d992b8d Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sat, 23 Nov 2019 20:18:37 +0000 Subject: [PATCH 19/32] created new jobs for one run --- eegprep/input_output.py | 25 +++++++---- eegprep/jobs/base.py | 4 +- eegprep/jobs/epoch.py | 19 +++++++++ eegprep/jobs/filter.py | 8 ++++ eegprep/jobs/read.py | 46 +++++++++++++++++++++ eegprep/jobs/run.py | 92 +++++------------------------------------ eegprep/jobs/subject.py | 4 +- eegprep/log.py | 7 ++++ eegprep/pipeline.py | 7 +++- tests/log_tests.py | 15 +++++-- 10 files changed, 129 insertions(+), 98 deletions(-) create mode 100644 eegprep/jobs/epoch.py create mode 100644 eegprep/jobs/filter.py create mode 100644 eegprep/jobs/read.py diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 4f6ae14..6d84140 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -1,4 +1,4 @@ -#from bids import BIDSLayout +from bids import BIDSLayout # TODO: # contains input, output, mem_storage, report @@ -7,16 +7,27 @@ class InputOutput(object): def __init__(self, log, root_dir): - pass + self.log = log + self.root_dir = root_dir + self._layout = None + + @property + def layout(self): + if self._layout is None: + self.log.discovering_data() + self._layout = BIDSLayout(self.root_dir) + return self._layout def get_subject_labels(self): - # TODO: must be sorted - return '' + subjects = self.layout.get(return_type='id', target='subject') + self.log.found_subjects(subjects) + return subjects - def for_(self, subject=None): - pass + def get_run_labels(self): + return self.layout.get(return_type='id', target='run') -# subjects = layout.get(return_type='id', target='subject') + def for_(self, subject=None, run=None): + return self # - # output # - eegprepdir = join(args.data_directory, 'derivatives', 'eegprep') diff --git a/eegprep/jobs/base.py b/eegprep/jobs/base.py index 9afd19a..ea1e5fe 100644 --- a/eegprep/jobs/base.py +++ b/eegprep/jobs/base.py @@ -7,7 +7,7 @@ def __init__(self, io, log): self.log = log def add_to(self, pipeline): - raise NotImplementedError(self.__class__ + '.add_to()') + pipeline.add(self) def run(self): - raise NotImplementedError(self.__class__ + '.run()') + raise NotImplementedError(self.__class__.__name__ + '.run()') diff --git a/eegprep/jobs/epoch.py b/eegprep/jobs/epoch.py new file mode 100644 index 0000000..76370c6 --- /dev/null +++ b/eegprep/jobs/epoch.py @@ -0,0 +1,19 @@ +from eegprep.jobs.base import BaseJob + + +class EpochJob(BaseJob): + + def run(self): + events = mne.find_events(raw, verbose=False) #raw, consecutive=False, min_duration=0.005) + ## epoching + picks = mne.pick_types(raw.info, eeg=True) + epochs_params = dict( + events=events, + tmin=-0.2, + tmax=0.8, + picks=picks, + verbose=False + ) + epochs = mne.Epochs(raw, preload=True, **epochs_params) + #epochs = epochs.resample(256., npad='auto') # downsample + # file_epochs.drop_channels(refChannels) diff --git a/eegprep/jobs/filter.py b/eegprep/jobs/filter.py new file mode 100644 index 0000000..8b26067 --- /dev/null +++ b/eegprep/jobs/filter.py @@ -0,0 +1,8 @@ +from eegprep.jobs.base import BaseJob + + +class FilterJob(BaseJob): + + def run(self): + # Filtering + raw = raw.filter(l_freq=0.05, h_freq=45, fir_design='firwin') diff --git a/eegprep/jobs/read.py b/eegprep/jobs/read.py new file mode 100644 index 0000000..5743c67 --- /dev/null +++ b/eegprep/jobs/read.py @@ -0,0 +1,46 @@ +from eegprep.jobs.base import BaseJob +# from os.path import join, basename +# import os, glob, random, numpy, mne, pandas +# from eegprep.guess import guess_montage + + +class ReadJob(BaseJob): + + def run(self): + # read data + raw = mne.io.read_raw_bdf(fpath, preload=True, verbose=False) + + # Set channel types and select reference channels + channelFile = fpath.replace('eeg.bdf', 'channels.tsv') # maybe should be a string arg + channels = pandas.read_csv(channelFile, index_col='name', sep='\t') + bids2mne = { + 'MISC': 'misc', + 'EEG': 'eeg', + 'EOG': 'eog', + 'VEOG': 'eog', + 'TRIG': 'stim', + 'REF': 'eeg', + } + channels['mne'] = channels.type.replace(bids2mne) + raw.set_channel_types(channels.mne.to_dict()) + + + + + # can now drop reference electrodes + raw.set_channel_types({k: 'misc' for k in refChannels}) + + # set bad channels + # raw.info['bads'] = channels[channels.status=='bad'].index.tolist() + + # pick channels to use for epoching + #epoching_picks = mne.pick_types(raw.info, eeg=True, eog=False, stim=False, exclude='bads') + + montage = mne.channels.read_montage(guess_montage(raw.ch_names)) + # print(montage) + raw = raw.set_montage(montage, verbose=False) + + + # Set reference + refChannels = channels[channels.type=='REF'].index.tolist() + raw = raw.set_eeg_reference(ref_channels=refChannels) \ No newline at end of file diff --git a/eegprep/jobs/run.py b/eegprep/jobs/run.py index 0cad8c0..b574c9a 100644 --- a/eegprep/jobs/run.py +++ b/eegprep/jobs/run.py @@ -1,60 +1,20 @@ from eegprep.jobs.base import BaseJob -from os.path import join, basename -import os, glob, random, numpy, mne, pandas -from eegprep.guess import guess_montage -from eegprep.util import ( - resample_events_on_resampled_epochs, - plot_rejectlog, - save_rejectlog -) +from eegprep.jobs.read import ReadJob +from eegprep.jobs.filter import FilterJob +from eegprep.jobs.epoch import EpochJob class RunJob(BaseJob): """Represents preprocessing of one raw data file. """ - def run(self): - - print(basename(fpath)) - #sub, ses, task, run = filename2tuple(basename(fname)) - - # read data - raw = mne.io.read_raw_bdf(fpath, preload=True, verbose=False) - - # Set channel types and select reference channels - channelFile = fpath.replace('eeg.bdf', 'channels.tsv') # maybe should be a string arg - channels = pandas.read_csv(channelFile, index_col='name', sep='\t') - bids2mne = { - 'MISC': 'misc', - 'EEG': 'eeg', - 'EOG': 'eog', - 'VEOG': 'eog', - 'TRIG': 'stim', - 'REF': 'eeg', - } - channels['mne'] = channels.type.replace(bids2mne) - raw.set_channel_types(channels.mne.to_dict()) - - - # Set reference - refChannels = channels[channels.type=='REF'].index.tolist() - raw = raw.set_eeg_reference(ref_channels=refChannels) - # can now drop reference electrodes - raw.set_channel_types({k: 'misc' for k in refChannels}) - - # set bad channels - # raw.info['bads'] = channels[channels.status=='bad'].index.tolist() - - # pick channels to use for epoching - #epoching_picks = mne.pick_types(raw.info, eeg=True, eog=False, stim=False, exclude='bads') - - - # Filtering - raw = raw.filter(l_freq=0.05, h_freq=45, fir_design='firwin') - - montage = mne.channels.read_montage(guess_montage(raw.ch_names)) - # print(montage) - raw = raw.set_montage(montage, verbose=False) + def add_to(self, pipeline): + job = ReadJob(self.io, self.log) + job.add_to(pipeline) + job = FilterJob(self.io, self.log) + job.add_to(pipeline) + job = EpochJob(self.io, self.log) + job.add_to(pipeline) # plot raw data # nchans = len(raw.ch_names) @@ -64,36 +24,6 @@ def run(self): # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_raw.png'.format(sub, ses, task, run) # fig.savefig(join(reportsdir, fname_plot)) - - events = mne.find_events(raw, verbose=False) #raw, consecutive=False, min_duration=0.005) - ## epoching - picks = mne.pick_types(raw.info, eeg=True) - epochs_params = dict( - events=events, - tmin=-0.2, - tmax=0.8, - picks=picks, - verbose=False - ) - epochs = mne.Epochs(raw, preload=True, **epochs_params) - epochs = epochs.resample(256., npad='auto') # downsample - # file_epochs.drop_channels(refChannels) - - # # autoreject (under development) - # ar = AutoReject(n_jobs=4) - # clean_epochs = ar.fit_transform(file_epochs) - - # rejectlog = ar.get_reject_log(clean_epochs) - # fname_log = 'sub-{}_ses-{}_task-{}_run-{}_reject-log.npz'.format(sub, ses, task, run) - # save_rejectlog(join(reportsdir, fname_log), rejectlog) - # fig = plot_rejectlog(rejectlog) - # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_bad-epochs.png'.format(sub, ses, task, run) - # fig.savefig(join(reportsdir, fname_plot)) - - - # # store for now - # # subject_epochs[(ses, task, run)] = clean_epochs - # # create evoked plots # conds = clean_epochs.event_id.keys() # selected_conds = random.sample(conds, min(len(conds), 6)) @@ -103,5 +33,3 @@ def run(self): # fname_plot = 'sub-{}_ses-{}_task-{}_run-{}_evoked-{}.png'.format(sub, ses, task, run, cond) # fig = evoked.plot_joint(picks=picks) # fig.savefig(join(reportsdir, fname_plot)) - return epochs - diff --git a/eegprep/jobs/subject.py b/eegprep/jobs/subject.py index a79cee5..ce43dba 100644 --- a/eegprep/jobs/subject.py +++ b/eegprep/jobs/subject.py @@ -8,8 +8,8 @@ class SubjectJob(BaseJob): def add_to(self, pipeline): runs = self.io.get_run_labels() for run_label in runs: - job = RunJob(self.io.for_(run=run_label)) + job = RunJob(self.io.for_(run=run_label), self.log) job.add_to(pipeline) if runs: - job = ConcatEpochsJob(self.io) + job = ConcatEpochsJob(self.io, self.log) job.add_to(pipeline) diff --git a/eegprep/log.py b/eegprep/log.py index 1d98a24..a3d06c8 100644 --- a/eegprep/log.py +++ b/eegprep/log.py @@ -13,5 +13,12 @@ def received_arguments(self, args): m += '\n\t'.join([f'{k}: {v}' for k, v in vars(args).items()]) self.write(m) + def found_subjects(self, subjects): + listed = ', '.join(subjects) + self.write(f'found {len(subjects)} subjects: {listed}') + + def discovering_data(self): + self.write('discovering data..') + # TODO: job can flush log after done: log.flush(io) (io.write_text(log.xyz)) \ No newline at end of file diff --git a/eegprep/pipeline.py b/eegprep/pipeline.py index 0f2b053..c5c9c3e 100644 --- a/eegprep/pipeline.py +++ b/eegprep/pipeline.py @@ -2,7 +2,12 @@ class Pipeline(object): def __init__(self, log, dry): - pass + self.log = log + self.dry = dry + self.jobs = [] + + def add(self, job): + self.jobs.append(job) def run(self): pass diff --git a/tests/log_tests.py b/tests/log_tests.py index ca5afa7..de2cab4 100644 --- a/tests/log_tests.py +++ b/tests/log_tests.py @@ -2,14 +2,12 @@ from unittest.mock import Mock -class MockNamespace(object): - pass - - class LogTests(TestCase): def test_received_arguments(self): from eegprep.log import Log + class MockNamespace(object): + pass args = MockNamespace() args.foo = 'abc' args.bar = 1 @@ -21,3 +19,12 @@ def test_received_arguments(self): '\tfoo: abc\n' '\tbar: 1' ) + + def test_found_subjects(self): + from eegprep.log import Log + log = Log() + log.write = Mock() + log.found_subjects(['pilot1', '03', 'pilot2', '02']) + log.write.assert_called_with( + 'found 4 subjects: pilot1, 03, pilot2, 02' + ) From c6ce51c3f7de25941b9aca47c232c40c3c12211f Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sat, 23 Nov 2019 20:45:41 +0000 Subject: [PATCH 20/32] InputOutput can provide a sub-scope --- eegprep/input_output.py | 15 ++++++++++++--- eegprep/jobs/base.py | 9 +++++++++ eegprep/log.py | 12 +++++++++--- eegprep/pipeline.py | 4 +++- tests/log_tests.py | 4 ++-- 5 files changed, 35 insertions(+), 9 deletions(-) diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 6d84140..6108b00 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -1,4 +1,5 @@ from bids import BIDSLayout +import copy # TODO: # contains input, output, mem_storage, report @@ -6,10 +7,11 @@ class InputOutput(object): - def __init__(self, log, root_dir): + def __init__(self, log, root_dir, scope=None, layout=None): self.log = log self.root_dir = root_dir - self._layout = None + self._layout = layout or None + self.scope = scope or dict() @property def layout(self): @@ -17,6 +19,9 @@ def layout(self): self.log.discovering_data() self._layout = BIDSLayout(self.root_dir) return self._layout + + def describe_scope(self): + return ' '.join([f'{k[:3]}={v}' for k, v in self.scope.items()]) def get_subject_labels(self): subjects = self.layout.get(return_type='id', target='subject') @@ -27,7 +32,11 @@ def get_run_labels(self): return self.layout.get(return_type='id', target='run') def for_(self, subject=None, run=None): - return self + new_scope = copy.copy(self.scope) + for spec, val in dict(subject=subject, run=run).items(): + if val is not None: + new_scope[spec] = val + return InputOutput(self.log, self.root_dir, new_scope, self.layout) # - # output # - eegprepdir = join(args.data_directory, 'derivatives', 'eegprep') diff --git a/eegprep/jobs/base.py b/eegprep/jobs/base.py index ea1e5fe..6f21494 100644 --- a/eegprep/jobs/base.py +++ b/eegprep/jobs/base.py @@ -6,6 +6,15 @@ def __init__(self, io, log): self.io = io self.log = log + def describe(self): + """Return a string that describes this job + + Returns: + str: one-line string describing this job and it's scope + """ + scope = self.io.describe_scope() + return scope + ' ' + self.__class__.__name__.replace('Job', '') + def add_to(self, pipeline): pipeline.add(self) diff --git a/eegprep/log.py b/eegprep/log.py index a3d06c8..3a6debb 100644 --- a/eegprep/log.py +++ b/eegprep/log.py @@ -9,16 +9,22 @@ def new_partial_log(self): return self def received_arguments(self, args): - m = 'eegprep command arguments:\n\t' + m = 'Command arguments:\n\t' m += '\n\t'.join([f'{k}: {v}' for k, v in vars(args).items()]) self.write(m) def found_subjects(self, subjects): listed = ', '.join(subjects) - self.write(f'found {len(subjects)} subjects: {listed}') + self.write(f'Found {len(subjects)} subjects: {listed}') + + def started_pipeline(self, jobs): + m = f'Starting pipeline with {len(jobs)} jobs:\n' + job_lines = [f'{j}: {job.describe()}' for j, job in enumerate(jobs)] + m += '\n'.join(job_lines) + self.write(m) def discovering_data(self): - self.write('discovering data..') + self.write('Discovering data..') # TODO: job can flush log after done: log.flush(io) (io.write_text(log.xyz)) \ No newline at end of file diff --git a/eegprep/pipeline.py b/eegprep/pipeline.py index c5c9c3e..e03b467 100644 --- a/eegprep/pipeline.py +++ b/eegprep/pipeline.py @@ -10,4 +10,6 @@ def add(self, job): self.jobs.append(job) def run(self): - pass + self.log.started_pipeline(self.jobs) + if self.dry: + return diff --git a/tests/log_tests.py b/tests/log_tests.py index de2cab4..a199e0d 100644 --- a/tests/log_tests.py +++ b/tests/log_tests.py @@ -15,7 +15,7 @@ class MockNamespace(object): log.write = Mock() log.received_arguments(args) log.write.assert_called_with( - 'eegprep command arguments:\n' + 'Command arguments:\n' '\tfoo: abc\n' '\tbar: 1' ) @@ -26,5 +26,5 @@ def test_found_subjects(self): log.write = Mock() log.found_subjects(['pilot1', '03', 'pilot2', '02']) log.write.assert_called_with( - 'found 4 subjects: pilot1, 03, pilot2, 02' + 'Found 4 subjects: pilot1, 03, pilot2, 02' ) From 6f62994fc5f87d126e32243fd19aec7ea19d06eb Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sun, 24 Nov 2019 00:31:58 +0000 Subject: [PATCH 21/32] cleaned up basic jobs --- eegprep/input_output.py | 60 ++++++++++++++++++++++++++++------------- eegprep/jobs/base.py | 4 +++ eegprep/jobs/epoch.py | 9 ++++--- eegprep/jobs/filter.py | 5 ++-- eegprep/jobs/read.py | 33 +++++++++-------------- eegprep/jobs/run.py | 2 +- eegprep/jobs/subject.py | 2 +- eegprep/log.py | 2 +- eegprep/main.py | 2 +- 9 files changed, 71 insertions(+), 48 deletions(-) diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 6108b00..545da06 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -1,9 +1,6 @@ from bids import BIDSLayout import copy -# TODO: # contains input, output, mem_storage, report - -# io.store(for_run=run, epochs) # io decides whether to keep in memory (with mem limit arg) or to store temp or store long term class InputOutput(object): @@ -15,6 +12,14 @@ def __init__(self, log, root_dir, scope=None, layout=None): @property def layout(self): + """pyBIDS layout object, lazily loaded. + + If the layout has not been created yet, it will + be done here, which takes time (<1min) for larger datasets. + + Returns: + bids.BIDSLayout: The BIDS layout object + """ if self._layout is None: self.log.discovering_data() self._layout = BIDSLayout(self.root_dir) @@ -23,32 +28,51 @@ def layout(self): def describe_scope(self): return ' '.join([f'{k[:3]}={v}' for k, v in self.scope.items()]) + def for_(self, subject=None, run=None): + new_scope = copy.copy(self.scope) + for spec, val in dict(subject=subject, run=run).items(): + if val is not None: + new_scope[spec] = val + return InputOutput(self.log, self.root_dir, new_scope, self.layout) + def get_subject_labels(self): subjects = self.layout.get(return_type='id', target='subject') self.log.found_subjects(subjects) return subjects def get_run_labels(self): + # TODO: must restrict to scope return self.layout.get(return_type='id', target='run') - def for_(self, subject=None, run=None): - new_scope = copy.copy(self.scope) - for spec, val in dict(subject=subject, run=run).items(): - if val is not None: - new_scope[spec] = val - return InputOutput(self.log, self.root_dir, new_scope, self.layout) + def get_filepath(self, suffix): + pass + + def store_object(self, obj, name, job): + pass + def retrieve_object(self, name): + pass + +# TODO: # contains input, output, mem_storage, report +# TODO: job can choose when to expire objects +# +# (parent job? interface?) self.add_child() + job.cleanup() + io.store('obj', for_=self) CleanupJob? +# self.cleanup_after(child_job) +# self.store_later(child_job) +# super().add_to() or pipeline.add(self) +# add_children_to(self, pipeline) +# TODO: io.store(for_run=run, epochs) # io decides whether to keep in memory (with mem limit arg) or to store temp or store long term +# eeg_runs = layout.get(subject=subject, suffix='eeg', extension='bdf') # - # output # - eegprepdir = join(args.data_directory, 'derivatives', 'eegprep') # - subjectdir = join(eegprepdir, 'sub-' + subject) # - os.makedirs(subjectdir, exist_ok=True) # - out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) - # eegprepdir = join(datadir, 'derivatives', 'eegprep') - # layout = BIDSLayout(datadir) - # subjects = layout.get(return_type='id', target='subject') - # for subject in subjects: - - # subjectdir = join(eegprepdir, 'sub-' + subject) - # os.makedirs(subjectdir, exist_ok=True) - # out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) - # preproc_subject(layout, subject, out_fpath) +# eegprepdir = join(datadir, 'derivatives', 'eegprep') +# layout = BIDSLayout(datadir) +# subjects = layout.get(return_type='id', target='subject') +# for subject in subjects: +# subjectdir = join(eegprepdir, 'sub-' + subject) +# os.makedirs(subjectdir, exist_ok=True) +# out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) +# preproc_subject(layout, subject, out_fpath) diff --git a/eegprep/jobs/base.py b/eegprep/jobs/base.py index 6f21494..176d598 100644 --- a/eegprep/jobs/base.py +++ b/eegprep/jobs/base.py @@ -16,7 +16,11 @@ def describe(self): return scope + ' ' + self.__class__.__name__.replace('Job', '') def add_to(self, pipeline): + self.add_children_to(pipeline) pipeline.add(self) + def add_children_to(self, pipeline): + raise NotImplementedError(self.__class__.__name__ + '.run()') + def run(self): raise NotImplementedError(self.__class__.__name__ + '.run()') diff --git a/eegprep/jobs/epoch.py b/eegprep/jobs/epoch.py index 76370c6..737a942 100644 --- a/eegprep/jobs/epoch.py +++ b/eegprep/jobs/epoch.py @@ -1,11 +1,13 @@ from eegprep.jobs.base import BaseJob +import mne class EpochJob(BaseJob): def run(self): - events = mne.find_events(raw, verbose=False) #raw, consecutive=False, min_duration=0.005) - ## epoching + raw = self.io.retrieve_object('raw') + # additional options: consecutive=False, min_duration=0.005) + events = mne.find_events(raw, verbose=False) picks = mne.pick_types(raw.info, eeg=True) epochs_params = dict( events=events, @@ -15,5 +17,4 @@ def run(self): verbose=False ) epochs = mne.Epochs(raw, preload=True, **epochs_params) - #epochs = epochs.resample(256., npad='auto') # downsample - # file_epochs.drop_channels(refChannels) + self.io.store_object(epochs, name='epochs', job=self) diff --git a/eegprep/jobs/filter.py b/eegprep/jobs/filter.py index 8b26067..c3f57cb 100644 --- a/eegprep/jobs/filter.py +++ b/eegprep/jobs/filter.py @@ -4,5 +4,6 @@ class FilterJob(BaseJob): def run(self): - # Filtering - raw = raw.filter(l_freq=0.05, h_freq=45, fir_design='firwin') + raw = self.io.retrieve_object('raw') + raw.filter(l_freq=0.05, h_freq=45, fir_design='firwin') + self.io.store_object(raw, name='raw', job=self) diff --git a/eegprep/jobs/read.py b/eegprep/jobs/read.py index 5743c67..0d3aaf9 100644 --- a/eegprep/jobs/read.py +++ b/eegprep/jobs/read.py @@ -1,18 +1,18 @@ +import mne, pandas from eegprep.jobs.base import BaseJob -# from os.path import join, basename -# import os, glob, random, numpy, mne, pandas -# from eegprep.guess import guess_montage +from eegprep.guess import guess_montage class ReadJob(BaseJob): def run(self): - # read data - raw = mne.io.read_raw_bdf(fpath, preload=True, verbose=False) + + fpath_raw = self.io.get_filepath(suffix='eeg') + raw = mne.io.read_raw_bdf(fpath_raw, preload=True, verbose=False) # Set channel types and select reference channels - channelFile = fpath.replace('eeg.bdf', 'channels.tsv') # maybe should be a string arg - channels = pandas.read_csv(channelFile, index_col='name', sep='\t') + fpath_channels = self.io.get_filepath(suffix='channels') + channels = pandas.read_csv(fpath_channels, index_col='name', sep='\t') bids2mne = { 'MISC': 'misc', 'EEG': 'eeg', @@ -24,23 +24,16 @@ def run(self): channels['mne'] = channels.type.replace(bids2mne) raw.set_channel_types(channels.mne.to_dict()) - - - - # can now drop reference electrodes - raw.set_channel_types({k: 'misc' for k in refChannels}) - # set bad channels # raw.info['bads'] = channels[channels.status=='bad'].index.tolist() - # pick channels to use for epoching - #epoching_picks = mne.pick_types(raw.info, eeg=True, eog=False, stim=False, exclude='bads') - montage = mne.channels.read_montage(guess_montage(raw.ch_names)) - # print(montage) - raw = raw.set_montage(montage, verbose=False) - + raw.set_montage(montage, verbose=False) # Set reference refChannels = channels[channels.type=='REF'].index.tolist() - raw = raw.set_eeg_reference(ref_channels=refChannels) \ No newline at end of file + raw.set_eeg_reference(ref_channels=refChannels) + # can now drop reference electrodes + raw.set_channel_types({k: 'misc' for k in refChannels}) + + self.io.store_object(raw, name='raw', job=self) diff --git a/eegprep/jobs/run.py b/eegprep/jobs/run.py index b574c9a..719cc03 100644 --- a/eegprep/jobs/run.py +++ b/eegprep/jobs/run.py @@ -8,7 +8,7 @@ class RunJob(BaseJob): """Represents preprocessing of one raw data file. """ - def add_to(self, pipeline): + def add_children_to(self, pipeline): job = ReadJob(self.io, self.log) job.add_to(pipeline) job = FilterJob(self.io, self.log) diff --git a/eegprep/jobs/subject.py b/eegprep/jobs/subject.py index ce43dba..ef77fc7 100644 --- a/eegprep/jobs/subject.py +++ b/eegprep/jobs/subject.py @@ -5,7 +5,7 @@ class SubjectJob(BaseJob): - def add_to(self, pipeline): + def add_children_to(self, pipeline): runs = self.io.get_run_labels() for run_label in runs: job = RunJob(self.io.for_(run=run_label), self.log) diff --git a/eegprep/log.py b/eegprep/log.py index 3a6debb..f2541be 100644 --- a/eegprep/log.py +++ b/eegprep/log.py @@ -19,7 +19,7 @@ def found_subjects(self, subjects): def started_pipeline(self, jobs): m = f'Starting pipeline with {len(jobs)} jobs:\n' - job_lines = [f'{j}: {job.describe()}' for j, job in enumerate(jobs)] + job_lines = [f'{j+1}: {job.describe()}' for j, job in enumerate(jobs)] m += '\n'.join(job_lines) self.write(m) diff --git a/eegprep/main.py b/eegprep/main.py index 0d7cf4f..d3d61fb 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -21,7 +21,7 @@ def run(args=None): subjects = io.get_subject_labels() if args.subject_index: - subjects = [subjects[args.subject_index]] + subjects = [subjects[args.subject_index-1]] if args.subject_label: subjects = [args.subject_label] From 40edeae662b306f58ded7349ad5fdc5837f56687 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sun, 24 Nov 2019 00:36:44 +0000 Subject: [PATCH 22/32] implemented io.get_filepath() --- eegprep/input_output.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 545da06..0e5deb7 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -41,11 +41,16 @@ def get_subject_labels(self): return subjects def get_run_labels(self): - # TODO: must restrict to scope - return self.layout.get(return_type='id', target='run') + return self.layout.get(return_type='id', target='run', **self.scope) def get_filepath(self, suffix): - pass + fpaths = self.layout.get( + return_type='filename', + suffix=suffix, + **self.scope + ) + assert len(fpaths) == 1 + return fpaths[0] def store_object(self, obj, name, job): pass From 3603d80f153017b2ccce8289ea13799f1167c2f8 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sun, 24 Nov 2019 01:53:38 +0000 Subject: [PATCH 23/32] implemented a key/value memory storage --- eegprep/input_output.py | 23 +++++++++++++++++++---- eegprep/jobs/concat_epochs.py | 5 ++++- eegprep/main.py | 3 ++- eegprep/memory.py | 21 +++++++++++++++++++++ tests/memory_tests.py | 15 +++++++++++++++ 5 files changed, 61 insertions(+), 6 deletions(-) create mode 100644 eegprep/memory.py create mode 100644 tests/memory_tests.py diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 0e5deb7..52eed3e 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -4,8 +4,9 @@ class InputOutput(object): - def __init__(self, log, root_dir, scope=None, layout=None): + def __init__(self, log, memory, root_dir, scope=None, layout=None): self.log = log + self.memory = memory self.root_dir = root_dir self._layout = layout or None self.scope = scope or dict() @@ -33,7 +34,13 @@ def for_(self, subject=None, run=None): for spec, val in dict(subject=subject, run=run).items(): if val is not None: new_scope[spec] = val - return InputOutput(self.log, self.root_dir, new_scope, self.layout) + return InputOutput( + self.log, + self.memory, + self.root_dir, + new_scope, + self.layout + ) def get_subject_labels(self): subjects = self.layout.get(return_type='id', target='subject') @@ -53,10 +60,18 @@ def get_filepath(self, suffix): return fpaths[0] def store_object(self, obj, name, job): - pass + # TODO: identify job by string + identifiers = dict(name=name, **self.scope) + self.memory.store(obj, **identifiers) + + def retrieve_objects(self, name): + identifiers = dict(name=name, **self.scope) + return self.memory.retrieve(**identifiers) def retrieve_object(self, name): - pass + objects = self.retrieve_objects(name) + assert len(objects) == 1 + return objects[0] # TODO: # contains input, output, mem_storage, report # TODO: job can choose when to expire objects diff --git a/eegprep/jobs/concat_epochs.py b/eegprep/jobs/concat_epochs.py index 9a107fa..f44b77a 100644 --- a/eegprep/jobs/concat_epochs.py +++ b/eegprep/jobs/concat_epochs.py @@ -1,7 +1,10 @@ from eegprep.jobs.base import BaseJob +import mne class ConcatEpochsJob(BaseJob): def run(self): - pass + epochs_per_run = self.io.retrieve_objects('epochs') + epochs = mne.epochs.concatenate_epochs(epochs_per_run) + self.io.store_object(epochs, name='epochs', job=self) diff --git a/eegprep/main.py b/eegprep/main.py index d3d61fb..cfab563 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -1,6 +1,7 @@ from eegprep.args import parse_arguments from eegprep.pipeline import Pipeline from eegprep.log import Log +from eegprep.memory import Memory from eegprep.input_output import InputOutput from eegprep.jobs.subject import SubjectJob @@ -16,7 +17,7 @@ def run(args=None): args = args or parse_arguments() log.received_arguments(args) - io = InputOutput(log, args.data_directory) + io = InputOutput(log, Memory(), args.data_directory) pipeline = Pipeline(log, args.dry_run) subjects = io.get_subject_labels() diff --git a/eegprep/memory.py b/eegprep/memory.py new file mode 100644 index 0000000..f828833 --- /dev/null +++ b/eegprep/memory.py @@ -0,0 +1,21 @@ +from copy import copy + +class Memory(object): + """Stores arbitrary objects by a set of key/value pairs. + """ + + def __init__(self): + self.objects = {} + + def store(self, obj, **filters): + self.objects[frozenset(filters.items())] = obj + + def retrieve(self, **filters): + selection = [] + for object_key in self.objects.keys(): + for name, val in object_key: + if (name in filters) and (filters.get(name) != val): + break + else: + selection.append(object_key) + return [self.objects[k] for k in selection] diff --git a/tests/memory_tests.py b/tests/memory_tests.py new file mode 100644 index 0000000..60d1526 --- /dev/null +++ b/tests/memory_tests.py @@ -0,0 +1,15 @@ +from unittest import TestCase +from unittest.mock import Mock + + +class MemoryTests(TestCase): + + def test_store_retrieve(self): + from eegprep.memory import Memory + obj1, obj2, obj3, obj4 = Mock(), Mock(), Mock(), Mock() + ram = Memory() + ram.store(obj1, foo='a') + ram.store(obj2, foo='a', bar=1) + ram.store(obj3, foo='a', bar=2, baz=0.5) + ram.store(obj4, foo='b', bar=2) + self.assertEqual(ram.retrieve(foo='a', bar=2), [obj1, obj3]) From 1aa73fc284af06092ca3fb0e1dbc4b5002ebd58e Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sun, 24 Nov 2019 02:44:00 +0000 Subject: [PATCH 24/32] fixed input/output issues --- eegprep/input_output.py | 36 ++++++++++++++++++++++++++++-------- eegprep/jobs/base.py | 7 +++++-- eegprep/jobs/subject.py | 14 +++++++++----- eegprep/log.py | 3 +++ eegprep/pipeline.py | 3 +++ 5 files changed, 48 insertions(+), 15 deletions(-) diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 52eed3e..45c5e5b 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -29,9 +29,10 @@ def layout(self): def describe_scope(self): return ' '.join([f'{k[:3]}={v}' for k, v in self.scope.items()]) - def for_(self, subject=None, run=None): + def for_(self, subject=None, session=None, run=None): new_scope = copy.copy(self.scope) - for spec, val in dict(subject=subject, run=run).items(): + filters = dict(subject=subject, session=session, run=run) + for spec, val in filters.items(): if val is not None: new_scope[spec] = val return InputOutput( @@ -43,30 +44,49 @@ def for_(self, subject=None, run=None): ) def get_subject_labels(self): - subjects = self.layout.get(return_type='id', target='subject') + subjects = self.layout.get( + return_type='id', + target='subject', + datatype='eeg' + ) self.log.found_subjects(subjects) return subjects + def get_session_labels(self): + return self.layout.get( + return_type='id', + target='session', + datatype='eeg', + **self.scope + ) + def get_run_labels(self): - return self.layout.get(return_type='id', target='run', **self.scope) + return self.layout.get( + return_type='id', + target='run', + datatype='eeg', + **self.scope + ) def get_filepath(self, suffix): fpaths = self.layout.get( return_type='filename', suffix=suffix, + datatype='eeg', **self.scope ) + fpaths = [f for f in fpaths if '.json' not in f] assert len(fpaths) == 1 return fpaths[0] def store_object(self, obj, name, job): - # TODO: identify job by string - identifiers = dict(name=name, **self.scope) + job_id = job.get_id() + identifiers = dict(name=name, job=job_id, **self.scope) self.memory.store(obj, **identifiers) def retrieve_objects(self, name): - identifiers = dict(name=name, **self.scope) - return self.memory.retrieve(**identifiers) + filters = dict(name=name, **self.scope) + return self.memory.retrieve(**filters) def retrieve_object(self, name): objects = self.retrieve_objects(name) diff --git a/eegprep/jobs/base.py b/eegprep/jobs/base.py index 176d598..e603af5 100644 --- a/eegprep/jobs/base.py +++ b/eegprep/jobs/base.py @@ -6,6 +6,9 @@ def __init__(self, io, log): self.io = io self.log = log + def get_id(self): + return self.__class__.__name__.replace('Job', '') + def describe(self): """Return a string that describes this job @@ -13,14 +16,14 @@ def describe(self): str: one-line string describing this job and it's scope """ scope = self.io.describe_scope() - return scope + ' ' + self.__class__.__name__.replace('Job', '') + return scope + ' ' + self.get_id() def add_to(self, pipeline): self.add_children_to(pipeline) pipeline.add(self) def add_children_to(self, pipeline): - raise NotImplementedError(self.__class__.__name__ + '.run()') + pass def run(self): raise NotImplementedError(self.__class__.__name__ + '.run()') diff --git a/eegprep/jobs/subject.py b/eegprep/jobs/subject.py index ef77fc7..dc63386 100644 --- a/eegprep/jobs/subject.py +++ b/eegprep/jobs/subject.py @@ -6,10 +6,14 @@ class SubjectJob(BaseJob): def add_children_to(self, pipeline): - runs = self.io.get_run_labels() - for run_label in runs: - job = RunJob(self.io.for_(run=run_label), self.log) - job.add_to(pipeline) - if runs: + found_data = False + sessions = self.io.get_session_labels() + for session_label in sessions: + session_io = self.io.for_(session=session_label) + for run_label in session_io.get_run_labels(): + found_data = True + job = RunJob(session_io.for_(run=run_label), self.log) + job.add_to(pipeline) + if found_data: job = ConcatEpochsJob(self.io, self.log) job.add_to(pipeline) diff --git a/eegprep/log.py b/eegprep/log.py index f2541be..e5fd705 100644 --- a/eegprep/log.py +++ b/eegprep/log.py @@ -23,6 +23,9 @@ def started_pipeline(self, jobs): m += '\n'.join(job_lines) self.write(m) + def starting_job(self, job): + self.write(f'Starting job: ' + job.describe()) + def discovering_data(self): self.write('Discovering data..') diff --git a/eegprep/pipeline.py b/eegprep/pipeline.py index e03b467..ad2bbe8 100644 --- a/eegprep/pipeline.py +++ b/eegprep/pipeline.py @@ -13,3 +13,6 @@ def run(self): self.log.started_pipeline(self.jobs) if self.dry: return + for job in self.jobs: + self.log.starting_job(job) + job.run() From 0c9ae8d2d606e542a8c061a8a2586c3be0c10f88 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sun, 24 Nov 2019 16:33:03 +0000 Subject: [PATCH 25/32] io will overwrite existing object --- eegprep/input_output.py | 5 +++-- eegprep/log.py | 5 +++++ eegprep/main.py | 2 +- eegprep/memory.py | 19 ++++++++++++++++--- tests/memory_tests.py | 3 ++- 5 files changed, 27 insertions(+), 7 deletions(-) diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 45c5e5b..8a5fe8c 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -80,8 +80,9 @@ def get_filepath(self, suffix): return fpaths[0] def store_object(self, obj, name, job): - job_id = job.get_id() - identifiers = dict(name=name, job=job_id, **self.scope) + # first delete existing copies (overwriting) + self.memory.delete(name=name, **self.scope) + identifiers = dict(name=name, job=job.get_id(), **self.scope) self.memory.store(obj, **identifiers) def retrieve_objects(self, name): diff --git a/eegprep/log.py b/eegprep/log.py index e5fd705..08130da 100644 --- a/eegprep/log.py +++ b/eegprep/log.py @@ -29,5 +29,10 @@ def starting_job(self, job): def discovering_data(self): self.write('Discovering data..') + def storing_object_in_memory(self, key, obj): + self.write(f'Storing {repr(obj)} in memory store as {key}') + + def removing_object_from_memory(self, key, obj): + self.write(f'Removing {repr(obj)} from memory known as {key}') # TODO: job can flush log after done: log.flush(io) (io.write_text(log.xyz)) \ No newline at end of file diff --git a/eegprep/main.py b/eegprep/main.py index cfab563..b13c3a8 100644 --- a/eegprep/main.py +++ b/eegprep/main.py @@ -17,7 +17,7 @@ def run(args=None): args = args or parse_arguments() log.received_arguments(args) - io = InputOutput(log, Memory(), args.data_directory) + io = InputOutput(log, Memory(log), args.data_directory) pipeline = Pipeline(log, args.dry_run) subjects = io.get_subject_labels() diff --git a/eegprep/memory.py b/eegprep/memory.py index f828833..d845e83 100644 --- a/eegprep/memory.py +++ b/eegprep/memory.py @@ -4,13 +4,26 @@ class Memory(object): """Stores arbitrary objects by a set of key/value pairs. """ - def __init__(self): + def __init__(self, log): self.objects = {} + self.log = log def store(self, obj, **filters): - self.objects[frozenset(filters.items())] = obj + key = frozenset(filters.items()) + self.log.storing_object_in_memory(key, obj) + self.objects[key] = obj def retrieve(self, **filters): + selection = self.find_matching_keys(**filters) + return [self.objects[k] for k in selection] + + def delete(self, **filters): + selection = self.find_matching_keys(**filters) + for k in selection: + self.log.removing_object_from_memory(k, self.objects[k]) + del self.objects[k] + + def find_matching_keys(self, **filters): selection = [] for object_key in self.objects.keys(): for name, val in object_key: @@ -18,4 +31,4 @@ def retrieve(self, **filters): break else: selection.append(object_key) - return [self.objects[k] for k in selection] + return selection diff --git a/tests/memory_tests.py b/tests/memory_tests.py index 60d1526..17be51b 100644 --- a/tests/memory_tests.py +++ b/tests/memory_tests.py @@ -7,7 +7,8 @@ class MemoryTests(TestCase): def test_store_retrieve(self): from eegprep.memory import Memory obj1, obj2, obj3, obj4 = Mock(), Mock(), Mock(), Mock() - ram = Memory() + log = Mock() + ram = Memory(log) ram.store(obj1, foo='a') ram.store(obj2, foo='a', bar=1) ram.store(obj3, foo='a', bar=2, baz=0.5) From 5295ef94e78299922e95d8f48b7077aac3a1784c Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sun, 24 Nov 2019 17:39:14 +0000 Subject: [PATCH 26/32] parent jobs can cleanup children in cleanup phase --- eegprep/input_output.py | 21 +++++---------------- eegprep/jobs/base.py | 10 +++++++++- eegprep/jobs/run.py | 3 +++ eegprep/jobs/subject.py | 1 + eegprep/log.py | 7 +++++-- eegprep/pipeline.py | 2 ++ 6 files changed, 25 insertions(+), 19 deletions(-) diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 8a5fe8c..9dd5ba2 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -94,26 +94,15 @@ def retrieve_object(self, name): assert len(objects) == 1 return objects[0] -# TODO: # contains input, output, mem_storage, report -# TODO: job can choose when to expire objects -# -# (parent job? interface?) self.add_child() + job.cleanup() + io.store('obj', for_=self) CleanupJob? -# self.cleanup_after(child_job) -# self.store_later(child_job) -# super().add_to() or pipeline.add(self) -# add_children_to(self, pipeline) + def expire_output_of(self, job): + self.memory.delete(job=job.get_id(), **self.scope) + + # TODO: io.store(for_run=run, epochs) # io decides whether to keep in memory (with mem limit arg) or to store temp or store long term -# eeg_runs = layout.get(subject=subject, suffix='eeg', extension='bdf') + # - # output # - eegprepdir = join(args.data_directory, 'derivatives', 'eegprep') # - subjectdir = join(eegprepdir, 'sub-' + subject) # - os.makedirs(subjectdir, exist_ok=True) # - out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) # eegprepdir = join(datadir, 'derivatives', 'eegprep') -# layout = BIDSLayout(datadir) -# subjects = layout.get(return_type='id', target='subject') -# for subject in subjects: -# subjectdir = join(eegprepdir, 'sub-' + subject) -# os.makedirs(subjectdir, exist_ok=True) -# out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) -# preproc_subject(layout, subject, out_fpath) diff --git a/eegprep/jobs/base.py b/eegprep/jobs/base.py index e603af5..97a2eee 100644 --- a/eegprep/jobs/base.py +++ b/eegprep/jobs/base.py @@ -5,6 +5,7 @@ class BaseJob(object): def __init__(self, io, log): self.io = io self.log = log + self.jobs_to_expire = [] def get_id(self): return self.__class__.__name__.replace('Job', '') @@ -26,4 +27,11 @@ def add_children_to(self, pipeline): pass def run(self): - raise NotImplementedError(self.__class__.__name__ + '.run()') + pass + + def cleanup(self): + for job in self.jobs_to_expire: + self.io.expire_output_of(job) + + def expire_output_on_cleanup(self, job): + self.jobs_to_expire.append(job) diff --git a/eegprep/jobs/run.py b/eegprep/jobs/run.py index 719cc03..4372035 100644 --- a/eegprep/jobs/run.py +++ b/eegprep/jobs/run.py @@ -11,10 +11,13 @@ class RunJob(BaseJob): def add_children_to(self, pipeline): job = ReadJob(self.io, self.log) job.add_to(pipeline) + self.expire_output_on_cleanup(job) job = FilterJob(self.io, self.log) job.add_to(pipeline) + self.expire_output_on_cleanup(job) job = EpochJob(self.io, self.log) job.add_to(pipeline) + # io.store_output_of(job) # plot raw data # nchans = len(raw.ch_names) diff --git a/eegprep/jobs/subject.py b/eegprep/jobs/subject.py index dc63386..3c022fa 100644 --- a/eegprep/jobs/subject.py +++ b/eegprep/jobs/subject.py @@ -14,6 +14,7 @@ def add_children_to(self, pipeline): found_data = True job = RunJob(session_io.for_(run=run_label), self.log) job.add_to(pipeline) + self.expire_output_on_cleanup(job) if found_data: job = ConcatEpochsJob(self.io, self.log) job.add_to(pipeline) diff --git a/eegprep/log.py b/eegprep/log.py index 08130da..bec2300 100644 --- a/eegprep/log.py +++ b/eegprep/log.py @@ -26,13 +26,16 @@ def started_pipeline(self, jobs): def starting_job(self, job): self.write(f'Starting job: ' + job.describe()) + def cleaning_up_after_job(self, job): + self.write(f'Cleaning up after job: ' + job.describe()) + def discovering_data(self): self.write('Discovering data..') def storing_object_in_memory(self, key, obj): - self.write(f'Storing {repr(obj)} in memory store as {key}') + self.write(f'Storing object in memory store as {key}: {repr(obj)}') def removing_object_from_memory(self, key, obj): - self.write(f'Removing {repr(obj)} from memory known as {key}') + self.write(f'Removing object from memory known as {key}: {repr(obj)}') # TODO: job can flush log after done: log.flush(io) (io.write_text(log.xyz)) \ No newline at end of file diff --git a/eegprep/pipeline.py b/eegprep/pipeline.py index ad2bbe8..930c9d9 100644 --- a/eegprep/pipeline.py +++ b/eegprep/pipeline.py @@ -16,3 +16,5 @@ def run(self): for job in self.jobs: self.log.starting_job(job) job.run() + self.log.cleaning_up_after_job(job) + job.cleanup() From 9f0eff10c5e5a8eb4f3f1650446f162b681e1a77 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sun, 24 Nov 2019 18:28:29 +0000 Subject: [PATCH 27/32] writing epochs to file --- eegprep/input_output.py | 39 ++++++++++++++++++++++++++++--------- eegprep/jobs/base.py | 6 ++++++ eegprep/jobs/subject.py | 2 ++ eegprep/log.py | 3 +++ eegprep/memory.py | 5 ++++- tests/input_output_tests.py | 18 +++++++++++++++++ tests/memory_tests.py | 2 +- 7 files changed, 64 insertions(+), 11 deletions(-) create mode 100644 tests/input_output_tests.py diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 9dd5ba2..1698d9a 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -1,5 +1,7 @@ from bids import BIDSLayout import copy +from os.path import join, dirname, isdir +from os import makedirs class InputOutput(object): @@ -97,12 +99,31 @@ def retrieve_object(self, name): def expire_output_of(self, job): self.memory.delete(job=job.get_id(), **self.scope) - -# TODO: io.store(for_run=run, epochs) # io decides whether to keep in memory (with mem limit arg) or to store temp or store long term - -# - # output -# - eegprepdir = join(args.data_directory, 'derivatives', 'eegprep') -# - subjectdir = join(eegprepdir, 'sub-' + subject) -# - os.makedirs(subjectdir, exist_ok=True) -# - out_fpath = join(subjectdir, 'sub-{}_epo.npz'.format(subject)) -# eegprepdir = join(datadir, 'derivatives', 'eegprep') + def write_output_of(self, job): + keys = self.memory.find_matching_keys(job=job.get_id(), **self.scope) + for key in keys: + self.write_object(key, self.memory.get(key)) + + def write_object(self, descriptors, obj): + fpath = self.build_fpath(suffix=descriptors.name, ext='fif') + self.ensure_dir(dirname(fpath)) + self.log.writing_object(obj, fpath) + obj.save(fpath) + + def ensure_dir(self, dirpath): + if not isdir(dirpath): + makedirs(dirpath) + + def build_fpath(self, suffix, ext): + outdir = join(self.root_dir, 'derivatives', 'eegprep') + for entity in ('subject', 'session'): + if entity in self.scope: + label = self.scope[entity] + outdir = join(outdir, f'{entity[:3]}-{label}') + fname = '' + for entity in ('subject', 'session'): + if entity in self.scope: + label = self.scope[entity] + fname += f'{entity[:3]}-{label}_' + fname += f'{suffix}.{ext}' + return join(outdir, fname) diff --git a/eegprep/jobs/base.py b/eegprep/jobs/base.py index 97a2eee..24c578d 100644 --- a/eegprep/jobs/base.py +++ b/eegprep/jobs/base.py @@ -6,6 +6,7 @@ def __init__(self, io, log): self.io = io self.log = log self.jobs_to_expire = [] + self.jobs_to_write = [] def get_id(self): return self.__class__.__name__.replace('Job', '') @@ -30,8 +31,13 @@ def run(self): pass def cleanup(self): + for job in self.jobs_to_write: + self.io.write_output_of(job) for job in self.jobs_to_expire: self.io.expire_output_of(job) def expire_output_on_cleanup(self, job): self.jobs_to_expire.append(job) + + def write_output_on_cleanup(self, job): + self.jobs_to_write.append(job) diff --git a/eegprep/jobs/subject.py b/eegprep/jobs/subject.py index 3c022fa..896b607 100644 --- a/eegprep/jobs/subject.py +++ b/eegprep/jobs/subject.py @@ -18,3 +18,5 @@ def add_children_to(self, pipeline): if found_data: job = ConcatEpochsJob(self.io, self.log) job.add_to(pipeline) + self.write_output_on_cleanup(job) + self.expire_output_on_cleanup(job) diff --git a/eegprep/log.py b/eegprep/log.py index bec2300..8d36750 100644 --- a/eegprep/log.py +++ b/eegprep/log.py @@ -38,4 +38,7 @@ def storing_object_in_memory(self, key, obj): def removing_object_from_memory(self, key, obj): self.write(f'Removing object from memory known as {key}: {repr(obj)}') + def writing_object(self, obj, fpath): + self.write(f'Writing object to disk at {fpath}') + # TODO: job can flush log after done: log.flush(io) (io.write_text(log.xyz)) \ No newline at end of file diff --git a/eegprep/memory.py b/eegprep/memory.py index d845e83..d271cb0 100644 --- a/eegprep/memory.py +++ b/eegprep/memory.py @@ -8,12 +8,15 @@ def __init__(self, log): self.objects = {} self.log = log + def get(self, key): + return self.objects[key] + def store(self, obj, **filters): key = frozenset(filters.items()) self.log.storing_object_in_memory(key, obj) self.objects[key] = obj - def retrieve(self, **filters): + def find(self, **filters): selection = self.find_matching_keys(**filters) return [self.objects[k] for k in selection] diff --git a/tests/input_output_tests.py b/tests/input_output_tests.py new file mode 100644 index 0000000..166b0e1 --- /dev/null +++ b/tests/input_output_tests.py @@ -0,0 +1,18 @@ +from unittest import TestCase +from unittest.mock import Mock + + +class InputOutputTests(TestCase): + + def test_build_fpath(self): + from eegprep.input_output import InputOutput + log, memory, layout = Mock(), Mock(), Mock() + scope = { + 'subject': '02', + 'session': '05' + } + io = InputOutput(log, memory, '/data', scope, layout) + self.assertEqual( + io.build_fpath(suffix='hello', ext='foo'), + '/data/derivatives/eegprep/sub-02/ses-05/sub-02_ses-05_hello.foo' + ) diff --git a/tests/memory_tests.py b/tests/memory_tests.py index 17be51b..abf9e47 100644 --- a/tests/memory_tests.py +++ b/tests/memory_tests.py @@ -13,4 +13,4 @@ def test_store_retrieve(self): ram.store(obj2, foo='a', bar=1) ram.store(obj3, foo='a', bar=2, baz=0.5) ram.store(obj4, foo='b', bar=2) - self.assertEqual(ram.retrieve(foo='a', bar=2), [obj1, obj3]) + self.assertEqual(ram.find(foo='a', bar=2), [obj1, obj3]) From 244c4fb23950d718f7167634e81843bc7789ceb1 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sun, 24 Nov 2019 19:45:34 +0000 Subject: [PATCH 28/32] fixed frozenset.name bug --- eegprep/input_output.py | 9 +++++---- eegprep/memory.py | 1 + 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/eegprep/input_output.py b/eegprep/input_output.py index 1698d9a..ac2297a 100644 --- a/eegprep/input_output.py +++ b/eegprep/input_output.py @@ -89,7 +89,7 @@ def store_object(self, obj, name, job): def retrieve_objects(self, name): filters = dict(name=name, **self.scope) - return self.memory.retrieve(**filters) + return self.memory.find(**filters) def retrieve_object(self, name): objects = self.retrieve_objects(name) @@ -102,10 +102,11 @@ def expire_output_of(self, job): def write_output_of(self, job): keys = self.memory.find_matching_keys(job=job.get_id(), **self.scope) for key in keys: - self.write_object(key, self.memory.get(key)) + name = dict(key)['name'] + self.write_object(name, self.memory.get(key)) - def write_object(self, descriptors, obj): - fpath = self.build_fpath(suffix=descriptors.name, ext='fif') + def write_object(self, name, obj): + fpath = self.build_fpath(suffix=name, ext='fif') self.ensure_dir(dirname(fpath)) self.log.writing_object(obj, fpath) obj.save(fpath) diff --git a/eegprep/memory.py b/eegprep/memory.py index d271cb0..890e0d4 100644 --- a/eegprep/memory.py +++ b/eegprep/memory.py @@ -1,5 +1,6 @@ from copy import copy + class Memory(object): """Stores arbitrary objects by a set of key/value pairs. """ From 403dc50d1720e388dc0e7af38b7c0b960448f0bb Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Sun, 24 Nov 2019 19:59:27 +0000 Subject: [PATCH 29/32] changed name of epochs object --- eegprep/jobs/concat_epochs.py | 4 ++-- eegprep/jobs/epoch.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/eegprep/jobs/concat_epochs.py b/eegprep/jobs/concat_epochs.py index f44b77a..502abe6 100644 --- a/eegprep/jobs/concat_epochs.py +++ b/eegprep/jobs/concat_epochs.py @@ -5,6 +5,6 @@ class ConcatEpochsJob(BaseJob): def run(self): - epochs_per_run = self.io.retrieve_objects('epochs') + epochs_per_run = self.io.retrieve_objects('epo') epochs = mne.epochs.concatenate_epochs(epochs_per_run) - self.io.store_object(epochs, name='epochs', job=self) + self.io.store_object(epochs, name='epo', job=self) diff --git a/eegprep/jobs/epoch.py b/eegprep/jobs/epoch.py index 737a942..8b564f2 100644 --- a/eegprep/jobs/epoch.py +++ b/eegprep/jobs/epoch.py @@ -12,9 +12,9 @@ def run(self): epochs_params = dict( events=events, tmin=-0.2, - tmax=0.8, + tmax=3.1, picks=picks, verbose=False ) epochs = mne.Epochs(raw, preload=True, **epochs_params) - self.io.store_object(epochs, name='epochs', job=self) + self.io.store_object(epochs, name='epo', job=self) From 4e823949688deb5af1451ba4a03eb3e42fff5ad4 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Wed, 8 Jan 2020 20:52:48 +0000 Subject: [PATCH 30/32] can read edf files --- eegprep/jobs/read.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/eegprep/jobs/read.py b/eegprep/jobs/read.py index 0d3aaf9..509e16b 100644 --- a/eegprep/jobs/read.py +++ b/eegprep/jobs/read.py @@ -8,7 +8,12 @@ class ReadJob(BaseJob): def run(self): fpath_raw = self.io.get_filepath(suffix='eeg') - raw = mne.io.read_raw_bdf(fpath_raw, preload=True, verbose=False) + ext = fpath_raw[-3:] + raw_funcs = { + 'bdf': mne.io.read_raw_bdf, + 'edf': mne.io.read_raw_edf + } + raw = raw_funcs[ext](fpath_raw, preload=True, verbose=False) # Set channel types and select reference channels fpath_channels = self.io.get_filepath(suffix='channels') From ebb3414fd512b3b37b31903fceaeb786f109f806 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Wed, 8 Jan 2020 21:13:03 +0000 Subject: [PATCH 31/32] fixed deprecated way to read montage --- eegprep/jobs/read.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/eegprep/jobs/read.py b/eegprep/jobs/read.py index 509e16b..07a3999 100644 --- a/eegprep/jobs/read.py +++ b/eegprep/jobs/read.py @@ -32,13 +32,15 @@ def run(self): # set bad channels # raw.info['bads'] = channels[channels.status=='bad'].index.tolist() - montage = mne.channels.read_montage(guess_montage(raw.ch_names)) - raw.set_montage(montage, verbose=False) - # Set reference refChannels = channels[channels.type=='REF'].index.tolist() - raw.set_eeg_reference(ref_channels=refChannels) + raw.set_eeg_reference(ref_channels=refChannels) # can now drop reference electrodes raw.set_channel_types({k: 'misc' for k in refChannels}) + # tell MNE about electrode locations + montageName = guess_montage(raw.ch_names) + montage = mne.channels.make_standard_montage(kind=montageName) + raw.set_montage(montage, verbose=False) + self.io.store_object(raw, name='raw', job=self) From 710f1aab34d258e00dc3e17f1fb530bf4c78d718 Mon Sep 17 00:00:00 2001 From: Jasper van den Bosch Date: Wed, 8 Jan 2020 21:32:06 +0000 Subject: [PATCH 32/32] improved log messages of memory store --- eegprep/log.py | 17 ++++++++++++----- eegprep/memory.py | 11 +++++++++-- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/eegprep/log.py b/eegprep/log.py index 8d36750..c702cef 100644 --- a/eegprep/log.py +++ b/eegprep/log.py @@ -1,3 +1,4 @@ +from mne.utils.misc import sizeof_fmt as hsize class Log(object): @@ -32,11 +33,17 @@ def cleaning_up_after_job(self, job): def discovering_data(self): self.write('Discovering data..') - def storing_object_in_memory(self, key, obj): - self.write(f'Storing object in memory store as {key}: {repr(obj)}') - - def removing_object_from_memory(self, key, obj): - self.write(f'Removing object from memory known as {key}: {repr(obj)}') + def storing_object_in_memory(self, key, obj, size, total_size): + self.write( + f'Storing object ({hsize(size)}) ' + f'in memory store ({hsize(total_size)})' + ) + + def removing_object_from_memory(self, key, obj, size, total_size): + self.write( + f'Removing object ({hsize(size)}) ' + f'from memory ({hsize(total_size)})' + ) def writing_object(self, obj, fpath): self.write(f'Writing object to disk at {fpath}') diff --git a/eegprep/memory.py b/eegprep/memory.py index 890e0d4..c5c6a41 100644 --- a/eegprep/memory.py +++ b/eegprep/memory.py @@ -8,13 +8,16 @@ class Memory(object): def __init__(self, log): self.objects = {} self.log = log + self.total_size = 0 def get(self, key): return self.objects[key] def store(self, obj, **filters): key = frozenset(filters.items()) - self.log.storing_object_in_memory(key, obj) + size = obj._size + self.total_size += size + self.log.storing_object_in_memory(key, obj, size, self.total_size) self.objects[key] = obj def find(self, **filters): @@ -24,7 +27,11 @@ def find(self, **filters): def delete(self, **filters): selection = self.find_matching_keys(**filters) for k in selection: - self.log.removing_object_from_memory(k, self.objects[k]) + obj = self.objects[k] + size = obj._size + self.total_size -= size + self.log.removing_object_from_memory( + k, obj, size, self.total_size) del self.objects[k] def find_matching_keys(self, **filters):