Skip to content

Commit 8412f0d

Browse files
committed
refactor: Replace httpclient with streamhttp for improved HTTP handling across the codebase
1 parent 6c7125c commit 8412f0d

File tree

8 files changed

+45
-60
lines changed

8 files changed

+45
-60
lines changed

camunda/camunda.go

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
"github.com/nativebpm/connectors/camunda/internal/tasks"
1313
"github.com/nativebpm/connectors/camunda/internal/worker"
14-
"github.com/nativebpm/connectors/httpclient"
14+
"github.com/nativebpm/connectors/streamhttp"
1515
)
1616

1717
type ExternalTask = worker.ExternalTask
@@ -21,15 +21,16 @@ type TopicRequest = worker.TopicRequest
2121

2222
// Client represents a Camunda external task client
2323
type Client struct {
24-
httpClient *httpclient.HTTPClient
24+
httpClient *streamhttp.Client
2525
workerID string
2626
logger *slog.Logger
2727
}
2828

2929
// NewClient creates a new Camunda external task client
30-
func NewClient(hostURL, workerID string, logger *slog.Logger) (*Client, error) {
30+
func NewClient(hostURL, workerID string, logger *slog.Logger,
31+
middlewares ...func(http.RoundTripper) http.RoundTripper) (*Client, error) {
3132
baseURL := hostURL + "/engine-rest"
32-
httpClient, err := httpclient.NewClient(http.Client{Timeout: 30 * time.Second}, baseURL)
33+
httpClient, err := streamhttp.NewClient(http.Client{Timeout: 30 * time.Second}, baseURL, middlewares...)
3334
if err != nil {
3435
return nil, fmt.Errorf("failed to create HTTP client: %w", err)
3536
}
@@ -41,22 +42,6 @@ func NewClient(hostURL, workerID string, logger *slog.Logger) (*Client, error) {
4142
}, nil
4243
}
4344

44-
// Use adds middleware to the HTTP client
45-
func (c *Client) Use(middleware httpclient.Middleware) *Client {
46-
c.httpClient.Use(middleware)
47-
return c
48-
}
49-
50-
// WithLogger adds logging middleware to the HTTP client
51-
func (c *Client) WithLogger(logger *slog.Logger) *Client {
52-
if logger == nil {
53-
logger = slog.Default()
54-
}
55-
c.httpClient.WithLogger(logger)
56-
c.logger = logger
57-
return c
58-
}
59-
6045
// TaskCompletion provides a fluent API for completing external tasks
6146
type TaskCompletion = tasks.TaskCompletion
6247

camunda/camunda_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"testing"
1010
"time"
1111

12-
"github.com/nativebpm/connectors/httpclient"
12+
"github.com/nativebpm/connectors/streamhttp"
1313
)
1414

