-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEchoStream.vb
More file actions
162 lines (133 loc) · 5.84 KB
/
EchoStream.vb
File metadata and controls
162 lines (133 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
Imports System.IO
Imports System.Threading
Imports System.Collections.Concurrent
Imports System.Runtime
''' <summary>
''' EchoStream.NET<br />
''' Written By Brian Coverstone - December 2018<br />
''' <br />
''' An in memory bi-directional stream that allows both writing And reading the same instance.<br />
''' This is useful for situations where you need to pass a stream into a function that writes to it, but you then may want to in turn pass that stream into another function that reads from a stream.<br />
''' Normally steams cannot do this and are unidirectional.<br />
''' This is a very specialized tool that is only required in certain circumstances, such as the PowerBak utility, which needs to split a stream to simultaneously write to both the file system and SQL.
''' </summary>
Public Class EchoStream
Inherits Stream
Public Overrides ReadOnly Property CanTimeout As Boolean = True
Public Overrides Property ReadTimeout As Integer = Timeout.Infinite
Public Overrides Property WriteTimeout As Integer = Timeout.Infinite
Public Overrides ReadOnly Property CanRead As Boolean = True
Public Overrides ReadOnly Property CanSeek As Boolean = False
Public Overrides ReadOnly Property CanWrite As Boolean = True
Public Property CopyBufferOnWrite As Boolean = False
Private ReadOnly _lock As New Object()
'Default underlying mechanism for BlockingCollection is ConcurrentQueue<T>, which is what we want
Private ReadOnly _Buffers As BlockingCollection(Of Byte())
Private _maxQueueDepth As Integer = 10
Private m_buffer As Byte() = Nothing
Private m_offset As Integer = 0
Private m_count As Integer = 0
Private m_Closed As Boolean = False
Public Overrides Sub Close()
m_Closed = True
'release any waiting writes
_Buffers.CompleteAdding()
End Sub
Public ReadOnly Property DataAvailable As Boolean
Get
Return _Buffers.Count > 0
End Get
End Property
Private _Length As Long = 0L
Public Overrides ReadOnly Property Length As Long
Get
Return _Length
End Get
End Property
Private _Position As Long = 0L
Public Overrides Property Position As Long
Get
Return _Position
End Get
Set(value As Long)
Throw New NotImplementedException()
End Set
End Property
Public Sub New()
Me.New(10)
End Sub
Public Sub New(ByVal maxQueueDepth As Integer)
_maxQueueDepth = maxQueueDepth
_Buffers = New BlockingCollection(Of Byte())(_maxQueueDepth)
End Sub
'we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
Public Overloads Function WriteAsync(ByVal buffer As Byte(), ByVal offset As Integer, ByVal count As Integer) As Task
Return Task.Run(Sub() Write(buffer, offset, count))
End Function
'we override the xxxxAsync functions because the default base class shares state between ReadAsync and WriteAsync, which causes a hang if both are called at once
Public Overloads Function ReadAsync(ByVal buffer As Byte(), ByVal offset As Integer, ByVal count As Integer) As Task(Of Integer)
Return Task.Run(Function()
Return Read(buffer, offset, count)
End Function)
End Function
Public Overrides Sub Write(ByVal buffer As Byte(), ByVal offset As Integer, ByVal count As Integer)
If m_Closed OrElse buffer.Length - offset < count OrElse count <= 0 Then Return
Dim newBuffer As Byte()
If Not CopyBufferOnWrite AndAlso offset = 0 AndAlso count = buffer.Length Then
newBuffer = buffer
Else
newBuffer = New Byte(count - 1) {}
System.Buffer.BlockCopy(buffer, offset, newBuffer, 0, count)
End If
If Not _Buffers.TryAdd(newBuffer, WriteTimeout) Then Throw New TimeoutException("EchoStream Write() Timeout")
_Length += count
End Sub
Public Overrides Function Read(ByVal buffer As Byte(), ByVal offset As Integer, ByVal count As Integer) As Integer
If count = 0 Then Return 0
SyncLock _lock
If m_count = 0 AndAlso _Buffers.Count = 0 Then
If m_Closed Then
Return 0
End If
If _Buffers.TryTake(m_buffer, ReadTimeout) Then
m_offset = 0
m_count = m_buffer.Length
Else
Return 0
End If
End If
Dim returnBytes As Integer = 0
Do While count > 0
If m_count = 0 Then
If _Buffers.TryTake(m_buffer, 0) Then
m_offset = 0
m_count = m_buffer.Length
Else
Exit Do
End If
End If
Dim bytesToCopy = If((count < m_count), count, m_count)
System.Buffer.BlockCopy(m_buffer, m_offset, buffer, offset, bytesToCopy)
m_offset += bytesToCopy
m_count -= bytesToCopy
offset += bytesToCopy
count -= bytesToCopy
returnBytes += bytesToCopy
Loop
_Position += returnBytes
Return returnBytes
End SyncLock
End Function
Public Overrides Function ReadByte() As Integer
Dim returnValue = New Byte(0) {}
Return If(Read(returnValue, 0, 1) <= 0, -1, returnValue(0))
End Function
Public Overrides Sub Flush()
End Sub
Public Overrides Function Seek(offset As Long, origin As SeekOrigin) As Long
Throw New NotImplementedException()
End Function
Public Overrides Sub SetLength(value As Long)
Throw New NotImplementedException()
End Sub
End Class