Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions pyvisa_py/highlevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from .common import LOGGER
from .sessions import OpenError, Session

import threading

Comment on lines +32 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not believe this will be needed at the library level.


class PyVisaLibrary(highlevel.VisaLibraryBase):
"""A pure Python backend for PyVISA.
Expand Down Expand Up @@ -124,6 +126,9 @@ def _init(self) -> None:
"""Custom initialization code."""
# Map session handle to session object.
self.sessions = {}
# Dictionary to map sessions to their event handlers
# Structure: { session_id: { event_type: [ (handler, user_handle), ... ] } }
self.handlers = {}
Comment on lines +129 to +131
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not need to live here and can simply exist on each session.


def _register(self, obj: Session) -> VISASession:
"""Creates a random but unique session handle for a session object.
Expand Down Expand Up @@ -758,6 +763,32 @@ def unlock(self, session: VISASession) -> StatusCode:
return self.handle_return_value(session, StatusCode.error_invalid_object)
return self.handle_return_value(session, sess.unlock())

def enable_event(self, session, event_type, mechanism, context=None):
"""Enables notification for an event."""

# 1. Retrieve the actual session object (e.g., USBSession, TCPIPSession)
# In pyvisa-py, sessions are usually stored in self.sessions
obj = self.sessions[session]

# 2. Start a listener thread for this specific event/session
if event_type == constants.VI_EVENT_SERVICE_REQ:
# This is a hypothetical method you must add to the specific session classes
# (e.g., in pyvisa_py/usb.py or pyvisa_py/tcpip.py)
if hasattr(obj, "start_srq_listener"):
obj.start_srq_listener(callback=self._dispatch_event)
else:
return constants.VI_ERROR_NSUP_OPER
return constants.VI_SUCCESS
Comment on lines +766 to +781
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than testing for specific methods on the session object. I would expect to have a method enable event on the session which is not implemented by default (i.e. return constants.VI_ERROR_NSUP_OPER, while logging a nicer message)


def _dispatch_event(self, session_id, event_type):
"""Internal method called by the listener thread when hardware triggers."""
if session_id in self.handlers and event_type in self.handlers[session_id]:
for handler, user_handle in self.handlers[session_id][event_type]:
# Call the user's registered function
# Note: VISA handlers require specific arguments: (session, event_type, event_context, user_handle)
# You may need to create a dummy event_context object here
handler(session_id, event_type, None, user_handle)

Comment on lines +783 to +791
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This housl be fully handled within the session

