import copy import logging from datetime import datetime, timedelta import random from typing import List, Union from uuid import uuid1 import config from channel import Channel, ChannelUnit from config import ChannelColors from callbacks import CallbackDispatcher, CallbackType class DetektorData: def __init__(self): self._file_path: Union[str,None] = None self._start_datetime: Union[datetime, None] = None self.interval_ms: Union[int, None] = 1000 # this is just auxiliary thing for debugging the reverting feature self.last_changed = self._start_datetime self.channels: List[Channel] = [] self._x_labels = [] self.data_tainted = False # any internal change of channel data will switch the tainted flag on CallbackDispatcher().register( CallbackType.DATA_TAINTED, self.set_tainted ) CallbackDispatcher().register( CallbackType.UPDATE_X, self._generate_x_labels ) CallbackDispatcher().register( CallbackType.UPDATE_X, self.set_tainted ) @property def start_datetime(self) -> datetime: return self._start_datetime @start_datetime.setter def start_datetime(self, value: datetime): if self._start_datetime != value: # the new value is different previous_start_datetime = self._start_datetime self._start_datetime = value #self._generate_x_labels() CallbackDispatcher().call(CallbackType.UPDATE_X) if previous_start_datetime != None: # we previously had some value, so no need to taint the data - this is probably called during import CallbackDispatcher().call(CallbackType.DATA_TAINTED) @property def file_path(self) -> str: return self._file_path @file_path.setter def file_path(self, value: str): self._file_path = value CallbackDispatcher().call(CallbackType.FILE_NAME_CHANGED) def set_tainted(self): self.data_tainted = True def add_channel(self, c: Channel): self.channels.append(c) if len(self.channels) == 1: self._generate_x_labels() CallbackDispatcher().call(CallbackType.ADD_CHANNEL, c.id) def remove_channel(self, c: Channel): CallbackDispatcher().call(CallbackType.REMOVE_CHANNEL, c.id) self.channels.remove(c) def x_labels(self) -> List[str]: if len(self._x_labels) == 0: self._generate_x_labels() return self._x_labels def _generate_x_labels(self): """ Pregenerates the list of labels. The DetektorAxis will pick labels from this list. """ # Initialize the list self._x_labels = [] # Start from the initial time current_time = self._start_datetime if self.data_count() > 0: for i in range(self.data_count()+1): # Format and add label self._x_labels.append(current_time.strftime("%H:%M:%S")) # Increment time current_time += timedelta(milliseconds=self.interval_ms) def data_count(self): """ Number of data. All channels should have the same amount of data timed from the same start """ first_channel = next(iter(self.channels), None) if first_channel is None: return 0 else: return len(first_channel.data) def min_y(self, active: bool = True): return min( (min(ch.data, default=float('inf')) for ch in self.channels if not active or (active and ch.active)), default=None ) def max_y(self, active: bool = True): return max( (max(ch.data, default=float('-inf')) for ch in self.channels if not active or (active and ch.active)), default=None ) def flush(self): """ Removes all channels """ for ch in list(self.channels): # Use a copy to avoid modification issues self.remove_channel(ch) def cut(self, start: int, end: int): """ For cutting and deleting """ logging.debug(f'Cutting range {start} - {end}') for c in self.channels: c.cut(start, end) # update the channel, but don't update the X labels or zoom/pan limits yet CallbackDispatcher().call(CallbackType.UPDATE_CHANNEL, c.id, False) # if the start is 0, we have to change the start_datetime accordingly if start == 0: new_start_datetime = self.start_datetime + timedelta(milliseconds=self.interval_ms * (end-start)) logging.debug(f'Since we\'re cutting from the start, changing the start from {self.start_datetime} to {new_start_datetime}') self.start_datetime = new_start_datetime # update the X labels and zoom/pan limits at once self._generate_x_labels() CallbackDispatcher().call(CallbackType.UPDATE_X) CallbackDispatcher().call(CallbackType.DATA_TAINTED) def copy(self, start, end): """ For copying a chunk of data """ logging.debug(f'Copying range {start} - {end}') for c in self.channels: c.copy(start, end) def paste(self, at: int): """ For pasting at end, or any other position """ for c in self.channels: c.paste(at) # update the channel, but don't update the X labels or zoom/pan limits yet CallbackDispatcher().call(CallbackType.UPDATE_CHANNEL, c.id, False) # update the X labels and zoom/pan limits at once self._generate_x_labels() CallbackDispatcher().call(CallbackType.UPDATE_X) CallbackDispatcher().call(CallbackType.DATA_TAINTED) def get_channel_by_uuid(self, uuid: uuid1) -> Union[Channel, None]: ret = None for c in self.channels: if c.id == uuid: ret = c break return ret class DetektorContainer: _data: List[DetektorData] = [] _instance = None def __new__(cls, *args, **kwargs): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def get(self) -> Union[DetektorData, None]: if len(self._data) > 0: return self._data[-1] else: logging.debug('No dataset to return') return None def set(self, data: DetektorData): logging.debug('New dataset') data.last_changed = 'patient zero' self._data.append(data) def revert(self): # revert the data, but only if we have changed them if self.has_history(): self._data.pop() CallbackDispatcher().call(CallbackType.DATA_PARSED) logging.debug(f'Dataset reverted to {self.get().last_changed}') else: logging.debug('No dataset to revert to') def duplicate(self, force_update: bool = True): self.get().last_changed = datetime.now() self._data.append( copy.deepcopy( self.get() ) ) self.get().last_changed = 'latest' if force_update: CallbackDispatcher().call(CallbackType.DATA_PARSED) logging.debug('Dataset duplicated') def has_history(self) -> bool: """ Used for enabling / disabling CTRL-Z """ return len(self._data) > 1 def flush(self): logging.debug('Flushing all datasets') self._data = [] def generate_data(channel_count: int = 5, data_count: int = 100, random_values: bool = True): """ Simple data generator for development purposes """ if channel_count > len(config.DUMMY_CHANNEL_NAMES): raise Exception('Too many channels to generate') d = DetektorData() d.start_datetime = datetime.now() d.file_path = 'TESTOVACÍ DATA' cn = 1 for c in config.DUMMY_CHANNEL_NAMES: nc = Channel() nc.name = c nc.number = cn nc.color = ChannelColors[cn-1] nc.unit = random.choice(list(ChannelUnit)) offset = 100 * random.random() if random_values: nc.data = [round(random.gauss(25, 5), 2) + offset for _ in range(data_count)] else: nc.data = [i+offset for i in range(data_count)] d.add_channel(nc) if cn == channel_count: break cn += 1 return d