Skip to content

Commit 9f5b36c

Browse files
committed
Update DataServer.cs
1 parent c7b36fb commit 9f5b36c

File tree

1 file changed

+29
-113
lines changed

1 file changed

+29
-113
lines changed

source/TS.NET.Engine/Waveform Buffer Readers/DataServer.cs

Lines changed: 29 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,12 @@ internal class DataServer : IThread
1212

1313
private readonly ICaptureBufferReader captureBuffer;
1414

15-
private Socket? listener;
16-
private WaveformSession? currentSession;
17-
private readonly object sessionLock = new();
18-
1915
private readonly IPAddress address;
2016
private readonly int port;
2117

2218
private CancellationTokenSource? cancelTokenSource;
2319
private Task? taskListener;
20+
private Task? taskClient;
2421

2522
public DataServer(
2623
ILogger logger,
@@ -38,56 +35,40 @@ public DataServer(
3835
public void Start(SemaphoreSlim startSemaphore)
3936
{
4037
cancelTokenSource = new CancellationTokenSource();
41-
listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
42-
listener.Bind(new IPEndPoint(address, port));
43-
listener.Listen(backlog: 1);
44-
taskListener = Task.Factory.StartNew(() => ListenLoop(logger, listener, cancelTokenSource.Token), TaskCreationOptions.LongRunning);
45-
38+
taskListener = Task.Factory.StartNew(() => ListenerLoop(logger, cancelTokenSource.Token), TaskCreationOptions.LongRunning);
4639
startSemaphore.Release();
4740
}
4841

4942
public void Stop()
5043
{
5144
cancelTokenSource?.Cancel();
5245
taskListener?.Wait();
53-
try
54-
{
55-
listener?.Close();
56-
}
57-
catch { }
58-
59-
lock (sessionLock)
60-
currentSession?.Stop();
61-
62-
lock (sessionLock)
63-
{
64-
currentSession?.Join();
65-
currentSession = null;
66-
}
46+
taskClient?.Wait();
47+
taskClient = null;
6748
}
6849

69-
private void ListenLoop(ILogger logger, Socket listener, CancellationToken cancelToken)
50+
private void ListenerLoop(ILogger logger, CancellationToken cancelToken)
7051
{
52+
var listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
53+
listener.Bind(new IPEndPoint(address, port));
54+
listener.Listen(backlog: 1);
55+
7156
while (!cancelToken.IsCancellationRequested)
7257
{
7358
try
7459
{
7560
var client = listener!.Accept();
7661
logger.LogDebug($"Client accepted: {client.RemoteEndPoint}");
77-
lock (sessionLock)
78-
{
79-
if (currentSession is { IsRunning: true })
62+
63+
if (taskClient != null)
8064
{
8165
logger.LogDebug("A session is already active; rejecting new connection");
8266
try { client.Shutdown(SocketShutdown.Both); } catch { }
8367
try { client.Close(); } catch { }
8468
continue;
8569
}
8670

87-
var session = new WaveformSession(client, logger, captureBuffer, cancelTokenSource.Token, OnSessionClosed);
88-
currentSession = session;
89-
session.Start();
90-
}
71+
taskClient = Task.Factory.StartNew(() => ClientLoop(logger, client, cancelTokenSource.Token), TaskCreationOptions.LongRunning);
9172
}
9273
catch (ObjectDisposedException)
9374
{
@@ -106,66 +87,21 @@ private void ListenLoop(ILogger logger, Socket listener, CancellationToken cance
10687
Thread.Sleep(50);
10788
}
10889
}
109-
}
11090

111-
private void OnSessionClosed(WaveformSession session)
112-
{
113-
lock (sessionLock)
114-
{
115-
if (ReferenceEquals(currentSession, session))
116-
currentSession = null;
117-
}
91+
listener.Close();
11892
}
119-
}
12093

121-
internal class WaveformSession
122-
{
123-
private readonly ILogger logger;
124-
private readonly ICaptureBufferReader captureBuffer;
125-
private readonly CancellationToken cancellationToken;
126-
private readonly Socket socket;
127-
private readonly Thread thread;
128-
private readonly Action<WaveformSession> onClose;
129-
private uint sequenceNumber = 0;
130-
131-
public bool IsRunning { get; private set; }
132-
133-
public WaveformSession(Socket socket, ILogger logger, ICaptureBufferReader captureBuffer, CancellationToken cancellationToken, Action<WaveformSession> onClose)
134-
{
135-
this.socket = socket;
136-
this.logger = logger;
137-
this.captureBuffer = captureBuffer;
138-
this.cancellationToken = cancellationToken;
139-
this.onClose = onClose;
140-
thread = new Thread(Run) { IsBackground = true, Name = $"WaveformSession-{socket.GetHashCode()}" };
141-
}
142-
143-
public void Start()
144-
{
145-
IsRunning = true;
146-
logger.LogDebug($"Waveform session started ({socket.RemoteEndPoint})");
147-
thread.Start();
148-
}
149-
150-
public void Stop()
151-
{
152-
try { socket.Shutdown(SocketShutdown.Both); } catch { }
153-
try { socket.Close(); } catch { }
154-
}
155-
156-
public void Join() => thread.Join();
157-
158-
private void Run()
94+
private void ClientLoop(ILogger logger, Socket client, CancellationToken cancelToken)
15995
{
16096
Span<byte> cmdBuf = stackalloc byte[1];
16197
try
16298
{
163-
while (!cancellationToken.IsCancellationRequested)
99+
while (!cancelToken.IsCancellationRequested)
164100
{
165101
int read = 0;
166102
try
167103
{
168-
read = socket.Receive(cmdBuf);
104+
read = client.Receive(cmdBuf);
169105
}
170106
catch (SocketException se)
171107
{
@@ -179,11 +115,11 @@ private void Run()
179115
switch (cmd)
180116
{
181117
case (byte)'K':
182-
SendScopehalOld();
118+
SendScopehalOld(client, cancelTokenSource.Token);
183119
break;
184120
case (byte)'S':
185121
var sw = Stopwatch.StartNew();
186-
SendScopehal();
122+
SendScopehal(client, cancelTokenSource.Token);
187123
sw.Stop();
188124
logger.LogDebug($"SendScopehal() - {sw.Elapsed.TotalMicroseconds} us");
189125
break;
@@ -200,36 +136,16 @@ private void Run()
200136
}
201137
finally
202138
{
203-
IsRunning = false;
204-
onClose(this);
205-
Stop();
206-
logger.LogDebug("Waveform session ended");
207-
}
208-
}
209139

210-
private void SendAll(ReadOnlySpan<byte> data)
211-
{
212-
while (!data.IsEmpty)
213-
{
214-
int sent = 0;
215-
try
216-
{
217-
sent = socket.Send(data);
218-
}
219-
catch (SocketException se)
220-
{
221-
logger.LogDebug($"Send error {se.SocketErrorCode}");
222-
throw;
223-
}
224-
data = data[sent..];
225140
}
226141
}
227142

228-
private void SendScopehalOld()
143+
private uint sequenceNumber = 0;
144+
private void SendScopehalOld(Socket socket, CancellationToken cancelToken)
229145
{
230146
while (true)
231147
{
232-
cancellationToken.ThrowIfCancellationRequested();
148+
cancelToken.ThrowIfCancellationRequested();
233149
bool noCapturesAvailable = false;
234150
lock (captureBuffer.ReadLock)
235151
{
@@ -274,17 +190,17 @@ private void SendScopehalOld()
274190
}
275191
unsafe
276192
{
277-
SendAll(new ReadOnlySpan<byte>(&header, sizeof(WaveformHeaderOld)));
193+
socket.Send(new ReadOnlySpan<byte>(&header, sizeof(WaveformHeaderOld)));
278194
for (byte captureBufferIndex = 0; captureBufferIndex < captureBuffer.ChannelCount; captureBufferIndex++)
279195
{
280196
int channelIndex = captureMetadata.HardwareConfig.GetChannelIndexByCaptureBufferIndex(captureBufferIndex);
281197
ThunderscopeChannelFrontend thunderscopeChannel = captureMetadata.HardwareConfig.Frontend[channelIndex];
282198
chHeader.channelIndex = (byte)channelIndex;
283199
chHeader.scale = (float)(thunderscopeChannel.ActualVoltFullScale / 256.0);
284200
chHeader.offset = (float)thunderscopeChannel.ActualVoltOffset;
285-
SendAll(new ReadOnlySpan<byte>(&chHeader, sizeof(ChannelHeaderOld)));
201+
socket.Send(new ReadOnlySpan<byte>(&chHeader, sizeof(ChannelHeaderOld)));
286202
var channelBuffer = MemoryMarshal.Cast<sbyte, byte>(captureBuffer.GetChannelReadBuffer<sbyte>(captureBufferIndex));
287-
SendAll(channelBuffer);
203+
socket.Send(channelBuffer);
288204
}
289205
}
290206
sequenceNumber++;
@@ -299,12 +215,12 @@ private void SendScopehalOld()
299215
}
300216
}
301217

302-
private void SendScopehal()
218+
private void SendScopehal(Socket socket, CancellationToken cancelToken)
303219
{
304220
bool noCapturesAvailable = false;
305221
while (true)
306222
{
307-
cancellationToken.ThrowIfCancellationRequested();
223+
cancelToken.ThrowIfCancellationRequested();
308224
lock (captureBuffer.ReadLock)
309225
{
310226
if (captureBuffer.TryStartRead(out var captureMetadata))
@@ -328,7 +244,7 @@ private void SendScopehal()
328244
};
329245
unsafe
330246
{
331-
SendAll(new ReadOnlySpan<byte>(&header, sizeof(WaveformHeader)));
247+
socket.Send(new ReadOnlySpan<byte>(&header, sizeof(WaveformHeader)));
332248
for (byte captureBufferIndex = 0; captureBufferIndex < captureBuffer.ChannelCount; captureBufferIndex++)
333249
{
334250
int channelIndex = captureMetadata.HardwareConfig.GetChannelIndexByCaptureBufferIndex(captureBufferIndex);
@@ -344,7 +260,7 @@ private void SendScopehal()
344260
break;
345261
}
346262
chHeader.offset = (float)thunderscopeChannel.ActualVoltOffset;
347-
SendAll(new ReadOnlySpan<byte>(&chHeader, sizeof(ChannelHeader)));
263+
socket.Send(new ReadOnlySpan<byte>(&chHeader, sizeof(ChannelHeader)));
348264
ReadOnlySpan<byte> channelBuffer = [];
349265
switch (captureMetadata.ProcessingConfig.ChannelDataType)
350266
{
@@ -357,7 +273,7 @@ private void SendScopehal()
357273
channelBuffer = MemoryMarshal.Cast<short, byte>(channelDataI16);
358274
break;
359275
}
360-
SendAll(channelBuffer);
276+
socket.Send(channelBuffer);
361277
}
362278
}
363279
sequenceNumber++;

0 commit comments

Comments
 (0)