Source code for pyads.ads

"""Pythonic ADS functions.

:author: Stefan Lehmann <stlm@posteo.de>
:license: MIT, see license file or https://opensource.org/licenses/MIT
:created on: 2018-06-11 18:15:53

"""
from __future__ import annotations
import struct
import itertools
from collections import OrderedDict
from ctypes import (
    c_ubyte,
    sizeof,
)
from typing import Optional, Union, Tuple, Any, Type, Dict, List, Iterator

# noinspection PyUnresolvedReferences
from .constants import (
    ADSIGRP_SYM_UPLOAD,
    ADSIGRP_SYM_UPLOADINFO2,
    ADSIOFFS_DEVDATA_ADSSTATE,
    PLCTYPE_BOOL,
    PLCTYPE_BYTE,
    PLCTYPE_DATE,
    PLCTYPE_DINT,
    PLCTYPE_DT,
    PLCTYPE_DWORD,
    PLCTYPE_INT,
    PLCTYPE_LREAL,
    PLCTYPE_REAL,
    PLCTYPE_SINT,
    PLCTYPE_STRING,
    PLCTYPE_WSTRING,
    PLCTYPE_TIME,
    PLCTYPE_TOD,
    PLCTYPE_UDINT,
    PLCTYPE_UINT,
    PLCTYPE_USINT,
    PLCTYPE_WORD,
    PLC_DEFAULT_STRING_SIZE,
    DATATYPE_MAP,
    ADSIGRP_SUMUP_READ,
    ADSIGRP_SUMUP_WRITE,
    MAX_ADS_SUB_COMMANDS,
    ads_type_to_ctype,
    PLCSimpleDataType,
    PLCDataType,
)
from .pyads_ex import (
    adsAddRoute,
    adsAddRouteToPLC,
    adsDelRoute,
    adsPortOpenEx,
    adsPortCloseEx,
    adsGetLocalAddressEx,
    adsGetNetIdForPLC,
    adsSyncSetTimeoutEx,
    adsSetLocalAddress,
    ADSError,
)
from .structs import (
    AmsAddr,
    SAmsNetId,
)
from .utils import platform_is_linux, find_wstring_null_terminator

# custom types
StructureDef = Tuple[
    Union[Tuple[str, Type, int], Tuple[str, Type, int, Optional[int]]], ...
]

# global variables
linux: bool = platform_is_linux()
port: Optional[int] = None


def _parse_ams_netid(ams_netid: str) -> SAmsNetId:
    """Parse an AmsNetId from *str* to *SAmsNetId*.

    :param str ams_netid: NetId as a string
    :rtype: SAmsNetId
    :return: NetId as a struct

    """
    try:
        id_numbers = list(map(int, ams_netid.split(".")))
    except ValueError:
        raise ValueError("no valid netid")

    if len(id_numbers) != 6:
        raise ValueError("no valid netid")

    # Fill the netId struct with data
    ams_netid_st = SAmsNetId()
    ams_netid_st.b = (c_ubyte * 6)(*id_numbers)
    return ams_netid_st


