Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugin_info.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ license = 'MIT'

[plugin-install]
#packages required for your plugin:
packages-required = ['pymodaq>=4.0.0', 'picamera2']
packages-required = ['pymodaq>=4.0.0', 'daqhats', 'picamera2']


[features] # defines the plugin features contained into this plugin
instruments = true # true if plugin contains instrument classes (else false, notice the lowercase for toml files)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
import numpy as np
from pymodaq.utils.daq_utils import ThreadCommand
from pymodaq.utils.data import DataFromPlugins, Axis, DataToExport
from pymodaq.control_modules.viewer_utility_classes import DAQ_Viewer_base, comon_parameters, main
from pymodaq.utils.parameter import Parameter
from daqhats import mcc128, OptionFlags, AnalogInputMode, TriggerModes, AnalogInputRange
from pymodaq.utils.parameter.utils import iter_children


class DAQ_1DViewer_daqhats(DAQ_Viewer_base):
""" Instrument plugin class for a 1D viewer.

This object inherits all functionalities to communicate with PyMoDAQ’s DAQ_Viewer module through inheritance via
DAQ_Viewer_base. It makes a bridge between the DAQ_Viewer module and the Python wrapper of a particular instrument.

Attributes:
-----------
controller: object
The particular object that allow the communication with the hardware, in general a python wrapper around the
hardware library.

"""
params = comon_parameters + [
{'title': 'Trigger Active', 'name': 'trigger_active', 'type': 'bool', 'value': False},
{'title': 'Trigger Mode', 'name': 'trigger_mode', 'type': 'list', 'value': 'RISING_EDGE',
'limits': ['RISING_EDGE', 'FALLING_EDGE', 'ACTIVE_HIGH',
'ACTIVE_LOW']},
{'title': 'External_clock', 'name': 'extclock_mode', 'type': 'bool', 'value': False},
{'title': 'External sampling rate(Hz)', 'name': 'extclock_rate', 'type': 'int', 'value': 0, 'visible': False},
{'title': 'Number of samples:', 'name': 'num_sample', 'type': 'int', 'value': 1000, 'min': 0},
{'title': 'Sampling rate(Hz) :', 'name': 'sampling_rate', 'type': 'int', 'value': 10000, 'min': 0, 'max': 100000},
{'title': 'Range(V)', 'name': 'range', 'type': 'list', 'value': 10, 'limits': [10, 5, 2, 1]},
{'title': 'Mode', 'name': 'mode', 'type': 'list', 'value': 'SINGLE_END',
'limits': ['SINGLE_ENDED', 'DIFFERENTIAL']},
{'title': "Channel", 'name': 'channel_on', 'type': 'group', 'children': [
{'title': 'CH0H', 'name': 'CH0H', 'type': 'bool', 'value': False},
{'title': 'CH1H', 'name': 'CH1H', 'type': 'bool', 'value': False},
{'title': 'CH2H', 'name': 'CH2H', 'type': 'bool', 'value': False},
{'title': 'CH3H', 'name': 'CH3H', 'type': 'bool', 'value': False},
{'title': 'CH0L', 'name': 'CH0L', 'type': 'bool', 'value': False},
{'title': 'CH1L', 'name': 'CH1L', 'type': 'bool', 'value': False},
{'title': 'CH2L', 'name': 'CH2L', 'type': 'bool', 'value': False},
{'title': 'CH3L', 'name': 'CH3L', 'type': 'bool', 'value': False}
]}

]

def __init__(self, parent=None, params_state=None):
super().__init__(parent, params_state)
self.num_channels = None

def ini_attributes(self):
self.controller: mcc128 = None
self.option = 0
self.mode = 0
self.trigger_mode = 0
self.range = 0
self.channel_mask = 0
self.num_channels = 0
self.data_signal = []
self.list_channel_names = []

def commit_settings(self, param: Parameter):
"""Apply the consequences of a change of value in the detector settings
Parameters
----------
param: Parameter
A given parameter (within detector_settings) whose value has been changed by the user
"""

