Source code for

"""Pythonic ADS functions.

:author: Stefan Lehmann <>
:license: MIT, see license file or
:created on: 2018-06-11 18:15:53

from __future__ import annotations
import struct
import itertools
from collections import OrderedDict
from ctypes import (
from typing import Optional, Union, Tuple, Any, Type, Dict, List, Iterator

# noinspection PyUnresolvedReferences
from .constants import (
from .pyads_ex import (
from .structs import (
from .utils import platform_is_linux

# custom types
StructureDef = Tuple[
    Union[Tuple[str, Type, int], Tuple[str, Type, int, Optional[int]]], ...

# global variables
linux: bool = platform_is_linux()
port: Optional[int] = None

def _parse_ams_netid(ams_netid: str) -> SAmsNetId:
    """Parse an AmsNetId from *str* to *SAmsNetId*.

    :param str ams_netid: NetId as a string
    :rtype: SAmsNetId
    :return: NetId as a struct

        id_numbers = list(map(int, ams_netid.split(".")))
    except ValueError:
        raise ValueError("no valid netid")

    if len(id_numbers) != 6:
        raise ValueError("no valid netid")

    # Fill the netId struct with data
    ams_netid_st = SAmsNetId()
    ams_netid_st.b = (c_ubyte * 6)(*id_numbers)
    return ams_netid_st

