Skip to content
Open
171 changes: 164 additions & 7 deletions pyvisa-py/gpib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from pyvisa import constants, logger, attributes

from .sessions import Session, UnknownAttribute
from .sessions import Session, UnknownAttribute, EventData

try:
import gpib
Expand All @@ -25,6 +25,7 @@
def _patch_Gpib():
if not hasattr(Gpib, "close"):
_old_del = Gpib.__del__

def _inner(self):
_old_del(self)
self._own = False
Expand Down Expand Up @@ -57,6 +58,8 @@ def _find_listeners():
# TODO: Check board indices other than 0.
BOARD = 0
# TODO: Check secondary addresses.


@Session.register(constants.InterfaceType.gpib, 'INSTR')
class GPIBSession(Session):
"""A GPIB Session that uses linux-gpib to do the low level communication.
Expand All @@ -82,10 +85,20 @@ def after_parsing(self):
timeout = 13
send_eoi = 1
eos_mode = 0
self.interface = Gpib(name=minor, pad=pad, sad=sad, timeout=timeout, send_eoi=send_eoi, eos_mode=eos_mode)
self.controller = Gpib(name=minor) # this is the bus controller device
self.interface = Gpib(name=minor, pad=pad, sad=sad,
timeout=timeout, send_eoi=send_eoi, eos_mode=eos_mode)
self.controller = Gpib(name=minor) # this is the bus controller device
# force timeout setting to interface
self.set_attribute(constants.VI_ATTR_TMO_VALUE, attributes.AttributesByID[constants.VI_ATTR_TMO_VALUE].default)
self.set_attribute(constants.VI_ATTR_TMO_VALUE,
attributes.AttributesByID[constants.VI_ATTR_TMO_VALUE].default)

# prepare set of allowed events
self.valid_event_types = {constants.VI_EVENT_IO_COMPLETION,
constants.VI_EVENT_SERVICE_REQ}

self.enabled_queue_events = set()

self.event_queue = []

def _get_timeout(self, attribute):
if self.interface:
Expand Down Expand Up @@ -150,9 +163,9 @@ def read(self, count):
"""

# 0x2000 = 8192 = END
checker = lambda current: self.interface.ibsta() & 8192
def checker(current): return self.interface.ibsta() & 8192

reader = lambda: self.interface.read(count)
def reader(): return self.interface.read(count)

return self._read(reader, count, checker, False, None, False, gpib.GpibError)

Expand All @@ -171,7 +184,7 @@ def write(self, data):

try:
self.interface.write(data)
count = self.interface.ibcnt() # number of bytes transmitted
count = self.interface.ibcnt() # number of bytes transmitted

return count, StatusCode.success

Expand Down Expand Up @@ -378,3 +391,147 @@ def read_stb(self):
return self.interface.serial_poll(), StatusCode.success
except gpib.GpibError:
return 0, StatusCode.error_system_error

def disable_event(self, event_type, mechanism):
"""Disables notification of the specified event type(s) via the specified mechanism(s).

Corresponds to viDisableEvent function of the VISA library.

:param event_type: Logical event identifier.
:param mechanism: Specifies event handling mechanisms to be disabled.
(Constants.VI_QUEUE, .VI_HNDLR, .VI_SUSPEND_HNDLR, .VI_ALL_MECH)
:return: return value of the library call.
:rtype: :class:`pyvisa.constants.StatusCode`
"""

if event_type not in self.valid_event_types:
return StatusCode.error_invalid_event

if mechanism in (constants.VI_QUEUE, constants.VI_ALL_MECH):
if event_type not in self.enabled_queue_events:
return StatusCode.success_event_already_disabled

self.enabled_queue_events.remove(event_type)
return StatusCode.success

return StatusCode.error_invalid_mechanism

def discard_events(self, event_type, mechanism):
"""Discards event occurrences for specified event types and mechanisms in a session.

Corresponds to viDiscardEvents function of the VISA library.

:param event_type: Logical event identifier.
:param mechanism: Specifies event handling mechanisms to be discarded.
(Constants.VI_QUEUE, .VI_SUSPEND_HNDLR, .VI_ALL_MECH)
:return: return value of the library call.
:rtype: :class:`pyvisa.constants.StatusCode`
"""
if event_type not in self.valid_event_types:
return StatusCode.error_invalid_event

