From 563eb129e5d5dff83e3addd0330f9b1212c0416f Mon Sep 17 00:00:00 2001 From: Simon Merrett <26767525+SimonMerrett@users.noreply.github.com> Date: Sat, 13 Dec 2025 19:41:31 +0000 Subject: [PATCH 1/3] initial introduction of event handling elements for TCPIP::INSTR resources --- pyvisa_py/highlevel.py | 94 +++++++++++++++++ pyvisa_py/tcpip.py | 235 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 329 insertions(+) diff --git a/pyvisa_py/highlevel.py b/pyvisa_py/highlevel.py index 207687b6..7a37e034 100644 --- a/pyvisa_py/highlevel.py +++ b/pyvisa_py/highlevel.py @@ -29,6 +29,7 @@ from .common import LOGGER from .sessions import OpenError, Session +import threading class PyVisaLibrary(highlevel.VisaLibraryBase): """A pure Python backend for PyVISA. @@ -124,6 +125,9 @@ def _init(self) -> None: """Custom initialization code.""" # Map session handle to session object. self.sessions = {} + # Dictionary to map sessions to their event handlers + # Structure: { session_id: { event_type: [ (handler, user_handle), ... ] } } + self.handlers = {} def _register(self, obj: Session) -> VISASession: """Creates a random but unique session handle for a session object. @@ -758,6 +762,32 @@ def unlock(self, session: VISASession) -> StatusCode: return self.handle_return_value(session, StatusCode.error_invalid_object) return self.handle_return_value(session, sess.unlock()) + def enable_event(self, session, event_type, mechanism, context=None): + """Enables notification for an event.""" + + # 1. Retrieve the actual session object (e.g., USBSession, TCPIPSession) + # In pyvisa-py, sessions are usually stored in self.sessions + obj = self.sessions[session] + + # 2. Start a listener thread for this specific event/session + if event_type == constants.VI_EVENT_SERVICE_REQ: + # This is a hypothetical method you must add to the specific session classes + # (e.g., in pyvisa_py/usb.py or pyvisa_py/tcpip.py) + if hasattr(obj, 'start_srq_listener'): + obj.start_srq_listener(callback=self._dispatch_event) + else: + return constants.VI_ERROR_NSUP_OPER + return constants.VI_SUCCESS + + def _dispatch_event(self, session_id, event_type): + """Internal method called by the listener thread when hardware triggers.""" + if session_id in self.handlers and event_type in self.handlers[session_id]: + for handler, user_handle in self.handlers[session_id][event_type]: + # Call the user's registered function + # Note: VISA handlers require specific arguments: (session, event_type, event_context, user_handle) + # You may need to create a dummy event_context object here + handler(session_id, event_type, None, user_handle) + def disable_event( self, session: VISASession, @@ -811,3 +841,67 @@ def discard_events( """ return StatusCode.error_nonimplemented_operation + + def install_handler(self, session, event_type, handler, user_handle=None): + """Stores the handler for a specific session and event type.""" + try: + # Ensure the session entry exists + if session not in self.handlers: + self.handlers[session] = {} + + # Ensure the event_type entry exists + if event_type not in self.handlers[session]: + self.handlers[session][event_type] = [] + + # Store the handler and the user_handle + self.handlers[session][event_type].append((handler, user_handle)) + + return constants.VI_SUCCESS #TODO: use existing status codes? + except Exception as e: + return constants.VI_ERROR_SYSTEM_ERROR #TODO: use existing status codes? + + def uninstall_handler(self, session, event_type, handler, user_handle=None): + """ + Removes the callback handler for the specified session and event type. + """ + try: + # 1. Check if the session and event type exist in the registry + if session not in self.handlers or event_type not in self.handlers[session]: + # Standard VISA behavior is to return success if the handler + # effectively isn't there, or a warning. + return constants.VI_SUCCESS + + # 2. Filter out the specific handler to be removed + # We keep only the entries that DO NOT match the (handler, user_handle) pair. + # This handles the case where multiple different handlers are registered for the same event. + original_list = self.handlers[session][event_type] + + # Check if the user passed a wildcard (Generic VISA behavior) + # If handler is generic, we might want to clear all, but usually + # users uninstall specific functions. + new_list = [ + (h, uh) for (h, uh) in original_list + if not (h == handler and uh == user_handle) + ] + + # If the list length didn't change, the handler wasn't found. + if len(new_list) == len(original_list): + return constants.VI_WARN_UNKNOWN_STATUS + + # 3. Update the registry + self.handlers[session][event_type] = new_list + + # 4. Cleanup (Optional optimization) + # If no handlers remain for this event type, we can delete the key. + if not new_list: + del self.handlers[session][event_type] + + # IMPROVEMENT: If you want to automatically stop the TCP listener thread + # to save resources when the last handler is removed, you would + # call disable_event here. + # self.disable_event(session, event_type, mechanism) + + return constants.VI_SUCCESS + + except Exception: + return constants.VI_ERROR_SYSTEM_ERROR \ No newline at end of file diff --git a/pyvisa_py/tcpip.py b/pyvisa_py/tcpip.py index fdd708bd..88e1de7e 100644 --- a/pyvisa_py/tcpip.py +++ b/pyvisa_py/tcpip.py @@ -22,6 +22,21 @@ from .protocols import hislip, rpc, vxi11 from .sessions import OpenError, Session, UnknownAttribute, VISARMSession +import struct +import threading + +# Constants for Parsing +DEVICE_INTR_SRQ_PROC = 1 +RPC_MSG_CALL = 0 + +# VXI-11 Constants +DEVICE_CORE_PROG = 0x0607B0 +DEVICE_CORE_VERS = 1 +CREATE_INTR_CHAN_PROC = 12 +DEVICE_INTR_PROG = 0x0607AF # The ID the device uses to call us back TODO: does this need to be dynamically assigned? +DEVICE_INTR_VERS = 1 +IPPROTO_TCP = 6 + # Let psutil be optional dependency try: import psutil # type: ignore @@ -60,6 +75,65 @@ 29: StatusCode.error_window_already_mapped, # channel_already_established } +def pack_uint(n): + """Pack a 32-bit unsigned integer (Big Endian).""" + return struct.pack('>I', n) + +def pack_string(s): + """Pack a variable length string (XDR format: Length + Data + Padding).""" + if isinstance(s, str): + s = s.encode('ascii') + length = len(s) + padding = (4 - (length % 4)) % 4 + return struct.pack('>I', length) + s + (b'\x00' * padding) + +def build_create_intr_chan_packet(host_ip, host_port, xid=None): + """ + Constructs the VXI-11 'create_intr_chan' RPC packet. + + Args: + host_ip (str): The IP address of YOUR computer (where the device should connect). + host_port (int): The port YOUR computer is listening on. + """ + if xid is None: + xid = random.randint(1, 0xFFFFFFFF) + + # --- 1. RPC Header --- + # Field: [XID] [MsgType:Call=0] [RPCVers:2] [Prog:Core] [Vers:1] [Proc:CreateIntr] + header = ( + pack_uint(xid) + # Transaction ID + pack_uint(0) + # Message Type (0 = Call) + pack_uint(2) + # RPC Version (2) + pack_uint(DEVICE_CORE_PROG) + # Program (Device Core) + pack_uint(DEVICE_CORE_VERS) + # Program Version + pack_uint(CREATE_INTR_CHAN_PROC) # Procedure (12 = create_intr_chan) + ) + + # --- 2. Authentication (None) --- + # Field: [Cred Flavor:0] [Cred Len:0] [Verf Flavor:0] [Verf Len:0] + auth = ( + pack_uint(0) + pack_uint(0) + # Credentials (Auth_None) + pack_uint(0) + pack_uint(0) # Verifier (Auth_None) + ) + + # --- 3. Arguments --- + # The arguments defined in VXI-11 spec for create_intr_chan: + # (host_addr, host_port, prog_num, prog_vers, prog_prot) + args = ( + pack_string(host_ip) + # arg1: Host Address (String) + pack_uint(host_port) + # arg2: Host Port (Uint) + pack_uint(DEVICE_INTR_PROG) + # arg3: Program Number (0x0607AF) + pack_uint(DEVICE_INTR_VERS) + # arg4: Program Version (1) + pack_uint(IPPROTO_TCP) # arg5: Protocol (6 = TCP) + ) + + # --- 4. Final Framing (Record Marking) --- + # ONC RPC over TCP requires a 4-byte fragment header. + # The top bit (0x80000000) indicates this is the "Last Fragment". + payload = header + auth + args + fragment_header = 0x80000000 | len(payload) + + return struct.pack('>I', fragment_header) + payload @Session.register(constants.InterfaceType.tcpip, "INSTR") class TCPIPInstrSession(Session): @@ -99,6 +173,167 @@ def get_low_level_info(cls) -> str: f"\n - hislip: {hislip}" ) + def start_tcp_listener(self, callback): + """ + 1. Opens a server socket on a random port. + 2. Tells the device where to connect (RPC 'create_intr_chan'). + 3. Starts a thread to listen for the incoming connection. + """ + + # --- 1. Create a server socket on the host --- + # AF_INET = IPv4, SOCK_STREAM = TCP + self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_sock.bind(('', 0)) # Bind to any available port + self.server_sock.listen(1) + + # Get the port we were assigned + host_ip, host_port = self.server_sock.getsockname() + + # --- 2. Send 'create_intr_chan' RPC to the device --- + # Note: This is the tricky part. You need to construct the VXI-11 RPC packet. + # If pyvisa-py has an internal RPC client, use it. + # Otherwise, you are sending raw bytes. + # This is a conceptual example of the RPC call: + error_code = self._send_create_intr_chan_rpc(host_ip, host_port) + + if error_code != 0: + raise Exception("Device refused to create interrupt channel") + + # --- 3. Start the Listener Thread --- + self.listening = True + t = threading.Thread(target=self._tcp_listen_loop, args=(callback,)) + t.daemon = True + t.start() + + def _send_create_intr_chan_rpc(self, host_ip, host_port): + """Sends the raw RPC packet to the device.""" + + # 1. Build the packet + packet = build_create_intr_chan_packet(host_ip, host_port) + + # 2. Send it over the EXISTING connection to the device + # self.interface is usually the socket object in pyvisa-py TCPIPSession + # Note: You might need to check how pyvisa-py stores the socket. + # It is often in self.visalib.sessions[self.session].interface + + # Assuming 'self.interface' is the active socket to the instrument: + self.interface.sendall(packet) + + # 3. Read the RPC Reply (Important!) + # We need to ensure the device accepted it. + # Read the 4-byte fragment header first + header_data = self.interface.recv(4) + frag_len = struct.unpack('>I', header_data)[0] & 0x7FFFFFFF + + # Read the rest of the reply + reply_data = self.interface.recv(frag_len) + + # 4. Parse Reply (Simplified) + # Skip XID(4), MsgType(4), ReplyState(4), Verifier(8), AcceptStatus(4) + # We just want to check if AcceptStatus is 0 (SUCCESS) + # This is at offset 24 (4+4+4+8) + accept_status = struct.unpack('>I', reply_data[24:28])[0] + + if accept_status != 0: + return -1 # RPC Error + + # The actual return value of create_intr_chan is an "Error" object (integer) + # It is usually the last 4 bytes of the packet + error_code = struct.unpack('>I', reply_data[-4:])[0] + + return error_code + + def _tcp_listen_loop(self, callback): + """Waits for the instrument to connect back to us.""" + while self.listening: + try: + conn, addr = self.server_sock.accept() + with conn: + # Validate, Reply, and Get Status Byte + stb = self._validate_and_handle_srq(conn) + + if stb is not None: + # Trigger the HighLevel callback + # We pass the Session ID, Event Type, and the STB + # Note: PyVISA handlers usually expect (session, event_type, context, user_handle) + # We can pass the STB inside a context object or wrap it if needed. + callback(self.session.session_id, constants.VI_EVENT_SERVICE_REQ, status_byte=stb) + + except socket.error: + if not self.listening: + break + + def _validate_and_handle_srq(self, conn): + """ + Reads the incoming RPC packet, validates it is an SRQ, + extracts the Status Byte, and sends a Success Reply. + + Returns: + status_byte (int) if valid SRQ, None otherwise. + """ + # 1. Read the Fragment Header (4 bytes) + header_data = conn.recv(4) + if not header_data: + return None + + # The last 31 bits are the length + frag_len = struct.unpack('>I', header_data)[0] & 0x7FFFFFFF + + # 2. Read the RPC Packet + data = conn.recv(frag_len) + + if len(data) < 24: + return None # Too short to be a valid RPC Call + + # 3. Unpack Key Fields + # Struct: [XID:4] [MsgType:4] [RPCVer:4] [Prog:4] [ProgVer:4] [Proc:4] + xid, msg_type, rpc_vers, prog, prog_vers, proc = struct.unpack('>IIIIII', data[:24]) + + # 4. Validate Context + if (msg_type == RPC_MSG_CALL and + prog == DEVICE_INTR_PROG and + proc == DEVICE_INTR_SRQ_PROC): + + # This IS a valid SRQ! + + # 5. Extract the Status Byte (STB) + # The arguments start after the Creds and Verifier. + # We need to skip Creds(8 bytes) + Verifier(8 bytes) = 16 bytes. + # Standard RPC header is 24 bytes. Total offset = 40 bytes. + # The argument is the Status Byte (passed as an int). + try: + stb = struct.unpack('>I', data[40:44])[0] + except: + stb = 0 # Default if parsing fails + + # 6. SEND THE REPLY (Mandatory!) + # We must tell the instrument we received the message. + self._send_srq_reply(conn, xid) + + return stb + + return None + + def _send_srq_reply(self, conn, xid): + """Sends a successful RPC reply back to the instrument.""" + + # Structure: [XID] [MsgType:Reply=1] [ReplyState:Accepted=0] + # [Verf:None] [AcceptStatus:Success=0] + + reply_payload = ( + struct.pack('>I', xid) + # Match the XID from the Request + struct.pack('>I', 1) + # MsgType: Reply + struct.pack('>I', 0) + # ReplyState: Accepted + struct.pack('>I', 0) + # Verifier Flavor (None) + struct.pack('>I', 0) + # Verifier Length (0) + struct.pack('>I', 0) # AcceptStatus: Success + ) + + # Add Fragment Header (Last Fragment bit set) + frag_header = 0x80000000 | len(reply_payload) + full_packet = struct.pack('>I', frag_header) + reply_payload + + conn.sendall(full_packet) class TCPIPInstrHiSLIP(Session): """A TCPIP Session built on socket standard library using HiSLIP protocol.""" From 15e1b975eb5b435caa41dea7ce94102640b2c406 Mon Sep 17 00:00:00 2001 From: Simon Merrett <26767525+SimonMerrett@users.noreply.github.com> Date: Sat, 13 Dec 2025 19:42:07 +0000 Subject: [PATCH 2/3] Add very simple test for event handling --- pyvisa_py/testsuite/test_event_handling.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 pyvisa_py/testsuite/test_event_handling.py diff --git a/pyvisa_py/testsuite/test_event_handling.py b/pyvisa_py/testsuite/test_event_handling.py new file mode 100644 index 00000000..ad8a8a4a --- /dev/null +++ b/pyvisa_py/testsuite/test_event_handling.py @@ -0,0 +1,22 @@ +import pyvisa + +def my_handler(session, event_type, context, user_handle): + print("Interrupt Received! SRQ Triggered.") + # You might need to query the device here to clear the bit + # inst.query("*ESR?") + +rm = pyvisa.ResourceManager('@py') # Forces pyvisa-py backend +inst = rm.open_resource('TCPIP::192.168.1.157::INSTR') + +# 1. Register the python function +inst.install_handler(pyvisa.constants.VI_EVENT_SERVICE_REQ, my_handler) + +# 2. Tell the hardware to start monitoring (Starts your thread) +inst.enable_event(pyvisa.constants.VI_EVENT_SERVICE_REQ, pyvisa.constants.VI_HNDLR) + +# 3. Enable SRQ on the device side (Standard SCPI) +inst.write("*SRE 16") # Enable Message Available bit (example) +inst.write("*CLS") + +print("Waiting for interrupt...") +input("Press Enter to exit") # Keep script alive to listen \ No newline at end of file From 99d6458f577997414bdb28f9126e36f39fbe9db1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 13 Dec 2025 19:55:47 +0000 Subject: [PATCH 3/3] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- pyvisa_py/highlevel.py | 38 ++--- pyvisa_py/tcpip.py | 164 +++++++++++---------- pyvisa_py/testsuite/test_event_handling.py | 12 +- 3 files changed, 116 insertions(+), 98 deletions(-) diff --git a/pyvisa_py/highlevel.py b/pyvisa_py/highlevel.py index 7a37e034..e8cd8182 100644 --- a/pyvisa_py/highlevel.py +++ b/pyvisa_py/highlevel.py @@ -31,6 +31,7 @@ import threading + class PyVisaLibrary(highlevel.VisaLibraryBase): """A pure Python backend for PyVISA. @@ -764,16 +765,16 @@ def unlock(self, session: VISASession) -> StatusCode: def enable_event(self, session, event_type, mechanism, context=None): """Enables notification for an event.""" - + # 1. Retrieve the actual session object (e.g., USBSession, TCPIPSession) # In pyvisa-py, sessions are usually stored in self.sessions obj = self.sessions[session] - + # 2. Start a listener thread for this specific event/session if event_type == constants.VI_EVENT_SERVICE_REQ: # This is a hypothetical method you must add to the specific session classes # (e.g., in pyvisa_py/usb.py or pyvisa_py/tcpip.py) - if hasattr(obj, 'start_srq_listener'): + if hasattr(obj, "start_srq_listener"): obj.start_srq_listener(callback=self._dispatch_event) else: return constants.VI_ERROR_NSUP_OPER @@ -841,25 +842,25 @@ def discard_events( """ return StatusCode.error_nonimplemented_operation - + def install_handler(self, session, event_type, handler, user_handle=None): """Stores the handler for a specific session and event type.""" try: # Ensure the session entry exists if session not in self.handlers: self.handlers[session] = {} - + # Ensure the event_type entry exists if event_type not in self.handlers[session]: self.handlers[session][event_type] = [] - + # Store the handler and the user_handle self.handlers[session][event_type].append((handler, user_handle)) - - return constants.VI_SUCCESS #TODO: use existing status codes? + + return constants.VI_SUCCESS # TODO: use existing status codes? except Exception as e: - return constants.VI_ERROR_SYSTEM_ERROR #TODO: use existing status codes? - + return constants.VI_ERROR_SYSTEM_ERROR # TODO: use existing status codes? + def uninstall_handler(self, session, event_type, handler, user_handle=None): """ Removes the callback handler for the specified session and event type. @@ -867,7 +868,7 @@ def uninstall_handler(self, session, event_type, handler, user_handle=None): try: # 1. Check if the session and event type exist in the registry if session not in self.handlers or event_type not in self.handlers[session]: - # Standard VISA behavior is to return success if the handler + # Standard VISA behavior is to return success if the handler # effectively isn't there, or a warning. return constants.VI_SUCCESS @@ -875,15 +876,16 @@ def uninstall_handler(self, session, event_type, handler, user_handle=None): # We keep only the entries that DO NOT match the (handler, user_handle) pair. # This handles the case where multiple different handlers are registered for the same event. original_list = self.handlers[session][event_type] - + # Check if the user passed a wildcard (Generic VISA behavior) # If handler is generic, we might want to clear all, but usually # users uninstall specific functions. new_list = [ - (h, uh) for (h, uh) in original_list + (h, uh) + for (h, uh) in original_list if not (h == handler and uh == user_handle) ] - + # If the list length didn't change, the handler wasn't found. if len(new_list) == len(original_list): return constants.VI_WARN_UNKNOWN_STATUS @@ -895,13 +897,13 @@ def uninstall_handler(self, session, event_type, handler, user_handle=None): # If no handlers remain for this event type, we can delete the key. if not new_list: del self.handlers[session][event_type] - - # IMPROVEMENT: If you want to automatically stop the TCP listener thread - # to save resources when the last handler is removed, you would + + # IMPROVEMENT: If you want to automatically stop the TCP listener thread + # to save resources when the last handler is removed, you would # call disable_event here. # self.disable_event(session, event_type, mechanism) return constants.VI_SUCCESS except Exception: - return constants.VI_ERROR_SYSTEM_ERROR \ No newline at end of file + return constants.VI_ERROR_SYSTEM_ERROR diff --git a/pyvisa_py/tcpip.py b/pyvisa_py/tcpip.py index 88e1de7e..880259d6 100644 --- a/pyvisa_py/tcpip.py +++ b/pyvisa_py/tcpip.py @@ -33,7 +33,7 @@ DEVICE_CORE_PROG = 0x0607B0 DEVICE_CORE_VERS = 1 CREATE_INTR_CHAN_PROC = 12 -DEVICE_INTR_PROG = 0x0607AF # The ID the device uses to call us back TODO: does this need to be dynamically assigned? +DEVICE_INTR_PROG = 0x0607AF # The ID the device uses to call us back TODO: does this need to be dynamically assigned? DEVICE_INTR_VERS = 1 IPPROTO_TCP = 6 @@ -75,22 +75,25 @@ 29: StatusCode.error_window_already_mapped, # channel_already_established } + def pack_uint(n): """Pack a 32-bit unsigned integer (Big Endian).""" - return struct.pack('>I', n) + return struct.pack(">I", n) + def pack_string(s): """Pack a variable length string (XDR format: Length + Data + Padding).""" if isinstance(s, str): - s = s.encode('ascii') + s = s.encode("ascii") length = len(s) padding = (4 - (length % 4)) % 4 - return struct.pack('>I', length) + s + (b'\x00' * padding) + return struct.pack(">I", length) + s + (b"\x00" * padding) + def build_create_intr_chan_packet(host_ip, host_port, xid=None): """ Constructs the VXI-11 'create_intr_chan' RPC packet. - + Args: host_ip (str): The IP address of YOUR computer (where the device should connect). host_port (int): The port YOUR computer is listening on. @@ -101,30 +104,32 @@ def build_create_intr_chan_packet(host_ip, host_port, xid=None): # --- 1. RPC Header --- # Field: [XID] [MsgType:Call=0] [RPCVers:2] [Prog:Core] [Vers:1] [Proc:CreateIntr] header = ( - pack_uint(xid) + # Transaction ID - pack_uint(0) + # Message Type (0 = Call) - pack_uint(2) + # RPC Version (2) - pack_uint(DEVICE_CORE_PROG) + # Program (Device Core) - pack_uint(DEVICE_CORE_VERS) + # Program Version - pack_uint(CREATE_INTR_CHAN_PROC) # Procedure (12 = create_intr_chan) + pack_uint(xid) # Transaction ID + + pack_uint(0) # Message Type (0 = Call) + + pack_uint(2) # RPC Version (2) + + pack_uint(DEVICE_CORE_PROG) # Program (Device Core) + + pack_uint(DEVICE_CORE_VERS) # Program Version + + pack_uint(CREATE_INTR_CHAN_PROC) # Procedure (12 = create_intr_chan) ) # --- 2. Authentication (None) --- # Field: [Cred Flavor:0] [Cred Len:0] [Verf Flavor:0] [Verf Len:0] auth = ( - pack_uint(0) + pack_uint(0) + # Credentials (Auth_None) - pack_uint(0) + pack_uint(0) # Verifier (Auth_None) + pack_uint(0) + + pack_uint(0) # Credentials (Auth_None) + + pack_uint(0) + + pack_uint(0) # Verifier (Auth_None) ) # --- 3. Arguments --- # The arguments defined in VXI-11 spec for create_intr_chan: # (host_addr, host_port, prog_num, prog_vers, prog_prot) args = ( - pack_string(host_ip) + # arg1: Host Address (String) - pack_uint(host_port) + # arg2: Host Port (Uint) - pack_uint(DEVICE_INTR_PROG) + # arg3: Program Number (0x0607AF) - pack_uint(DEVICE_INTR_VERS) + # arg4: Program Version (1) - pack_uint(IPPROTO_TCP) # arg5: Protocol (6 = TCP) + pack_string(host_ip) # arg1: Host Address (String) + + pack_uint(host_port) # arg2: Host Port (Uint) + + pack_uint(DEVICE_INTR_PROG) # arg3: Program Number (0x0607AF) + + pack_uint(DEVICE_INTR_VERS) # arg4: Program Version (1) + + pack_uint(IPPROTO_TCP) # arg5: Protocol (6 = TCP) ) # --- 4. Final Framing (Record Marking) --- @@ -132,8 +137,9 @@ def build_create_intr_chan_packet(host_ip, host_port, xid=None): # The top bit (0x80000000) indicates this is the "Last Fragment". payload = header + auth + args fragment_header = 0x80000000 | len(payload) - - return struct.pack('>I', fragment_header) + payload + + return struct.pack(">I", fragment_header) + payload + @Session.register(constants.InterfaceType.tcpip, "INSTR") class TCPIPInstrSession(Session): @@ -179,23 +185,23 @@ def start_tcp_listener(self, callback): 2. Tells the device where to connect (RPC 'create_intr_chan'). 3. Starts a thread to listen for the incoming connection. """ - + # --- 1. Create a server socket on the host --- # AF_INET = IPv4, SOCK_STREAM = TCP self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server_sock.bind(('', 0)) # Bind to any available port + self.server_sock.bind(("", 0)) # Bind to any available port self.server_sock.listen(1) - + # Get the port we were assigned host_ip, host_port = self.server_sock.getsockname() - + # --- 2. Send 'create_intr_chan' RPC to the device --- # Note: This is the tricky part. You need to construct the VXI-11 RPC packet. - # If pyvisa-py has an internal RPC client, use it. - # Otherwise, you are sending raw bytes. + # If pyvisa-py has an internal RPC client, use it. + # Otherwise, you are sending raw bytes. # This is a conceptual example of the RPC call: error_code = self._send_create_intr_chan_rpc(host_ip, host_port) - + if error_code != 0: raise Exception("Device refused to create interrupt channel") @@ -207,40 +213,40 @@ def start_tcp_listener(self, callback): def _send_create_intr_chan_rpc(self, host_ip, host_port): """Sends the raw RPC packet to the device.""" - + # 1. Build the packet packet = build_create_intr_chan_packet(host_ip, host_port) - + # 2. Send it over the EXISTING connection to the device # self.interface is usually the socket object in pyvisa-py TCPIPSession - # Note: You might need to check how pyvisa-py stores the socket. + # Note: You might need to check how pyvisa-py stores the socket. # It is often in self.visalib.sessions[self.session].interface - + # Assuming 'self.interface' is the active socket to the instrument: self.interface.sendall(packet) - + # 3. Read the RPC Reply (Important!) # We need to ensure the device accepted it. # Read the 4-byte fragment header first header_data = self.interface.recv(4) - frag_len = struct.unpack('>I', header_data)[0] & 0x7FFFFFFF - + frag_len = struct.unpack(">I", header_data)[0] & 0x7FFFFFFF + # Read the rest of the reply reply_data = self.interface.recv(frag_len) - + # 4. Parse Reply (Simplified) # Skip XID(4), MsgType(4), ReplyState(4), Verifier(8), AcceptStatus(4) # We just want to check if AcceptStatus is 0 (SUCCESS) # This is at offset 24 (4+4+4+8) - accept_status = struct.unpack('>I', reply_data[24:28])[0] - + accept_status = struct.unpack(">I", reply_data[24:28])[0] + if accept_status != 0: - return -1 # RPC Error - + return -1 # RPC Error + # The actual return value of create_intr_chan is an "Error" object (integer) # It is usually the last 4 bytes of the packet - error_code = struct.unpack('>I', reply_data[-4:])[0] - + error_code = struct.unpack(">I", reply_data[-4:])[0] + return error_code def _tcp_listen_loop(self, callback): @@ -251,23 +257,27 @@ def _tcp_listen_loop(self, callback): with conn: # Validate, Reply, and Get Status Byte stb = self._validate_and_handle_srq(conn) - + if stb is not None: # Trigger the HighLevel callback # We pass the Session ID, Event Type, and the STB # Note: PyVISA handlers usually expect (session, event_type, context, user_handle) # We can pass the STB inside a context object or wrap it if needed. - callback(self.session.session_id, constants.VI_EVENT_SERVICE_REQ, status_byte=stb) - + callback( + self.session.session_id, + constants.VI_EVENT_SERVICE_REQ, + status_byte=stb, + ) + except socket.error: if not self.listening: break def _validate_and_handle_srq(self, conn): """ - Reads the incoming RPC packet, validates it is an SRQ, + Reads the incoming RPC packet, validates it is an SRQ, extracts the Status Byte, and sends a Success Reply. - + Returns: status_byte (int) if valid SRQ, None otherwise. """ @@ -275,66 +285,70 @@ def _validate_and_handle_srq(self, conn): header_data = conn.recv(4) if not header_data: return None - + # The last 31 bits are the length - frag_len = struct.unpack('>I', header_data)[0] & 0x7FFFFFFF - + frag_len = struct.unpack(">I", header_data)[0] & 0x7FFFFFFF + # 2. Read the RPC Packet data = conn.recv(frag_len) - - if len(data) < 24: - return None # Too short to be a valid RPC Call + + if len(data) < 24: + return None # Too short to be a valid RPC Call # 3. Unpack Key Fields # Struct: [XID:4] [MsgType:4] [RPCVer:4] [Prog:4] [ProgVer:4] [Proc:4] - xid, msg_type, rpc_vers, prog, prog_vers, proc = struct.unpack('>IIIIII', data[:24]) + xid, msg_type, rpc_vers, prog, prog_vers, proc = struct.unpack( + ">IIIIII", data[:24] + ) # 4. Validate Context - if (msg_type == RPC_MSG_CALL and - prog == DEVICE_INTR_PROG and - proc == DEVICE_INTR_SRQ_PROC): - + if ( + msg_type == RPC_MSG_CALL + and prog == DEVICE_INTR_PROG + and proc == DEVICE_INTR_SRQ_PROC + ): # This IS a valid SRQ! - + # 5. Extract the Status Byte (STB) # The arguments start after the Creds and Verifier. # We need to skip Creds(8 bytes) + Verifier(8 bytes) = 16 bytes. # Standard RPC header is 24 bytes. Total offset = 40 bytes. # The argument is the Status Byte (passed as an int). try: - stb = struct.unpack('>I', data[40:44])[0] + stb = struct.unpack(">I", data[40:44])[0] except: - stb = 0 # Default if parsing fails - + stb = 0 # Default if parsing fails + # 6. SEND THE REPLY (Mandatory!) # We must tell the instrument we received the message. self._send_srq_reply(conn, xid) - + return stb - + return None def _send_srq_reply(self, conn, xid): """Sends a successful RPC reply back to the instrument.""" - - # Structure: [XID] [MsgType:Reply=1] [ReplyState:Accepted=0] + + # Structure: [XID] [MsgType:Reply=1] [ReplyState:Accepted=0] # [Verf:None] [AcceptStatus:Success=0] - + reply_payload = ( - struct.pack('>I', xid) + # Match the XID from the Request - struct.pack('>I', 1) + # MsgType: Reply - struct.pack('>I', 0) + # ReplyState: Accepted - struct.pack('>I', 0) + # Verifier Flavor (None) - struct.pack('>I', 0) + # Verifier Length (0) - struct.pack('>I', 0) # AcceptStatus: Success + struct.pack(">I", xid) # Match the XID from the Request + + struct.pack(">I", 1) # MsgType: Reply + + struct.pack(">I", 0) # ReplyState: Accepted + + struct.pack(">I", 0) # Verifier Flavor (None) + + struct.pack(">I", 0) # Verifier Length (0) + + struct.pack(">I", 0) # AcceptStatus: Success ) - + # Add Fragment Header (Last Fragment bit set) frag_header = 0x80000000 | len(reply_payload) - full_packet = struct.pack('>I', frag_header) + reply_payload - + full_packet = struct.pack(">I", frag_header) + reply_payload + conn.sendall(full_packet) + class TCPIPInstrHiSLIP(Session): """A TCPIP Session built on socket standard library using HiSLIP protocol.""" diff --git a/pyvisa_py/testsuite/test_event_handling.py b/pyvisa_py/testsuite/test_event_handling.py index ad8a8a4a..be51fa0b 100644 --- a/pyvisa_py/testsuite/test_event_handling.py +++ b/pyvisa_py/testsuite/test_event_handling.py @@ -1,12 +1,14 @@ import pyvisa + def my_handler(session, event_type, context, user_handle): print("Interrupt Received! SRQ Triggered.") # You might need to query the device here to clear the bit - # inst.query("*ESR?") + # inst.query("*ESR?") + -rm = pyvisa.ResourceManager('@py') # Forces pyvisa-py backend -inst = rm.open_resource('TCPIP::192.168.1.157::INSTR') +rm = pyvisa.ResourceManager("@py") # Forces pyvisa-py backend +inst = rm.open_resource("TCPIP::192.168.1.157::INSTR") # 1. Register the python function inst.install_handler(pyvisa.constants.VI_EVENT_SERVICE_REQ, my_handler) @@ -15,8 +17,8 @@ def my_handler(session, event_type, context, user_handle): inst.enable_event(pyvisa.constants.VI_EVENT_SERVICE_REQ, pyvisa.constants.VI_HNDLR) # 3. Enable SRQ on the device side (Standard SCPI) -inst.write("*SRE 16") # Enable Message Available bit (example) +inst.write("*SRE 16") # Enable Message Available bit (example) inst.write("*CLS") print("Waiting for interrupt...") -input("Press Enter to exit") # Keep script alive to listen \ No newline at end of file +input("Press Enter to exit") # Keep script alive to listen