if param.name() == 'mode':
self.set_mode()
elif param.name() == 'range':
self.set_range()
elif param.name() == 'trigger_mode':
self.set_trigger()
elif param.name() in iter_children(self.settings.child('channel_on'), []):
self.active_channel()
elif param.name() == 'sampling_rate':
sample_rate_real = self.controller.a_in_scan_actual_rate(self.num_channels, param.value())
param.setValue(sample_rate_real)
elif param.name() == 'extclock_mode':
self.settings.child('extclock_rate').setOpts(visible = param.value())

def scan_data(self, totalsamples: int, scan_rate: int, name_channel: int):
"""This starts a hardware-paced analog input channel scan . Then this function reads and returns the data from the
buffer.

The aggregate sampling rate of all activated channels can't exceed 100kS/s
================= ================
Channels activate Scan rate (kS/s)
================= ================
1 100
2 50
3 33,33
4 25
5 20
6 16,67
7 14,29
8 12,50
============== ================

Parameters
-----------
totalsamples: int
The numbers of samples which we want to aquire
scan_rate: int
The sampling rate of each channel
name_channel: int
The bit mask of the channels activated

Returns
-------
voltage.data : list
The list of data scanned
"""

self.controller.a_in_scan_start(name_channel, totalsamples, scan_rate, self.option)
voltage = self.controller.a_in_scan_read_numpy(totalsamples, 0.5)
self.controller.a_in_scan_stop()
self.controller.a_in_scan_cleanup()
return voltage.data

def set_range(self):
""" This sets the analog input range to one of these values:
+/- 10V
+/- 5V
+/- 2V
+/- 1V
"""

if self.settings['range'] == 10:
self.range = AnalogInputRange['BIP_10V'].value
elif self.settings['range'] == 5:
self.range = AnalogInputRange['BIP_5V'].value
elif self.settings['range'] == 2:
self.range = AnalogInputRange['BIP_2V'].value
elif self.settings['range'] == 1:
self.range = AnalogInputRange['BIP_1V'].value
self.controller.a_in_range_write(self.range)

def set_mode(self):
""" This sets the analog input mode to one of two values
* Single ended
* Differential
"""

if self.settings['mode'] == 'SINGLE_ENDED':
self.mode = AnalogInputMode.SE.value
elif self.settings['mode'] == 'DIFFERENTIAL':
self.mode = AnalogInputMode.DIFF.value
self.controller.a_in_mode_write(self.mode)

def set_trigger(self):
"""Read and Set the external trigger mode input of the card

There are 4 type available TRIGGER mode:
*RISING_EDGE: Start the scan when the trigger signal transition from LOW to HIGH
*FALLING_EDGE: Start the scan when the trigger signal transition from HIGH to LOW
*ACTIVE_HIGH: Start the scan when the trigger signal is HIGH
*ACTIVE_LOW: Start the scan when the trigger signal is LOW

"""

mode_trigger = self.settings['trigger_mode']
if mode_trigger == 'RISING_EDGE':
self.trigger_mode = TriggerModes.RISING_EDGE
elif mode_trigger == 'FALLING_EDGE':
self.trigger_mode = TriggerModes.FALLING_EDGE
elif mode_trigger == 'ACTIVE_HIGH':
self.trigger_mode = TriggerModes.ACTIVE_HIGH
elif mode_trigger == 'ACTIVE_LOW':
self.trigger_mode = TriggerModes.ACTIVE_LOW
self.controller.trigger_mode(self.trigger_mode)


def active_channel(self):
"""This function reads the states of channels on the interface and sets the bit mask of the active channels,
the number of activated channels, and a list of the names of these channels.

"""

