boxmoe_header_banner_img

Hello! 欢迎来到悠悠畅享网!

文章导读

高效管理Pyads通知回调与大规模数据处理


avatar
站长 2025年8月11日 7

高效管理Pyads通知回调与大规模数据处理

本文旨在探讨在Pyads应用中,如何高效处理来自PLC的ADS通知回调数据,特别是在处理大量、高频数据时,避免使用全局变量,并采用Pythonic的类封装方法管理状态。同时,文章还将深入讲解如何优化字节数据到Python类型的转换,以提升大规模数据处理的性能。

Pyads通知机制与数据处理挑战

pyads库提供了一种通过ads通知(notification)机制从倍福plc高效获取数据的方式。与轮询不同,通知允许plc在数据发生变化时主动向python应用程序发送更新,这对于需要快速响应(如200ms周期)和处理大量数据(如每周期1000个值,累积形成100000个值的数据序列)的场景至关重要。

然而,在处理通知回调时,开发者常面临一个挑战:如何将回调函数内部接收到的数据安全、高效地传递给主程序或其他处理逻辑,同时避免使用不推荐的全局变量。特别是当需要将多次回调的数据累积成一个大型数组时,这个问题变得尤为突出。Pyads的回调函数是独立的,直接在函数内部处理所有逻辑或依赖全局变量会使代码难以维护和扩展。

解决方案一:基于类的封装管理回调与状态

解决上述问题的核心在于利用Python的面向对象特性,将Pyads连接、通知设置以及数据处理逻辑封装到一个类中。这种方法不仅能够避免全局变量的使用,还能更好地管理应用程序的状态和数据流。

1. 类结构设计

通过将Pyads连接作为类的一个成员变量,并在类的初始化方法中设置通知,可以将回调函数定义为类的成员方法。这样,回调函数就可以直接访问和修改类的其他成员变量,从而实现数据的累积和状态的更新。

