Files
detektor/src/parsers.py
2025-03-10 17:10:04 +01:00

148 lines
4.6 KiB
Python

import logging
from datetime import timedelta
import pandas as pd
from channel import Channel
from config import ChannelColors
from detektor_data import DetektorContainer
from dbfread import DBF
from channel import ChannelUnit
from detektor_data import DetektorData
def parse_dbf_to_detektor(dbf_file: str) -> bool:
try:
interval_ms = 1000
# Convert to DataFrame
df = pd.DataFrame(iter(DBF(dbf_file, encoding="utf-8")))
# Create a datetime column
df["TIMESTAMP"] = pd.to_datetime(df["DATUM"].astype(str) + " " + df["CAS"], format="%Y-%m-%d %H:%M:%S")
# Drop unnecessary columns
df = df.drop(columns=["DATUM", "CAS", "VYPADEK"])
# Set timestamp as index
df = df.set_index("TIMESTAMP")
# Generate the complete time range
start_time = df.index.min()
end_time = df.index.max()
full_time_range = pd.date_range(start=start_time, end=end_time, freq=f"{interval_ms}ms")
# Reindex the DataFrame to include missing timestamps, filling with 0s
df = df.reindex(full_time_range, fill_value=0)
DetektorContainer().get().file_path = dbf_file
DetektorContainer().get().start_datetime = start_time
DetektorContainer().get().interval_ms = interval_ms
# Assign colors to channels
for i, column in enumerate(df.columns):
color = ChannelColors[i % len(ChannelColors)] # Cycle through colors
channel = Channel()
channel.name=column
channel.unit=ChannelUnit.PPM
channel.color=color
channel.data = df[column].tolist()
DetektorContainer().get().add_channel(channel)
except Exception as e:
logging.error(e)
return False
return True
def parse_xls_to_detektor(xls_file: str) -> bool:
"""
Parses an XLSX file into a DetektorData structure.
"""
# Load the XLSX file
try:
# if True:
xls_data = pd.ExcelFile(xls_file)
df = xls_data.parse(xls_data.sheet_names[0]) # Assume data is in the first sheet
# (re)Initialize DetektorData
DetektorContainer().flush()
d = DetektorData()
DetektorContainer().set(d)
DetektorContainer().get().start_datetime = pd.to_datetime(df.iloc[1, 0], errors="coerce")
logging.debug(f'Parsed start_datetime: {DetektorContainer().get().start_datetime}')
# if we have at least two lines of data, calculate the interval
if len(df) >= 3:
interval = int(
(pd.to_datetime(df.iloc[2, 0]) - DetektorContainer().get().start_datetime).total_seconds() * 1000
)
DetektorContainer().get().interval_ms = interval
logging.debug(f'Parsed interval: {interval}')
else:
interval = 1000
DetektorContainer().get().interval_ms = interval
logging.debug(f'Interval set to {interval}')
# Create channels
for idx, col in enumerate(df.columns[1:]):
channel = Channel()
channel.name = str(col)
channel.number = idx + 1
channel.data = list(df.iloc[1:, idx + 1])
channel.color = ChannelColors[idx % len(ChannelColors)]
DetektorContainer().get().add_channel(channel)
logging.debug(f'Parsed channel {col}, data count of {len(channel.data)} records')
except Exception as e:
logging.error(e)
return False
return True
def export_to_xlsx(xlsx_file: str):
# Example data
export = dict()
export["Datum"] = []
# Start from the initial time
current_time = DetektorContainer().get().start_datetime
for i in range(DetektorContainer().get().data_count()):
# Format and add label
export["Datum"].append(current_time)
# Increment time
current_time += timedelta(milliseconds=DetektorContainer().get().interval_ms)
for c in DetektorContainer().get().channels:
export[c.name] = c.data
# Create DataFrame
df = pd.DataFrame(export)
#df["Datum"] = pd.to_datetime(df["Datum"])
# Export to XLSX with formatting
with pd.ExcelWriter(xlsx_file, engine="xlsxwriter") as writer:
df.to_excel(writer, sheet_name="Sheet1", index=False)
logging.debug(f'Saving to file {xlsx_file}')
# Get workbook and worksheet objects
#workbook = writer.book
#worksheet = writer.sheets["Sheet1"]
# Define date format
#date_format = workbook.add_format({"num_format": "dd.mm.yyyy hh:mm:ss"})
# Apply format to the 'Timestamp' column (1-based index, first column = 0)
#worksheet.set_column("A:A", 20, date_format)
return True