[docs]def open_port() -> int: """Connect to the TwinCAT message router. :rtype: int :return: port number """ global port port = port or adsPortOpenEx() return port
[docs]def close_port() -> None: """Close the connection to the TwinCAT message router.""" global port if port is not None: adsPortCloseEx(port) port = None
[docs]def get_local_address() -> Optional[AmsAddr]: """Return the local AMS-address and the port number. :rtype: AmsAddr """ if port is not None: return adsGetLocalAddressEx(port) return None
[docs]def set_local_address(ams_netid: Union[str, SAmsNetId]) -> None: """Set the local NetID (**Linux only**). :param str ams_netid: new AmsNetID :rtype: None **Usage:** >>> import pyads >>> pyads.open_port() >>> pyads.set_local_address('0.0.0.0.1.1') """ if isinstance(ams_netid, str): ams_netid_st = _parse_ams_netid(ams_netid) else: ams_netid_st = ams_netid assert isinstance(ams_netid_st, SAmsNetId) if linux: return adsSetLocalAddress(ams_netid_st) else: raise ADSError( text="SetLocalAddress is not supported for Windows clients." ) # pragma: no cover
[docs]def add_route(adr: Optional[Union[str, AmsAddr]], ip_address: str) -> None: """Establish a new route in the AMS Router (linux Only). :param adr: AMS Address of routing endpoint as str or AmsAddr object. If None is provided, the net id of the PLC will be discovered. :param str ip_address: ip address of the routing endpoint """ if adr is None: adr = adsGetNetIdForPLC(ip_address) if isinstance(adr, str): adr = AmsAddr(adr) return adsAddRoute(adr.netIdStruct(), ip_address)
[docs]def add_route_to_plc( sending_net_id: str, adding_host_name: str, ip_address: str, username: str, password: str, route_name: str = None, added_net_id: str = None, ) -> bool: """Embed a new route in the PLC. :param str sending_net_id: sending net id :param str adding_host_name: host name (or IP) of the PC being added :param str ip_address: ip address of the PLC :param str username: username for PLC :param str password: password for PLC :param str route_name: PLC side name for route, defaults to adding_host_name or the current hostname of this PC :param pyads.structs.SAmsNetId added_net_id: net id that is being added to the PLC, defaults to sending_net_id :rtype: bool :return: True if route was added """ return adsAddRouteToPLC( sending_net_id, adding_host_name, ip_address, username, password, route_name=route_name, added_net_id=added_net_id, )
[docs]def delete_route(adr: AmsAddr) -> None: """Remove existing route from the AMS Router (Linux Only). :param pyads.structs.AmsAddr adr: AMS Address associated with the routing entry which is to be removed from the router. """ return adsDelRoute(adr.netIdStruct())
[docs]def set_timeout(ms: int) -> None: """Set timeout.""" if port is not None: return adsSyncSetTimeoutEx(port, ms)
[docs]def size_of_structure(structure_def: StructureDef) -> int: """Calculate the size of a structure in number of BYTEs. :param tuple structure_def: special tuple defining the structure and types contained within it according o PLCTYPE constants :return: data size required to read/write a structure of multiple types :rtype: int Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('sVar1', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ('iVar1', pyads.PLCTYPE_INT, 3), ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) If array of structure multiply structure_def input by array size. """ num_of_bytes = 0 for item in structure_def: try: var, plc_datatype, size = item # type: ignore str_len = None except ValueError: var, plc_datatype, size, str_len = item # type: ignore if plc_datatype == PLCTYPE_STRING: if str_len is not None: num_of_bytes += (str_len + 1) * size # STRING uses 1 byte per character + null-terminator else: num_of_bytes += (PLC_DEFAULT_STRING_SIZE + 1) * size elif plc_datatype == PLCTYPE_WSTRING: if str_len is not None: num_of_bytes += 2 * (str_len + 1) * size # WSTRING uses 2 bytes per character + null-terminator else: num_of_bytes += (PLC_DEFAULT_STRING_SIZE + 1) * 2 * size elif plc_datatype not in DATATYPE_MAP: raise RuntimeError("Datatype not found") else: num_of_bytes += sizeof(plc_datatype) * size return num_of_bytes
[docs]def dict_from_bytes( byte_list: bytearray, structure_def: StructureDef, array_size: int = 1 ) -> Union[Dict[str, Any], List[Dict[str, Any]]]: """Return an ordered dict of PLC values from a list of BYTE values read from PLC. :param bytearray byte_list: list of byte values for an entire structure :param tuple structure_def: special tuple defining the structure and types contained within it according o PLCTYPE constants :param Optional[int] array_size: size of array if reading array of structure, defaults to 1 :return: ordered dictionary of values for each variable type in order of structure Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('sVar1', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ('iVar1', pyads.PLCTYPE_INT, 3), ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) """ values_list: List[Dict[str, Any]] = [] index = 0 for structure in range(0, array_size): values: Dict[str, Any] = OrderedDict() for item in structure_def: try: var, plc_datatype, size = item # type: ignore str_len = None except ValueError: # str_len is the numbers of characters without null-terminator var, plc_datatype, size, str_len = item # type: ignore var_array = [] for i in range(size): if plc_datatype == PLCTYPE_STRING: if str_len is None: str_len = PLC_DEFAULT_STRING_SIZE var_array.append( bytearray(byte_list[index: (index + (str_len + 1))]) .partition(b"\0")[0] .decode("utf-8") ) index += str_len + 1 elif plc_datatype == PLCTYPE_WSTRING: if str_len is None: # if no str_len is given use default size str_len = PLC_DEFAULT_STRING_SIZE n_bytes = 2 * (str_len + 1) # WSTRING uses 2 bytes per character + null-terminator a = bytearray(byte_list[index: (index + n_bytes)]) null_idx = find_wstring_null_terminator(a) var_array.append(a[:null_idx].decode("utf-16-le")) index += n_bytes elif plc_datatype not in DATATYPE_MAP: raise RuntimeError("Datatype not found. Check structure definition") else: n_bytes = sizeof(plc_datatype) var_array.append( struct.unpack( DATATYPE_MAP[plc_datatype], bytearray(byte_list[index: (index + n_bytes)]), )[0] ) index += n_bytes if size == 1: # if not an array, don't want a list in the dict return values[var] = var_array[0] else: values[var] = var_array values_list.append(values) if array_size != 1: return values_list else: return values_list[0]
[docs]def bytes_from_dict( values: Union[Dict[str, Any], List[Dict[str, Any]]], structure_def: StructureDef, ) -> List[int]: """Returns a byte array of values which can be written to the PLC from an ordered dict. :param values: ordered dictionary of values for each variable type in order of structure_def :param tuple structure_def: special tuple defining the structure and types contained within it according o PLCTYPE constants :param Optional[int] array_size: size of array if writing array of structure, defaults to 1 :return: list of byte values for an entire structure :rtype: List[int] Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('sVar2', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ('iVar1', pyads.PLCTYPE_INT, 3) ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) """ byte_list = [] if not isinstance(values, list): values = [values] for cur_dict in values: for item in structure_def: try: var, plc_datatype, size = item # type: ignore str_len = None except ValueError: var, plc_datatype, size, str_len = item # type: ignore var = cur_dict[var] for i in range(0, size): if plc_datatype == PLCTYPE_STRING: if str_len is None: str_len = PLC_DEFAULT_STRING_SIZE if size > 1: byte_list += list(var[i].encode("utf-8")) remaining_bytes = str_len + 1 - len(var[i]) # 1 byte a character plus null-terminator else: byte_list += list(var.encode("utf-8")) remaining_bytes = str_len + 1 - len(var) # 1 byte a character plus null-terminator byte_list.extend(remaining_bytes * [0]) elif plc_datatype == PLCTYPE_WSTRING: if str_len is None: str_len = PLC_DEFAULT_STRING_SIZE if size > 1: encoded = list(var[i].encode("utf-16-le")) byte_list += encoded remaining_bytes = 2 * (str_len + 1) - len(encoded) # 2 bytes a character plus null-terminator else: encoded = list(var.encode("utf-16-le")) byte_list += encoded remaining_bytes = 2 * (str_len + 1) - len(encoded) # 2 bytes a character plus null-terminator byte_list.extend(remaining_bytes * [0]) elif plc_datatype not in DATATYPE_MAP: raise RuntimeError("Datatype not found. Check structure definition") else: if size > 1: byte_list += list( struct.pack(DATATYPE_MAP[plc_datatype], var[i]) ) else: byte_list += list(struct.pack(DATATYPE_MAP[plc_datatype], var)) return byte_list
def _dict_slice_generator(dict_: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]: """Generator for slicing a dictionary into parts of size long.""" it = iter(dict_) for _ in range(0, len(dict_), size): yield {i: dict_[i] for i in itertools.islice(it, size)} def _list_slice_generator(list_: List[Any], size: int) -> Iterator[List[Any]]: """Generator for slicing a list into parts of size long.""" it = iter(list_) for _ in range(0, len(list_), size): yield [i for i in itertools.islice(it, size)]