import pyads import ctypes import numpy as np # 提前引入,用于后续数据优化  class PlcDataManager:     def __init__(self, ams_net_id, plc_ip, ams_port):         """         初始化PLC连接和数据管理器。         """         self.plc = pyads.Connection(ams_net_id, plc_ip, ams_port)         self.plc.open()         self.data_buffer = []  # 用于累积接收到的数据         self.notification_handles = {} # 存储通知句柄,便于后续释放         print(f"PLC connection established: {plc_ip}")          # 示例:初始化心跳计数器         self.heartbeat_count = 0         self.plc_status_message = "PLC is running"          self.setup_notifications()      def setup_notifications(self):         """         设置ADS通知。         """         # 示例1:PLC心跳通知         heartbeat_var_name = 'global.heartbeat' # 假设PLC中有一个心跳变量         heartbeat_attr = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_BOOL))         handle_heartbeat = self.plc.add_device_notification(             heartbeat_var_name,             heartbeat_attr,             self.on_plc_heartbeat # 回调方法         )         self.notification_handles[heartbeat_var_name] = handle_heartbeat         print(f"Notification set for {heartbeat_var_name}, handle: {handle_heartbeat}")          # 示例2:PLC状态变化通知 (ADSSTATE_RUN, ADSSTATE_CONFIG等)         # ADSIGRP_DEVICE_DATA (0xF100), ADSIOFFS_DEVDATA_ADSSTATE (0x0000) 是特殊的地址用于获取PLC状态         plc_status_addr = (int("0xF100", 16), int("0x0000", 16))         status_attr = pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT))         handle_status = self.plc.add_device_notification(             plc_status_addr,             status_attr,             self.on_plc_status_change # 回调方法         )         self.notification_handles["PLC_Status"] = handle_status         print(f"Notification set for PLC Status, handle: {handle_status}")          # 示例3:自定义结构体数据通知 (原始问题中的场景)         # 假设PLC中有一个名为 'global.sample_structure' 的变量         structure_def = (             ('nVar', pyads.PLCTYPE_DINT, 1000),             ('nVar2', pyads.PLCTYPE_DINT, 1000),             ('nVar3', pyads.PLCTYPE_DINT, 1000),             ('nVar4', pyads.PLCTYPE_DINT, 1000),             ('nVar5', pyads.PLCTYPE_DINT, 1000)         )         size_of_struct = pyads.size_of_structure(structure_def)         data_attr = pyads.NotificationAttrib(size_of_struct)         handle_data = self.plc.add_device_notification(             'global.sample_structure',             data_attr,             self.on_data_received # 回调方法         )         self.notification_handles['global.sample_structure'] = handle_data         print(f"Notification set for global.sample_structure, handle: {handle_data}")         self.structure_definition = structure_def # 保存结构体定义供回调使用      def on_plc_heartbeat(self, handle, name, timestamp, value):         """         PLC心跳通知回调。         """         # value 默认是 ctypes.c_ubyte * size 的字节数组,对于简单类型可以直接判断         # 或者使用 pyads.dict_from_bytes(value, [(name, pyads.PLCTYPE_BOOL)])         # 这里假设value直接是布尔值对应的字节         is_active = bool(value[0]) # 假设PLCBOOL是单字节         if is_active:             self.heartbeat_count += 1             # print(f"Heartbeat received: {self.heartbeat_count}")      def on_plc_status_change(self, notification, _):         """         PLC状态变化通知回调。         """         # 使用parse_notification解析通知数据         *_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT)         if value != pyads.ADSSTATE_RUN:             self.plc_status_message = f"PLC exited run mode (State: {value})"             print(self.plc_status_message)         else:             self.plc_status_message = "PLC is running"             # print(self.plc_status_message)      def on_data_received(self, handle, name, timestamp, value):         """         接收结构体数据通知回调。         """         # 将字节数据转换为字典         values_dict = pyads.dict_from_bytes(value, self.structure_definition)         # print(f"Received data: {values_dict['nVar'][0]}...") # 打印第一个元素示例          # 将数据添加到缓冲区         # 假设我们只关心 'nVar' 信号的数据,并将其展平         self.data_buffer.extend(values_dict['nVar'])          # 检查是否达到100000个值,并进行处理         if len(self.data_buffer) >= 100000:             print(f"Collected {len(self.data_buffer)} values. Processing...")             # 在这里可以对 data_buffer 进行进一步处理,例如保存到文件、进行分析等             processed_array = np.array(self.data_buffer[:100000]) # 取前100000个             print(f"First 10 values of processed array: {processed_array[:10]}")             self.data_buffer = self.data_buffer[100000:] # 清除已处理的数据,保留剩余部分      def close(self):         """         关闭PLC连接并移除所有通知。         """         for var_name, handle in self.notification_handles.items():             try:                 self.plc.del_device_notification(handle)                 print(f"Removed notification for {var_name}, handle: {handle}")             except Exception as e:                 print(f"Error removing notification for {var_name}: {e}")         self.plc.close()         print("PLC connection closed.")  # 示例使用 if __name__ == "__main__":     # 请替换为你的PLC实际连接参数     AMS_NET_ID = '192.168.1.100.1.1' # PLC的AMS Net ID     PLC_IP = '192.168.1.100'        # PLC的IP地址     AMS_PORT = 851                 # 通常为851      data_manager = PlcDataManager(AMS_NET_ID, PLC_IP, AMS_PORT)      try:         # 在主循环中等待数据或执行其他任务         # Pyads通知通常在后台线程中运行,因此主线程可以保持活跃         while True:             # 可以在这里执行其他非阻塞任务             # 例如:检查数据缓冲区大小,或者每隔一段时间打印心跳计数             # print(f"Current heartbeat: {data_manager.heartbeat_count}, PLC Status: {data_manager.plc_status_message}")             # print(f"Data buffer size: {len(data_manager.data_buffer)}")             # time.sleep(1) # 模拟主程序忙碌             pass # 实际应用中会在这里有更复杂的逻辑      except KeyboardInterrupt:         print("nExiting program.")     finally:         data_manager.close()

2. 关键点:

  • 封装性 PlcDataManager 类封装了PLC连接、通知设置、数据缓冲区和所有相关的回调方法。
  • 状态管理: self.data_buffer、self.heartbeat_count 等成员变量用于存储和更新应用程序的状态,回调方法直接操作这些变量。
  • 资源管理: 在 __init__ 中打开连接并设置通知,在 close 方法中关闭连接并移除所有通知,确保资源正确释放。
  • 回调签名: Pyads通知回调的签名通常为 (handle, name, timestamp, value)。对于特殊的系统通知(如PLC状态),parse_notification 方法可以更方便地提取值。

解决方案二:优化大规模数据转换性能

当处理的PLC数据量巨大且类型一致时,pyads.dict_from_bytes 这样的逐变量转换方法可能会成为性能瓶颈。Pyads允许用户获取原始的 ctypes 字节数据,然后自行进行更高效的转换,例如利用 numpy 库。

1. return_ctypes=True 的应用

