Source code for pyads.pyads_ex

"""Contains cross platform ADS extension functions.

:author: David Browne <davidabrowne@gmail.com>
:license: MIT, see license file or https://opensource.org/licenses/MIT
:created on: 2018-06-11 18:15:53

"""
from typing import Union, Callable, Any, Tuple, Type, Optional, List, Dict
import ctypes
import os
import platform
import socket
import struct
import sys
from contextlib import closing
from functools import wraps

from .utils import platform_is_linux, platform_is_windows, platform_is_freebsd, find_wstring_null_terminator
from .structs import (
    AmsAddr,
    SAmsAddr,
    AdsVersion,
    SAdsVersion,
    SAdsNotificationAttrib,
    SAdsNotificationHeader,
    SAmsNetId,
    SAdsSymbolEntry,
    NotificationAttrib,
    SAdsSumRequest,
)
from .constants import (
    PLCTYPE_STRING,
    PLCTYPE_WSTRING,
    STRING_BUFFER,
    ADSIGRP_SYM_HNDBYNAME,
    PLCTYPE_UDINT,
    ADST_STRING,
    ADST_WSTRING,
    ADSIGRP_SYM_INFOBYNAMEEX,
    ADSIGRP_SYM_VALBYHND,
    ADSIGRP_SYM_RELEASEHND,
    PORT_SYSTEMSERVICE,
    PORT_REMOTE_UDP,
    ADSIGRP_SUMUP_READ,
    ADSIGRP_SUMUP_WRITE,
    DATATYPE_MAP,
    ads_type_to_ctype,
)
from .errorcodes import ERROR_CODES


NOTEFUNC: Optional[Callable] = None

# _adslib can be WinDLL or CDLL depending on OS
_adsDLL: Union["ctypes.WinDLL", "ctypes.CDLL"]

# load dynamic ADS library
if platform_is_windows():  # pragma: no cover, skip Windows test
    dlldir_handle = None
    if sys.version_info >= (3, 8) and "TWINCAT3DIR" in os.environ:
        # Starting with version 3.8, CPython does not consider the PATH environment
        # variable any more when resolving DLL paths. The following works with the default
        # installation of the Beckhoff TwinCAT ADS DLL.
        dll_path = os.environ["TWINCAT3DIR"] + "\\..\\AdsApi\\TcAdsDll"
        if platform.architecture()[0] == "64bit":
            dll_path += "\\x64"
        dlldir_handle = os.add_dll_directory(dll_path)
    try:
        _adsDLL = ctypes.WinDLL("TcAdsDll.dll")  # type: ignore
    finally:
        if dlldir_handle:
            # Do not clobber the load path for other modules
            dlldir_handle.close()
    NOTEFUNC = ctypes.WINFUNCTYPE(  # type: ignore
        ctypes.c_void_p,
        ctypes.POINTER(SAmsAddr),
        ctypes.POINTER(SAdsNotificationHeader),
        ctypes.c_ulong,
    )

elif platform_is_linux():
    # try to load local adslib.so in favor to global one
    local_adslib = os.path.join(os.path.dirname(__file__), "adslib.so")
    if os.path.isfile(local_adslib):
        adslib = local_adslib
    else:
        adslib = "adslib.so"

    _adsDLL = ctypes.CDLL(adslib)

    NOTEFUNC = ctypes.CFUNCTYPE(
        None,
        ctypes.POINTER(SAmsAddr),
        ctypes.POINTER(SAdsNotificationHeader),
        ctypes.c_ulong,
    )
    
elif platform_is_freebsd():
    # try to load local libTcAdsDll.so in favor to global one
    local_adslib = os.path.join(os.path.dirname(__file__), "libTcAdsDll.so")
    if os.path.isfile(local_adslib):
        adslib = local_adslib
    else:
        adslib = "libTcAdsDll.so"

    _adsDLL = ctypes.CDLL(adslib)

    NOTEFUNC = ctypes.CFUNCTYPE(
        None,
        ctypes.POINTER(SAmsAddr),
        ctypes.POINTER(SAdsNotificationHeader),
        ctypes.c_ulong,
    )
    
else:  # pragma: no cover, can not test unsupported platform
    raise RuntimeError("Unsupported platform {0}.".format(sys.platform))

callback_store: Dict[Tuple[AmsAddr, int], Callable[[SAmsAddr, SAdsNotificationHeader, int], None]] = dict()


