diff --git a/internal/impl/pure/processor_template.go b/internal/impl/pure/processor_template.go new file mode 100644 index 000000000..4c324344f --- /dev/null +++ b/internal/impl/pure/processor_template.go @@ -0,0 +1,197 @@ +// Copyright 2025 Redpanda Data, Inc. + +package pure + +import ( + "bytes" + "context" + "errors" + "text/template" + + "github.com/redpanda-data/benthos/v4/internal/bundle" + "github.com/redpanda-data/benthos/v4/internal/component/interop" + "github.com/redpanda-data/benthos/v4/public/bloblang" + "github.com/redpanda-data/benthos/v4/public/service" +) + +func tmplProcConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Beta(). + Categories("Utility"). + Summary("Executes a Go text/template template on the message content."). + Description(`This processor allows you to apply Go text/template templates to the structured content of messages. The template can access the message data as a structured object. Optionally, a Bloblang mapping can be applied first to transform the data before templating. + +For more information on the template syntax, see https://pkg.go.dev/text/template#hdr-Actions`). + Example( + "Execute template", + `This example uses a xref:components:inputs/generate.adoc[`+"`generate`"+` input] to make payload for the template.`, + ` +input: + generate: + count: 1 + mapping: root.foo = "bar" + +pipeline: + processors: + - template: + code: "{{ .foo }}" +`). + Example( + "Execute template with mapping", + `This example uses a xref:components:inputs/generate.adoc[`+"`generate`"+` input] to make payload for the template.`, + ` +input: + generate: + count: 1 + mapping: root.foo = "bar" + +pipeline: + processors: + - template: + code: "{{ .value }}" + mapping: "root.value = this.foo" +`). + Example( + "Execute template from file", + `This example loads a template from a file and applies it to the message.`, + ` +input: + generate: + count: 1 + mapping: root.foo = "bar" + +pipeline: + processors: + - template: + code: | + {{ template "greeting" . }} + files: ["./templates/greeting.tmpl"] +`). + Example( + "Execute template on array", + `This example uses a xref:components:inputs/generate.adoc[`+"`generate`"+` input] to make payload for the template.`, + ` +input: + generate: + count: 1 + mapping: root = [1, 2, 3] + +pipeline: + processors: + - template: + missing_key: error + code: "{{ range . }}{{ . }}\n{{ end }}" +`). + Fields( + service.NewStringField("code"). + Description("The template code to execute. This should be a valid Go text/template string."). + Example("{{.name}}"). + Optional(), + service.NewStringListField("files"). + Description("A list of file paths containing template definitions. Templates from these files will be parsed and available for execution. Glob patterns are supported, including super globs (double star)."). + Optional(), + service.NewBloblangField("mapping"). + Description("An optional xref:guides:bloblang/about.adoc[Bloblang] mapping to apply to the message before executing the template. This allows you to transform the data structure before templating."). + Optional(), + service.NewStringEnumField("missing_key", "default", "invalid", "zero", "error"). + Description("Control the behavior during execution if a map is indexed with a key that is not present in the map."). + Default("default"). + Optional(), + ) +} + +func init() { + service.MustRegisterProcessor( + "template", + tmplProcConfig(), + func(conf *service.ParsedConfig, res *service.Resources) (service.Processor, error) { + mgr := interop.UnwrapManagement(res) + return templateFromParsed(conf, mgr) + }, + ) +} + +type tmplProc struct { + tmpl *template.Template + exec *bloblang.Executor +} + +func templateFromParsed(conf *service.ParsedConfig, mgr bundle.NewManagement) (*tmplProc, error) { + code, err := conf.FieldString("code") + if err != nil { + return nil, err + } + + files, err := conf.FieldStringList("files") + if err != nil { + return nil, err + } + + option, err := conf.FieldString("missing_key") + if err != nil { + return nil, err + } + + option = "missingkey=" + option + + if code == "" && len(files) == 0 { + return nil, errors.New("at least one of 'code' or 'files' fields must be specified") + } + + t := &tmplProc{tmpl: template.New("root").Option(option)} + if len(files) > 0 { + for _, f := range files { + if t.tmpl, err = t.tmpl.ParseGlob(f); err != nil { + return nil, err + } + } + } + + if code != "" { + if t.tmpl, err = t.tmpl.New("code").Parse(code); err != nil { + return nil, err + } + } + + if conf.Contains("mapping") { + if t.exec, err = conf.FieldBloblang("mapping"); err != nil { + return nil, err + } + } + + return t, nil +} + +func (t *tmplProc) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) { + var data any + var err error + if t.exec != nil { + mapRes, err := msg.BloblangQuery(t.exec) + if err != nil { + return nil, err + } + + data, err = mapRes.AsStructured() + if err != nil { + return nil, err + } + } else { + data, err = msg.AsStructured() + if err != nil { + return nil, err + } + } + + var buf bytes.Buffer + if err := t.tmpl.Execute(&buf, data); err != nil { + return nil, err + } + + msg.SetBytes(buf.Bytes()) + + return service.MessageBatch{msg}, nil +} + +func (t *tmplProc) Close(ctx context.Context) error { + return nil +} diff --git a/internal/impl/pure/processor_template_test.go b/internal/impl/pure/processor_template_test.go new file mode 100644 index 000000000..e9b0949ed --- /dev/null +++ b/internal/impl/pure/processor_template_test.go @@ -0,0 +1,85 @@ +package pure + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/benthos/v4/internal/component/testutil" + "github.com/redpanda-data/benthos/v4/internal/manager/mock" + "github.com/redpanda-data/benthos/v4/internal/message" +) + +func TestTemplate(t *testing.T) { + conf, err := testutil.ProcessorFromYAML(` +template: + code: "{{ .name }}" +`) + require.NoError(t, err) + + tmpl, err := mock.NewManager().NewProcessor(conf) + require.NoError(t, err) + + msgIn := message.QuickBatch([][]byte{[]byte(`{"name": "John Doe"}`)}) + msgsOut, err := tmpl.ProcessBatch(t.Context(), msgIn) + require.NoError(t, err) + require.Len(t, msgsOut, 1) + require.Len(t, msgsOut[0], 1) + assert.Equal(t, "John Doe", string(msgsOut[0][0].AsBytes())) + + type testCase struct { + name string + input []string + expected []string + } + + tests := []testCase{ + { + name: "template test 1", + input: []string{`{"name": "John Doe"}`}, + expected: []string{`John Doe`}, + }, + { + name: "template test 2", + input: []string{`{"wrong": "John Doe"}`}, + expected: []string{``}, + }, + } + for _, test := range tests { + test := test + t.Run(test.name, func(t *testing.T) { + msg := message.QuickBatch(nil) + for _, s := range test.input { + msg = append(msg, message.NewPart([]byte(s))) + } + msgs, res := tmpl.ProcessBatch(t.Context(), msg) + require.NoError(t, res) + + resStrs := []string{} + for _, b := range message.GetAllBytes(msgs[0]) { + resStrs = append(resStrs, string(b)) + } + assert.Equal(t, test.expected, resStrs) + }) + } +} + +func TestTemplateError(t *testing.T) { + conf, err := testutil.ProcessorFromYAML(` +template: + missing_key: 'error' + code: '{{ .name }}' +`) + require.NoError(t, err) + + tmpl, err := mock.NewManager().NewProcessor(conf) + require.NoError(t, err) + + msgIn := message.QuickBatch([][]byte{[]byte(`{"wrong": "John Doe"}`)}) + msgsOut, err := tmpl.ProcessBatch(t.Context(), msgIn) + require.NoError(t, err) + require.Len(t, msgsOut, 1) + require.Len(t, msgsOut[0], 1) + assert.Equal(t, string(msgIn[0].AsBytes()), string(msgsOut[0][0].AsBytes())) +}