在 add_device_notification 或 read_by_name 方法中,可以通过设置 return_ctypes=True 来获取原始的 ctypes 对象,而不是直接解析为Python字典或基本类型。

# 假设 structure_def 和 size_of_struct 已定义 # 修改 add_device_notification 设置,让回调接收原始ctypes数据 handle_data_optimized = self.plc.add_device_notification(     'global.sample_structure',     data_attr, # 仍然使用相同的NotificationAttrib     self.on_data_received_optimized,     return_ctypes=True # 关键在于这里 ) self.notification_handles['global.sample_structure_optimized'] = handle_data_optimized  # 新的回调方法,处理原始ctypes数据 def on_data_received_optimized(self, handle, name, timestamp, value_ctypes):     """     接收原始ctypes数据通知回调,并使用numpy优化转换。     value_ctypes 是一个 ctypes 数组对象 (e.g., ctypes.c_ubyte * size_of_struct)     """     # 假设我们知道数据结构是连续的PLCTYPE_DINT数组     # 并且我们只关心第一个变量 'nVar',它是一个1000个DINT的数组     # 获取原始字节数据     buffer = bytearray(value_ctypes)      # 计算 'nVar' 在结构体中的偏移和大小     # 这需要根据 structure_def 手动计算或预先确定     # 假设 nVar 是结构体的第一个元素,且每个DINT是4字节     num_elements = 1000     element_size = ctypes.sizeof(pyads.PLCTYPE_DINT) # 4 bytes for DINT     nvar_byte_length = num_elements * element_size      # 使用numpy从字节缓冲区直接创建数组     # dtype='<i4' 表示 little-endian 32位有符号整数 (DINT)     # frombuffer_copy 可以避免内存视图问题,确保数据被复制     # 这里只读取 nVar 部分的数据     try:         nvar_array = np.frombuffer(buffer[:nvar_byte_length], dtype='<i4')         self.data_buffer.extend(nvar_array.tolist()) # 转换为列表以便extend         # print(f"Optimized received data (first 5): {nvar_array[:5]}")          # 检查是否达到100000个值,并进行处理         if len(self.data_buffer) >= 100000:             print(f"Collected {len(self.data_buffer)} values (optimized). Processing...")             processed_array = np.array(self.data_buffer[:100000])             print(f"First 10 values of processed array (optimized): {processed_array[:10]}")             self.data_buffer = self.data_buffer[100000:]     except ValueError as e:         print(f"Error converting data with numpy: {e}")         # Fallback to slower method or log error         values_dict = pyads.dict_from_bytes(bytearray(value_ctypes), self.structure_definition)         self.data_buffer.extend(values_dict['nVar'])

2. numpy 进行数据转换

numpy.frombuffer() 函数能够从一个缓冲区(如 bytearray 或 memoryview)直接解析数据,并以指定的数据类型(dtype)和字节序(endianness)创建 numpy 数组。对于连续存储的同类型数据,这种方法比逐个元素解析快几个数量级。

注意事项:

  • 数据结构理解: 使用 numpy.frombuffer() 需要对PLC中变量的内存布局有清晰的理解,包括每个变量的类型、大小以及在结构体中的偏移量。
  • 字节序(Endianness): PLC通常使用小端字节序(Little-endian),因此在 dtype 中使用
  • 数据类型映射: 确保 numpy 的 dtype 与PLC中的数据类型正确匹配。

总结与最佳实践

通过将Pyads通知回调和数据处理逻辑封装到类中,可以有效地管理应用程序的状态,避免全局变量的混乱,并提高代码的可读性和可维护性。对于需要处理大量、高频PLC数据的场景,进一步利用 pyads 的 return_ctypes=True 选项结合 numpy 进行数据转换,能够显著提升数据处理性能。

其他最佳实践:

  • 错误处理: 在回调函数和主程序中都应包含健壮的错误处理机制,以应对PLC通信中断、数据解析失败等情况。
  • 线程安全: Pyads通知回调通常在单独的线程中执行。如果回调函数需要修改主线程访问的数据,或与其他线程共享资源,务必考虑线程安全(例如使用 threading.Lock)。
  • 通知句柄管理: 妥善存储和管理 add_device_notification 返回的句柄,确保在程序退出时能够正确地调用 del_device_notification 移除通知,释放PLC资源。
  • 日志记录: 详细的日志记录有助于调试和监控应用程序的运行状态,特别是在生产环境中。

遵循这些原则,可以构建出高效、稳定且易于维护的Pyads应用程序,有效应对复杂的工业自动化数据采集需求。



评论(已关闭)

评论已关闭