diff --git a/utils/opatio/pyproject.toml b/utils/opatio/pyproject.toml new file mode 100644 index 0000000..cdf1eab --- /dev/null +++ b/utils/opatio/pyproject.toml @@ -0,0 +1,16 @@ +[build-system] +requires = ["setuptools", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "opatio" +version = "0.1.0a" +description = "A python module for handling OPAT files" +readme = "readme.md" +authors = [{name = "Emily M. Boudreaux", email = "emily.boudreaux@dartmouth.edu"}] +requires-python = ">=3.8" +dependencies = ["numpy >= 1.21.1"] + +[tool.setuptools] +packages = ["opatio", "opatio.opat"] +package-dir = {"" = "src"} \ No newline at end of file diff --git a/utils/opatio/readme.md b/utils/opatio/readme.md new file mode 100644 index 0000000..48ba4ce --- /dev/null +++ b/utils/opatio/readme.md @@ -0,0 +1,46 @@ +# opatIO python module +This module defines a set of tools to build, write, and read OPAT files. +The OPAT fileformat is a custom file format designed to efficiently store +opacity information for a variety of compositions. + +## Installation +You can install this module with pip +```bash +git clone +cd 4DSSE/utils/opat +pip install . +``` + +## General Usage +The general way that this module is mean to be used is to first build a schema for the opaticy table and then save that to disk. The module will handle all the byte aligment and lookup table construction for you. + +A simple example might look like the following + +```python +from opatio import OpatIO + +opacityFile = OpatIO() +opacityFile.set_comment("This is a sample opacity file") +opaticyFile.set_source("OPLIB") + +# some code to get a logR, logT, and logKappa table +# where logKappa is of size (n,m) if logR is size n and +# logT is size m + +opacityFile.add_table(X, Z, logR, logT, logKappa) +opacityFile.save("opacity.opat") +``` + +You can also read opat files which have been generated with the loadOpat function + +```python +from opatio import loadOpat + +opacityFile = loadOpat("opacity.opat") + +print(opacityFile.header) +print(opaticyFile.tables[0]) +``` + +## Problems +If you have problems feel free to either submit an issue to the root github repo (tagged as utils/opatio) or email Emily Boudreaux at emily.boudreaux@dartmouth.edu \ No newline at end of file diff --git a/utils/opatio/src/opatio/__init__.py b/utils/opatio/src/opatio/__init__.py new file mode 100644 index 0000000..87db302 --- /dev/null +++ b/utils/opatio/src/opatio/__init__.py @@ -0,0 +1 @@ +from .opat.opat import OpatIO, loadOpat \ No newline at end of file diff --git a/utils/opatio/src/opatio/opat/__init__.py b/utils/opatio/src/opatio/opat/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/opatio/src/opatio/opat/opat.py b/utils/opatio/src/opatio/opat/opat.py new file mode 100644 index 0000000..ef091f7 --- /dev/null +++ b/utils/opatio/src/opatio/opat/opat.py @@ -0,0 +1,277 @@ +import struct +import numpy as np +from datetime import datetime + +from dataclasses import dataclass + +from typing import Iterable, List, Tuple +from collections.abc import Iterable as collectionIterable + +import hashlib + +import os + +@dataclass +class Header: + magic: str + version: int + numTables: int + headerSize: int + indexOffset: int + creationDate: str + sourceInfo: str + comment: str + reserved: bytes + +@dataclass +class TableIndex: + X: float + Z: float + byteStart: int + byteEnd: int + sha256: bytes + +@dataclass +class OPATTable: + N_R: int + N_T: int + logR: Iterable[float] + logT: Iterable[float] + logKappa: Iterable[Iterable[float]] + +defaultHeader = Header( + magic="OPAT", + version=1, + numTables=0, + headerSize=256, + indexOffset=0, + creationDate=datetime.now().strftime("%b %d, %Y"), + sourceInfo="no source provided by user", + comment="default header", + reserved=b"\x00" * 26 +) + +class OpatIO: + def __init__(self): + self.header: Header = defaultHeader + self.tables: List[Tuple[Tuple[float, float], OPATTable]] = [] + + @staticmethod + def validate_char_array_size(s: str, nmax: int) -> bool: + if len(s) > nmax: + return False + return True + + @staticmethod + def validate_logKappa(logKappa): + if isinstance(logKappa, np.ndarray): + if logKappa.ndim == 2: + return + else: + raise ValueError("logKappa must be a non-empty 2D array") + + if isintance(logKappa, collectionIterable) and all(isinstance(row, collectionIterable) for row in logKappa): + try: + first_row = next(iter(logKappa)) + if all(isinstance(x, (int, float)) for x in first_row): + return + else: + raise ValueError("logKappa must be fully numeric") + except StopIteration: + raise ValueError("logKappa must be a non-empty 2D iterable") + else: + raise TypeError("logKappa must be a non-empty 2D array or iterable") + + @staticmethod + def validate_1D(arr, name: str): + if isinstance(arr, np.ndarray): + if arr.ndim == 1: + return + else: + raise ValueError(f"{name} must be a 1D numpy array") + if isinstance(arr, collectionIterable) and not isinstance(arr, (str, bytes)): + if all(isinstance(x, (int, float)) for x in arr): + return + else: + raise ValueError(f"{name} must be fully numeric") + else: + raise TypeError(f"{name} must be a non-empty 2D array or iterable") + + @staticmethod + def compute_checksum(data: bytes) -> bytes: + return hashlib.sha256(data).digest() + + def set_version(self, version: int) -> int: + self.header.version = version + return self.header.version + + def set_source(self, source: str) -> str: + if not self.validate_char_array_size(source, 64): + raise TypeError(f"sourceInfo string ({source}) is too long ({len(source)}). Max length is 64") + self.header.sourceInfo = source + return self.header.sourceInfo + + def set_comment(self, comment: str) -> str: + if not self.validate_char_array_size(comment, 128): + raise TypeError(f"comment string ({comment}) is too long ({len(comment)}). Max length is 128") + self.header.comment = comment + return self.header.comment + + def add_table(self, X: float, Z: float, logR: Iterable[float], logT: Iterable[float], logKappa: Iterable[Iterable[float]]): + self.validate_logKappa(logKappa) + self.validate_1D(logR, "logR") + self.validate_1D(logT, "logT") + + logR = np.array(logR) + logT = np.array(logT) + logKappa = np.array(logKappa) + + if logKappa.shape != (logR.shape[0], logT.shape[0]): + raise ValueError(f"logKappa must be of shape ({len(logR)} x {len(logT)})! Currently logKappa has shape {logKappa.shape}") + + table = OPATTable( + N_R = logR.shape[0], + N_T = logT.shape[0], + logR = logR, + logT = logT, + logKappa = logKappa + ) + + self.tables.append(((X, Z), table)) + self.header.numTables += 1 + + + def _header_bytes(self) -> bytes: + headerBytes = struct.pack( + "<4s H I I Q 16s 64s 128s 26s", + self.header.magic.encode('utf-8'), + self.header.version, + self.header.numTables, + self.header.headerSize, + self.header.indexOffset, + self.header.creationDate.encode('utf-8'), + self.header.sourceInfo.encode('utf-8'), + self.header.comment.encode('utf-8'), + self.header.reserved + ) + return headerBytes + + def _table_bytes(self, table: OPATTable) -> Tuple[bytes, bytes]: + logR = table.logR.flatten() + logT = table.logT.flatten() + logKappa = table.logKappa.flatten() + tableBytes = struct.pack( + f" bytes: + tableIndexBytes = struct.pack( + ' str: + tempHeaderBytes = self._header_bytes() + + if len(tempHeaderBytes) != 256: + raise RuntimeError(f"Header must have 256 bytes. Due to an unknown error the header has {len(tempHeaderBytes)} bytes") + + currentStartByte: int = 256 + tableIndicesBytes: List[bytes] = [] + tablesBytes: List[bytes] = [] + for (X, Z), table in self.tables: + checksum, tableBytes = self._table_bytes(table) + tableIndex = TableIndex( + X = X, + Z = Z, + byteStart = currentStartByte, + byteEnd = currentStartByte + len(tableBytes), + sha256 = checksum + ) + tableIndexBytes = self._tableIndex_bytes(tableIndex) + tablesBytes.append(tableBytes) + tableIndicesBytes.append(tableIndexBytes) + + currentStartByte += len(tableBytes) + self.header.indexOffset = currentStartByte + headerBytes = self._header_bytes() + + with open(filename, 'wb') as f: + f.write(headerBytes) + for tableBytes in tablesBytes: + f.write(tableBytes) + for tableIndexBytes in tableIndicesBytes: + f.write(tableIndexBytes) + + if os.path.exists(filename): + return filename + + +def loadOpat(filename: str) -> OpatIO: + opat = OpatIO() + with open(filename, 'rb') as f: + headerBytes: bytes = f.read(256) + unpackedHeader = struct.unpack("<4s H I I Q 16s 64s 128s 26s", headerBytes) + loadedHeader = Header( + magic = unpackedHeader[0].decode(), + version = unpackedHeader[1], + numTables = unpackedHeader[2], + headerSize = unpackedHeader[3], + indexOffset = unpackedHeader[4], + creationDate = unpackedHeader[5].decode(), + sourceInfo = unpackedHeader[6].decode(), + comment = unpackedHeader[7].decode(), + reserved = unpackedHeader[8] + ) + opat.header = loadedHeader + f.seek(opat.header.indexOffset) + tableIndices: List[TableIndex] = [] + while tableIndexEntryBytes := f.read(32): + unpackedTableIndexEntry = struct.unpack("