[docs]class ADSError(Exception): """Error class for errors related to ADS communication.""" def __init__( self, err_code: Optional[int] = None, text: Optional[str] = None ) -> None: if err_code is not None: self.err_code = err_code try: self.msg = "{} ({}). ".format(ERROR_CODES[self.err_code], self.err_code) except KeyError: self.msg = "Unknown Error ({0}). ".format(self.err_code) else: self.msg = "" if text is not None: self.msg += text def __str__(self): # type: () -> str """Return text representation of the object.""" return "ADSError: " + self.msg
[docs]def router_function(fn: Callable) -> Callable: """Raise a runtime error if on Win32 systems. Decorator. Decorator for functions that interact with the router for the Linux implementation of the ADS library. Unlike the Windows implementation which uses a separate router daemon, the Linux library manages AMS routing in-process. As such, routing must be configured programmatically via. the provided API. These endpoints are invalid on Win32 systems, so an exception will be raised. """ @wraps(fn) def wrapper(*args: Any, **kwargs: Any) -> Callable: if platform_is_windows(): # pragma: no cover, skip Windows test raise RuntimeError( "Router interface is not available on Win32 systems.\n" "Configure AMS routes using the TwinCAT router service." ) return fn(*args, **kwargs) return wrapper
[docs]@router_function def adsAddRoute(net_id: SAmsNetId, ip_address: str) -> None: """Establish a new route in the AMS Router. :param pyads.structs.SAmsNetId net_id: net id of routing endpoint :param str ip_address: ip address of the routing endpoint """ add_route = _adsDLL.AdsAddRoute add_route.restype = ctypes.c_long # Convert ip address to bytes (PY3) and get pointer. ip_address_p = ctypes.c_char_p(ip_address.encode("utf-8")) error_code = add_route(net_id, ip_address_p) if error_code: raise ADSError(error_code)
[docs]def send_raw_udp_message( ip_address: str, message: bytes, expected_return_length: int ) -> Tuple[bytes, Tuple[str, int]]: """Send a raw UDP message to the PLC and return the response. :param str ip_address: ip address of the PLC :param bytes message: the message to send to the PLC :param int expected_return_length: number of bytes to expect in response :rtype: Tuple[bytes, Tuple[str, int]] :return: A tuple containing the response and a tuple containing the IP address and port of the sending socket """ with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as sock: # UDP # Listen on any available port for the response from the PLC sock.bind(("", 0)) # Send our data to the PLC sock.sendto(message, (ip_address, PORT_REMOTE_UDP)) # Response should come in in less than .5 seconds, but wait longer to account for slow # communications sock.settimeout(5) # Allow TimeoutError to be raised so user can handle it how they please return sock.recvfrom(expected_return_length)
[docs]def type_is_string(plc_type: Type) -> bool: """Return true if the given class is a string type.""" # If single char if plc_type == PLCTYPE_STRING: return True # If char array if type(plc_type).__name__ == "PyCArrayType": if plc_type._type_ == PLCTYPE_STRING: return True return False
[docs]def type_is_wstring(plc_type: Type) -> bool: """Return True if the given class is a WSTRING type.""" # If single char if plc_type == PLCTYPE_WSTRING: return True return False
[docs]def get_value_from_ctype_data(read_data: Optional[Any], plc_type: Type) -> Any: """Convert ctypes data object to a regular value based on the PLCTYPE_* property. Typical usage is: .. code:: python obj = my_plc_type.from_buffer(my_buffer) value = get_value_from_ctype_data(obj, my_plc_type) :param read_data: ctypes._CData object :param plc_type: pyads.PLCTYPE_* constant (i.e. a ctypes-like type) """ if read_data is None: return None if type_is_string(plc_type): return read_data.value.decode("utf-8") if type_is_wstring(plc_type): for ix in range(1, len(read_data), 2): if (read_data[ix - 1], read_data[ix]) == (0, 0): null_idx = ix - 1 break else: raise ValueError("No null-terminator found in buffer") return bytearray(read_data[:null_idx]).decode("utf-16-le") if type(plc_type).__name__ == "PyCArrayType": return list(read_data) if hasattr(read_data, "value"): return read_data.value return read_data # Just return the object itself, don't throw an error
[docs]@router_function def adsAddRouteToPLC( sending_net_id: str, adding_host_name: str, ip_address: str, username: str, password: str, route_name: Optional[str] = None, added_net_id: Optional[str] = None, ) -> bool: """Embed a new route in the PLC. :param pyads.structs.SAmsNetId sending_net_id: sending net id :param str adding_host_name: host name (or IP) of the PC being added :param str ip_address: ip address of the PLC :param str username: username for PLC :param str password: password for PLC :param str route_name: PLC side name for route, defaults to adding_host_name or the current hostname of this PC :param pyads.structs.SAmsNetId added_net_id: net id that is being added to the PLC, defaults to sending_net_id :rtype: bool :return: True if the provided credentials are correct, False otherwise """ # ALL SENT STRINGS MUST BE NULL TERMINATED adding_host_name += "\0" added_net_id = added_net_id if added_net_id else sending_net_id route_name = route_name + "\0" if route_name else adding_host_name username = username + "\0" password = password + "\0" # The head of the UDP AMS packet containing host routing information data_header = struct.pack( ">12s", b"\x03\x66\x14\x71\x00\x00\x00\x00\x06\x00\x00\x00" ) data_header += struct.pack( ">6B", *map(int, sending_net_id.split(".")) ) # Sending net ID data_header += struct.pack("<H", PORT_SYSTEMSERVICE) # Internal communication port data_header += struct.pack(">2s", b"\x05\x00") # Write command data_header += struct.pack(">4s", b"\x00\x00\x0c\x00") # Block of unknown data_header += struct.pack("<H", len(route_name)) # Length of sender host name data_header += route_name.encode("utf-8") # Sender host name data_header += struct.pack(">2s", b"\x07\x00") # Block of unknown actual_data = struct.pack("<H", 6) # Byte length of AMS ID (always 6) actual_data += struct.pack( ">6B", *map(int, added_net_id.split(".")) ) # Net ID being added to the PLC actual_data += struct.pack( ">2s", b"\x0d\x00" ) # Block of unknown (maybe encryption?) actual_data += struct.pack("<H", len(username)) # Length of the user name field actual_data += username.encode("utf-8") # PLC Username actual_data += struct.pack(">2s", b"\x02\x00") # Block of unknown actual_data += struct.pack("<H", len(password)) # Length of password field actual_data += password.encode("utf-8") # PLC Password actual_data += struct.pack(">2s", b"\x05\x00") # Block of unknown actual_data += struct.pack("<H", len(adding_host_name)) # Length of route name actual_data += adding_host_name.encode( "utf-8" ) # Name of route being added to the PLC data, addr = send_raw_udp_message( ip_address, data_header + actual_data, 32 ) # PLC response is 32 bytes long rcvd_packet_header = data[ 0:12 ] # AMS Packet header, seems to define communication type # If the last byte in the header is 0x80, then this is a response to our request if struct.unpack(">B", rcvd_packet_header[-1:])[0] == 0x80: rcvd_PLC_AMS_ID = struct.unpack(">6B", data[12:18])[0] # AMS ID of PLC # Convert to a String AMS ID rcvd_AMS_port = struct.unpack( "<H", data[18:20] ) # Some sort of AMS port? Little endian rcvd_command_code = struct.unpack( "<2s", data[20:22] ) # Command code (should be read) Little endian rcvd_protocol_block = data[22:] # Unknown block of protocol rcvd_is_password_correct = rcvd_protocol_block[ 4:7 ] # 0x040000 when password was correct, 0x000407 when it was incorrect if rcvd_is_password_correct == b"\x04\x00\x00": return True elif rcvd_is_password_correct == b"\x00\x04\x07": return False # If we fell through the whole way to the bottom, then we got a weird response raise RuntimeError(f"Unexpected response from PLC: {data!r}")
[docs]def adsGetNetIdForPLC(ip_address: str) -> str: """Get AMS Net ID from IP address. :param str ip_address: ip address of the PLC :rtype: str :return: net id of the device at the provided ip address """ # The head of the UDP AMS packet containing host routing information data_header = struct.pack( ">12s", b"\x03\x66\x14\x71\x00\x00\x00\x00\x01\x00\x00\x00" ) data_header += struct.pack( ">6B", *[1, 1, 1, 1, 1, 1] ) # It doesn't matter what NetID you use here, so just send 1.1.1.1.1.1 data_header += struct.pack("<H", PORT_SYSTEMSERVICE) # Internal communication port data_header += struct.pack(">4s", b"\x00\x00\x00\x00") # Block of unknown data, addr = send_raw_udp_message( ip_address, data_header, 395 ) # PLC response is 395 bytes long rcvd_packet_header = data[ 0:12 ] # AMS Packet header, seems to define communication type # If the last byte in the header is 0x80, then this is a response to our request if struct.unpack(">B", rcvd_packet_header[-1:])[0] == 0x80: ams_id_tuple = struct.unpack(">6B", data[12:18]) # AMS ID of PLC return ".".join(map(str, ams_id_tuple)) # If we fell through the whole way to the bottom, then we got a weird response raise RuntimeError(f"Unexpected response from PLC: {data!r}")
[docs]@router_function def adsDelRoute(net_id: SAmsNetId) -> None: """Remove existing route from the AMS Router. :param pyads.structs.SAmsNetId net_id: net id associated with the routing entry which is to be removed from the router. """ delete_route = _adsDLL.AdsDelRoute delete_route(net_id)
[docs]def adsPortOpenEx() -> int: """Connect to the TwinCAT message router. :rtype: int :return: port number """ port_open_ex = _adsDLL.AdsPortOpenEx port_open_ex.restype = ctypes.c_long port = port_open_ex() if port == 0: raise RuntimeError("Failed to open port on AMS router.") return port
[docs]def adsPortCloseEx(port: int) -> None: """Close the connection to the TwinCAT message router.""" port_close_ex = _adsDLL.AdsPortCloseEx port_close_ex.restype = ctypes.c_long error_code = port_close_ex(port) if error_code: raise ADSError(error_code)
[docs]def adsGetLocalAddressEx(port: int) -> AmsAddr: """Return the local AMS-address and the port number. :rtype: pyads.structs.AmsAddr :return: AMS-address """ get_local_address_ex = _adsDLL.AdsGetLocalAddressEx ams_address_struct = SAmsAddr() error_code = get_local_address_ex(port, ctypes.pointer(ams_address_struct)) if error_code: raise ADSError(error_code) local_ams_address = AmsAddr() local_ams_address._ams_addr = ams_address_struct return local_ams_address
[docs]def adsSetLocalAddress(ams_netid: SAmsNetId) -> None: """Change the local NetId. :param pyads.structs.SAmsNetId ams_netid: new AmsNetID :rtype: None """ set_local_address = _adsDLL.AdsSetLocalAddress set_local_address(ams_netid)
[docs]def adsSyncReadStateReqEx(port: int, address: AmsAddr) -> Tuple[int, int]: """Read the current ADS-state and the machine-state. Read the current ADS-state and the machine-state from the ADS-server. :param port: AMS port :param pyads.structs.AmsAddr address: local or remote AmsAddr :rtype: (int, int) :return: ads_state, device_state """ sync_read_state_request = _adsDLL.AdsSyncReadStateReqEx # C pointer to ams address struct ams_address_pointer = ctypes.pointer(address.amsAddrStruct()) # Current ADS status and corresponding pointer ads_state = ctypes.c_uint16() ads_state_pointer = ctypes.pointer(ads_state) # Current device status and corresponding pointer device_state = ctypes.c_uint16() device_state_pointer = ctypes.pointer(device_state) error_code = sync_read_state_request( port, ams_address_pointer, ads_state_pointer, device_state_pointer ) if error_code: raise ADSError(error_code) return ads_state.value, device_state.value
[docs]def adsSyncReadDeviceInfoReqEx(port: int, address: AmsAddr) -> Tuple[str, AdsVersion]: """Read the name and the version number of the ADS-server. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :rtype: string, AdsVersion :return: device name, version """ sync_read_device_info_request = _adsDLL.AdsSyncReadDeviceInfoReqEx # Get pointer to the target AMS address ams_address_pointer = ctypes.pointer(address.amsAddrStruct()) # Create buffer to be filled with device name, get pointer to said buffer device_name_buffer = ctypes.create_string_buffer(20) device_name_pointer = ctypes.pointer(device_name_buffer) # Create ADS Version struct and get pointer. ads_version = SAdsVersion() ads_version_pointer = ctypes.pointer(ads_version) error_code = sync_read_device_info_request( port, ams_address_pointer, device_name_pointer, ads_version_pointer ) if error_code: raise ADSError(error_code) return device_name_buffer.value.decode(), AdsVersion(ads_version)
[docs]def adsSyncWriteControlReqEx( port: int, address: AmsAddr, ads_state: int, device_state: int, data: Any, plc_data_type: Type, ) -> None: """Change the ADS state and the machine-state of the ADS-server. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param int ads_state: new ADS-state, according to ADSTATE constants :param int device_state: new machine-state :param data: additional data :param int plc_data_type: plc datatype, according to PLCTYPE constants """ sync_write_control_request = _adsDLL.AdsSyncWriteControlReqEx ams_address_pointer = ctypes.pointer(address.amsAddrStruct()) ads_state_c = ctypes.c_ulong(ads_state) device_state_c = ctypes.c_ulong(device_state) if type_is_string(plc_data_type): data = ctypes.c_char_p(data.encode("utf-8")) data_pointer = data data_length = len(data_pointer.value) + 1 # add 1 byte for null terminator else: data = plc_data_type(data) data_pointer = ctypes.pointer(data) data_length = ctypes.sizeof(data) error_code = sync_write_control_request( port, ams_address_pointer, ads_state_c, device_state_c, data_length, data_pointer, ) if error_code: raise ADSError(error_code)
[docs]def adsSyncWriteReqEx( port: int, address: AmsAddr, index_group: int, index_offset: int, value: Any, plc_data_type: Type, ) -> None: """Send data synchronous to an ADS-device. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param int index_group: PLC storage area, according to the INDEXGROUP constants :param int index_offset: PLC storage address :param value: value to write to the storage address of the PLC :param int plc_data_type: type of the data given to the PLC, according to PLCTYPE constants """ sync_write_request = _adsDLL.AdsSyncWriteReqEx ams_address_pointer = ctypes.pointer(address.amsAddrStruct()) index_group_c = ctypes.c_ulong(index_group) index_offset_c = ctypes.c_ulong(index_offset) if type_is_string(plc_data_type): data = ctypes.c_char_p(value.encode("utf-8")) data_pointer = data # type: Union[ctypes.c_char_p, ctypes.c_wchar_p, ctypes.pointer] data_length = len(data_pointer.value) + 1 # type: ignore elif type_is_wstring(plc_data_type): value_bytes = [byte for byte in value.encode("utf-16-le")] data_length = len(value_bytes) # type: ignore data = (data_length * ctypes.c_uint8)(*value_bytes) data_pointer = ctypes.pointer(data) else: if type(plc_data_type).__name__ == "PyCArrayType": data = plc_data_type(*value) elif type(value) is plc_data_type: data = value else: data = plc_data_type(value) data_pointer = ctypes.pointer(data) data_length = ctypes.sizeof(data) error_code = sync_write_request( port, ams_address_pointer, index_group_c, index_offset_c, data_length, data_pointer, ) if error_code: raise ADSError(error_code)
[docs]def adsSyncReadWriteReqEx2( port: int, address: AmsAddr, index_group: int, index_offset: int, read_data_type: Optional[Type], value: Any, write_data_type: Optional[Type], return_ctypes: bool = False, check_length: bool = True, ) -> Any: """Read and write data synchronous from/to an ADS-device. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param int index_group: PLC storage area, according to the INDEXGROUP constants :param int index_offset: PLC storage address :param Type read_data_type: type of the data given to the PLC to respond to, according to PLCTYPE constants, or None to not read anything :param value: value to write to the storage address of the PLC :param Type write_data_type: type of the data given to the PLC, according to PLCTYPE constants, or None to not write anything :param bool return_ctypes: return ctypes instead of python types if True (default: False) :param bool check_length: check whether the amount of bytes read matches the size of the read data type (default: True) :rtype: read_data_type :return: value: value read from PLC """ sync_read_write_request = _adsDLL.AdsSyncReadWriteReqEx2 ams_address_pointer = ctypes.pointer(address.amsAddrStruct()) index_group_c = ctypes.c_ulong(index_group) index_offset_c = ctypes.c_ulong(index_offset) read_data: Optional[Any] read_data_pointer: Optional[ctypes.pointer] response_size: int = 0 if index_group == ADSIGRP_SUMUP_READ: response_size = 4 * len(value) for _ in value: response_size += _.size read_data_buf = bytearray(response_size) read_data = (ctypes.c_ubyte * len(read_data_buf)).from_buffer(read_data_buf) read_data_pointer = ctypes.pointer(read_data) read_length = response_size elif index_group == ADSIGRP_SUMUP_WRITE: response_size = ( index_offset * 4 ) # expect 4 bytes back for every value written (error data) read_data_buf = bytearray(response_size) read_data = (ctypes.c_ubyte * len(read_data_buf)).from_buffer(read_data_buf) read_data_pointer = ctypes.pointer(read_data) read_length = response_size elif read_data_type is None: read_data = None read_data_pointer = None read_length = 0 else: if type_is_string(read_data_type): read_data = (STRING_BUFFER * PLCTYPE_STRING)() elif type_is_wstring(read_data_type): read_data = (STRING_BUFFER * ctypes.c_uint8)() else: read_data = read_data_type() read_data_pointer = ctypes.pointer(read_data) read_length = ctypes.sizeof(read_data) bytes_read = ctypes.c_ulong() bytes_read_pointer = ctypes.pointer(bytes_read) write_data_pointer: Optional[Union[ctypes.c_char_p, ctypes.c_wchar_p, ctypes.pointer]] if index_group == ADSIGRP_SUMUP_READ: write_data_pointer = ctypes.pointer(value) write_length = ctypes.sizeof(value) elif index_group == ADSIGRP_SUMUP_WRITE: write_data = (ctypes.c_byte * len(value)).from_buffer(value) write_data_pointer = ctypes.pointer(write_data) write_length = ctypes.sizeof(write_data) elif write_data_type is None: write_data_pointer = None write_length = 0 elif type_is_string(write_data_type): # Get pointer to string write_data_pointer = ctypes.c_char_p(value.encode("utf-8")) # Add an extra byte to the data length for the null terminator write_length = len(value) + 1 elif type_is_wstring(write_data_type): value_bytes = [byte for byte in value.encode("utf-16-le")] write_length = len(value_bytes) # type: ignore write_data = (write_length * ctypes.c_uint8)(*value_bytes) write_data_pointer = ctypes.pointer(write_data) else: if type(write_data_type).__name__ == "PyCArrayType": write_data = write_data_type(*value) elif type(value) is write_data_type: write_data = value else: write_data = write_data_type(value) write_data_pointer = ctypes.pointer(write_data) write_length = ctypes.sizeof(write_data) err_code = sync_read_write_request( port, ams_address_pointer, index_group_c, index_offset_c, ctypes.c_ulong(read_length), read_data_pointer, ctypes.c_ulong(write_length), write_data_pointer, bytes_read_pointer, ) if err_code: raise ADSError(err_code) if index_group == ADSIGRP_SUMUP_READ or index_group == ADSIGRP_SUMUP_WRITE: expected_length = response_size else: expected_length = ( read_data.entryLength if isinstance(read_data, SAdsSymbolEntry) else read_length ) # If we're reading a value of predetermined size (anything but a string or wstring), # validate that the correct number of bytes were read if ( check_length and not (type_is_string(read_data_type) or type_is_wstring(read_data_type)) and bytes_read.value != expected_length ): raise RuntimeError( "Insufficient data (expected {0} bytes, {1} were read).".format( expected_length, bytes_read.value ) ) if return_ctypes: return read_data return get_value_from_ctype_data(read_data, read_data_type)
[docs]def adsSyncReadReqEx2( port: int, address: AmsAddr, index_group: int, index_offset: int, data_type: Type, return_ctypes: bool = False, check_length: bool = True, ) -> Any: """Read data synchronous from an ADS-device. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param int index_group: PLC storage area, according to the INDEXGROUP constants :param int index_offset: PLC storage address :param Type data_type: type of the data given to the PLC, according to PLCTYPE constants :param bool return_ctypes: return ctypes instead of python types if True (default: False) :param bool check_length: check whether the amount of bytes read matches the size of the read data type (default: True) :rtype: data_type :return: value: **value** """ sync_read_request = _adsDLL.AdsSyncReadReqEx2 ams_address_pointer = ctypes.pointer(address.amsAddrStruct()) index_group_c = ctypes.c_ulong(index_group) index_offset_c = ctypes.c_ulong(index_offset) if type_is_string(data_type): data = (STRING_BUFFER * PLCTYPE_STRING)() elif type_is_wstring(data_type): data = (STRING_BUFFER * ctypes.c_uint8)() else: data = data_type() data_pointer = ctypes.pointer(data) data_length = ctypes.c_ulong(ctypes.sizeof(data)) bytes_read = ctypes.c_ulong() bytes_read_pointer = ctypes.pointer(bytes_read) error_code = sync_read_request( port, ams_address_pointer, index_group_c, index_offset_c, data_length, data_pointer, bytes_read_pointer, ) if error_code: raise ADSError(error_code) # If we're reading a value of predetermined size (anything but a string or wstring), # validate that the correct number of bytes were read if ( check_length and not(type_is_string(data_type) or type_is_wstring(data_type)) and bytes_read.value != data_length.value ): raise RuntimeError( "Insufficient data (expected {0} bytes, {1} were read).".format( data_length.value, bytes_read.value ) ) if return_ctypes: return data return get_value_from_ctype_data(data, data_type)
[docs]def adsGetHandle(port: int, address: AmsAddr, data_name: str) -> int: """Get the handle of the PLC-variable. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param string data_name: data name :rtype: int :return: handle: PLC-variable handle """ handle = adsSyncReadWriteReqEx2( port, address, ADSIGRP_SYM_HNDBYNAME, 0x0, PLCTYPE_UDINT, data_name, PLCTYPE_STRING, ) return handle
[docs]def adsGetSymbolInfo(port: int, address: AmsAddr, data_name: str) -> SAdsSymbolEntry: """Get the symbol information of the PLC-variable. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param string data_name: data name :rtype: SAdsSymbolInfo :return: symbol_info: PLC Symbol info """ symbol_info = adsSyncReadWriteReqEx2( port, address, ADSIGRP_SYM_INFOBYNAMEEX, 0x0, SAdsSymbolEntry, data_name, PLCTYPE_STRING, ) return symbol_info
[docs]def adsSumReadBytes( port: int, address: AmsAddr, data_symbols: List[Tuple[int, int, int]], ) -> Any: """Perform a sum read for multiple variables, returning the bytes This version does not do any processing, and will simply return the concatenation of the bytes of the target symbols. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param data_symbols: list of tuples like: (index_group, index_offset, size) """ num_requests = len(data_symbols) sum_req_array_type = SAdsSumRequest * num_requests sum_req_array = sum_req_array_type() for i, data_symbol in enumerate(data_symbols): idx_group, idx_offset, size = data_symbol sum_req_array[i].iGroup = idx_group sum_req_array[i].iOffset = idx_offset sum_req_array[i].size = size return adsSyncReadWriteReqEx2( port, address, ADSIGRP_SUMUP_READ, num_requests, None, sum_req_array, None, return_ctypes=False, check_length=False, )
[docs]def adsSumRead( port: int, address: AmsAddr, data_names: List[str], data_symbols: Dict[str, SAdsSymbolEntry], structured_data_names: List[str], ) -> Dict[str, Any]: """Perform a sum read to get the value of multiple variables :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param data_names: list of variables names to read :param Dict[str, SAdsSymbolEntry] data_symbols: dictionary of ADS Symbol Info :param structured_data_names: list of structured variable names :return: result: dict of variable names and values :rtype: dict[str, Any] """ result: Dict[str, Optional[Any]] = {i: None for i in data_names} num_requests = len(data_names) symbol_infos = [ (data_symbols[name].iGroup, data_symbols[name].iOffs, data_symbols[name].size) for name in data_names ] # When a read is split, `data_symbols` will be bigger than `data_names` # Therefore we avoid looping over `data_symbols` sum_response = adsSumReadBytes(port, address, symbol_infos) data_start = 4 * num_requests offset = data_start for i, data_name in enumerate(data_names): error = struct.unpack_from("<I", sum_response, offset=i * 4)[0] if error: result[data_name] = ERROR_CODES[error] else: if data_name in structured_data_names: value = sum_response[ offset: offset + data_symbols[data_name].size] elif data_symbols[data_name].dataType == ADST_STRING: # find null-terminator 1 Byte null_idx = sum_response[offset: offset + data_symbols[data_name].size].index(0) value = bytearray(sum_response[offset: offset + null_idx]).decode("utf-8") elif data_symbols[data_name].dataType == ADST_WSTRING: # find null-terminator 2 Bytes a = sum_response[offset: offset + data_symbols[data_name].size] null_idx = find_wstring_null_terminator(a) if null_idx is None: raise ValueError("No null-terminator found in buffer") value = bytearray(sum_response[offset: offset + null_idx]).decode("utf-16-le") else: value = struct.unpack_from( DATATYPE_MAP[ads_type_to_ctype[data_symbols[data_name].dataType]], sum_response, offset=offset, )[0] result[data_name] = value offset += data_symbols[data_name].size return result
[docs]def adsSumWriteBytes( port: int, address: AmsAddr, num_requests: int, buffer: bytes, ) -> List[str]: """Perform a sum write of concatenated bytes to multiple symbols. :return: List of errors """ sum_response = adsSyncReadWriteReqEx2( port, address, ADSIGRP_SUMUP_WRITE, num_requests, None, buffer, None, return_ctypes=False, check_length=False, ) errors = list(struct.iter_unpack("<I", sum_response)) error_descriptions = [ERROR_CODES[i[0]] for i in errors] return error_descriptions
[docs]def adsSumWrite( port: int, address: AmsAddr, data_names_and_values: Dict[str, Any], data_symbols: Dict[str, SAdsSymbolEntry], structured_data_names: List[str], ) -> Dict[str, str]: """Perform a sum write to write the value of multiple ADS variables :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param data_names_and_values: dict of variable names and values to be written :type data_names_and_values: dict[str, Any] :param data_symbols: list of dictionaries of ADS Symbol Info :type data_symbols: dict[str, ADSSymbolInfo] :param structured_data_names: list of structured variable names :return: result: dict of variable names and error codes :rtype: dict[str, ADSError] """ offset = 0 num_requests = len(data_names_and_values) total_request_size = num_requests * 3 * 4 # iGroup, iOffset & size for data_name in data_names_and_values.keys(): total_request_size += data_symbols[data_name].size buf = bytearray(total_request_size) for data_name in data_names_and_values.keys(): struct.pack_into("<I", buf, offset, data_symbols[data_name].iGroup) struct.pack_into("<I", buf, offset + 4, data_symbols[data_name].iOffs) struct.pack_into("<I", buf, offset + 8, data_symbols[data_name].size) offset += 12 for data_name, value in data_names_and_values.items(): if data_name in structured_data_names: buf[offset: offset + data_symbols[data_name].size] = value elif data_symbols[data_name].dataType == ADST_STRING: buf[offset: offset + len(value)] = value.encode("utf-8") elif data_symbols[data_name].dataType == ADST_WSTRING: buf[offset: offset + 2 * len(value)] = value.encode("utf-16-le") else: struct.pack_into( DATATYPE_MAP[ads_type_to_ctype[data_symbols[data_name].dataType]], buf, offset, value, ) offset += data_symbols[data_name].size error_descriptions = adsSumWriteBytes( port, address, num_requests, buf, ) return dict(zip(data_names_and_values.keys(), error_descriptions))
[docs]def adsReleaseHandle(port: int, address: AmsAddr, handle: int) -> None: """Release the handle of the PLC-variable. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param int handle: handle of PLC-variable to be released """ adsSyncWriteReqEx(port, address, ADSIGRP_SYM_RELEASEHND, 0, handle, PLCTYPE_UDINT)
[docs]def adsSyncReadByNameEx( port: int, address: AmsAddr, data_name: str, data_type: Type, return_ctypes: bool = False, handle: int = None, check_length: bool = True, ) -> Any: """Read data synchronous from an ADS-device from data name. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param string data_name: data name :param Type data_type: type of the data given to the PLC, according to PLCTYPE constants :param bool return_ctypes: return ctypes instead of python types if True (default: False) :param int handle: PLC-variable handle (default: None) :param bool check_length: check whether the amount of bytes read matches the size of the read data type (default: True) :rtype: data_type :return: value: **value** """ if handle is None: no_handle = True handle = adsGetHandle(port, address, data_name) else: no_handle = False # Read the value of a PLC-variable, via handle value = adsSyncReadReqEx2( port, address, ADSIGRP_SYM_VALBYHND, handle, data_type, return_ctypes, check_length, ) if no_handle is True: adsReleaseHandle(port, address, handle) return value
[docs]def adsSyncWriteByNameEx( port: int, address: AmsAddr, data_name: str, value: Any, data_type: Type, handle: int = None, ) -> None: """Send data synchronous to an ADS-device from data name. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr address: local or remote AmsAddr :param string data_name: PLC storage name :param value: value to write to the storage address of the PLC :param Type data_type: type of the data given to the PLC, according to PLCTYPE constants :param int handle: PLC-variable handle (default: None) """ if handle is None: no_handle = True handle = adsGetHandle(port, address, data_name) else: no_handle = False # Write the value of a PLC-variable, via handle adsSyncWriteReqEx(port, address, ADSIGRP_SYM_VALBYHND, handle, value, data_type) if no_handle is True: adsReleaseHandle(port, address, handle)
[docs]def adsSyncAddDeviceNotificationReqEx( port: int, adr: AmsAddr, data: Union[str, Tuple[int, int]], pNoteAttrib: NotificationAttrib, callback: Callable, user_handle: Optional[int] = None, ) -> Tuple[int, int]: """Add a device notification. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr adr: local or remote AmsAddr :param Union[str, Tuple[int, int]] data: PLC storage address by name or index group and offset :param pyads.structs.NotificationAttrib pNoteAttrib: notification attributes :param callback: Callback function to handle notification :param user_handle: User Handle :rtype: (int, int) :returns: notification handle, user handle """ global callback_store if NOTEFUNC is None: raise TypeError("Callback function type can't be None") adsSyncAddDeviceNotificationReqFct = _adsDLL.AdsSyncAddDeviceNotificationReqEx pAmsAddr = ctypes.pointer(adr.amsAddrStruct()) if isinstance(data, str): hnl = adsSyncReadWriteReqEx2( port, adr, ADSIGRP_SYM_HNDBYNAME, 0x0, PLCTYPE_UDINT, data, PLCTYPE_STRING ) nIndexGroup = ctypes.c_ulong(ADSIGRP_SYM_VALBYHND) nIndexOffset = ctypes.c_ulong(hnl) elif isinstance(data, tuple): nIndexGroup = ctypes.c_ulong(data[0]) nIndexOffset = ctypes.c_ulong(data[1]) hnl = None else: raise TypeError( "Parameter data has the wrong type %s. Allowed types are: str, Tuple[int, int]." % (type(data)) ) attrib = pNoteAttrib.notificationAttribStruct() pNotification = ctypes.c_ulong() nHUser = ctypes.c_ulong(0) if hnl is not None: nHUser = ctypes.c_ulong(hnl) if user_handle is not None: nHUser = ctypes.c_ulong(user_handle) adsSyncAddDeviceNotificationReqFct.argtypes = [ ctypes.c_ulong, ctypes.POINTER(SAmsAddr), ctypes.c_ulong, ctypes.c_ulong, ctypes.POINTER(SAdsNotificationAttrib), NOTEFUNC, # type: ignore ctypes.c_ulong, ctypes.POINTER(ctypes.c_ulong), ] adsSyncAddDeviceNotificationReqFct.restype = ctypes.c_long # noinspection PyUnusedLocal def wrapper(addr: SAmsAddr, notification: SAdsNotificationHeader, user: int) -> Callable[ [SAdsNotificationHeader, str], None]: return callback(notification, data) # noinspection PyTypeChecker c_callback = NOTEFUNC(wrapper) # type: ignore err_code = adsSyncAddDeviceNotificationReqFct( port, pAmsAddr, nIndexGroup, nIndexOffset, ctypes.byref(attrib), c_callback, nHUser, ctypes.byref(pNotification), ) if err_code: raise ADSError(err_code) callback_store[(adr, pNotification.value)] = c_callback return pNotification.value, hnl
[docs]def adsSyncDelDeviceNotificationReqEx( port: int, adr: AmsAddr, notification_handle: int, user_handle: int ) -> None: """Remove a device notification. :param int port: local AMS port as returned by adsPortOpenEx() :param pyads.structs.AmsAddr adr: local or remote AmsAddr :param int notification_handle: Notification Handle :param int user_handle: User Handle """ adsSyncDelDeviceNotificationReqFct = _adsDLL.AdsSyncDelDeviceNotificationReqEx pAmsAddr = ctypes.pointer(adr.amsAddrStruct()) nHNotification = ctypes.c_ulong(notification_handle) err_code = adsSyncDelDeviceNotificationReqFct(port, pAmsAddr, nHNotification) del callback_store[(adr, notification_handle)] if err_code: raise ADSError(err_code) if user_handle is not None: adsSyncWriteReqEx( port, adr, ADSIGRP_SYM_RELEASEHND, 0, user_handle, PLCTYPE_UDINT )
[docs]def adsSyncSetTimeoutEx(port: int, n_ms: int) -> None: """Set Timeout. :param int port: local AMS port as returned by adsPortOpenEx() :param int n_ms: timeout in ms """ adsSyncSetTimeoutFct = _adsDLL.AdsSyncSetTimeoutEx cms = ctypes.c_long(n_ms) err_code = adsSyncSetTimeoutFct(port, cms) if err_code: raise ADSError(err_code)