1515
func TestNewClient(t *testing.T) {
@@ -205,7 +205,7 @@ func TestComplete(t *testing.T) {
205205
defer server.Close()
206206

207207
// Create client
208-
httpClient, _ := httpclient.NewClient(http.Client{}, server.URL)
208+
httpClient, _ := streamhttp.NewClient(http.Client{}, server.URL)
209209
client := &Client{
210210
httpClient: httpClient,
211211
workerID: "test-worker",
@@ -248,7 +248,7 @@ func TestHandleFailure(t *testing.T) {
248248
defer server.Close()
249249

250250
// Create client
251-
httpClient, _ := httpclient.NewClient(http.Client{}, server.URL)
251+
httpClient, _ := streamhttp.NewClient(http.Client{}, server.URL)
252252
client := &Client{
253253
httpClient: httpClient,
254254
workerID: "test-worker",
@@ -297,7 +297,7 @@ func TestExtendLock(t *testing.T) {
297297
defer server.Close()
298298

299299
// Create client
300-
httpClient, _ := httpclient.NewClient(http.Client{}, server.URL)
300+
httpClient, _ := streamhttp.NewClient(http.Client{}, server.URL)
301301
client := &Client{
302302
httpClient: httpClient,
303303
workerID: "test-worker",
@@ -336,7 +336,7 @@ func TestUnlock(t *testing.T) {
336336
defer server.Close()
337337

338338
// Create client
339-
httpClient, _ := httpclient.NewClient(http.Client{}, server.URL)
339+
httpClient, _ := streamhttp.NewClient(http.Client{}, server.URL)
340340
client := &Client{
341341
httpClient: httpClient,
342342
workerID: "test-worker",

camunda/examples/loan-granting/main.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/nativebpm/connectors/camunda"
1616
"github.com/nativebpm/connectors/camunda/examples/loan-granting/handlers"
1717
storepkg "github.com/nativebpm/connectors/camunda/examples/loan-granting/store"
18+
"github.com/nativebpm/connectors/streamhttp"
1819
)
1920

2021
func main() {
@@ -34,15 +35,13 @@ func main() {
3435
logger.Info("Submission throttle configured", "delay_ms", submissionDelayMs)
3536

3637
// Create a new Camunda client
37-
client, err := camunda.NewClient("http://localhost:8080", "loan-worker", logger)
38+
client, err := camunda.NewClient("http://localhost:8080", "loan-worker", logger,
39+
streamhttp.LoggingMiddleware(logger))
3840
if err != nil {
3941
logger.Error("Failed to create client", "error", err)
4042
return
4143
}
4244

43-
// Add logging middleware
44-
client.WithLogger(logger)
45-
4645
// Deploy the BPMN process
4746
if err := deployProcess(ctx, client, logger); err != nil {
4847
logger.Error("Failed to deploy process", "error", err)

camunda/go.mod

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ module github.com/nativebpm/connectors/camunda
22

33
go 1.21
44

5-
require github.com/nativebpm/connectors/httpclient v0.1.1
6-
7-
require github.com/google/uuid v1.4.0
5+
require (
6+
github.com/google/uuid v1.6.0
7+
github.com/nativebpm/connectors/streamhttp v1.0.1
8+
)

camunda/go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
2-
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
3-
github.com/nativebpm/connectors/httpclient v0.1.1 h1:5JxQ+LjKq2n/U1NerBG0kyH2CEp2/M3pTWK6/dFM2Wo=
4-
github.com/nativebpm/connectors/httpclient v0.1.1/go.mod h1:Two9T6JfOi12HHIZClriB6v/I8SPbvo91G2+TLYEMcA=
1+
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
2+
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
3+
github.com/nativebpm/connectors/streamhttp v1.0.1 h1:9E1jx9+yts3bCEoHcMYeyw/MgSrODkkro15SfC65rpU=
4+
github.com/nativebpm/connectors/streamhttp v1.0.1/go.mod h1:bUBu284aYzl/NEJykPaufubHYIUy0WMUPnVQn/AXHX4=

camunda/internal/tasks/tasks.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/nativebpm/connectors/camunda/internal/vars"
14-
"github.com/nativebpm/connectors/httpclient"
14+
"github.com/nativebpm/connectors/streamhttp"
1515
)
1616

1717
// rnd is a package-local random number generator used for jitter.
@@ -21,7 +21,7 @@ var rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
2121

2222
// TaskCompletion provides a fluent API for completing external tasks
2323
type TaskCompletion struct {
24-
httpClient *httpclient.HTTPClient
24+
httpClient *streamhttp.Client
2525
workerID string
2626
ctx context.Context
2727
taskID string
@@ -31,7 +31,7 @@ type TaskCompletion struct {
3131
}
3232

3333
// NewTaskCompletion creates a new TaskCompletion builder
34-
func NewTaskCompletion(httpClient *httpclient.HTTPClient, workerID, taskID string, logger *slog.Logger) *TaskCompletion {
34+
func NewTaskCompletion(httpClient *streamhttp.Client, workerID, taskID string, logger *slog.Logger) *TaskCompletion {
3535
if logger == nil {
3636
logger = slog.Default()
3737
}
@@ -226,7 +226,7 @@ func (tc *TaskCompletion) Execute() error {
226226

227227
// TaskFailure preovides a fluent API for reporting task failures
228228
type TaskFailure struct {
229-
httpClient *httpclient.HTTPClient
229+
httpClient *streamhttp.Client
230230
workerID string
231231
ctx context.Context
232232
taskID string
@@ -237,7 +237,7 @@ type TaskFailure struct {
237237
}
238238

239239
// NewTaskFailure creates a new TaskFailure builder
240-
func NewTaskFailure(httpClient *httpclient.HTTPClient, workerID, taskID string) *TaskFailure {
240+
func NewTaskFailure(httpClient *streamhttp.Client, workerID, taskID string) *TaskFailure {
241241
return &TaskFailure{
242242
httpClient: httpClient,
243243
workerID: workerID,
@@ -317,15 +317,15 @@ func (tf *TaskFailure) Execute() error {
317317

318318
// LockExtension provides a fluent API for extending task locks
319319
type LockExtension struct {
320-
httpClient *httpclient.HTTPClient
320+
httpClient *streamhttp.Client
321321
workerID string
322322
ctx context.Context
323323
taskID string
324324
newDuration int
325325
}
326326

327327
// NewLockExtension creates a new LockExtension builder
328-
func NewLockExtension(httpClient *httpclient.HTTPClient, workerID, taskID string, newDuration int) *LockExtension {
328+
func NewLockExtension(httpClient *streamhttp.Client, workerID, taskID string, newDuration int) *LockExtension {
329329
return &LockExtension{
330330
httpClient: httpClient,
331331
workerID: workerID,
@@ -374,14 +374,14 @@ func (le *LockExtension) Execute() error {
374374

375375
// TaskUnlock provides a fluent API for unlocking tasks
376376
type TaskUnlock struct {
377-
httpClient *httpclient.HTTPClient
377+
httpClient *streamhttp.Client
378378
workerID string
379379
ctx context.Context
380380
taskID string
381381
}
382382

383383
// NewTaskUnlock creates a new TaskUnlock builder
384-
func NewTaskUnlock(httpClient *httpclient.HTTPClient, workerID, taskID string) *TaskUnlock {
384+
func NewTaskUnlock(httpClient *streamhttp.Client, workerID, taskID string) *TaskUnlock {
385385
return &TaskUnlock{
386386
httpClient: httpClient,
387387
workerID: workerID,

camunda/internal/worker/worker.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313

1414
"github.com/nativebpm/connectors/camunda/internal/tasks"
1515
"github.com/nativebpm/connectors/camunda/internal/vars"
16-
"github.com/nativebpm/connectors/httpclient"
16+
"github.com/nativebpm/connectors/streamhttp"
1717
)
1818

1919
// TopicRequest represents a topic request for fetching tasks
@@ -110,7 +110,7 @@ type FailFunc func(errorMessage, errorDetails string, retries, retryTimeout int)
110110

111111
// Worker manages external task polling and processing
112112
type Worker struct {
113-
httpClient *httpclient.HTTPClient
113+
httpClient *streamhttp.Client
114114
workerID string
115115
logger *slog.Logger
116116
handlers map[string]TaskHandler
@@ -124,7 +124,7 @@ type Worker struct {
124124
}
125125

126126
// New creates a new external task worker
127-
func New(httpClient *httpclient.HTTPClient, workerID string, logger *slog.Logger) *Worker {
127+
func New(httpClient *streamhttp.Client, workerID string, logger *slog.Logger) *Worker {
128128
if logger == nil {
129129
logger = slog.Default()
130130
}

camunda/internal/worker/worker_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/nativebpm/connectors/camunda/internal/vars"
14-
"github.com/nativebpm/connectors/httpclient"
14+
"github.com/nativebpm/connectors/streamhttp"
1515
)
1616

1717
// TestExternalTask_UnmarshalJSON tests parsing of Camunda timestamp formats
@@ -113,7 +113,7 @@ func (m *MockHandler) Handle(ctx context.Context, task ExternalTask, complete Co
113113
}
114114

115115
func TestNew(t *testing.T) {
116-
httpClient, _ := httpclient.NewClient(http.Client{}, "http://localhost:8080")
116+
httpClient, _ := streamhttp.NewClient(http.Client{}, "http://localhost:8080")
117117
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
118118

119119
worker := New(httpClient, "test-worker", logger)
@@ -152,7 +152,7 @@ func TestNew(t *testing.T) {
152152
}
153153

154154
func TestNew_NilLogger(t *testing.T) {
155-
httpClient, _ := httpclient.NewClient(http.Client{}, "http://localhost:8080")
155+
httpClient, _ := streamhttp.NewClient(http.Client{}, "http://localhost:8080")
156156

157157
worker := New(httpClient, "test-worker", nil)
158158

@@ -166,7 +166,7 @@ func TestNew_NilLogger(t *testing.T) {
166166
}
167167

168168
func TestWorker_RegisterHandler(t *testing.T) {
169-
httpClient, _ := httpclient.NewClient(http.Client{}, "http://localhost:8080")
169+
httpClient, _ := streamhttp.NewClient(http.Client{}, "http://localhost:8080")
170170
logger := slog.Default()
171171
worker := New(httpClient, "test-worker", logger)
172172

@@ -207,7 +207,7 @@ func TestWorker_RegisterHandler(t *testing.T) {
207207
}
208208

209209
func TestWorker_RegisterHandler_Multiple(t *testing.T) {
210-
httpClient, _ := httpclient.NewClient(http.Client{}, "http://localhost:8080")
210+
httpClient, _ := streamhttp.NewClient(http.Client{}, "http://localhost:8080")
211211
worker := New(httpClient, "test-worker", nil)
212212

213213
handler1 := &MockHandler{}
@@ -234,7 +234,7 @@ func TestWorker_RegisterHandler_Multiple(t *testing.T) {
234234
}
235235

236236
func TestWorker_SetMaxTasks(t *testing.T) {
237-
httpClient, _ := httpclient.NewClient(http.Client{}, "http://localhost:8080")
237+
httpClient, _ := streamhttp.NewClient(http.Client{}, "http://localhost:8080")
238238
worker := New(httpClient, "test-worker", nil)
239239

240240
result := worker.SetMaxTasks(20)
@@ -250,7 +250,7 @@ func TestWorker_SetMaxTasks(t *testing.T) {
250250
}
251251

252252
func TestWorker_SetPollInterval(t *testing.T) {
253-
httpClient, _ := httpclient.NewClient(http.Client{}, "http://localhost:8080")
253+
httpClient, _ := streamhttp.NewClient(http.Client{}, "http://localhost:8080")
254254
worker := New(httpClient, "test-worker", nil)
255255

256256
interval := 10 * time.Second
@@ -267,7 +267,7 @@ func TestWorker_SetPollInterval(t *testing.T) {
267267
}
268268

269269
func TestWorker_FluentAPI(t *testing.T) {
270-
httpClient, _ := httpclient.NewClient(http.Client{}, "http://localhost:8080")
270+
httpClient, _ := streamhttp.NewClient(http.Client{}, "http://localhost:8080")
271271
handler := &MockHandler{}
272272

273273
// Test method chaining
@@ -306,7 +306,7 @@ func TestWorker_ProcessTask(t *testing.T) {
306306
}))
307307
defer server.Close()
308308

309-
httpClient, _ := httpclient.NewClient(http.Client{}, server.URL)
309+
httpClient, _ := streamhttp.NewClient(http.Client{}, server.URL)
310310
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
311311
worker := New(httpClient, "test-worker", logger)
312312

@@ -344,7 +344,7 @@ func TestWorker_ProcessTask(t *testing.T) {
344344
}
345345

346346
func TestWorker_ProcessTask_NoHandler(t *testing.T) {
347-
httpClient, _ := httpclient.NewClient(http.Client{}, "http://localhost:8080")
347+
httpClient, _ := streamhttp.NewClient(http.Client{}, "http://localhost:8080")
348348
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelError}))
349349
worker := New(httpClient, "test-worker", logger)
350350

@@ -378,7 +378,7 @@ func TestCompleteFunc(t *testing.T) {
378378
}))
379379
defer server.Close()
380380

381-
httpClient, _ := httpclient.NewClient(http.Client{}, server.URL)
381+
httpClient, _ := streamhttp.NewClient(http.Client{}, server.URL)
382382
worker := New(httpClient, "test-worker", nil)
383383

384384
handler := &MockHandler{}
@@ -418,7 +418,7 @@ func TestFailFunc(t *testing.T) {
418418
}))
419419
defer server.Close()
420420

421-
httpClient, _ := httpclient.NewClient(http.Client{}, server.URL)
421+
httpClient, _ := streamhttp.NewClient(http.Client{}, server.URL)
422422
worker := New(httpClient, "test-worker", nil)
423423

424424
handler := &MockHandler{}

0 commit comments

Comments
 (0)