diff --git a/opc/__init__.py b/opc/__init__.py index e306a87..d1d0e51 100644 --- a/opc/__init__.py +++ b/opc/__init__.py @@ -14,9 +14,9 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -__version__ = "1.6.0" +__version__ = "1.6.3" -__all__ = ['OPCN2', 'OPCN1'] +__all__ = ['OPCN2', 'OPCN1','OPCR1'] class _OPC(object): """Generic class for any Alphasense OPC. Provides the common methods and calculations for each OPC. This class is designed to be the base class, and should not be used alone unless during development. @@ -1101,3 +1101,629 @@ def read_histogram(self): data['Bin 15'] return data + + + +class OPCR1(_OPC): + """Create an instance of the Alphasene OPC-R1. Currently supported by firmware + versions 2.10. opc.OPCR1 inherits from the opc.OPC parent class. + + :param spi_connection: The spidev instance for the SPI connection. + + :type spi_connection: spidev.SpiDev + + :rtype: opc.OPCR1 + + :raises: opc.exceptions.FirmwareVersionError + + :Example: + + >>> alpha = opc.OPCR1(spi) + >>> alpha + Alphasense OPC-R1v2.10 + + """ + def __init__(self, spi_connection, **kwargs): + super(OPCR1, self).__init__(spi_connection, model='R1', firmware=(2, 10), **kwargs) + + firmware_min = 2. # Minimum firmware version supported + firmware_max = 2. # Maximum firmware version supported + self.spi_opc_ready = 0xF3 + self.spi_opc_busy = 0x31 + #if self.firmware['major'] < firmware_min or self.firmware['major'] > firmware_max: + # logger.error("Firmware version is invalid for this device.") + # raise FirmwareVersionError("Your firmware is not yet supported. Only version 2 is currently supported.") + + def get_ready_response(self, spi_command): + """ + Verify that the OPC is ready for communication + Flushes the SPI connection if 0x31 is returned after 20 attempts + or reset the OPC otherwise. + + :param spi_command: The command to be sent to the sensor + + :rtype: NULL + """ + + message = self._attempt_get_ready_response(spi_command) + logger.debug("Last byte received: 0x%02x", message) + + if message != self.spi_opc_ready: + + logger.warning("OPCR1 busy") + #self.cnxn.flush() + sleep(2) + logger.warning("Waiting 2s (for OPC comms timeout)") + message = self._attempt_get_ready_response(spi_command) + if message != self.spi_opc_ready: + logger.error("OPCR1 not responding - quitting") + self.cnxn.close() + quit(1) + + return True + + + def _attempt_get_ready_response(self, spi_command): + logger.debug("Attempt to read up to 20 times") + message = self.cnxn.xfer([spi_command])[0] + sleep(1e-3) + + attempt = 0 + while attempt < 20 & message != self.spi_opc_ready: + attempt += 1 + if message != self.spi_opc_ready: + message = self.cnxn.xfer([spi_command])[0] + if message != self.spi_opc_ready: + sleep(1e-3) # Wait 1ms before retrying + logger.debug("Number of attempts: %s",attempt) + return message + + def on(self): + """Turn ON the OPC (fan and laser) + + :rtype: boolean + + :Example: + + >>> alpha.on() + True + """ + self.get_ready_response(0x03) # send the command byte + sleep(10e-3) # sleep for 10 ms + byte_2 = self.cnxn.xfer([0x03])[0] # send the following byte powers the laser and fan + sleep(10) # sleep while it is properly powered on + + return True if byte_2 == 0x03 else False + + def off(self): + """Turn OFF the OPC (fan and laser) + + :rtype: boolean + + :Example: + + >>> alpha.off() + True + """ + self.get_ready_response(0x03) # send the command byte + sleep(10e-3) # sleep for 10 ms + byte_2 = self.cnxn.xfer([0x00])[0] # send the following byte to turn off laser and fan + sleep(0.1) + + return True if byte_2 == 0x03 else False + + def set_laser_power(self, power): + """Set the laser power only. + + :param power: Laser power as a value between 0-255. + + :type power: int + + :rtype: boolean + + :Example: + + >>> alpha.set_laser_power(230) + True + """ + + # Check to make sure the value is a single byte + if power > 255: + raise ValueError("Laser Power should be a single byte (0-255).") + + # Send the command byte + self.get_ready_response(0x04) + + # Wait 10 ms + sleep(10e-3) + + byte = self.cnxn.xfer([power])[0] + + sleep(0.1) + + return True if byte == 0x04 else False + + + def set_bin_weighting_index(self, bin_weighting_index): + """ + :param bin_weighting_index: Bin weigthing index is an integer between 0 and 10. + + :type bin_weighting_index: int + + :rtype: boolean + + :Example: + + >>> alpha.set_bin_weighting_index(2) + True + """ + + if bin_weighting_index < 0 or bin_weighting_index > 10: + raise ValueError("Bin weigthing index is an integer between 0 and 10.") + if not bin_weighting_index.is_integer(): + raise ValueError("Bin weigthing index is an integer between 0 and 10.") + + # Send the command byte + self.get_ready_response(0x05) + + # Wait 10 ms + sleep(10e-3) + + byte = self.cnxn.xfer([bin_weighting_index])[0] + + sleep(0.1) + + return True if byte == 0x05 else False + + def read_info_string(self): + """Reads the information string for the OPC + + :rtype: string + + :Example: + + >>> alpha.read_info_string() + 'OPC-N2 FirmwareVer=OPC-018.2....................BD' + """ + infostring = [] + + # Send the command byte + self.get_ready_response(0x3F) + + # Wait 10 ms + sleep(10e-3) + + # Read the info string by sending 60 empty bytes + for i in range(60): + resp = self.cnxn.xfer([0x00])[0] + infostring.append(chr(resp)) + + sleep(0.1) + + return ''.join(infostring) + + def sn(self): + """Read the Serial Number string. This method is only available on OPC-N2 + firmware versions 18+. + + :rtype: string + + :Example: + + >>> alpha.sn() + 'OPC-N2 123456789' + """ + string = [] + + # Send the command byte + self.get_ready_response(0x10) + + # Wait 10 ms + sleep(10e-3) + + # Read the info string by sending 60 empty bytes + for i in range(60): + resp = self.cnxn.xfer([0x00])[0] + string.append(chr(resp)) + + sleep(0.1) + + return ''.join(string) + + + def write_sn(self): + """Write the Serial Number string. + + **NOTE: This method is currently a placeholder and is not implemented.** + + :param sn: string containing the serial number to write + + :type sn: string + """ + + return + + def read_firmware(self): + """Read the firmware version of the OPC-N2. Firmware v18+ only. + + :rtype: dict + + :Example: + + >>> alpha.read_firmware() + { + 'major': 18, + 'minor': 2, + 'version': 18.2 + } + """ + # Send the command byte + self.get_ready_response(0x12) + + # Wait 10 ms + sleep(10e-3) + + self.firmware['major'] = self.cnxn.xfer([0x12])[0] + self.firmware['minor'] = self.cnxn.xfer([0x12])[0] + + # Build the firmware version + self.firmware['version'] = float('{}.{}'.format(self.firmware['major'], + self.firmware['minor'])) + + sleep(0.1) + + return self.firmware + + def read_config(self): + """ + Read the configuration variables and return as a dictionary. + + **NOTE: This method has not been tested on the OPCR1 + + :rtype: dictionary + + :Example: + + + """ + config = [] + data = {} + + # Send the command byte + self.get_ready_response(0x3C) + + # Wait 10 ms + sleep(10e-3) + + # Read the config variables by sending 193 empty bytes + for i in range(193): + resp = self.cnxn.xfer([0x3C])[0] + config.append(resp) + # Add the bin bounds to the dictionary of data [bytes 0-33] + for i in range(0, 17): + data["Bin Boundary {0}".format(i)] = self._16bit_unsigned(config[2*i], config[2*i + 1]) + + # Add the Bin Boundaries diameter (BDD) [bytes 34-101] + for i in range(0, 17): + data["BBD {0}".format(i)] = self._calculate_float(config[4*i + 34:4*i + 38]) + + # Add the Bin Weighting (BW) [bytes 102-165] + for i in range(0, 16): + data["BW {0}".format(i)] = self._calculate_float(config[4*i + 102:4*i + 106]) + + + # Add the Gain Scaling Coefficient (GSC) and sample flow rate (SFR) + data["GSC"] = self._calculate_float(config[166:170]) + data["SFR"] = self._calculate_float(config[170:174]) + + # Add laser dac (LDAC) and Fan dac (FanDAC) + data["TOF_SFR"] = config[174] + + data["M_A"] = self._calculate_float(config[175:179]) + data["M_B"] = self._calculate_float(config[179:183]) + data["M_C"] = self._calculate_float(config[183:187]) + + data["PVP"] = config[187] + data["PowerStatus"] = config[188] + + data["Max TOF"] = self._16bit_unsigned(config[189], config[190]) + + data["LaserDAC"] = config[191] + + data["BinWeightingIndex"] = config[192] + sleep(0.1) + + return data + + def write_config_variables(self, config_vars): + """ Write configuration variables to non-volatile memory. + + **NOTE: This method is currently a placeholder and is not implemented.** + + :param config_vars: dictionary containing the configuration variables + + :type config_vars: dictionary + """ + logger.warning("This method has not yet been implemented yet.") + + return + + def histogram(self, number_concentration=True): + """Read and reset the histogram. As of v1.3.0, histogram + values are reported in particle number concentration (#/cc) by default. + + :param number_concentration: If true, histogram bins are reported in number concentration vs. raw values. + + :type number_concentration: boolean + + :rtype: dictionary + + :Example: + + >>> alpha.histogram() + { + 'Temperature': None, + 'Pressure': None, + 'Bin 0': 0, + 'Bin 1': 0, + 'Bin 2': 0, + ... + 'Bin 15': 0, + 'SFR': 3.700, + 'Bin1MToF': 0, + 'Bin3MToF': 0, + 'Bin5MToF': 0, + 'Bin7MToF': 0, + 'PM1': 0.0, + 'PM2.5': 0.0, + 'PM10': 0.0, + 'Sampling Period': 2.345, + 'Checksum': 0 + } + """ + resp = [] + data = {} + + # Send the command byte + self.get_ready_response(0x30) + + # Wait 10 ms + sleep(10e-3) + + # read the histogram + for i in range(64): + r = self.cnxn.xfer([0x00])[0] + resp.append(r) + sleep(10e-6) # sleep for 10us + + # convert to real things and store in dictionary! + data['Bin 0'] = self._16bit_unsigned(resp[0], resp[1]) + data['Bin 1'] = self._16bit_unsigned(resp[2], resp[3]) + data['Bin 2'] = self._16bit_unsigned(resp[4], resp[5]) + data['Bin 3'] = self._16bit_unsigned(resp[6], resp[7]) + data['Bin 4'] = self._16bit_unsigned(resp[8], resp[9]) + data['Bin 5'] = self._16bit_unsigned(resp[10], resp[11]) + data['Bin 6'] = self._16bit_unsigned(resp[12], resp[13]) + data['Bin 7'] = self._16bit_unsigned(resp[14], resp[15]) + data['Bin 8'] = self._16bit_unsigned(resp[16], resp[17]) + data['Bin 9'] = self._16bit_unsigned(resp[18], resp[19]) + data['Bin 10'] = self._16bit_unsigned(resp[20], resp[21]) + data['Bin 11'] = self._16bit_unsigned(resp[22], resp[23]) + data['Bin 12'] = self._16bit_unsigned(resp[24], resp[25]) + data['Bin 13'] = self._16bit_unsigned(resp[26], resp[27]) + data['Bin 14'] = self._16bit_unsigned(resp[28], resp[29]) + data['Bin 15'] = self._16bit_unsigned(resp[30], resp[31]) + data['Bin1 MToF'] = self._calculate_mtof(resp[32]) + data['Bin3 MToF'] = self._calculate_mtof(resp[33]) + data['Bin5 MToF'] = self._calculate_mtof(resp[34]) + data['Bin7 MToF'] = self._calculate_mtof(resp[35]) + + # Bins associated with firmware versions 14 and 15(?) + data['SFR'] = self._calculate_float(resp[36:40]) + data['Temperature'] = self._conv_st_to_temperature(self._16bit_unsigned(resp[40], resp[41])) + data['Humidity'] = self._conv_srh_to_relative_humidity(self._16bit_unsigned(resp[42], resp[43])) + data['Sampling Period'] = self._calculate_float(resp[44:48]) + data['Reject count glitch'] = resp[48] + data['Reject count long'] = resp[49] + data['PM1'] = self._calculate_float(resp[50:54]) + data['PM2.5'] = self._calculate_float(resp[54:58]) + data['PM10'] = self._calculate_float(resp[58:62]) + data['Checksum'] = self._16bit_unsigned(resp[62], resp[63]) + + # Check that checksum and the least significant bits of the sum of histogram bins + # are equivilant + if self._modbus_calc_crc(resp, 62) != data['Checksum']: + logger.warning("Data transfer was incomplete") + return None + + # If histogram is true, convert histogram values to number concentration + if number_concentration is True: + _conv_ = data['SFR'] * data['Sampling Period'] # Divider in units of ml (cc) + + data['Bin 0'] = data['Bin 0'] / _conv_ + data['Bin 1'] = data['Bin 1'] / _conv_ + data['Bin 2'] = data['Bin 2'] / _conv_ + data['Bin 3'] = data['Bin 3'] / _conv_ + data['Bin 4'] = data['Bin 4'] / _conv_ + data['Bin 5'] = data['Bin 5'] / _conv_ + data['Bin 6'] = data['Bin 6'] / _conv_ + data['Bin 7'] = data['Bin 7'] / _conv_ + data['Bin 8'] = data['Bin 8'] / _conv_ + data['Bin 9'] = data['Bin 9'] / _conv_ + data['Bin 10'] = data['Bin 10'] / _conv_ + data['Bin 11'] = data['Bin 11'] / _conv_ + data['Bin 12'] = data['Bin 12'] / _conv_ + data['Bin 13'] = data['Bin 13'] / _conv_ + data['Bin 14'] = data['Bin 14'] / _conv_ + data['Bin 15'] = data['Bin 15'] / _conv_ + + sleep(0.1) + + return data + + def pm(self): + """Read the PM data and reset the histogram + + :rtype: dictionary + + :Example: + + >>> alpha.pm() + { + 'PM1': 0.12, + 'PM2.5': 0.24, + 'PM10': 1.42 + } + """ + + resp = [] + data = {} + + # Send the command byte + self.get_ready_response(0x32) + + # Wait 10 ms + sleep(10e-3) + + # read the histogram + for i in range(14): + r = self.cnxn.xfer([0x00])[0] + resp.append(r) + + # convert to real things and store in dictionary! + data['PM1'] = self._calculate_float(resp[0:4]) + data['PM2.5'] = self._calculate_float(resp[4:8]) + data['PM10'] = self._calculate_float(resp[8:12]) + + data['Checksum'] = self._16bit_unsigned(resp[12], resp[13]) + + # Check that checksum and the least significant bits of the sum of histogram bins + # are equivilant + if self._modbus_calc_crc(resp, 12) != data['Checksum']: + logger.warning("Data transfer was incomplete") + return None + + sleep(0.1) + + return data + + def _conv_st_to_temperature(self, st): + #Convert SHT31 ST output to Temperature (C) + + return -45 + 175*st/65535 + + + + + def _conv_srh_to_relative_humidity(self, srh): + #Convert SHT31 SRH output to Relative Humidity (%): + return 100*srh/65535 + + + def _modbus_calc_crc(self, data, nbr_of_bytes): + + + polynomial_modbus = 0xA001 #Generator polynomial for MODBUS crc + init_crc_val_modbus = 0xFFFF #Initial CRC value + + crc = init_crc_val_modbus + byte_ctr = 0 + while byte_ctr < nbr_of_bytes: + crc ^= data[byte_ctr] + byte_ctr += 1 + for i in range(0,8): + if crc & 1: + crc >>= 1 + crc ^= polynomial_modbus + else: + crc >>= 1 + return crc + + + def save_config_variables(self): + """Save the configuration variables in non-volatile memory. This method + should be used in conjuction with *write_config_variables*. + + :rtype: boolean + + :Example: + + >>> alpha.save_config_variables() + True + """ + + byte_list = [0x3F, 0x3C, 0x3F, 0x3C, 0x43] + success = [0x43, 0x3F, 0x3C, 0x3F, 0x3C] + resp = [] + + # Send the command byte + self.get_ready_response(0x43) + + # Wait 10 ms + sleep(10e-3) + + # Send the rest of the config bytes + for each in byte_list: + res = self.cnxn.xfer([each])[0] + resp.append(res) + + sleep(0.1) + + return True if resp == success else False + + def _enter_bootloader_mode(self): + """Enter bootloader mode. Must be issued prior to writing + configuration variables to non-volatile memory. + + :rtype: boolean + + :Example: + + >>> alpha._enter_bootloader_mode() + True + """ + + return True if self.cnxn.xfer(0x41)[0] == 0xF3 else False + + def check_status(self): + """Check the status of the OPCR1. + + :rtype: boolean + + :Example: + + >>> alpha.check_status() + True + """ + + # Send the command byte + self.get_ready_response(0xCF) + + # Wait 10 ms + sleep(10e-3) + + res = self.cnxn.xfer(0xCF)[0] + + if res == self.spi_opc_ready: + return True + + return False + + def reset(self): + """Reset the OPCR1. + + :rtype: boolean + + :Example: + + >>> alpha.reset() + True + """ + + # Send the command byte + self.get_ready_response(0x06) + + # Wait 10 ms + sleep(10e-3) + + return True diff --git a/tests/opcr1_test.py b/tests/opcr1_test.py new file mode 100755 index 0000000..0eb11b2 --- /dev/null +++ b/tests/opcr1_test.py @@ -0,0 +1,40 @@ +import unittest +from time import sleep +from opc import OPCR1 +from opc.exceptions import SpiConnectionError, FirmwareVersionError +from usbiss.spi import SPI + +interval = 1 + +spi = SPI("/dev/ttyACM0",mode=1, max_speed_hz=500000) + +alpha = OPCR1(spi) + + + +alpha.on() +sleep(10) + +print("Read info string") +print(alpha.read_info_string()) + +print("Read serial number") +print(alpha.sn()) + +print("Read firmware") +print(alpha.read_firmware()) + + +print("Read config") +print(alpha.read_config()) + + +print("Read Histogram") +print(alpha.histogram()) + +sleep(10) +print("Read PM") +print(alpha.pm()) + + +alpha.off()