Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
ffd62dc
Add template_processor
artemklevtsov Oct 5, 2025
e0c76f8
Add glob support to the template processor
artemklevtsov Oct 5, 2025
0130ea9
fix: tmplProc.Close method
artemklevtsov Dec 8, 2025
7240afe
fix: template init
artemklevtsov Dec 8, 2025
37e7514
fix templateFromParsed error message
artemklevtsov Dec 8, 2025
f6f7d9b
fix: config examples
artemklevtsov Dec 8, 2025
bf033ed
chore: bump x/crypto package to 0.43.0 (#293)
josephwoodward Oct 10, 2025
6c3ba5a
chore: bump Go from 1.25.1 to 1.25.2 (#295)
josephwoodward Oct 13, 2025
76dfb20
netclient: introduce package to for custom TCP settings
alextreichler Oct 16, 2025
37e3a29
message: preallocate metadata (#300)
mmatczuk Oct 21, 2025
af6ccff
Extend field JSON schema (#301)
tomasz-sadura Oct 22, 2025
d625fe9
Allow deprecated templates (#302)
Jeffail Oct 23, 2025
c2c77f6
Network utils improvements (#297)
mmatczuk Oct 23, 2025
a2e715d
Update CL (#303)
Jeffail Oct 23, 2025
76bf934
bloblang: add "".repeat(N) method (#305)
rockwotj Oct 30, 2025
87fb518
netutil: fix the name of keep_alive idle value
alextreichler Oct 30, 2025
ad912c6
bloblang: add bytes function (#308)
rockwotj Oct 30, 2025
44aea24
netutil: enable decorating listener with SO_REUSEADDR and SO_REUSEPORT
vuldin Oct 31, 2025
cea8d61
Update CL (#311)
Jeffail Nov 3, 2025
c8fd7bc
netutil: add connect_timeout to DialerConfig
mmatczuk Nov 4, 2025
e6717ed
netutil: add connect_timeout to DialerConfig
mmatczuk Nov 4, 2025
a7dda98
netutil: make DialerConfigSpec() return advanced by default
mmatczuk Nov 4, 2025
80f3b69
netutil: add ListenerConfigSpec() and ListenerConfig constructor from…
mmatczuk Nov 4, 2025
bad4c4c
netutil: add support for windows
mmatczuk Nov 6, 2025
05de422
Update CL
mmatczuk Nov 7, 2025
88db177
io: add unixgram option to input socket server
alextreichler Nov 14, 2025
8f6ea44
build(deps): bump shutdown dependency (#320)
josephwoodward Nov 17, 2025
7f3d811
chore: bump changelog (#321)
josephwoodward Nov 21, 2025
eafc684
pure: fix input sequence hanging when input fails
eduardodbr Dec 5, 2025
568680a
io: add new listener opts to input socket_server
alextreichler Dec 5, 2025
66df7fc
Bump actions/setup-go from 5 to 6
dependabot[bot] Sep 8, 2025
2637d15
build(deps): bump golangci/golangci-lint-action from 8 to 9
dependabot[bot] Nov 10, 2025
ef116b5
build(deps): bump actions/checkout from 4 to 6
dependabot[bot] Dec 5, 2025
6f297eb
cli: add support for listing bloblang functions and methods with json…
mmatczuk Dec 2, 2025
bd675c0
cli/blobl: add input field
mmatczuk Dec 3, 2025
53660b2
cli/blobl: fix data race where program exits before printing output
mmatczuk Dec 3, 2025
edae09d
bloblang: function and method descriptions and examples overhaul
mmatczuk Dec 4, 2025
65b14e4
bloblang: create category general and register functions without cate…
mmatczuk Dec 4, 2025
8f599d3
bloblang/query: InCategory aggregate examples into the top-level Exam…
mmatczuk Dec 4, 2025
ccc4d19
Update CL
mmatczuk Dec 4, 2025
1ea1057
build(deps): bump the production-dependencies group across 1 director…
dependabot[bot] Dec 5, 2025
6fd6c25
service: fixe race condition in TestForceTimelyNacksBatchedNoAck
mmatczuk Dec 5, 2025
0c669e0
otel: update tracer name (#331)
rockwotj Dec 8, 2025
15225b0
otel: missed one instance of benthos (#332)
rockwotj Dec 8, 2025
c8eba2d
bloblang/query: add description to map_each
mmatczuk Dec 8, 2025
ab5e020
build(deps): bump the production-dependencies group with 2 updates
dependabot[bot] Dec 8, 2025
c6802ec
feat: add missing_key field
artemklevtsov Dec 8, 2025
0104c23
feeat: add template tests
artemklevtsov Dec 8, 2025
96537eb
fix" linter error
artemklevtsov Dec 8, 2025
ad34339
fix: format imports
artemklevtsov Dec 8, 2025
094f1e1
feaT: add array example
artemklevtsov Dec 9, 2025
28e4252
Merge branch 'main' into template-processor
artemklevtsov Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 197 additions & 0 deletions internal/impl/pure/processor_template.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Copyright 2025 Redpanda Data, Inc.

package pure

import (
"bytes"
"context"
"errors"
"text/template"

"github.com/redpanda-data/benthos/v4/internal/bundle"
"github.com/redpanda-data/benthos/v4/internal/component/interop"
"github.com/redpanda-data/benthos/v4/public/bloblang"
"github.com/redpanda-data/benthos/v4/public/service"
)

func tmplProcConfig() *service.ConfigSpec {
return service.NewConfigSpec().
Beta().
Categories("Utility").
Summary("Executes a Go text/template template on the message content.").
Description(`This processor allows you to apply Go text/template templates to the structured content of messages. The template can access the message data as a structured object. Optionally, a Bloblang mapping can be applied first to transform the data before templating.

For more information on the template syntax, see https://pkg.go.dev/text/template#hdr-Actions`).
Example(
"Execute template",
`This example uses a xref:components:inputs/generate.adoc[`+"`generate`"+` input] to make payload for the template.`,
`
input:
generate:
count: 1
mapping: root.foo = "bar"

pipeline:
processors:
- template:
code: "{{ .foo }}"
`).
Example(
"Execute template with mapping",
`This example uses a xref:components:inputs/generate.adoc[`+"`generate`"+` input] to make payload for the template.`,
`
input:
generate:
count: 1
mapping: root.foo = "bar"

pipeline:
processors:
- template:
code: "{{ .value }}"
mapping: "root.value = this.foo"
`).
Example(
"Execute template from file",
`This example loads a template from a file and applies it to the message.`,
`
input:
generate:
count: 1
mapping: root.foo = "bar"

pipeline:
processors:
- template:
code: |
{{ template "greeting" . }}
files: ["./templates/greeting.tmpl"]
`).
Example(
"Execute template on array",
`This example uses a xref:components:inputs/generate.adoc[`+"`generate`"+` input] to make payload for the template.`,
`
input:
generate:
count: 1
mapping: root = [1, 2, 3]

pipeline:
processors:
- template:
missing_key: error
code: "{{ range . }}{{ . }}\n{{ end }}"
`).
Fields(
service.NewStringField("code").
Description("The template code to execute. This should be a valid Go text/template string.").
Example("{{.name}}").
Optional(),
service.NewStringListField("files").
Description("A list of file paths containing template definitions. Templates from these files will be parsed and available for execution. Glob patterns are supported, including super globs (double star).").
Optional(),
service.NewBloblangField("mapping").
Description("An optional xref:guides:bloblang/about.adoc[Bloblang] mapping to apply to the message before executing the template. This allows you to transform the data structure before templating.").
Optional(),
service.NewStringEnumField("missing_key", "default", "invalid", "zero", "error").
Description("Control the behavior during execution if a map is indexed with a key that is not present in the map.").
Default("default").
Optional(),
)
}

func init() {
service.MustRegisterProcessor(
"template",
tmplProcConfig(),
func(conf *service.ParsedConfig, res *service.Resources) (service.Processor, error) {
mgr := interop.UnwrapManagement(res)
return templateFromParsed(conf, mgr)
},
)
}

type tmplProc struct {
tmpl *template.Template
exec *bloblang.Executor
}

func templateFromParsed(conf *service.ParsedConfig, mgr bundle.NewManagement) (*tmplProc, error) {
code, err := conf.FieldString("code")
if err != nil {
return nil, err
}

files, err := conf.FieldStringList("files")
if err != nil {
return nil, err
}

option, err := conf.FieldString("missing_key")
if err != nil {
return nil, err
}

option = "missingkey=" + option

if code == "" && len(files) == 0 {
return nil, errors.New("at least one of 'code' or 'files' fields must be specified")
}

t := &tmplProc{tmpl: template.New("root").Option(option)}
if len(files) > 0 {
for _, f := range files {
if t.tmpl, err = t.tmpl.ParseGlob(f); err != nil {
return nil, err
}
}
}

if code != "" {
if t.tmpl, err = t.tmpl.New("code").Parse(code); err != nil {
return nil, err
}
}

if conf.Contains("mapping") {
if t.exec, err = conf.FieldBloblang("mapping"); err != nil {
return nil, err
}
}

return t, nil
}

func (t *tmplProc) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
var data any
var err error
if t.exec != nil {
mapRes, err := msg.BloblangQuery(t.exec)
if err != nil {
return nil, err
}

data, err = mapRes.AsStructured()
if err != nil {
return nil, err
}
} else {
data, err = msg.AsStructured()
if err != nil {
return nil, err
}
}

var buf bytes.Buffer
if err := t.tmpl.Execute(&buf, data); err != nil {
return nil, err
}

msg.SetBytes(buf.Bytes())

return service.MessageBatch{msg}, nil
}

func (t *tmplProc) Close(ctx context.Context) error {
return nil
}
85 changes: 85 additions & 0 deletions internal/impl/pure/processor_template_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package pure

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/benthos/v4/internal/component/testutil"
"github.com/redpanda-data/benthos/v4/internal/manager/mock"
"github.com/redpanda-data/benthos/v4/internal/message"
)

func TestTemplate(t *testing.T) {
conf, err := testutil.ProcessorFromYAML(`
template:
code: "{{ .name }}"
`)
require.NoError(t, err)

tmpl, err := mock.NewManager().NewProcessor(conf)
require.NoError(t, err)

msgIn := message.QuickBatch([][]byte{[]byte(`{"name": "John Doe"}`)})
msgsOut, err := tmpl.ProcessBatch(t.Context(), msgIn)
require.NoError(t, err)
require.Len(t, msgsOut, 1)
require.Len(t, msgsOut[0], 1)
assert.Equal(t, "John Doe", string(msgsOut[0][0].AsBytes()))

type testCase struct {
name string
input []string
expected []string
}

tests := []testCase{
{
name: "template test 1",
input: []string{`{"name": "John Doe"}`},
expected: []string{`John Doe`},
},
{
name: "template test 2",
input: []string{`{"wrong": "John Doe"}`},
expected: []string{`<no value>`},
},
}
for _, test := range tests {
test := test
t.Run(test.name, func(t *testing.T) {
msg := message.QuickBatch(nil)
for _, s := range test.input {
msg = append(msg, message.NewPart([]byte(s)))
}
msgs, res := tmpl.ProcessBatch(t.Context(), msg)
require.NoError(t, res)

resStrs := []string{}
for _, b := range message.GetAllBytes(msgs[0]) {
resStrs = append(resStrs, string(b))
}
assert.Equal(t, test.expected, resStrs)
})
}
}

func TestTemplateError(t *testing.T) {
conf, err := testutil.ProcessorFromYAML(`
template:
missing_key: 'error'
code: '{{ .name }}'
`)
require.NoError(t, err)

tmpl, err := mock.NewManager().NewProcessor(conf)
require.NoError(t, err)

msgIn := message.QuickBatch([][]byte{[]byte(`{"wrong": "John Doe"}`)})
msgsOut, err := tmpl.ProcessBatch(t.Context(), msgIn)
require.NoError(t, err)
require.Len(t, msgsOut, 1)
require.Len(t, msgsOut[0], 1)
assert.Equal(t, string(msgIn[0].AsBytes()), string(msgsOut[0][0].AsBytes()))
}