From e74b94150bd387dcaef3133adc3ecfc2e822c3e8 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Tue, 6 Jan 2026 23:41:44 +0530 Subject: [PATCH 1/4] Add PAUSE/RESUME TRIGGER support for TableTriggers --- .../java/com/linkedin/hoptimator/Trigger.java | 1 + .../jdbc/HoptimatorDdlExecutor.java | 46 ++++++++- .../hoptimator/k8s/K8sTableTriggerTable.java | 5 +- .../hoptimator/k8s/K8sTriggerDeployer.java | 25 +++++ .../k8s/models/V1alpha1TableTrigger.java | 2 +- .../k8s/models/V1alpha1TableTriggerList.java | 2 +- .../k8s/models/V1alpha1TableTriggerSpec.java | 33 ++++++- .../models/V1alpha1TableTriggerStatus.java | 2 +- .../src/main/resources/tabletriggers.crd.yaml | 7 ++ .../hoptimator/k8s/TestSqlScripts.java | 5 + .../src/test/resources/k8s-trigger-pause.id | 52 ++++++++++ .../trigger/TableTriggerReconciler.java | 91 ++++++++++++------ .../trigger/TestTableTriggerReconciler.java | 95 +++++++++++++++++++ 13 files changed, 329 insertions(+), 37 deletions(-) create mode 100644 hoptimator-k8s/src/test/resources/k8s-trigger-pause.id diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java index dc6591fe..1eabf09a 100644 --- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Trigger.java @@ -6,6 +6,7 @@ public class Trigger implements Deployable { + public static final String PAUSED_OPTION = "paused"; private final String name; private final UserJob job; private final List path; diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index b6b0bbdf..5cc63f7c 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -32,6 +32,8 @@ import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView; import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger; import com.linkedin.hoptimator.util.ArrayTable; +import com.linkedin.hoptimator.jdbc.ddl.SqlPauseTrigger; +import com.linkedin.hoptimator.jdbc.ddl.SqlResumeTrigger; import com.linkedin.hoptimator.util.DeploymentService; import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema; import com.linkedin.hoptimator.util.planner.HoptimatorJdbcTable; @@ -41,6 +43,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -61,6 +64,7 @@ import org.apache.calcite.schema.impl.ViewTable; import org.apache.calcite.server.DdlExecutor; import org.apache.calcite.server.ServerDdlExecutor; +import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; @@ -490,8 +494,48 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) { logger.info("CREATE TABLE {} completed", tableName); } - // N.B. originally copy-pasted from Apache Calcite + /** Executes a {@code PAUSE TRIGGER} command. */ + public void execute(SqlPauseTrigger pause, CalcitePrepare.Context context) { + updateTriggerPausedState(pause, pause.name, true); + } + + /** Executes a {@code RESUME TRIGGER} command. */ + public void execute(SqlResumeTrigger resume, CalcitePrepare.Context context) { + updateTriggerPausedState(resume, resume.name, false); + } + + private void updateTriggerPausedState(SqlNode sqlNode, SqlIdentifier triggerName, boolean paused) { + logger.info("Validating statement: {}", sqlNode); + try { + ValidationService.validateOrThrow(sqlNode); + } catch (SQLException e) { + throw new DdlException(sqlNode, e.getMessage(), e); + } + + if (triggerName.names.size() > 1) { + throw new DdlException(sqlNode, "Triggers cannot belong to a schema or database."); + } + String name = triggerName.names.get(0); + + Map options = new HashMap<>(); + options.put(Trigger.PAUSED_OPTION, String.valueOf(paused)); + Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, options); + + Collection deployers = null; + try { + logger.info("Updating trigger {} with paused state: {}", name, paused); + deployers = DeploymentService.deployers(trigger, connection); + DeploymentService.update(deployers); + logger.info("Successfully updated trigger {} with paused state: {}", name, paused); + } catch (Exception e) { + if (deployers != null) { + DeploymentService.restore(deployers); + } + throw new DdlException(sqlNode, e.getMessage(), e); + } + } + // N.B. largely copy-pasted from Apache Calcite /** Executes {@code DROP FUNCTION}, {@code DROP TABLE}, {@code DROP MATERIALIZED VIEW}, {@code DROP TYPE}, * {@code DROP VIEW} commands. */ @Override diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTriggerTable.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTriggerTable.java index 5c07a4a6..cd2603e5 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTriggerTable.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTableTriggerTable.java @@ -15,13 +15,15 @@ public static class Row { public String NAME; public String SCHEMA; public String TABLE; + public Boolean PAUSED; public String TIMESTAMP; public String WATERMARK; - public Row(String name, String schema, String table, String timestamp, String watermark) { + public Row(String name, String schema, String table, Boolean paused, String timestamp, String watermark) { this.NAME = name; this.SCHEMA = schema; this.TABLE = table; + this.PAUSED = paused; this.TIMESTAMP = timestamp; this.WATERMARK = watermark; } @@ -35,6 +37,7 @@ public K8sTableTriggerTable(K8sContext context) { @Override public Row toRow(V1alpha1TableTrigger obj) { return new Row(obj.getMetadata().getName(), obj.getSpec().getSchema(), obj.getSpec().getTable(), + obj.getSpec().getPaused(), Optional.ofNullable(obj.getStatus()) .flatMap(x -> Optional.ofNullable(x.getTimestamp())) .map(x -> x.toString()).orElse(null), diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java index 1bc29de0..8c5bfa36 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java @@ -20,15 +20,40 @@ class K8sTriggerDeployer extends K8sDeployer triggerApi; private final K8sApi jobTemplateApi; K8sTriggerDeployer(Trigger trigger, K8sContext context) { super(context, K8sApiEndpoints.TABLE_TRIGGERS); this.context = context; this.trigger = trigger; + this.triggerApi = new K8sApi<>(context, K8sApiEndpoints.TABLE_TRIGGERS); this.jobTemplateApi = new K8sApi<>(context, K8sApiEndpoints.JOB_TEMPLATES); } + @Override + public void update() throws SQLException { + if (trigger.options().containsKey(Trigger.PAUSED_OPTION)) { + String pauseValue = trigger.options().get(Trigger.PAUSED_OPTION); + String canonicalName = K8sUtils.canonicalizeName(trigger.name()); + V1alpha1TableTrigger existingTrigger = triggerApi.get(canonicalName); + + if (existingTrigger == null) { + throw new SQLException("Trigger " + trigger.name() + " not found."); + } + + V1alpha1TableTriggerSpec spec = existingTrigger.getSpec(); + if (spec == null) { + spec = new V1alpha1TableTriggerSpec(); + existingTrigger.spec(spec); + } + spec.setPaused("true".equals(pauseValue)); + triggerApi.update(existingTrigger); + return; + } + super.update(); + } + @Override protected V1alpha1TableTrigger toK8sObject() throws SQLException { String name = K8sUtils.canonicalizeName(trigger.name(), trigger.job().name()); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTrigger.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTrigger.java index 01a2078f..b145966d 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTrigger.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTrigger.java @@ -31,7 +31,7 @@ * Trigger for a specific table. */ @ApiModel(description = "Trigger for a specific table.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]") public class V1alpha1TableTrigger implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerList.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerList.java index f6a3acdf..310fc6da 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerList.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerList.java @@ -32,7 +32,7 @@ * TableTriggerList is a list of TableTrigger */ @ApiModel(description = "TableTriggerList is a list of TableTrigger") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]") public class V1alpha1TableTriggerList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerSpec.java index 68322928..e8737eaa 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerSpec.java @@ -31,12 +31,16 @@ * TableTrigger spec. */ @ApiModel(description = "TableTrigger spec.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]") public class V1alpha1TableTriggerSpec { public static final String SERIALIZED_NAME_JOB_PROPERTIES = "jobProperties"; @SerializedName(SERIALIZED_NAME_JOB_PROPERTIES) private Map jobProperties = null; + public static final String SERIALIZED_NAME_PAUSED = "paused"; + @SerializedName(SERIALIZED_NAME_PAUSED) + private Boolean paused; + public static final String SERIALIZED_NAME_SCHEDULE = "schedule"; @SerializedName(SERIALIZED_NAME_SCHEDULE) private String schedule; @@ -85,6 +89,29 @@ public void setJobProperties(Map jobProperties) { } + public V1alpha1TableTriggerSpec paused(Boolean paused) { + + this.paused = paused; + return this; + } + + /** + * Whether the trigger is paused. + * @return paused + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Whether the trigger is paused.") + + public Boolean getPaused() { + return paused; + } + + + public void setPaused(Boolean paused) { + this.paused = paused; + } + + public V1alpha1TableTriggerSpec schedule(String schedule) { this.schedule = schedule; @@ -185,6 +212,7 @@ public boolean equals(Object o) { } V1alpha1TableTriggerSpec v1alpha1TableTriggerSpec = (V1alpha1TableTriggerSpec) o; return Objects.equals(this.jobProperties, v1alpha1TableTriggerSpec.jobProperties) && + Objects.equals(this.paused, v1alpha1TableTriggerSpec.paused) && Objects.equals(this.schedule, v1alpha1TableTriggerSpec.schedule) && Objects.equals(this.schema, v1alpha1TableTriggerSpec.schema) && Objects.equals(this.table, v1alpha1TableTriggerSpec.table) && @@ -193,7 +221,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(jobProperties, schedule, schema, table, yaml); + return Objects.hash(jobProperties, paused, schedule, schema, table, yaml); } @@ -202,6 +230,7 @@ public String toString() { StringBuilder sb = new StringBuilder(); sb.append("class V1alpha1TableTriggerSpec {\n"); sb.append(" jobProperties: ").append(toIndentedString(jobProperties)).append("\n"); + sb.append(" paused: ").append(toIndentedString(paused)).append("\n"); sb.append(" schedule: ").append(toIndentedString(schedule)).append("\n"); sb.append(" schema: ").append(toIndentedString(schema)).append("\n"); sb.append(" table: ").append(toIndentedString(table)).append("\n"); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerStatus.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerStatus.java index e1e6ba5f..8f87b8dd 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerStatus.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1TableTriggerStatus.java @@ -29,7 +29,7 @@ * TableTrigger status. */ @ApiModel(description = "TableTrigger status.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]") public class V1alpha1TableTriggerStatus { public static final String SERIALIZED_NAME_TIMESTAMP = "timestamp"; @SerializedName(SERIALIZED_NAME_TIMESTAMP) diff --git a/hoptimator-k8s/src/main/resources/tabletriggers.crd.yaml b/hoptimator-k8s/src/main/resources/tabletriggers.crd.yaml index cdbffddc..0cc8cb44 100644 --- a/hoptimator-k8s/src/main/resources/tabletriggers.crd.yaml +++ b/hoptimator-k8s/src/main/resources/tabletriggers.crd.yaml @@ -47,6 +47,9 @@ spec: schedule: description: Cron schedule, e.g. "@hourly", which causes the trigger to fire on a schedule. type: string + paused: + description: Whether the trigger is paused. + type: boolean required: - schema - table @@ -65,6 +68,10 @@ spec: subresources: status: {} additionalPrinterColumns: + - name: PAUSED + type: boolean + description: Whether trigger is paused. + jsonPath: .spec.paused - name: SCHEMA type: string description: Schema name. diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java index 7949e51d..c88c66ec 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java @@ -33,4 +33,9 @@ public void k8sMetadataTables() throws Exception { public void k8sConditionalJobTemplate() throws Exception { run("k8s-metadata-beam.id", "hints=flink.app.type=BEAM"); } + + @Test + public void k8sTriggerPauseResume() throws Exception { + run("k8s-trigger-pause.id"); + } } diff --git a/hoptimator-k8s/src/test/resources/k8s-trigger-pause.id b/hoptimator-k8s/src/test/resources/k8s-trigger-pause.id new file mode 100644 index 00000000..e664cdaf --- /dev/null +++ b/hoptimator-k8s/src/test/resources/k8s-trigger-pause.id @@ -0,0 +1,52 @@ +!set outputformat mysql +!use k8s + +create trigger test on ads.ad_clicks as 'my-app' in 'my-mp' with (key 'value'); +(0 rows modified) + +!update + +select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'test'; ++------+--------+-----------+--------+ +| NAME | SCHEMA | TABLE | PAUSED | ++------+--------+-----------+--------+ +| test | ADS | AD_CLICKS | | ++------+--------+-----------+--------+ +(1 row) + +!ok + +pause trigger test; +(0 rows modified) + +!update + +select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'test'; ++------+--------+-----------+--------+ +| NAME | SCHEMA | TABLE | PAUSED | ++------+--------+-----------+--------+ +| test | ADS | AD_CLICKS | true | ++------+--------+-----------+--------+ +(1 row) + +!ok + +resume trigger test; +(0 rows modified) + +!update + +select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'test'; ++------+--------+-----------+--------+ +| NAME | SCHEMA | TABLE | PAUSED | ++------+--------+-----------+--------+ +| test | ADS | AD_CLICKS | false | ++------+--------+-----------+--------+ +(1 row) + +!ok + +delete from "k8s".table_triggers where name = 'test'; +(1 row modified) + +!update \ No newline at end of file diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/trigger/TableTriggerReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/trigger/TableTriggerReconciler.java index 9ab38d13..ca4c75b1 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/trigger/TableTriggerReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/trigger/TableTriggerReconciler.java @@ -126,6 +126,22 @@ public Result reconcile(Request request) { return new Result(false); } + if (Boolean.TRUE.equals(object.getSpec().getPaused())) { + log.info("Trigger {} is paused. Skipping job creation.", name); + V1alpha1TableTriggerStatus status = object.getStatus(); + if (status != null) { + DynamicKubernetesObject expectedJob = yamlApi.objFromYaml(jobYaml(object)); + V1Job job = jobApi.getIfExists(expectedJob.getMetadata().getNamespace(), + expectedJob.getMetadata().getName()); + if (job != null) { + log.info("Trigger {} is paused but existing job {} is still running. Monitoring it.", + name, job.getMetadata().getName()); + return handleExistingJob(job, status, object); + } + } + return new Result(false); + } + V1alpha1TableTriggerStatus status = object.getStatus(); if (status == null && object.getSpec().getSchedule() == null) { log.info("Trigger {} has not been fired yet. Skipping.", name); @@ -134,7 +150,7 @@ public Result reconcile(Request request) { status = new V1alpha1TableTriggerStatus(); object.status(status); } - + if (status.getTimestamp() != null) { log.info("TableTrigger {} was last fired at {}.", name, status.getTimestamp()); } @@ -165,35 +181,8 @@ public Result reconcile(Request request) { log.info("Launching Job for TableTrigger {}. ", name); createJob(jobYaml, object); return new Result(true, pendingRetryDuration()); - } else if (job != null && job.getStatus() != null && job.getStatus().getConditions() != null) { - List conditions = job.getStatus().getConditions(); - boolean failed = conditions.stream() - .anyMatch(x -> "Failed".equals(x.getType()) && "True".equals(x.getStatus())); - boolean complete = conditions.stream() - .anyMatch(x -> "Complete".equals(x.getType()) && "True".equals(x.getStatus())); - if (failed) { - log.warn("Job {} has FAILED.", name); - jobApi.delete(job); - return new Result(true); // retry - } else if (complete) { - log.info("Job {} completed successfully.", name); - // We get the watermark from the job itself. We annotate the job when launching it. - if (job.getMetadata().getAnnotations() == null || job.getMetadata().getAnnotations() - .get(TRIGGER_TIMESTAMP_KEY) == null) { - log.error("Job {} has no timestamp annotation. Unable to advance the watermark.", name); - } else { - String watermark = job.getMetadata().getAnnotations().get(TRIGGER_TIMESTAMP_KEY); - status.setWatermark(OffsetDateTime.parse(watermark)); - tableTriggerApi.updateStatus(object, status); - log.info("Trigger {} watermark advanced to {}.", name, watermark); - } - jobApi.delete(job); - return new Result(true); // retry - } else { - maybeUpdateJobAnnotation(job, status.getTimestamp()); - log.info("Job for TableTrigger {} still running from a previous trigger event.", name); - return new Result(true, pendingRetryDuration()); // retry later - } + } else if (job != null) { + return handleExistingJob(job, status, object); } else if (job == null && scheduled != null) { log.info("TableTrigger {} sleeping until next scheduled execution.", name); return new Result(true, scheduled.timeToNextExecution(now).get()); @@ -269,5 +258,47 @@ void maybeUpdateJobAnnotation(V1Job job, OffsetDateTime timestamp) throws SQLExc } } } + + private Result handleExistingJob(V1Job job, V1alpha1TableTriggerStatus status, + V1alpha1TableTrigger trigger) throws SQLException { + String name = trigger.getMetadata().getName(); + + if (job.getStatus() != null && job.getStatus().getConditions() != null) { + List conditions = job.getStatus().getConditions(); + boolean failed = conditions.stream() + .anyMatch(x -> "Failed".equals(x.getType()) && "True".equals(x.getStatus())); + boolean complete = conditions.stream() + .anyMatch(x -> "Complete".equals(x.getType()) && "True".equals(x.getStatus())); + + if (failed) { + log.warn("Job {} has FAILED.", name); + jobApi.delete(job); + return new Result(true); // retry + } else if (complete) { + log.info("Job {} completed successfully.", name); + // We get the watermark from the job itself. We annotate the job when launching it. + if (job.getMetadata().getAnnotations() == null + || job.getMetadata().getAnnotations().get(TRIGGER_TIMESTAMP_KEY) == null) { + log.error("Job {} has no timestamp annotation. Unable to advance the watermark.", name); + } else { + String watermark = job.getMetadata().getAnnotations().get(TRIGGER_TIMESTAMP_KEY); + status.setWatermark(OffsetDateTime.parse(watermark)); + tableTriggerApi.updateStatus(trigger, status); + log.info("Trigger {} watermark advanced to {}.", name, watermark); + } + jobApi.delete(job); + return new Result(true); // retry + } else { + if (status.getTimestamp() != null) { + maybeUpdateJobAnnotation(job, status.getTimestamp()); + } + log.info("Job for TableTrigger {} still running.", name); + return new Result(true, pendingRetryDuration()); // retry later + } + } else { + log.info("Job for TableTrigger {} has no status yet.", name); + return new Result(true, pendingRetryDuration()); // retry later + } + } } diff --git a/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/trigger/TestTableTriggerReconciler.java b/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/trigger/TestTableTriggerReconciler.java index d9d0f025..22072fcd 100644 --- a/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/trigger/TestTableTriggerReconciler.java +++ b/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/trigger/TestTableTriggerReconciler.java @@ -150,4 +150,99 @@ void doesNotFireTriggerWhenNoSchedule() { Assertions.assertTrue(trigger.getStatus() == null || trigger.getStatus().getTimestamp() == null, "Trigger was fired when it shouldn't have been"); } + + @Test + void pausedTriggerDoesNotCreateNewJob() { + V1Job job = new V1Job().apiVersion("v1/batch").kind("Job") + .metadata(new V1ObjectMeta().name("paused-trigger-job").namespace("namespace")); + V1alpha1TableTrigger trigger = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("paused-trigger")) + .spec(new V1alpha1TableTriggerSpec() + .yaml(Yaml.dump(job)) + .paused(true)) + .status(new V1alpha1TableTriggerStatus().timestamp(OffsetDateTime.now())); + triggers.add(trigger); + Result result = reconciler.reconcile(new Request("namespace", "paused-trigger")); + Assertions.assertFalse(result.isRequeue(), "Paused trigger should not requeue"); + Assertions.assertTrue(yamls.isEmpty(), "Paused trigger should not create job"); + } + + @Test + void pausedTriggerDoesNotFireOnSchedule() { + V1Job job = new V1Job().apiVersion("v1/batch").kind("Job") + .metadata(new V1ObjectMeta().name("paused-cron-trigger-job").namespace("namespace")); + V1alpha1TableTrigger trigger = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("paused-cron-trigger")) + .spec(new V1alpha1TableTriggerSpec() + .yaml(Yaml.dump(job)) + .schedule("@hourly") + .paused(true)); + triggers.add(trigger); + Result result = reconciler.reconcile(new Request("namespace", "paused-cron-trigger")); + Assertions.assertFalse(result.isRequeue(), "Paused trigger should not requeue for schedule"); + Assertions.assertTrue(trigger.getStatus() == null || trigger.getStatus().getTimestamp() == null, + "Paused trigger should not be fired on schedule"); + Assertions.assertTrue(yamls.isEmpty(), "Paused trigger should not create job on schedule"); + } + + @Test + void pausedTriggerMonitorsExistingRunningJob() { + Map annotations = new HashMap<>(); + annotations.put(TableTriggerReconciler.TRIGGER_KEY, "paused-trigger-with-job"); + annotations.put(TableTriggerReconciler.TRIGGER_TIMESTAMP_KEY, OffsetDateTime.now().toString()); + V1Job job = new V1Job().apiVersion("v1/batch").kind("Job") + .metadata(new V1ObjectMeta().name("paused-trigger-running-job").annotations(annotations)) + .status(new V1JobStatus().addConditionsItem(new V1JobCondition().type("Running").status("True"))); + V1alpha1TableTrigger trigger = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("paused-trigger-with-job")) + .spec(new V1alpha1TableTriggerSpec() + .yaml(Yaml.dump(job)) + .paused(true)) + .status(new V1alpha1TableTriggerStatus().timestamp(OffsetDateTime.now())); + triggers.add(trigger); + jobs.add(job); + Result result = reconciler.reconcile(new Request("namespace", "paused-trigger-with-job")); + Assertions.assertTrue(result.isRequeue(), "Paused trigger should requeue to monitor existing job"); + Assertions.assertFalse(jobs.isEmpty(), "Existing job should still be running"); + } + + @Test + void pausedTriggerDeletesCompletedJob() { + Map annotations = new HashMap<>(); + annotations.put(TableTriggerReconciler.TRIGGER_KEY, "paused-trigger-completed"); + annotations.put(TableTriggerReconciler.TRIGGER_TIMESTAMP_KEY, OffsetDateTime.now().toString()); + V1Job job = new V1Job().apiVersion("v1/batch").kind("Job") + .metadata(new V1ObjectMeta().name("paused-trigger-completed-job").annotations(annotations)) + .status(new V1JobStatus().addConditionsItem(new V1JobCondition().type("Complete").status("True"))); + V1alpha1TableTrigger trigger = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("paused-trigger-completed")) + .spec(new V1alpha1TableTriggerSpec() + .yaml(Yaml.dump(job)) + .paused(true)) + .status(new V1alpha1TableTriggerStatus().timestamp(OffsetDateTime.now())); + triggers.add(trigger); + jobs.add(job); + Result result = reconciler.reconcile(new Request("namespace", "paused-trigger-completed")); + Assertions.assertTrue(result.isRequeue(), "Should requeue after handling completed job"); + Assertions.assertTrue(jobs.isEmpty(), "Completed job should be deleted even when trigger is paused"); + Assertions.assertNotNull(trigger.getStatus().getWatermark(), "Watermark should be updated for completed job"); + } + + @Test + void unpausedTriggerCreatesJobAfterBeingPaused() { + V1Job job = new V1Job().apiVersion("v1/batch").kind("Job") + .metadata(new V1ObjectMeta().name("unpaused-trigger-job").namespace("namespace")); + OffsetDateTime triggerTime = OffsetDateTime.now().minusHours(1); + V1alpha1TableTrigger trigger = new V1alpha1TableTrigger() + .metadata(new V1ObjectMeta().name("unpaused-trigger")) + .spec(new V1alpha1TableTriggerSpec() + .yaml(Yaml.dump(job)) + .paused(false)) + .status(new V1alpha1TableTriggerStatus() + .timestamp(triggerTime)); // Has timestamp but no watermark (was paused) + triggers.add(trigger); + Result result = reconciler.reconcile(new Request("namespace", "unpaused-trigger")); + Assertions.assertTrue(result.isRequeue(), "Unpaused trigger should requeue"); + Assertions.assertFalse(yamls.isEmpty(), "Unpaused trigger should create job for pending trigger event"); + } } From 6949e005f9f2b8dfe6d08a0255f356ba233a3123 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Wed, 7 Jan 2026 16:29:36 +0530 Subject: [PATCH 2/4] Add DROP TRIGGER Support --- .../jdbc/HoptimatorDdlExecutor.java | 37 +++++++++++++++++++ .../hoptimator/k8s/K8sTriggerDeployer.java | 10 +++++ .../src/test/resources/k8s-trigger-pause.id | 9 ++++- 3 files changed, 54 insertions(+), 2 deletions(-) diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index 5cc63f7c..d42f1217 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -32,6 +32,7 @@ import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView; import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger; import com.linkedin.hoptimator.util.ArrayTable; +import com.linkedin.hoptimator.jdbc.ddl.SqlDropTrigger; import com.linkedin.hoptimator.jdbc.ddl.SqlPauseTrigger; import com.linkedin.hoptimator.jdbc.ddl.SqlResumeTrigger; import com.linkedin.hoptimator.util.DeploymentService; @@ -504,6 +505,42 @@ public void execute(SqlResumeTrigger resume, CalcitePrepare.Context context) { updateTriggerPausedState(resume, resume.name, false); } + /** Executes a {@code DROP TRIGGER} command. */ + public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) { + logger.info("Validating statement: {}", drop); + try { + ValidationService.validateOrThrow(drop); + } catch (SQLException e) { + throw new DdlException(drop, e.getMessage(), e); + } + + if (drop.name.names.size() > 1) { + throw new DdlException(drop, "Triggers cannot belong to a schema or database."); + } + String name = drop.name.names.get(0); + + Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, new HashMap<>()); + + Collection deployers = null; + try { + logger.info("Deleting trigger {}", name); + deployers = DeploymentService.deployers(trigger, connection); + DeploymentService.delete(deployers); + logger.info("Deleted trigger {}", name); + logger.info("DROP TRIGGER {} completed", name); + } catch (Exception e) { + if (deployers != null) { + DeploymentService.restore(deployers); + } + // Handle IF EXISTS + if (drop.ifExists && e.getMessage() != null && e.getMessage().contains("Error getting TableTrigger")) { + logger.info("Trigger {} does not exist (IF EXISTS specified)", name); + return; + } + throw new DdlException(drop, e.getMessage(), e); + } + } + private void updateTriggerPausedState(SqlNode sqlNode, SqlIdentifier triggerName, boolean paused) { logger.info("Validating statement: {}", sqlNode); try { diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java index 8c5bfa36..eb37dbeb 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sTriggerDeployer.java @@ -54,6 +54,16 @@ public void update() throws SQLException { super.update(); } + @Override + public void delete() throws SQLException { + String canonicalName = K8sUtils.canonicalizeName(trigger.name()); + V1alpha1TableTrigger existingTrigger = triggerApi.get(canonicalName); + if (existingTrigger == null) { + throw new SQLException("Trigger " + trigger.name() + " not found."); + } + triggerApi.delete(existingTrigger); + } + @Override protected V1alpha1TableTrigger toK8sObject() throws SQLException { String name = K8sUtils.canonicalizeName(trigger.name(), trigger.job().name()); diff --git a/hoptimator-k8s/src/test/resources/k8s-trigger-pause.id b/hoptimator-k8s/src/test/resources/k8s-trigger-pause.id index e664cdaf..8443ba63 100644 --- a/hoptimator-k8s/src/test/resources/k8s-trigger-pause.id +++ b/hoptimator-k8s/src/test/resources/k8s-trigger-pause.id @@ -46,7 +46,12 @@ select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'tes !ok -delete from "k8s".table_triggers where name = 'test'; -(1 row modified) +drop trigger test; +(0 rows modified) + +!update + +drop trigger if exists test; +(0 rows modified) !update \ No newline at end of file From a5deaa4e047a59080a47dc95254e3b7d4da708ac Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Wed, 7 Jan 2026 16:45:02 +0530 Subject: [PATCH 3/4] Spotbug fix --- .../hoptimator/operator/trigger/TableTriggerReconciler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/trigger/TableTriggerReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/trigger/TableTriggerReconciler.java index ca4c75b1..4bd4ff11 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/trigger/TableTriggerReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/trigger/TableTriggerReconciler.java @@ -183,7 +183,7 @@ public Result reconcile(Request request) { return new Result(true, pendingRetryDuration()); } else if (job != null) { return handleExistingJob(job, status, object); - } else if (job == null && scheduled != null) { + } else if (scheduled != null) { log.info("TableTrigger {} sleeping until next scheduled execution.", name); return new Result(true, scheduled.timeToNextExecution(now).get()); } else { From 77ed56854fef579972c7ef168751968278ee50d7 Mon Sep 17 00:00:00 2001 From: Shrinand Thakkar Date: Wed, 7 Jan 2026 19:05:48 +0530 Subject: [PATCH 4/4] Fix test setup --- Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/Makefile b/Makefile index 923cfb67..6a32a072 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,7 @@ deploy-demo: deploy kubectl apply -f ./deploy/samples/demodb.yaml kubectl apply -f ./deploy/samples/tabletriggers.yaml kubectl apply -f ./deploy/samples/crontrigger.yaml + kubectl apply -f ./deploy/samples/user-jobs.yaml undeploy-demo: undeploy kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"