bit_mask = 0 # the bit mask of the active channels
number_channels = 0 # the number of activated channels
name_channels = [] # the list of the names of these channels
if self.settings['channel_on', 'CH0H']:
bit_mask = bit_mask + 1
number_channels += 1
name_channels.append('CH0H')
if self.settings['channel_on', 'CH1H']:
bit_mask = bit_mask + 2
number_channels += 1
name_channels.append('CH1H')
if self.settings['channel_on', 'CH2H']:
bit_mask = bit_mask + 4
number_channels += 1
name_channels.append('CH2H')
if self.settings['channel_on', 'CH3H']:
bit_mask = bit_mask + 8
number_channels += 1
name_channels.append('CH3H')
if self.settings['channel_on', 'CH0L']:
bit_mask = bit_mask + 16
number_channels += 1
name_channels.append('CH0L')
if self.settings['channel_on', 'CH1L']:
bit_mask = bit_mask + 32
number_channels += 1
name_channels.append('CH1L')
if self.settings['channel_on', 'CH2L']:
bit_mask = bit_mask + 64
number_channels += 1
name_channels.append('CH2L')
if self.settings['channel_on', 'CH3L']:
bit_mask = bit_mask + 128
number_channels += 1
name_channels.append('CH3L')
self.channel_mask, self.num_channels, self.label = bit_mask, number_channels, name_channels

def get_data(self, list_signal_non_arranged, nb_channel_activated):
""" Arrange the list of data received from activated channels

This methode arranges the list of data received from activated channels. It converts this list of data into a list
of numpy arrays. The order of each array corresponds to the order of each activated channel on the interface (from CH0H to CH3L)
Example: When you activate 2 channels CHOH and CH2L, this function will return a list of 2 arrays: the first one is
data of channel CH0H and the second one is data of channel CH2L

Parameters
----------
list_signal_non_arranged : list
List of data obtained by function scan_data()
nb_channel_activated : int
The quantity of activated channels

Returns
-------
list_signal_arranged : list of numpy array
list of data of each channel arranged
"""
list_signal_arranged = []
if len(list_signal_non_arranged) != 0:
for k in range(nb_channel_activated):
list_signal_arranged.append(np.array(list_signal_non_arranged[k]))
i = nb_channel_activated
while i <= len(list_signal_non_arranged) - nb_channel_activated:
for j in range(nb_channel_activated):
list_signal_arranged[j] = np.append(list_signal_arranged[j], list_signal_non_arranged[i + j])
i = i + nb_channel_activated
return list_signal_arranged

def ini_detector(self, controller=None):
"""Detector communication initialization

Parameters
----------
controller: (object)
custom object of a PyMoDAQ plugin (Slave case). None if only one actuator/detector by controller
(Master case)

Returns
-------
info: str
initialized: bool
False if initialization failed otherwise True
"""

self.ini_detector_init(old_controller=controller,
new_controller=mcc128(0))

info = ""
initialized = True
return info, initialized

def close(self):
pass
"""Terminate the communication protocol"""

def grab_data(self, Naverage=1, **kwargs):
"""Start a grab from the detector

Parameters
----------
Naverage: int
Number of hardware averaging (if hardware averaging is possible, self.hardware_averaging should be set to
True in class preamble and you should code this implementation)
kwargs: dict
others optionals arguments
"""

num_sample = self.settings['num_sample']
freq = self.settings['sampling_rate']

self.option = 0

# Set up mode Trigger
if self.settings['trigger_active'] == True:
self.option += OptionFlags['EXTTRIGGER'].value

# Set up mode External clock
if self.settings['extclock_mode'] == True:
self.option += OptionFlags['EXTCLOCK'].value
external_spl_rate = self.settings['extclock_rate']
if external_spl_rate == 0:
raise ValueError("Sampling rate cannot be zero")
else:
xaxis = Axis('time', 'seconds', np.arange(0, num_sample * 1 / external_spl_rate, 1 / external_spl_rate),
0) # Redefine the x-axis when the external clock is used
else:
xaxis = Axis('time', 'seconds', np.arange(0, num_sample * 1 / freq, 1 / freq), 0)

self.channel_mask, self.num_channels, self.list_channel_names = self.active_channel()

signal = self.scan_data(num_sample, freq, self.channel_mask)

if len(signal) != 0: # Condition to avoid empty data error
self.data_signal = self.get_data(signal, self.num_channels)

self.dte_signal.emit(DataToExport('mcc128_plugin', data=[DataFromPlugins(name='mcc128', data=self.data_signal,
dim='Data1D',
labels=self.list_channel_names,
axes=[xaxis])]))

def stop(self):
"""Stop the current grab hardware wise if necessary"""
return ''


if __name__ == '__main__':
main(__file__, init=False)