Source code for step.participant

from dataclasses import dataclass

from .component import ComponentConfig, ComponentPipeline
from .epoch import EpochConfig, EpochPipeline
from .input import InputConfig, InputPipeline
from .preproc import PreprocConfig, PreprocPipeline


@dataclass
class ParticipantConfig:
    """The configuration for the participant pipeline."""

    input_config: InputConfig = None
    preproc_config: PreprocConfig = None
    epoch_config: EpochConfig = None
    component_config: ComponentConfig = None


[docs] class ParticipantPipeline: """The participant pipeline for processing the EEG data of a single participant.""" def __init__(self, config: ParticipantConfig): self.input_pipeline = InputPipeline(config.input_config) self.preproc_pipeline = PreprocPipeline(config.preproc_config) self.epoch_pipeline = EpochPipeline(config.epoch_config) self.component_pipeline = ComponentPipeline(config.component_config) def run(self): self.input_pipeline.run() self.preproc_pipeline.run(self.input_pipeline.raw, self.input_pipeline.besa) self.epoch_pipeline.run(self.preproc_pipeline.raw, self.input_pipeline.log) # TODO: Maybe this could be done on the continuous raw data (i.e., # fully within the preproc pipeline) rather then on the epochs. # Let's check once we added automatic break detection (#212). if self.preproc_pipeline.config.bad_channels == "auto": self._detect_bad_channels_and_rerun() self.component_pipeline.run( self.epoch_pipeline.epochs, self.epoch_pipeline.bad_ixs ) def _detect_bad_channels_and_rerun(self): bad_channels = self.epoch_pipeline.detect_bad_channels() if len(bad_channels) > 0: self.preproc_pipeline.config.bad_channels = bad_channels self.preproc_pipeline.run(self.input_pipeline.raw, self.input_pipeline.besa) self.epoch_pipeline.run(self.preproc_pipeline.raw, self.input_pipeline.log)