Source code for pyads.symbol

"""
Define the Symbol class

Separate file because it depends on many other files, so we try to simplify
the circular dependencies.

:author: Roberto Roos
:license: MIT, see license file or https://opensource.org/licenses/MIT
:created on: 2020-11-16

"""
from __future__ import annotations

import re
from ctypes import sizeof
from typing import TYPE_CHECKING, Any, Optional, List, Tuple, Callable, Union, Type

from . import constants  # To access all constants, use package notation
from .constants import PLCDataType
from .pyads_ex import adsGetSymbolInfo
from .structs import NotificationAttrib

# ads.Connection relies on structs.AdsSymbol (but in type hints only), so use
# this 'if' to only include it when type hinting (False during execution)
if TYPE_CHECKING:
    from .ads import Connection, StructureDef  # pragma: no cover


[docs]class AdsSymbol: """Object that points to an ADS variable Contains index group, index offset, name, symbol type, comment of ADS symbol. Also remembers a reference to a Connection to be able to read/write directly. The virtual property `value` can be used to read from and write to the symbol. :ivar index_group: Index group of symbol :ivar index_offset: Index offset of symbol :ivar name: Name of symbol :ivar symbol_type: String representation of symbol type (PLC-style, e.g. "LREAL") :ivar plc_type: ctypes type of variable (from constants.PLCTYPE_*) :ivar comment: Comment of symbol :ivar value: Buffered value, i.e. the most recently read or written value for this symbol """ # Regex for array - e.g. "ARRAY [1..10] OF DINT" _regex_array = re.compile(r"ARRAY \[(\d+)..(\d+)\] OF (.*)") # Regex for matrix - e.g. "matrix_10_int32" _regex_matrix = re.compile(r"matrix_(\d+)_(.*)_T") # Regex for list - e.g. "DINT(10)" _regex_list = re.compile(r"(.*)\((\d+)\)") def __init__( self, plc: "Connection", name: Optional[str] = None, index_group: Optional[int] = None, index_offset: Optional[int] = None, symbol_type: Optional[Union[str, Type["PLCDataType"]]] = None, comment: Optional[str] = None, auto_update: bool = False, structure_def: Optional["StructureDef"] = None, array_size: Optional[int] = 1, ) -> None: """Create AdsSymbol instance. Specify either the variable name or the index_group **and** index_offset so the symbol can be located. If the name was specified but not all other attributes were, the other attributes will be looked up from the connection. `symbol_type` should be a type constant like `pyads.PLCTYPE_*`. Alternatively, it can be a string representation a PLC type (e.g. 'LREAL'). :param plc: Connection instance :param name: :param index_group: :param index_offset: :param symbol_type: PLC variable type (e.g. `pyads.PLCTYPE_DINT`) :param comment: :param auto_update: Create notification to update buffer (same as `set_auto_update(True)`) :param Optional["StructureDef"] structure_def: special tuple defining the structure and types contained within it according to PLCTYPE constants, must match the structure defined in the PLC, PLC structure must be defined with {attribute 'pack_mode' := '1'} :param Optional[int] array_size: size of array if reading array of structure, defaults to 1 Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('SVar1', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ('iVar1', pyads.PLCTYPE_INT, 3), ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) """ self._plc = plc self._handles_list: List[Tuple[int, int]] = [] # Notification handles self._auto_update_handle: Optional[Tuple[int, int]] = None # Check if the required info is present: missing_info = index_group is None or index_offset is None or symbol_type is None if missing_info: if name is None: raise ValueError( "Please specify either `name`, or `index_group`, " "`index_offset` and plc_type" ) self.name = name self.index_offset = index_offset self.index_group = index_group self.symbol_type = symbol_type self.comment = comment self._value: Any = None # structure information self.structure_def = structure_def self.array_size = array_size self._structure_size = 0 if self.structure_def is not None: from .ads import size_of_structure self._structure_size = size_of_structure(self.structure_def * self.array_size) if missing_info: self._create_symbol_from_info() # Perform remote lookup # Now `self.symbol_type` should have a value, find the actual PLCTYPE # from it. # This is relevant for both lookup and full user definition. self.plc_type: Optional[Type[PLCDataType]] = None if self.symbol_type is not None: if isinstance(self.symbol_type, str): # Perform lookup if string self.plc_type = AdsSymbol.get_type_from_str(self.symbol_type) else: # Otherwise `symbol_type` is probably a pyads.PLCTYPE_* constant self.plc_type = self.symbol_type self.auto_update = auto_update def _create_symbol_from_info(self) -> None: """Look up remaining info from the remote The name must already be present. Other values will already have a default value (mostly None). """ info = adsGetSymbolInfo(self._plc._port, self._plc._adr, self.name) self.index_group = info.iGroup self.index_offset = info.iOffs if info.comment: self.comment = info.comment # info.dataType is an integer mapping to a type in # constants.ads_type_to_ctype. # However, this type ignores whether the variable is really an array! # So are not going to be using this and instead rely on the textual # type self.symbol_type = info.symbol_type # Save the type as string def _check_for_open_connection(self) -> None: """Assert the current object is ready to read from/write to. This checks only if the Connection is open. """ if not self._plc.is_open: raise ValueError( "Cannot read or write data with missing or closed Connection" )
[docs] def read(self) -> Any: """Read the current value of this symbol. The new read value is also saved in the buffer. """ self._check_for_open_connection() if self.is_structure: self._value = self._plc.read_structure_by_name(self.name, self.structure_def, structure_size=self._structure_size, array_size=self.array_size) else: self._value = self._plc.read(self.index_group, self.index_offset, self.plc_type) return self._value
[docs] def write(self, new_value: Optional[Any] = None) -> None: """Write a new value or the buffered value to the symbol. When a new value was written, the buffer is updated. :param new_value Value to be written to symbol (if None, the buffered value is send instead) """ self._check_for_open_connection() if new_value is None: new_value = self._value # Send buffered value instead else: self._value = new_value # Update buffer with new value if self.is_structure: self._plc.write_structure_by_name(self.name, new_value, self.structure_def, structure_size=self._structure_size, array_size=self.array_size) else: self._plc.write(self.index_group, self.index_offset, new_value, self.plc_type)
def __repr__(self) -> str: """Debug string""" t = type(self) return "<{}.{} object at {}, name: {}, type: {}>".format( t.__module__, t.__qualname__, hex(id(self)), self.name, self.symbol_type ) def __del__(self) -> None: """Destructor""" self.clear_device_notifications()
[docs] def add_device_notification( self, callback: Callable[[Any, Any], None], attr: Optional[NotificationAttrib] = None, user_handle: Optional[int] = None, ) -> Optional[Tuple[int, int]]: """Add on-change callback to symbol. See Connection.add_device_notification(...). When `attr` is omitted, the default will be used. The notification handles are returned but also stored locally. When this symbol is destructed any notifications will be freed up automatically. """ if attr is None: attr = NotificationAttrib(length=sizeof(self.plc_type)) handles = self._plc.add_device_notification( (self.index_group, self.index_offset), attr, callback, user_handle ) self._handles_list.append(handles) return handles
[docs] def clear_device_notifications(self) -> None: """Remove all registered notifications""" if self._handles_list: for handles in self._handles_list: self._plc.del_device_notification(*handles) self._handles_list = [] # Clear the list self._auto_update_handle = None # If auto-update was enabled,
# it won't work anymore
[docs] def del_device_notification(self, handles: Tuple[int, int]) -> None: """Remove a single device notification by handles""" if handles in self._handles_list: self._plc.del_device_notification(*handles) self._handles_list.remove(handles)
def _value_callback(self, notification: Any, data_name: Any) -> None: """Internal callback used by auto-update""" _handle, _datetime, value = self._plc.parse_notification( notification, self.plc_type ) self._value = value
[docs] @staticmethod def get_type_from_str(type_str: str) -> Optional[Type[PLCDataType]]: """Get PLCTYPE_* from PLC name string If PLC name could not be mapped, return None. This is done on purpose to prevent a program from crashing when an unusable symbol is found. Instead, exceptions will be thrown when this unmapped symbol is read/written. """ # If simple scalar plc_name = "PLCTYPE_" + type_str # if type is WSTRING just return the PLCTYPE constant if plc_name.startswith("PLCTYPE_WSTRING"): return constants.PLCTYPE_WSTRING if hasattr(constants, plc_name): # Map e.g. 'LREAL' to 'PLCTYPE_LREAL' directly based on the name return getattr(constants, plc_name) # If ARRAY reg_match = AdsSymbol._regex_array.match(type_str) if reg_match is not None: groups = reg_match.groups() size = int(groups[1]) + 1 - int(groups[0]) # Estimate the size scalar_type_str = groups[2] # Find scalar type scalar_type = AdsSymbol.get_type_from_str(scalar_type_str) if scalar_type: return scalar_type * size # Fall to method default instead # If array/matrix (an 1D array is also called a matrix) reg_match = AdsSymbol._regex_matrix.match(type_str) if reg_match is not None: groups = reg_match.groups() size = int(groups[0]) scalar_type_str = groups[1] if scalar_type_str in constants.PLC_ARRAY_MAP: return constants.PLC_ARRAY_MAP[scalar_type_str](size) # If list reg_match = AdsSymbol._regex_list.match(type_str) if reg_match is not None: groups = reg_match.groups() scalar_type_str = groups[0] size = int(groups[1]) scalar_type = AdsSymbol.get_type_from_str(scalar_type_str) if scalar_type: return scalar_type * size # We allow unmapped types at this point - Instead we will throw an # error when they are being addressed return None
@property def auto_update(self) -> Any: """Return True if auto_update is enabled for this symbol.""" return self._auto_update_handle is not None @auto_update.setter def auto_update(self, value: bool) -> None: """Enable or disable auto-update of the buffered value. This automatic update is done through a device notification. This can be efficient when a remote variables changes its values less often than your code run. Clearing all device notifications will also disable auto-update. Automatic update is disabled by default. """ if value and self._auto_update_handle is None: self._auto_update_handle = self.add_device_notification( self._value_callback ) elif not value and self._auto_update_handle is not None: self.del_device_notification(self._auto_update_handle) self._auto_update_handle = None @property def value(self) -> Any: """Return the current value of the symbol.""" return self._value @value.setter def value(self, val: Any) -> None: """Set the current value of the symbol. If auto_update is True then the the write command will be called automatically. """ self._value = val # write value to plc if auto_update is enabled if self.auto_update: self.write(val) @property def is_structure(self) -> bool: """Return True if the symbol object represents a structure. This is the case if a structure_def has been passed during initialization. """ return self.structure_def is not None