Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion app/LabTool.pro
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
SOURCES += \
capture/streamworker.cpp \
capture/uicapturestreamer.cpp \
main.cpp \
generator/i2cgenerator.cpp \
device/devicemanager.cpp \
Expand Down Expand Up @@ -75,6 +77,8 @@ SOURCES += \
device/reconfigurelistener.cpp

HEADERS += \
capture/streamworker.h \
capture/uicapturestreamer.h \
generator/i2cgenerator.h \
libusbx/include/libusbx-1.0/libusb.h \
device/devicemanager.h \
Expand Down Expand Up @@ -164,7 +168,7 @@ else:unix:!symbian: LIBS += -L$$PWD/libusbx/Linux/ -lusb-1.0 -ludev
INCLUDEPATH += $$PWD/libusbx/MS32/dll
DEPENDPATH += $$PWD/libusbx/MS32/dll

QT += widgets
QT += widgets network

mac {
ICON = resources/oscilloscope.icns
Expand Down
67 changes: 67 additions & 0 deletions app/capture/captureapp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ CaptureApp::CaptureApp(QWidget* uiContext, QObject *parent) :
mUiContext = uiContext;
mCaptureActive = false;


mContinuous = false;
// Deallocation: uiContext is set as parent
mArea = new UiCaptureArea(mSignalManager, uiContext);
Expand All @@ -75,6 +76,10 @@ CaptureApp::CaptureApp(QWidget* uiContext, QObject *parent) :
}
}


mStreamingActive = false;
captureStreamer = new UiCaptureStreamer(DeviceManager::instance().activeDevice()->captureDevice(), mUiContext);

}

/*!
Expand Down Expand Up @@ -275,6 +280,14 @@ void CaptureApp::saveProject(QSettings &project)
*/
void CaptureApp::handleDeviceChanged(Device* activeDevice)
{
// recreate captureStreamer (even if not running)
delete captureStreamer;
captureStreamer = new UiCaptureStreamer(activeDevice->captureDevice(), mUiContext);
// update UI to follow up
// this will make it end up in the "stopped" UI state
mStreamingActive = true;
streamData();

setupRates(activeDevice->captureDevice());
mSignalManager->reloadSignalsFromDevice();
mArea->updateAnalogGroup();
Expand Down Expand Up @@ -454,6 +467,18 @@ void CaptureApp::createMenu()
connect(action, SIGNAL(triggered()), this, SLOT(exportData()));
mMenu->addAction(action);


//
// Set Up Streaming via Network
//

mStreamAction = new QAction(tr("Stream Data to Socket"), this);
mStreamAction->setData("Stream Data to Socket");
mStreamAction->setToolTip("Open a socket and send the currently captured data there");
connect(mStreamAction, SIGNAL(triggered()), this, SLOT(streamData()));
mMenu->addAction(mStreamAction);


}

/*!
Expand Down Expand Up @@ -747,6 +772,48 @@ void CaptureApp::exportData()

}

/*!
Called when the user selects to stream data to socket, adapted from exportData
*/
void CaptureApp::streamData()
{
if(mStreamingActive) {
// currently streaming, so stop now
emit captureStreamer->stopWorker();
mStreamingActive = false;
mStreamAction->setText(tr("Stream Data to Socket"));
mStreamAction->setData("Stream Data to Socket");

} else {
// currently not streaming, so (try to) start now

// checks before streaming
CaptureDevice* device = DeviceManager::instance().activeDevice()
->captureDevice();
if (device == NULL) {
return;
}
// check if there is data to stream
if(device->digitalSignals().empty() && device->analogSignals().empty()) {
QMessageBox::warning(mUiContext,
"No signal found",
"Please add at least one signal!");
return;
}


if(captureStreamer->exec() != QDialog::Accepted) {
// not accepted, abort
return;
}

mStreamingActive = true;
mStreamAction->setText(tr("Stop Streaming"));
mStreamAction->setData("Stop Streaming");
}

}

/*!
Called when the sample rate has changed.
*/
Expand Down
6 changes: 6 additions & 0 deletions app/capture/captureapp.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <QSettings>

#include "uicapturearea.h"
#include "uicapturestreamer.h"
#include "device/device.h"

class CaptureApp : public QObject
Expand Down Expand Up @@ -65,11 +66,15 @@ public slots:
QAction* mTbStartAction;
QAction* mTbContinuousAction;
QAction* mTbStopAction;
QAction* mStreamAction;

QComboBox* mRateBox;

bool mCaptureActive;

bool mStreamingActive;
UiCaptureStreamer* captureStreamer;

void createToolBar();
void createMenu();
void changeCaptureActions(bool captureActive);
Expand All @@ -87,6 +92,7 @@ private slots:
void calibrationSettings();
void selectSignalsToAdd();
void exportData();
void streamData();
void sampleRateChanged(int rateIndex);


Expand Down
164 changes: 164 additions & 0 deletions app/capture/streamworker.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#include "streamworker.h"

StreamWorker::StreamWorker(CaptureDevice* device) : QObject(nullptr) {
this->device = device;
// server = new QTcpServer();
// connect(server, &QTcpServer::newConnection, this, &StreamWorker::handleNewConnection);
}

StreamWorker::~StreamWorker()
{
stop();
delete server;
}

