feat(utils/opatio): added python module for interfacing with opat files
A python module (opatio) has been written to make the creation and reading of opat files straight forward
This commit is contained in:
16
utils/opatio/pyproject.toml
Normal file
16
utils/opatio/pyproject.toml
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["setuptools", "wheel"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "opatio"
|
||||||
|
version = "0.1.0a"
|
||||||
|
description = "A python module for handling OPAT files"
|
||||||
|
readme = "readme.md"
|
||||||
|
authors = [{name = "Emily M. Boudreaux", email = "emily.boudreaux@dartmouth.edu"}]
|
||||||
|
requires-python = ">=3.8"
|
||||||
|
dependencies = ["numpy >= 1.21.1"]
|
||||||
|
|
||||||
|
[tool.setuptools]
|
||||||
|
packages = ["opatio", "opatio.opat"]
|
||||||
|
package-dir = {"" = "src"}
|
||||||
46
utils/opatio/readme.md
Normal file
46
utils/opatio/readme.md
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
# opatIO python module
|
||||||
|
This module defines a set of tools to build, write, and read OPAT files.
|
||||||
|
The OPAT fileformat is a custom file format designed to efficiently store
|
||||||
|
opacity information for a variety of compositions.
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
You can install this module with pip
|
||||||
|
```bash
|
||||||
|
git clone <repo>
|
||||||
|
cd 4DSSE/utils/opat
|
||||||
|
pip install .
|
||||||
|
```
|
||||||
|
|
||||||
|
## General Usage
|
||||||
|
The general way that this module is mean to be used is to first build a schema for the opaticy table and then save that to disk. The module will handle all the byte aligment and lookup table construction for you.
|
||||||
|
|
||||||
|
A simple example might look like the following
|
||||||
|
|
||||||
|
```python
|
||||||
|
from opatio import OpatIO
|
||||||
|
|
||||||
|
opacityFile = OpatIO()
|
||||||
|
opacityFile.set_comment("This is a sample opacity file")
|
||||||
|
opaticyFile.set_source("OPLIB")
|
||||||
|
|
||||||
|
# some code to get a logR, logT, and logKappa table
|
||||||
|
# where logKappa is of size (n,m) if logR is size n and
|
||||||
|
# logT is size m
|
||||||
|
|
||||||
|
opacityFile.add_table(X, Z, logR, logT, logKappa)
|
||||||
|
opacityFile.save("opacity.opat")
|
||||||
|
```
|
||||||
|
|
||||||
|
You can also read opat files which have been generated with the loadOpat function
|
||||||
|
|
||||||
|
```python
|
||||||
|
from opatio import loadOpat
|
||||||
|
|
||||||
|
opacityFile = loadOpat("opacity.opat")
|
||||||
|
|
||||||
|
print(opacityFile.header)
|
||||||
|
print(opaticyFile.tables[0])
|
||||||
|
```
|
||||||
|
|
||||||
|
## Problems
|
||||||
|
If you have problems feel free to either submit an issue to the root github repo (tagged as utils/opatio) or email Emily Boudreaux at emily.boudreaux@dartmouth.edu
|
||||||
1
utils/opatio/src/opatio/__init__.py
Normal file
1
utils/opatio/src/opatio/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
from .opat.opat import OpatIO, loadOpat
|
||||||
0
utils/opatio/src/opatio/opat/__init__.py
Normal file
0
utils/opatio/src/opatio/opat/__init__.py
Normal file
277
utils/opatio/src/opatio/opat/opat.py
Normal file
277
utils/opatio/src/opatio/opat/opat.py
Normal file
@@ -0,0 +1,277 @@
|
|||||||
|
import struct
|
||||||
|
import numpy as np
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
from typing import Iterable, List, Tuple
|
||||||
|
from collections.abc import Iterable as collectionIterable
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Header:
|
||||||
|
magic: str
|
||||||
|
version: int
|
||||||
|
numTables: int
|
||||||
|
headerSize: int
|
||||||
|
indexOffset: int
|
||||||
|
creationDate: str
|
||||||
|
sourceInfo: str
|
||||||
|
comment: str
|
||||||
|
reserved: bytes
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TableIndex:
|
||||||
|
X: float
|
||||||
|
Z: float
|
||||||
|
byteStart: int
|
||||||
|
byteEnd: int
|
||||||
|
sha256: bytes
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class OPATTable:
|
||||||
|
N_R: int
|
||||||
|
N_T: int
|
||||||
|
logR: Iterable[float]
|
||||||
|
logT: Iterable[float]
|
||||||
|
logKappa: Iterable[Iterable[float]]
|
||||||
|
|
||||||
|
defaultHeader = Header(
|
||||||
|
magic="OPAT",
|
||||||
|
version=1,
|
||||||
|
numTables=0,
|
||||||
|
headerSize=256,
|
||||||
|
indexOffset=0,
|
||||||
|
creationDate=datetime.now().strftime("%b %d, %Y"),
|
||||||
|
sourceInfo="no source provided by user",
|
||||||
|
comment="default header",
|
||||||
|
reserved=b"\x00" * 26
|
||||||
|
)
|
||||||
|
|
||||||
|
class OpatIO:
|
||||||
|
def __init__(self):
|
||||||
|
self.header: Header = defaultHeader
|
||||||
|
self.tables: List[Tuple[Tuple[float, float], OPATTable]] = []
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate_char_array_size(s: str, nmax: int) -> bool:
|
||||||
|
if len(s) > nmax:
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate_logKappa(logKappa):
|
||||||
|
if isinstance(logKappa, np.ndarray):
|
||||||
|
if logKappa.ndim == 2:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
raise ValueError("logKappa must be a non-empty 2D array")
|
||||||
|
|
||||||
|
if isintance(logKappa, collectionIterable) and all(isinstance(row, collectionIterable) for row in logKappa):
|
||||||
|
try:
|
||||||
|
first_row = next(iter(logKappa))
|
||||||
|
if all(isinstance(x, (int, float)) for x in first_row):
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
raise ValueError("logKappa must be fully numeric")
|
||||||
|
except StopIteration:
|
||||||
|
raise ValueError("logKappa must be a non-empty 2D iterable")
|
||||||
|
else:
|
||||||
|
raise TypeError("logKappa must be a non-empty 2D array or iterable")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def validate_1D(arr, name: str):
|
||||||
|
if isinstance(arr, np.ndarray):
|
||||||
|
if arr.ndim == 1:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
raise ValueError(f"{name} must be a 1D numpy array")
|
||||||
|
if isinstance(arr, collectionIterable) and not isinstance(arr, (str, bytes)):
|
||||||
|
if all(isinstance(x, (int, float)) for x in arr):
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
raise ValueError(f"{name} must be fully numeric")
|
||||||
|
else:
|
||||||
|
raise TypeError(f"{name} must be a non-empty 2D array or iterable")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def compute_checksum(data: bytes) -> bytes:
|
||||||
|
return hashlib.sha256(data).digest()
|
||||||
|
|
||||||
|
def set_version(self, version: int) -> int:
|
||||||
|
self.header.version = version
|
||||||
|
return self.header.version
|
||||||
|
|
||||||
|
def set_source(self, source: str) -> str:
|
||||||
|
if not self.validate_char_array_size(source, 64):
|
||||||
|
raise TypeError(f"sourceInfo string ({source}) is too long ({len(source)}). Max length is 64")
|
||||||
|
self.header.sourceInfo = source
|
||||||
|
return self.header.sourceInfo
|
||||||
|
|
||||||
|
def set_comment(self, comment: str) -> str:
|
||||||
|
if not self.validate_char_array_size(comment, 128):
|
||||||
|
raise TypeError(f"comment string ({comment}) is too long ({len(comment)}). Max length is 128")
|
||||||
|
self.header.comment = comment
|
||||||
|
return self.header.comment
|
||||||
|
|
||||||
|
def add_table(self, X: float, Z: float, logR: Iterable[float], logT: Iterable[float], logKappa: Iterable[Iterable[float]]):
|
||||||
|
self.validate_logKappa(logKappa)
|
||||||
|
self.validate_1D(logR, "logR")
|
||||||
|
self.validate_1D(logT, "logT")
|
||||||
|
|
||||||
|
logR = np.array(logR)
|
||||||
|
logT = np.array(logT)
|
||||||
|
logKappa = np.array(logKappa)
|
||||||
|
|
||||||
|
if logKappa.shape != (logR.shape[0], logT.shape[0]):
|
||||||
|
raise ValueError(f"logKappa must be of shape ({len(logR)} x {len(logT)})! Currently logKappa has shape {logKappa.shape}")
|
||||||
|
|
||||||
|
table = OPATTable(
|
||||||
|
N_R = logR.shape[0],
|
||||||
|
N_T = logT.shape[0],
|
||||||
|
logR = logR,
|
||||||
|
logT = logT,
|
||||||
|
logKappa = logKappa
|
||||||
|
)
|
||||||
|
|
||||||
|
self.tables.append(((X, Z), table))
|
||||||
|
self.header.numTables += 1
|
||||||
|
|
||||||
|
|
||||||
|
def _header_bytes(self) -> bytes:
|
||||||
|
headerBytes = struct.pack(
|
||||||
|
"<4s H I I Q 16s 64s 128s 26s",
|
||||||
|
self.header.magic.encode('utf-8'),
|
||||||
|
self.header.version,
|
||||||
|
self.header.numTables,
|
||||||
|
self.header.headerSize,
|
||||||
|
self.header.indexOffset,
|
||||||
|
self.header.creationDate.encode('utf-8'),
|
||||||
|
self.header.sourceInfo.encode('utf-8'),
|
||||||
|
self.header.comment.encode('utf-8'),
|
||||||
|
self.header.reserved
|
||||||
|
)
|
||||||
|
return headerBytes
|
||||||
|
|
||||||
|
def _table_bytes(self, table: OPATTable) -> Tuple[bytes, bytes]:
|
||||||
|
logR = table.logR.flatten()
|
||||||
|
logT = table.logT.flatten()
|
||||||
|
logKappa = table.logKappa.flatten()
|
||||||
|
tableBytes = struct.pack(
|
||||||
|
f"<II{table.N_R}d{table.N_T}d{table.N_R*table.N_T}d",
|
||||||
|
table.N_R,
|
||||||
|
table.N_T,
|
||||||
|
*logR,
|
||||||
|
*logT,
|
||||||
|
*logKappa
|
||||||
|
)
|
||||||
|
checksum = self.compute_checksum(tableBytes)
|
||||||
|
return (checksum, tableBytes)
|
||||||
|
|
||||||
|
def _tableIndex_bytes(self, tableIndex: TableIndex) -> bytes:
|
||||||
|
tableIndexBytes = struct.pack(
|
||||||
|
'<ddQQ',
|
||||||
|
tableIndex.X,
|
||||||
|
tableIndex.Z,
|
||||||
|
tableIndex.byteStart,
|
||||||
|
tableIndex.byteEnd
|
||||||
|
)
|
||||||
|
tableIndexBytes += tableIndex.sha256
|
||||||
|
|
||||||
|
if len(tableIndexBytes) != 64:
|
||||||
|
raise RuntimeError(f"Each table index entry must have 64 bytes. Due to an unknown error the table index entry for (X,Z)=({tableIndex.X},{tableIndex.Z}) header has {len(tableIndexBytes)} bytes")
|
||||||
|
|
||||||
|
return tableIndexBytes
|
||||||
|
|
||||||
|
def save(self, filename: str) -> str:
|
||||||
|
tempHeaderBytes = self._header_bytes()
|
||||||
|
|
||||||
|
if len(tempHeaderBytes) != 256:
|
||||||
|
raise RuntimeError(f"Header must have 256 bytes. Due to an unknown error the header has {len(tempHeaderBytes)} bytes")
|
||||||
|
|
||||||
|
currentStartByte: int = 256
|
||||||
|
tableIndicesBytes: List[bytes] = []
|
||||||
|
tablesBytes: List[bytes] = []
|
||||||
|
for (X, Z), table in self.tables:
|
||||||
|
checksum, tableBytes = self._table_bytes(table)
|
||||||
|
tableIndex = TableIndex(
|
||||||
|
X = X,
|
||||||
|
Z = Z,
|
||||||
|
byteStart = currentStartByte,
|
||||||
|
byteEnd = currentStartByte + len(tableBytes),
|
||||||
|
sha256 = checksum
|
||||||
|
)
|
||||||
|
tableIndexBytes = self._tableIndex_bytes(tableIndex)
|
||||||
|
tablesBytes.append(tableBytes)
|
||||||
|
tableIndicesBytes.append(tableIndexBytes)
|
||||||
|
|
||||||
|
currentStartByte += len(tableBytes)
|
||||||
|
self.header.indexOffset = currentStartByte
|
||||||
|
headerBytes = self._header_bytes()
|
||||||
|
|
||||||
|
with open(filename, 'wb') as f:
|
||||||
|
f.write(headerBytes)
|
||||||
|
for tableBytes in tablesBytes:
|
||||||
|
f.write(tableBytes)
|
||||||
|
for tableIndexBytes in tableIndicesBytes:
|
||||||
|
f.write(tableIndexBytes)
|
||||||
|
|
||||||
|
if os.path.exists(filename):
|
||||||
|
return filename
|
||||||
|
|
||||||
|
|
||||||
|
def loadOpat(filename: str) -> OpatIO:
|
||||||
|
opat = OpatIO()
|
||||||
|
with open(filename, 'rb') as f:
|
||||||
|
headerBytes: bytes = f.read(256)
|
||||||
|
unpackedHeader = struct.unpack("<4s H I I Q 16s 64s 128s 26s", headerBytes)
|
||||||
|
loadedHeader = Header(
|
||||||
|
magic = unpackedHeader[0].decode(),
|
||||||
|
version = unpackedHeader[1],
|
||||||
|
numTables = unpackedHeader[2],
|
||||||
|
headerSize = unpackedHeader[3],
|
||||||
|
indexOffset = unpackedHeader[4],
|
||||||
|
creationDate = unpackedHeader[5].decode(),
|
||||||
|
sourceInfo = unpackedHeader[6].decode(),
|
||||||
|
comment = unpackedHeader[7].decode(),
|
||||||
|
reserved = unpackedHeader[8]
|
||||||
|
)
|
||||||
|
opat.header = loadedHeader
|
||||||
|
f.seek(opat.header.indexOffset)
|
||||||
|
tableIndices: List[TableIndex] = []
|
||||||
|
while tableIndexEntryBytes := f.read(32):
|
||||||
|
unpackedTableIndexEntry = struct.unpack("<ddQQ", tableIndexEntryBytes)
|
||||||
|
checksum = f.read(32)
|
||||||
|
tableIndexEntry = TableIndex(
|
||||||
|
X = unpackedTableIndexEntry[0],
|
||||||
|
Z = unpackedTableIndexEntry[1],
|
||||||
|
byteStart = unpackedTableIndexEntry[2],
|
||||||
|
byteEnd = unpackedTableIndexEntry[3],
|
||||||
|
sha256 = checksum
|
||||||
|
)
|
||||||
|
tableIndices.append(tableIndexEntry)
|
||||||
|
|
||||||
|
currentStartByte = opat.header.headerSize
|
||||||
|
f.seek(currentStartByte)
|
||||||
|
for tableIndex in tableIndices:
|
||||||
|
f.seek(tableIndex.byteStart)
|
||||||
|
byteLength = tableIndex.byteEnd - tableIndex.byteStart
|
||||||
|
tableBytes = f.read(byteLength)
|
||||||
|
|
||||||
|
nr_nt_fmt = "<II"
|
||||||
|
nr_nt_size = struct.calcsize(nr_nt_fmt)
|
||||||
|
N_R, N_T = struct.unpack(nr_nt_fmt, tableBytes[:nr_nt_size])
|
||||||
|
|
||||||
|
dataFormat = f"<{N_R}d{N_T}d{N_R*N_T}d"
|
||||||
|
unpackedData = struct.unpack(dataFormat, tableBytes[nr_nt_size:])
|
||||||
|
|
||||||
|
logR = np.array(unpackedData[:N_R], dtype=np.float64)
|
||||||
|
logT = np.array(unpackedData[N_R: N_R+N_T], dtype=np.float64)
|
||||||
|
logKappa = np.array(unpackedData[N_R+N_T:], dtype=np.float64).reshape((N_R, N_T))
|
||||||
|
|
||||||
|
opat.add_table(tableIndex.X, tableIndex.Z, logR, logT, logKappa)
|
||||||
|
return opat
|
||||||
|
|
||||||
Reference in New Issue
Block a user