if mechanism in (constants.VI_QUEUE, constants.VI_ALL_MECH):
self.event_queue = [(t, a) for t, a in self.event_queue if not (
event_type == constants.VI_ALL_ENABLED_EVENTS or t == event_type)]
return StatusCode.success

return StatusCode.error_invalid_mechanism

def enable_event(self, event_type, mechanism, context=None):
"""Enable event occurrences for specified event types and mechanisms in a session.

Corresponds to viEnableEvent function of the VISA library.

:param event_type: Logical event identifier.
:param mechanism: Specifies event handling mechanisms to be enabled.
(Constants.VI_QUEUE, .VI_HNDLR, .VI_SUSPEND_HNDLR)
:param context:
:return: return value of the library call.
:rtype: :class:`pyvisa.constants.StatusCode`
"""

if event_type not in self.valid_event_types:
return StatusCode.error_invalid_event

if mechanism in (constants.VI_QUEUE, constants.VI_ALL_MECH):
# enable GPIB autopoll
try:
self.controller.config(7, 1)
except gpib.GpibError:
return StatusCode.error_invalid_setup

if event_type in self.enabled_queue_events:
return StatusCode.success_event_already_enabled
else:
self.enabled_queue_events.add(event_type)
return StatusCode.success

# mechanisms which are not implemented: constants.VI_SUSPEND_HNDLR, constants.VI_ALL_MECH
return StatusCode.error_invalid_mechanism

def wait_on_event(self, in_event_type, timeout):
"""Waits for an occurrence of the specified event for a given session.

Corresponds to viWaitOnEvent function of the VISA library.

:param in_event_type: Logical identifier of the event(s) to wait for.
:param timeout: Absolute time period in time units that the resource shall wait for a specified event to
occur before returning the time elapsed error. The time unit is in milliseconds.
:return: - Logical identifier of the event actually received
- An object specifying the unique occurrence of an event
- return value of the library call.
:rtype: - eventtype
- EventData
- :class:`pyvisa.constants.StatusCode`
"""

if in_event_type not in self.valid_event_types:
return StatusCode.error_invalid_event

if in_event_type not in self.enabled_queue_events:
return StatusCode.error_not_enabled

# if the event queue is empty, wait for more events
if not self.event_queue:
old_timeout = self.timeout
self.timeout = timeout

event_mask = 0

if in_event_type in (constants.VI_EVENT_IO_COMPLETION, constants.VI_ALL_ENABLED_EVENTS):
event_mask |= 0x100 # CMPL

if in_event_type in (constants.VI_EVENT_SERVICE_REQ, constants.VI_ALL_ENABLED_EVENTS):
event_mask |= gpib.RQS

if timeout != 0:
event_mask |= gpib.TIMO

self.interface.wait(event_mask)
sta = self.interface.ibsta()

self.timeout = old_timeout

if 0x100 & event_mask & sta:
# TODO: implement all event attributes
# VI_ATTR_EVENT_TYPE: VI_EVENT_IO_COMPLETION,
# VI_ATTR_STATUS: return code of the asynchronous IO operation that has completed,
# VI_ATTR_JOB_ID: job ID of the asynchronous operation that has completed,
# VI_ATTR_BUFFER: the address of the buffer that was used in the asynchronous operation,
# VI_ATTR_RET_COUNT/VI_ATTR_RET_COUNT_32/VI_ATTR_RET_COUNT_64: number of elements that were asynchronously transferred,
# VI_ATTR_OPER_NAME: name of the operation generating the event
self.event_queue.append(
(constants.VI_EVENT_IO_COMPLETION,
EventData({constants.VI_ATTR_EVENT_TYPE: constants.VI_EVENT_IO_COMPLETION})))

if gpib.RQS & event_mask & sta:
self.event_queue.append(
(constants.VI_EVENT_SERVICE_REQ,
EventData({constants.VI_ATTR_EVENT_TYPE: constants.VI_EVENT_SERVICE_REQ})))

try:
out_event_type, event_data = self.event_queue.pop()
return out_event_type, event_data, StatusCode.success
except IndexError:
return in_event_type, None, StatusCode.error_timeout

Loading