[docs]def open_port() -> int: """Connect to the TwinCAT message router. :rtype: int :return: port number """ global port port = port or adsPortOpenEx() return port
[docs]def close_port() -> None: """Close the connection to the TwinCAT message router.""" global port if port is not None: adsPortCloseEx(port) port = None
[docs]def get_local_address() -> Optional[AmsAddr]: """Return the local AMS-address and the port number. :rtype: AmsAddr """ if port is not None: return adsGetLocalAddressEx(port) return None
[docs]def set_local_address(ams_netid: Union[str, SAmsNetId]) -> None: """Set the local NetID (**Linux only**). :param str ams_netid: new AmsNetID :rtype: None **Usage:** >>> import pyads >>> pyads.open_port() >>> pyads.set_local_address('') """ if isinstance(ams_netid, str): ams_netid_st = _parse_ams_netid(ams_netid) else: ams_netid_st = ams_netid assert isinstance(ams_netid_st, SAmsNetId) if linux: return adsSetLocalAddress(ams_netid_st) else: raise ADSError( text="SetLocalAddress is not supported for Windows clients." ) # pragma: no cover
[docs]def add_route(adr: Optional[Union[str, AmsAddr]], ip_address: str) -> None: """Establish a new route in the AMS Router (linux Only). :param adr: AMS Address of routing endpoint as str or AmsAddr object. If None is provided, the net id of the PLC will be discovered. :param str ip_address: ip address of the routing endpoint """ if adr is None: adr = adsGetNetIdForPLC(ip_address) if isinstance(adr, str): adr = AmsAddr(adr) return adsAddRoute(adr.netIdStruct(), ip_address)
[docs]def add_route_to_plc( sending_net_id: str, adding_host_name: str, ip_address: str, username: str, password: str, route_name: str = None, added_net_id: str = None, ) -> bool: """Embed a new route in the PLC. :param str sending_net_id: sending net id :param str adding_host_name: host name (or IP) of the PC being added :param str ip_address: ip address of the PLC :param str username: username for PLC :param str password: password for PLC :param str route_name: PLC side name for route, defaults to adding_host_name or the current hostname of this PC :param pyads.structs.SAmsNetId added_net_id: net id that is being added to the PLC, defaults to sending_net_id :rtype: bool :return: True if route was added """ return adsAddRouteToPLC( sending_net_id, adding_host_name, ip_address, username, password, route_name=route_name, added_net_id=added_net_id, )
[docs]def delete_route(adr: AmsAddr) -> None: """Remove existing route from the AMS Router (Linux Only). :param pyads.structs.AmsAddr adr: AMS Address associated with the routing entry which is to be removed from the router. """ return adsDelRoute(adr.netIdStruct())
[docs]def set_timeout(ms: int) -> None: """Set timeout.""" if port is not None: return adsSyncSetTimeoutEx(port, ms)
[docs]def size_of_structure(structure_def: StructureDef) -> int: """Calculate the size of a structure in number of BYTEs. :param tuple structure_def: special tuple defining the structure and types contained within it according o PLCTYPE constants :return: data size required to read/write a structure of multiple types :rtype: int Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('sVar1', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ('iVar1', pyads.PLCTYPE_INT, 3), ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) If array of structure multiply structure_def input by array size. """ num_of_bytes = 0 for item in structure_def: try: var, plc_datatype, size = item # type: ignore str_len = None except ValueError: var, plc_datatype, size, str_len = item # type: ignore if plc_datatype == PLCTYPE_STRING: if str_len is not None: num_of_bytes += (str_len + 1) * size else: num_of_bytes += (PLC_DEFAULT_STRING_SIZE + 1) * size elif plc_datatype not in DATATYPE_MAP: raise RuntimeError("Datatype not found") else: num_of_bytes += sizeof(plc_datatype) * size return num_of_bytes
[docs]def dict_from_bytes( byte_list: bytearray, structure_def: StructureDef, array_size: int = 1 ) -> Union[Dict[str, Any], List[Dict[str, Any]]]: """Return an ordered dict of PLC values from a list of BYTE values read from PLC. :param bytearray byte_list: list of byte values for an entire structure :param tuple structure_def: special tuple defining the structure and types contained within it according o PLCTYPE constants :param Optional[int] array_size: size of array if reading array of structure, defaults to 1 :return: ordered dictionary of values for each variable type in order of structure Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('sVar1', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ('iVar1', pyads.PLCTYPE_INT, 3), ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) """ values_list: List[Dict[str, Any]] = [] index = 0 for structure in range(0, array_size): values: Dict[str, Any] = OrderedDict() for item in structure_def: try: var, plc_datatype, size = item # type: ignore str_len = None except ValueError: var, plc_datatype, size, str_len = item # type: ignore var_array = [] for i in range(size): if plc_datatype == PLCTYPE_STRING: if str_len is None: str_len = PLC_DEFAULT_STRING_SIZE var_array.append( bytearray(byte_list[index: (index + (str_len + 1))]) .partition(b"\0")[0] .decode("utf-8") ) index += str_len + 1 elif plc_datatype not in DATATYPE_MAP: raise RuntimeError("Datatype not found. Check structure definition") else: n_bytes = sizeof(plc_datatype) var_array.append( struct.unpack( DATATYPE_MAP[plc_datatype], bytearray(byte_list[index: (index + n_bytes)]), )[0] ) index += n_bytes if size == 1: # if not an array, don't want a list in the dict return values[var] = var_array[0] else: values[var] = var_array values_list.append(values) if array_size != 1: return values_list else: return values_list[0]
[docs]def bytes_from_dict( values: Union[Dict[str, Any], List[Dict[str, Any]]], structure_def: StructureDef, ) -> List[int]: """Returns a byte array of values which can be written to the PLC from an ordered dict. :param values: ordered dictionary of values for each variable type in order of structure_def :param tuple structure_def: special tuple defining the structure and types contained within it according o PLCTYPE constants :param Optional[int] array_size: size of array if writing array of structure, defaults to 1 :return: list of byte values for an entire structure :rtype: List[int] Expected input example for structure_def: .. code:: python structure_def = ( ('rVar', pyads.PLCTYPE_LREAL, 1), ('sVar', pyads.PLCTYPE_STRING, 2, 35), ('sVar2', pyads.PLCTYPE_STRING, 1), ('rVar1', pyads.PLCTYPE_REAL, 1), ('iVar', pyads.PLCTYPE_DINT, 1), ('iVar1', pyads.PLCTYPE_INT, 3) ) # i.e ('Variable Name', variable type, arr size (1 if not array), # length of string (if defined in PLC)) """ byte_list = [] if not isinstance(values, list): values = [values] for cur_dict in values: for item in structure_def: try: var, plc_datatype, size = item # type: ignore str_len = None except ValueError: var, plc_datatype, size, str_len = item # type: ignore var = cur_dict[var] for i in range(0, size): if plc_datatype == PLCTYPE_STRING: if str_len is None: str_len = PLC_DEFAULT_STRING_SIZE if size > 1: byte_list += list(var[i].encode("utf-8")) remaining_bytes = str_len + 1 - len(var[i]) else: byte_list += list(var.encode("utf-8")) remaining_bytes = str_len + 1 - len(var) for byte in range(remaining_bytes): byte_list.append(0) elif plc_datatype not in DATATYPE_MAP: raise RuntimeError("Datatype not found. Check structure definition") else: if size > 1: byte_list += list( struct.pack(DATATYPE_MAP[plc_datatype], var[i]) ) else: byte_list += list(struct.pack(DATATYPE_MAP[plc_datatype], var)) return byte_list
def _dict_slice_generator(dict_: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]: """Generator for slicing a dictionary into parts of size long.""" it = iter(dict_) for _ in range(0, len(dict_), size): yield {i: dict_[i] for i in itertools.islice(it, size)} def _list_slice_generator(list_: List[Any], size: int) -> Iterator[List[Any]]: """Generator for slicing a list into parts of size long.""" it = iter(list_) for _ in range(0, len(list_), size): yield [i for i in itertools.islice(it, size)]