def disable_event(
self,
session: VISASession,
Expand Down Expand Up @@ -811,3 +842,68 @@ def discard_events(

"""
return StatusCode.error_nonimplemented_operation

def install_handler(self, session, event_type, handler, user_handle=None):
"""Stores the handler for a specific session and event type."""
try:
# Ensure the session entry exists
if session not in self.handlers:
self.handlers[session] = {}

# Ensure the event_type entry exists
if event_type not in self.handlers[session]:
self.handlers[session][event_type] = []

# Store the handler and the user_handle
self.handlers[session][event_type].append((handler, user_handle))

return constants.VI_SUCCESS # TODO: use existing status codes?
except Exception as e:
return constants.VI_ERROR_SYSTEM_ERROR # TODO: use existing status codes?

def uninstall_handler(self, session, event_type, handler, user_handle=None):
"""
Removes the callback handler for the specified session and event type.
"""
try:
# 1. Check if the session and event type exist in the registry
if session not in self.handlers or event_type not in self.handlers[session]:
# Standard VISA behavior is to return success if the handler
# effectively isn't there, or a warning.
return constants.VI_SUCCESS

# 2. Filter out the specific handler to be removed
# We keep only the entries that DO NOT match the (handler, user_handle) pair.
# This handles the case where multiple different handlers are registered for the same event.
original_list = self.handlers[session][event_type]

# Check if the user passed a wildcard (Generic VISA behavior)
# If handler is generic, we might want to clear all, but usually
# users uninstall specific functions.
new_list = [
(h, uh)
for (h, uh) in original_list
if not (h == handler and uh == user_handle)
]

# If the list length didn't change, the handler wasn't found.
if len(new_list) == len(original_list):
return constants.VI_WARN_UNKNOWN_STATUS

# 3. Update the registry
self.handlers[session][event_type] = new_list

# 4. Cleanup (Optional optimization)
# If no handlers remain for this event type, we can delete the key.
if not new_list:
del self.handlers[session][event_type]

# IMPROVEMENT: If you want to automatically stop the TCP listener thread
# to save resources when the last handler is removed, you would
# call disable_event here.
# self.disable_event(session, event_type, mechanism)

return constants.VI_SUCCESS

except Exception:
return constants.VI_ERROR_SYSTEM_ERROR
Comment on lines +845 to +909
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be delegated to the session.

249 changes: 249 additions & 0 deletions pyvisa_py/tcpip.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,21 @@
from .protocols import hislip, rpc, vxi11
from .sessions import OpenError, Session, UnknownAttribute, VISARMSession

import struct
import threading

# Constants for Parsing
DEVICE_INTR_SRQ_PROC = 1
RPC_MSG_CALL = 0

# VXI-11 Constants
DEVICE_CORE_PROG = 0x0607B0
DEVICE_CORE_VERS = 1
CREATE_INTR_CHAN_PROC = 12
DEVICE_INTR_PROG = 0x0607AF # The ID the device uses to call us back TODO: does this need to be dynamically assigned?
DEVICE_INTR_VERS = 1
IPPROTO_TCP = 6

Comment on lines +25 to +39
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should come from the vxi11 module (or be defined there if those definition do not exist there).

# Let psutil be optional dependency
try:
import psutil # type: ignore
Expand Down Expand Up @@ -61,6 +76,71 @@
}


def pack_uint(n):
"""Pack a 32-bit unsigned integer (Big Endian)."""
return struct.pack(">I", n)


def pack_string(s):
"""Pack a variable length string (XDR format: Length + Data + Padding)."""
if isinstance(s, str):
s = s.encode("ascii")
length = len(s)
padding = (4 - (length % 4)) % 4
return struct.pack(">I", length) + s + (b"\x00" * padding)


def build_create_intr_chan_packet(host_ip, host_port, xid=None):
"""
Constructs the VXI-11 'create_intr_chan' RPC packet.

Args:
host_ip (str): The IP address of YOUR computer (where the device should connect).
host_port (int): The port YOUR computer is listening on.
"""
if xid is None:
xid = random.randint(1, 0xFFFFFFFF)

# --- 1. RPC Header ---
# Field: [XID] [MsgType:Call=0] [RPCVers:2] [Prog:Core] [Vers:1] [Proc:CreateIntr]
header = (
pack_uint(xid) # Transaction ID
+ pack_uint(0) # Message Type (0 = Call)
+ pack_uint(2) # RPC Version (2)
+ pack_uint(DEVICE_CORE_PROG) # Program (Device Core)
+ pack_uint(DEVICE_CORE_VERS) # Program Version
+ pack_uint(CREATE_INTR_CHAN_PROC) # Procedure (12 = create_intr_chan)
)

# --- 2. Authentication (None) ---
# Field: [Cred Flavor:0] [Cred Len:0] [Verf Flavor:0] [Verf Len:0]
auth = (
pack_uint(0)
+ pack_uint(0) # Credentials (Auth_None)
+ pack_uint(0)
+ pack_uint(0) # Verifier (Auth_None)
)

# --- 3. Arguments ---
# The arguments defined in VXI-11 spec for create_intr_chan:
# (host_addr, host_port, prog_num, prog_vers, prog_prot)
args = (
pack_string(host_ip) # arg1: Host Address (String)
+ pack_uint(host_port) # arg2: Host Port (Uint)
+ pack_uint(DEVICE_INTR_PROG) # arg3: Program Number (0x0607AF)
+ pack_uint(DEVICE_INTR_VERS) # arg4: Program Version (1)
+ pack_uint(IPPROTO_TCP) # arg5: Protocol (6 = TCP)
)

# --- 4. Final Framing (Record Marking) ---
# ONC RPC over TCP requires a 4-byte fragment header.
# The top bit (0x80000000) indicates this is the "Last Fragment".
payload = header + auth + args
fragment_header = 0x80000000 | len(payload)

return struct.pack(">I", fragment_header) + payload


Comment on lines +79 to +143
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything vxi11 specific should be in the vxi11 module.

@Session.register(constants.InterfaceType.tcpip, "INSTR")
class TCPIPInstrSession(Session):
"""A class to dispatch to VXI11 or HiSLIP, based on the protocol."""
Expand Down Expand Up @@ -99,6 +179,175 @@ def get_low_level_info(cls) -> str:
f"\n - hislip: {hislip}"
)

def start_tcp_listener(self, callback):
"""
1. Opens a server socket on a random port.
2. Tells the device where to connect (RPC 'create_intr_chan').
3. Starts a thread to listen for the incoming connection.
"""

# --- 1. Create a server socket on the host ---
# AF_INET = IPv4, SOCK_STREAM = TCP
self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_sock.bind(("", 0)) # Bind to any available port
self.server_sock.listen(1)

# Get the port we were assigned
host_ip, host_port = self.server_sock.getsockname()

# --- 2. Send 'create_intr_chan' RPC to the device ---
# Note: This is the tricky part. You need to construct the VXI-11 RPC packet.
# If pyvisa-py has an internal RPC client, use it.
# Otherwise, you are sending raw bytes.
# This is a conceptual example of the RPC call:
error_code = self._send_create_intr_chan_rpc(host_ip, host_port)

if error_code != 0:
raise Exception("Device refused to create interrupt channel")

# --- 3. Start the Listener Thread ---
self.listening = True
t = threading.Thread(target=self._tcp_listen_loop, args=(callback,))
t.daemon = True
t.start()

def _send_create_intr_chan_rpc(self, host_ip, host_port):
"""Sends the raw RPC packet to the device."""

# 1. Build the packet
packet = build_create_intr_chan_packet(host_ip, host_port)

# 2. Send it over the EXISTING connection to the device
# self.interface is usually the socket object in pyvisa-py TCPIPSession
# Note: You might need to check how pyvisa-py stores the socket.
# It is often in self.visalib.sessions[self.session].interface

# Assuming 'self.interface' is the active socket to the instrument:
self.interface.sendall(packet)

# 3. Read the RPC Reply (Important!)
# We need to ensure the device accepted it.
# Read the 4-byte fragment header first
header_data = self.interface.recv(4)
frag_len = struct.unpack(">I", header_data)[0] & 0x7FFFFFFF

# Read the rest of the reply
reply_data = self.interface.recv(frag_len)

# 4. Parse Reply (Simplified)
# Skip XID(4), MsgType(4), ReplyState(4), Verifier(8), AcceptStatus(4)
# We just want to check if AcceptStatus is 0 (SUCCESS)
# This is at offset 24 (4+4+4+8)
accept_status = struct.unpack(">I", reply_data[24:28])[0]

if accept_status != 0:
return -1 # RPC Error

# The actual return value of create_intr_chan is an "Error" object (integer)
# It is usually the last 4 bytes of the packet
error_code = struct.unpack(">I", reply_data[-4:])[0]

return error_code
Comment on lines +214 to +250
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably take advantage of the Packer existing in the vxi11 module.


def _tcp_listen_loop(self, callback):
"""Waits for the instrument to connect back to us."""
while self.listening:
try:
conn, addr = self.server_sock.accept()
with conn:
# Validate, Reply, and Get Status Byte
stb = self._validate_and_handle_srq(conn)

if stb is not None:
# Trigger the HighLevel callback
# We pass the Session ID, Event Type, and the STB
# Note: PyVISA handlers usually expect (session, event_type, context, user_handle)
# We can pass the STB inside a context object or wrap it if needed.
callback(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The callback should be stored on the session by the sessions install handler method.

self.session.session_id,
constants.VI_EVENT_SERVICE_REQ,
status_byte=stb,
)

except socket.error:
if not self.listening:
break

def _validate_and_handle_srq(self, conn):
"""
Reads the incoming RPC packet, validates it is an SRQ,
extracts the Status Byte, and sends a Success Reply.

Returns:
status_byte (int) if valid SRQ, None otherwise.
"""
# 1. Read the Fragment Header (4 bytes)
header_data = conn.recv(4)
if not header_data:
return None

# The last 31 bits are the length
frag_len = struct.unpack(">I", header_data)[0] & 0x7FFFFFFF

# 2. Read the RPC Packet
data = conn.recv(frag_len)

if len(data) < 24:
return None # Too short to be a valid RPC Call

# 3. Unpack Key Fields
# Struct: [XID:4] [MsgType:4] [RPCVer:4] [Prog:4] [ProgVer:4] [Proc:4]
xid, msg_type, rpc_vers, prog, prog_vers, proc = struct.unpack(
">IIIIII", data[:24]
)

# 4. Validate Context
if (
msg_type == RPC_MSG_CALL
and prog == DEVICE_INTR_PROG
and proc == DEVICE_INTR_SRQ_PROC
):
# This IS a valid SRQ!

# 5. Extract the Status Byte (STB)
# The arguments start after the Creds and Verifier.
# We need to skip Creds(8 bytes) + Verifier(8 bytes) = 16 bytes.
# Standard RPC header is 24 bytes. Total offset = 40 bytes.
# The argument is the Status Byte (passed as an int).
try:
stb = struct.unpack(">I", data[40:44])[0]
except:
stb = 0 # Default if parsing fails

# 6. SEND THE REPLY (Mandatory!)
# We must tell the instrument we received the message.
self._send_srq_reply(conn, xid)

return stb

return None

def _send_srq_reply(self, conn, xid):
"""Sends a successful RPC reply back to the instrument."""

# Structure: [XID] [MsgType:Reply=1] [ReplyState:Accepted=0]
# [Verf:None] [AcceptStatus:Success=0]

reply_payload = (
struct.pack(">I", xid) # Match the XID from the Request
+ struct.pack(">I", 1) # MsgType: Reply
+ struct.pack(">I", 0) # ReplyState: Accepted
+ struct.pack(">I", 0) # Verifier Flavor (None)
+ struct.pack(">I", 0) # Verifier Length (0)
+ struct.pack(">I", 0) # AcceptStatus: Success
)

# Add Fragment Header (Last Fragment bit set)
frag_header = 0x80000000 | len(reply_payload)
full_packet = struct.pack(">I", frag_header) + reply_payload

conn.sendall(full_packet)
Comment on lines +276 to +349
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same using vxi11 module to build this would make more sense.



class TCPIPInstrHiSLIP(Session):
"""A TCPIP Session built on socket standard library using HiSLIP protocol."""
Expand Down
Loading