Source code for pyjoulescope_driver.record

# Copyright 2023 Jetperch LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
Record streaming sample data to a JLS v2 file.
"""

import copy
import numpy as np
from pyjoulescope_driver import time64
import logging


_PYJLS_VERSION_MIN = (0, 9, 2)  # inclusive
_PYJLS_VERSION_MAX = (1, 0, 0)  # exclusive


try:
    from pyjls import Writer, SignalType, DataType, __version__
    _DTYPE_MAP = {
        'f32': DataType.F32,
        'u8': DataType.U8,
        'u4': DataType.U4,
        'u1': DataType.U1,
    }
except ImportError:
    Writer = None
    _DTYPE_MAP = {}


_SIGNALS = {
    'current': {
        'signal_type': 'f32',
        'units': 'A',
        'ctrl_topic': 's/i/ctrl',
        'data_topic': 's/i/!data',
    },
    'voltage': {
        'signal_type': 'f32',
        'units': 'V',
        'ctrl_topic': 's/v/ctrl',
        'data_topic': 's/v/!data',
    },
    'power': {
        'signal_type': 'f32',
        'units': 'W',
        'ctrl_topic': 's/p/ctrl',
        'data_topic': 's/p/!data',
    },
    'current_range': {
        'signal_type': 'u4',
        'units': '',
        'ctrl_topic': 's/i/range/ctrl',
        'data_topic': 's/i/range/!data',
    },
    'gpi[0]': {
        'signal_type': 'u1',
        'units': '',
        'ctrl_topic': 's/gpi/0/ctrl',
        'data_topic': 's/gpi/0/!data',
    },
    'gpi[1]': {
        'signal_type': 'u1',
        'units': '',
        'ctrl_topic': 's/gpi/1/ctrl',
        'data_topic': 's/gpi/1/!data',
    },
    'gpi[2]': {
        'signal_type': 'u1',
        'units': '',
        'ctrl_topic': 's/gpi/2/ctrl',
        'data_topic': 's/gpi/2/!data',
    },
    'gpi[3]': {
        'signal_type': 'u1',
        'units': '',
        'ctrl_topic': 's/gpi/3/ctrl',
        'data_topic': 's/gpi/3/!data',
    },
    'trigger_in': {
        'signal_type': 'u1',
        'units': '',
        'ctrl_topic': 's/gpi/7/ctrl',
        'data_topic': 's/gpi/7/!data',
    },
}


_SIGNAL_SHORT_MAP = [
    ('current', 'i'),
    ('voltage', 'v'),
    ('power', 'p'),
    ('current_range', 'r', 'current range'),
    ('gpi[0]', '0'),
    ('gpi[1]', '1'),
    ('gpi[2]', '2'),
    ('gpi[3]', '3'),
    ('trigger_in', 'T', 't'),
]


def _signal_name_map():
    m = {}
    for z in _SIGNAL_SHORT_MAP:
        signal_name = z[0]
        m[signal_name] = signal_name
        for n in z[1:]:
            m[n] = signal_name
    return m


[docs] class Record: """Record streaming sample data to a JLS v2 file. :param driver: The active driver instance. :param device_path: The device prefix path. :param signals: The list of signals to record. None=['current', 'voltage'] :param auto: Configure automatic operation. Provide the list of automatic operations to perform, which can be: * signal_enable * signal_disable None (default) is equivalent to ['signal_enable', 'signal_disable'] Call :meth:`open` to start recording and :meth:`close` to stop. """ def __init__(self, driver, device_path, signals=None, auto=None): if Writer is None: raise RuntimeError('pyjls package not found. Install using:\n' + ' pip3 install -U pyjls') pyjls_version = tuple([int(x) for x in __version__.split('.')]) if pyjls_version < _PYJLS_VERSION_MIN or pyjls_version >= _PYJLS_VERSION_MAX: raise ImportError(f'Unsupported pyjls version {__version__}\n' + f' Require {_PYJLS_VERSION_MIN} <= pyjls version < {_PYJLS_VERSION_MAX}\n' + ' pip3 install -U pyjls') self._utc_interval = time64.MINUTE self._log = logging.getLogger(__name__) self._wr = None self._data_map = {} self._driver = driver self._device_path = device_path self._on_data_fn = self._on_data # bind and save for unsubscribe if signals is None: signals = ['current', 'voltage'] elif isinstance(signals, str): signals = [s.strip() for s in signals.split(',')] m = _signal_name_map() signals = [m[s] for s in signals] if auto is None: auto = ['signal_enable', 'signal_disable'] if isinstance(auto, str): auto = [auto] self._auto = auto signal_id = 0 self._signals = {} for signal_name in signals: signal_id += 1 signal = copy.deepcopy(_SIGNALS[signal_name]) signal['name'] = signal_name signal['signal_id'] = signal_id signal['signal_type'] = _DTYPE_MAP[signal['signal_type']] signal['data_topic_abs'] = f"{self._device_path}/{signal['data_topic']}" signal['utc_next'] = None signal['utc'] = None self._signals[signal_name] = signal
[docs] def open(self, filename): """Start the recording. :param filename: The filename for the recording. Use time64.filename to produce a filename from timestamp. :return: self. """ if self._wr is not None: self.close() self._data_map.clear() device_path = self._device_path self._wr = Writer(filename) _, model, serial_number = device_path.split('/') model = model.upper() self._wr.source_def( source_id=1, name=f'{model}-{serial_number}', vendor='Jetperch', model=model, version='', serial_number=serial_number, ) for signal in self._signals.values(): data_topic = signal['data_topic_abs'] self._data_map[data_topic] = signal self._driver.subscribe(data_topic, ['pub'], self._on_data_fn) if 'signal_enable' in self._auto: for signal in self._signals.values(): ctrl_topic = signal['ctrl_topic'] self._publish(ctrl_topic, 1, timeout=0) return self
def _publish(self, topic, value, timeout=None): self._driver.publish(f'{self._device_path}/{topic}', value, timeout=timeout)
[docs] def close(self): """Close the recording and release all resources.""" try: for signal in self._signals.values(): self._driver.unsubscribe(signal['data_topic_abs'], self._on_data_fn) for signal in self._signals.values(): if signal['utc'] is not None: self._wr.utc(signal['signal_id'], *signal['utc']) if 'signal_disable' in self._auto: ctrl_topic = signal['ctrl_topic'] try: self._publish(ctrl_topic, 0, timeout=0.25) except TimeoutError: self._log.warning('Timed out in publish: %s <= 0', ctrl_topic) except Exception: self._log.exception('Exception in publish: %s <= 0', ctrl_topic) finally: self._wr.close() self._wr = None
def _on_data(self, topic, value): if self._wr is None: return signal = self._data_map[topic] decimate_factor = value['decimate_factor'] signal_id = signal['signal_id'] sample_id = value['sample_id'] sample_id = sample_id // decimate_factor if signal['utc_next'] is None: self._wr.signal_def( signal_id=signal['signal_id'], source_id=1, signal_type=SignalType.FSR, data_type=signal['signal_type'], sample_rate=value['sample_rate'] // decimate_factor, name=signal['name'], units=signal['units'], ) self._wr.utc(signal_id, sample_id, value['utc']) signal['utc_next'] = value['utc'] + self._utc_interval if value['utc'] >= signal['utc_next']: self._wr.utc(signal_id, sample_id, value['utc']) signal['utc_next'] += self._utc_interval signal['utc'] = None elif sample_id: signal['utc'] = (sample_id, value['utc']) x = value['data'] if len(x): x = np.ascontiguousarray(x) self._wr.fsr_f32(signal_id, sample_id, x)