Source code for CaChannel.util

"""
This module provides functions similiar to those command line tools found in EPICS base,
e.g. :func:`caget`, :func:`caput`, :func:`camonitor`, :func:`cainfo`.

In those functions, :class:`CaChannel.CaChannel` objects are created implicitly and cached
in :data:`_channel_` dictionary.

>>> import time
>>> caput('catest', 1.23, wait=True)
>>> caget('catest')
1.23
>>> caput('cabo', 'Busy')
>>> caget('cabo')
1
>>> caget('cabo', as_string=True)
'Busy'
>>> caput('cawavec', 'this can be a long string')
>>> caget('cawavec', as_string=True)
'this '
>>> caput('cawave', range(4))
>>> caget('cawave', count=4)
[0.0, 1.0, 2.0, 3.0]
"""


import collections
import datetime
import itertools
import numbers
import sys
import threading

from .CaChannel import ca, CaChannel

#: channel object cache
_channels_ = {}


def _ints_to_string(integers):
    try:
        sequence_type = collections.abc.Sequence # Python 3.10+
    except AttributeError:
        sequence_type = collections.Sequence
    if isinstance(integers, sequence_type):
        stripped = itertools.takewhile(lambda x: x != 0, integers)
        if sys.hexversion < 0x03000000:
            value = ''.join([chr(c) for c in stripped])
        else:
            value = bytes(stripped).decode()
    elif isinstance(integers, numbers.Integral):
        if integers == 0:
            value = ''
        elif sys.hexversion < 0x03000000:
            value = chr(integers)
        else:
            value = bytes([integers]).decode()
    else:
        value = None

    return value


def _get_or_create_channel(name):
    """
    return the channel object associated with *name*. If nothing exists, create a new one.

    :param str name: pv name
    :return: channel object
    :rtype: :class:`CaChannel.CaChannel`
    """
    chan = _channels_.get(name)
    if chan is None:
        chan = CaChannel(name)
        chan.searchw()
        _channels_[name] = chan
    return chan


[docs]def caget(name, as_string=False, count=None): """ Return PV's current value. For enum or char type PV, the string form is returned if *as_string* is True. If the PV is of multi-element array, *count* can be used to limit the number of elements. :param str name: pv name :param bool as_string: retrieve enum and char type as string :param int count: number of element to request :return: pv value """ chan = _get_or_create_channel(name) req_type = ca.dbf_type_to_DBR(chan.field_type()) if as_string and req_type == ca.DBR_ENUM: req_type = ca.DBR_STRING value = chan.getw(req_type, count) if as_string and req_type == ca.DBR_CHAR: value = _ints_to_string(value) return value
[docs]def caput(name, value, wait=False, timeout=None): """ :param str name: pv name :param value: value to write :param bool wait: wait for completion :param float timeout: seconds to wait """ chan = _get_or_create_channel(name) def put_callback(_, user_args): user_args[0].set() if wait: event = threading.Event() chan.array_put_callback(value, None, None, put_callback, event) chan.flush_io() event.wait(timeout) else: chan.putw(value)
[docs]def camonitor(name, as_string=False, count=None, callback=None): """ set a *callback* to be invoked when pv value or alarm status change. :param str name: pv name :param bool as_string: retrieve enum and char type as string :param int count: number of element to request :param callback: callback function. If *None* is specified, the default callback is to print to the console. If *callback* is not a valid *callable*, any previous callback is removed. >>> camonitor('cacalc') >>> time.sleep(2) # doctest: +ELLIPSIS cacalc ... >>> def monitor_callback(epics_args, _): ... for k in sorted(epics_args): ... print(k, epics_args[k]) >>> camonitor('cacalc', callback=monitor_callback) >>> time.sleep(2) # doctest: +ELLIPSIS chid ... count 1 pv_nseconds ... pv_seconds ... pv_severity AlarmSeverity.No pv_status AlarmCondition.No pv_value ... status ECA.NORMAL type DBR.TIME_DOUBLE chid ... >>> camonitor('cacalc', callback=()) >>> time.sleep(2) """ chan = _get_or_create_channel(name) req_type = ca.dbf_type_to_DBR_TIME(chan.field_type()) if as_string and req_type == ca.DBR_TIME_ENUM: req_type = ca.DBR_TIME_STRING def monitor_callback(epics_args, _): stamp = epics_args['pv_seconds'] + ca.POSIX_TIME_AT_EPICS_EPOCH + epics_args['pv_nseconds'] * 1e-9 # time.strftime does not support microseconds strfmt = datetime.datetime.fromtimestamp(stamp).strftime('%Y-%m-%d %H:%M:%S.%f') value = epics_args['pv_value'] if as_string and req_type == ca.DBR_CHAR: value = _ints_to_string(value) print('%s %s %s' % (name, strfmt, value)) if callback is not None and not callable(callback): chan.clear_event() chan.flush_io() return if callback is None: callback = monitor_callback chan.add_masked_array_event(req_type, count, None, callback) chan.flush_io()
[docs]def cainfo(name): """ print pv information :param name: pv name >>> caput('cabo', 1) >>> cainfo('cabo') # doctest: +ELLIPSIS cabo State: Connected Host: ... Data type: DBF_ENUM Element count: 1 Access: RW Status: STATE Severity: MINOR Enumerates: ('Done', 'Busy') """ chan = _get_or_create_channel(name) r = chan.read_access() w = chan.write_access() if not r and not w: access = 'No access' else: access = '' if r: access += 'R' if w: access += 'W' message = \ """%s State: %s Host: %s Data type: %s Element count: %d Access: %s""" % ( name, ['Not connected', 'Connected'][chan.state() == ca.cs_conn], chan.host_name(), ca.dbf_text(chan.field_type()), chan.element_count(), access) if chan.state() == ca.cs_conn: ctrl = chan.getw(ca.dbf_type_to_DBR_CTRL(chan.field_type())) message += \ """ Status: %s Severity: %s""" % (ca.alarmStatusString(ctrl['pv_status']), ca.alarmSeverityString(ctrl['pv_severity'])) if chan.field_type() == ca.DBF_ENUM: message += \ """ Enumerates: %s""" % (ctrl['pv_statestrings'],) print(message)
if __name__ == '__main__': import time import doctest doctest.testmod()