/*!
Start streaming (start listening)
*/
void StreamWorker::start(int port) {
if(server == nullptr) {
// the server does not like to be dragged across threads, so create it only when we need it (and already are in the right thread)
server = new QTcpServer();
connect(server, &QTcpServer::newConnection, this, &StreamWorker::handleNewConnection);
}
if(port < 1 || port > 65535) {
// bad port value
setState(StreamingState::ERROR);
return;
}

if(getState() != StreamingState::STOPPED) {
// we are not in the right state, stop and indicate an error
stop();
setState(StreamingState::ERROR);
return;
}

if(!server->listen(QHostAddress::Any, port)) {
// error with the port
setState(StreamingState::ERROR);
server->close();
return;
}

qInfo() << "StreamWorker: Starting";
setState(StreamingState::RUNNING);
}

/*!
Stop streaming
*/
void StreamWorker::stop() {
qInfo() << "StreamWorker: Stopping";
if(getState() == StreamingState::RUNNING) {
server->close();
foreach (auto socket, sockets) {
socket->deleteLater();
}
sockets.clear();
}
setState(StreamingState::STOPPED);
}

/*!
Sets the state, synchronizes with a mutex, and emits the appropriate signal
*/
void StreamWorker::setState(StreamingState newState) {
QMutexLocker stateChangingMutexLocker {&stateChangingMutex};
state = newState;
switch (state) {
case STOPPED:
emit stopped();
break;
case RUNNING:
emit running();
break;
case ERROR:
emit error();
break;
};
}

/*!
Handle a new connection by adding it to the list and remove it when disconnected
*/
void StreamWorker::handleNewConnection() {
QTcpSocket* socket = server->nextPendingConnection();
if(socket == nullptr) {
// I have no idea why this would happen, but it happens
return;
}
sockets.append(socket);
qInfo() << "StreamWorker: Got new connection: " << (long)socket;
connect(socket, &QTcpSocket::disconnected, [this, socket](){
this->sockets.removeOne(socket);
qDebug() << "StreamWorker: Removed a connection: " << (long)socket;
socket->deleteLater();
} );
}

/*!
Send the captured data to all active sockets
*/
void StreamWorker::handleCaptureFinished(bool successful, QString msg) {
// no need to further synchronize this, since all of this class is run via the event loop of its thread
if(getState() == StreamingState::RUNNING && successful) {
qDebug() << "StreamWorker: Got new data, sending to " << sockets.length() << " clients";
QJsonObject json;
this->writeToJson(json);
// write compact json, so one message -> one line
// this simplifies reading in clients with tcp streaming
// so the delimiter between messages will be '\n'
QByteArray jsonBytes = QJsonDocument(json).toJson(QJsonDocument::Compact);
foreach(QTcpSocket* socket, sockets) {
socket->write(jsonBytes);
socket->write("\n");
}
}
}

/*!
Writes the current state of the CaptureDevice to JSON
*/
void StreamWorker::writeToJson(QJsonObject &json) {
QJsonArray jsonSignals;

QList<DigitalSignal*> digitalSignals = device->digitalSignals();
QList<AnalogSignal*> analogSignals = device->analogSignals();

json.insert("sampleRate", device->usedSampleRate());

foreach(DigitalSignal* s, digitalSignals) {
QVector<int>* data = device->digitalData(s->id());
if (data == NULL) continue;

QJsonObject jsonSignal;
jsonSignal.insert("id", s->id());
jsonSignal.insert("name", s->name());
jsonSignal.insert("type", "digital");
QJsonArray jsonData;
foreach(int dataPoint, *data) {
jsonData.append(dataPoint);
}
jsonSignal.insert("data", jsonData);
jsonSignals.append(jsonSignal);
}
foreach(AnalogSignal* s, analogSignals) {
QVector<double>* data = device->analogData(s->id());
if (data == NULL) continue;

QJsonObject jsonSignal;
jsonSignal.insert("id", s->id());
jsonSignal.insert("name", s->name());
jsonSignal.insert("type", "analog");
QJsonArray jsonData;
foreach(double dataPoint, *data) {
jsonData.append(dataPoint);
}
jsonSignal.insert("data", jsonData);
jsonSignals.append(jsonSignal);
}

json.insert("signals", jsonSignals);

}
66 changes: 66 additions & 0 deletions app/capture/streamworker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#ifndef STREAMWORKER_H
#define STREAMWORKER_H

#include <QtNetwork/QTcpServer>
#include <QtNetwork/QTcpSocket>
#include <QJsonObject>
#include <QJsonDocument>
#include <QJsonArray>
#include <QMutex>
#include <QDebug>

#include "device/capturedevice.h"

/*!
\class StreamWorker
\brief Worker class/thread managed by \a UiCaptureStreamer

\ingroup Capture
*/
class StreamWorker : public QObject {
Q_OBJECT

public:
StreamWorker(CaptureDevice* device);
~StreamWorker();

enum StreamingState {
STOPPED, // not running, no current error
RUNNING, // successfully running
ERROR // not running because of an error
};

/// makes sure that this synchronized by a mutex
/// probably a bit paranoid, but doesn't hurt
StreamingState getState() { QMutexLocker stateChangingMutexLocker {&stateChangingMutex}; return state; };

private:
QMutex stateChangingMutex; // indicate that a (possible) state change is happening
QTcpServer *server = nullptr;
StreamingState state = StreamingState::STOPPED;
CaptureDevice* device = nullptr;

QList<QTcpSocket*> sockets;

void writeToJson(QJsonObject &json);

/// makes sure that this synchronized by a mutex and also emits the signal every time
void setState(StreamingState newState);

private slots:
void handleNewConnection();

public slots:
void start(int port);
void stop();
void handleCaptureFinished(bool successful, QString msg);

signals:
void running();
void stopped();
void error();
void deleted();

};

#endif // STREAMWORKER_H
Loading