Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ deploy-demo: deploy
kubectl apply -f ./deploy/samples/demodb.yaml
kubectl apply -f ./deploy/samples/tabletriggers.yaml
kubectl apply -f ./deploy/samples/crontrigger.yaml
kubectl apply -f ./deploy/samples/user-jobs.yaml

undeploy-demo: undeploy
kubectl delete -f ./deploy/samples/demodb.yaml || echo "skipping"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

public class Trigger implements Deployable {

public static final String PAUSED_OPTION = "paused";
private final String name;
private final UserJob job;
private final List<String> path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger;
import com.linkedin.hoptimator.util.ArrayTable;
import com.linkedin.hoptimator.jdbc.ddl.SqlDropTrigger;
import com.linkedin.hoptimator.jdbc.ddl.SqlPauseTrigger;
import com.linkedin.hoptimator.jdbc.ddl.SqlResumeTrigger;
import com.linkedin.hoptimator.util.DeploymentService;
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcTable;
Expand All @@ -41,6 +44,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -61,6 +65,7 @@
import org.apache.calcite.schema.impl.ViewTable;
import org.apache.calcite.server.DdlExecutor;
import org.apache.calcite.server.ServerDdlExecutor;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
Expand Down Expand Up @@ -490,8 +495,84 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
logger.info("CREATE TABLE {} completed", tableName);
}

// N.B. originally copy-pasted from Apache Calcite
/** Executes a {@code PAUSE TRIGGER} command. */
public void execute(SqlPauseTrigger pause, CalcitePrepare.Context context) {
updateTriggerPausedState(pause, pause.name, true);
}

/** Executes a {@code RESUME TRIGGER} command. */
public void execute(SqlResumeTrigger resume, CalcitePrepare.Context context) {
updateTriggerPausedState(resume, resume.name, false);
}

/** Executes a {@code DROP TRIGGER} command. */
public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) {
logger.info("Validating statement: {}", drop);
try {
ValidationService.validateOrThrow(drop);
} catch (SQLException e) {
throw new DdlException(drop, e.getMessage(), e);
}

if (drop.name.names.size() > 1) {
throw new DdlException(drop, "Triggers cannot belong to a schema or database.");
}
String name = drop.name.names.get(0);

Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, new HashMap<>());

Collection<Deployer> deployers = null;
try {
logger.info("Deleting trigger {}", name);
deployers = DeploymentService.deployers(trigger, connection);
DeploymentService.delete(deployers);
logger.info("Deleted trigger {}", name);
logger.info("DROP TRIGGER {} completed", name);
} catch (Exception e) {
if (deployers != null) {
DeploymentService.restore(deployers);
}
// Handle IF EXISTS
if (drop.ifExists && e.getMessage() != null && e.getMessage().contains("Error getting TableTrigger")) {
logger.info("Trigger {} does not exist (IF EXISTS specified)", name);
return;
}
throw new DdlException(drop, e.getMessage(), e);
}
}

private void updateTriggerPausedState(SqlNode sqlNode, SqlIdentifier triggerName, boolean paused) {
logger.info("Validating statement: {}", sqlNode);
try {
ValidationService.validateOrThrow(sqlNode);
} catch (SQLException e) {
throw new DdlException(sqlNode, e.getMessage(), e);
}

if (triggerName.names.size() > 1) {
throw new DdlException(sqlNode, "Triggers cannot belong to a schema or database.");
}
String name = triggerName.names.get(0);

Map<String, String> options = new HashMap<>();
options.put(Trigger.PAUSED_OPTION, String.valueOf(paused));
Trigger trigger = new Trigger(name, null, new ArrayList<>(), null, options);

Collection<Deployer> deployers = null;
try {
logger.info("Updating trigger {} with paused state: {}", name, paused);
deployers = DeploymentService.deployers(trigger, connection);
DeploymentService.update(deployers);
logger.info("Successfully updated trigger {} with paused state: {}", name, paused);
} catch (Exception e) {
if (deployers != null) {
DeploymentService.restore(deployers);
}
throw new DdlException(sqlNode, e.getMessage(), e);
}
}

// N.B. largely copy-pasted from Apache Calcite
/** Executes {@code DROP FUNCTION}, {@code DROP TABLE}, {@code DROP MATERIALIZED VIEW}, {@code DROP TYPE},
* {@code DROP VIEW} commands. */
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ public static class Row {
public String NAME;
public String SCHEMA;
public String TABLE;
public Boolean PAUSED;
public String TIMESTAMP;
public String WATERMARK;

public Row(String name, String schema, String table, String timestamp, String watermark) {
public Row(String name, String schema, String table, Boolean paused, String timestamp, String watermark) {
this.NAME = name;
this.SCHEMA = schema;
this.TABLE = table;
this.PAUSED = paused;
this.TIMESTAMP = timestamp;
this.WATERMARK = watermark;
}
Expand All @@ -35,6 +37,7 @@ public K8sTableTriggerTable(K8sContext context) {
@Override
public Row toRow(V1alpha1TableTrigger obj) {
return new Row(obj.getMetadata().getName(), obj.getSpec().getSchema(), obj.getSpec().getTable(),
obj.getSpec().getPaused(),
Optional.ofNullable(obj.getStatus())
.flatMap(x -> Optional.ofNullable(x.getTimestamp()))
.map(x -> x.toString()).orElse(null),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,50 @@ class K8sTriggerDeployer extends K8sDeployer<V1alpha1TableTrigger, V1alpha1Table

private final K8sContext context;
private final Trigger trigger;
private final K8sApi<V1alpha1TableTrigger, V1alpha1TableTriggerList> triggerApi;
private final K8sApi<V1alpha1JobTemplate, V1alpha1JobTemplateList> jobTemplateApi;

K8sTriggerDeployer(Trigger trigger, K8sContext context) {
super(context, K8sApiEndpoints.TABLE_TRIGGERS);
this.context = context;
this.trigger = trigger;
this.triggerApi = new K8sApi<>(context, K8sApiEndpoints.TABLE_TRIGGERS);
this.jobTemplateApi = new K8sApi<>(context, K8sApiEndpoints.JOB_TEMPLATES);
}

@Override
public void update() throws SQLException {
if (trigger.options().containsKey(Trigger.PAUSED_OPTION)) {
String pauseValue = trigger.options().get(Trigger.PAUSED_OPTION);
String canonicalName = K8sUtils.canonicalizeName(trigger.name());
V1alpha1TableTrigger existingTrigger = triggerApi.get(canonicalName);

if (existingTrigger == null) {
throw new SQLException("Trigger " + trigger.name() + " not found.");
}

V1alpha1TableTriggerSpec spec = existingTrigger.getSpec();
if (spec == null) {
spec = new V1alpha1TableTriggerSpec();
existingTrigger.spec(spec);
}
spec.setPaused("true".equals(pauseValue));
triggerApi.update(existingTrigger);
return;
}
super.update();
}

@Override
public void delete() throws SQLException {
String canonicalName = K8sUtils.canonicalizeName(trigger.name());
V1alpha1TableTrigger existingTrigger = triggerApi.get(canonicalName);
if (existingTrigger == null) {
throw new SQLException("Trigger " + trigger.name() + " not found.");
}
triggerApi.delete(existingTrigger);
}

@Override
protected V1alpha1TableTrigger toK8sObject() throws SQLException {
String name = K8sUtils.canonicalizeName(trigger.name(), trigger.job().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Trigger for a specific table.
*/
@ApiModel(description = "Trigger for a specific table.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]")
public class V1alpha1TableTrigger implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* TableTriggerList is a list of TableTrigger
*/
@ApiModel(description = "TableTriggerList is a list of TableTrigger")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]")
public class V1alpha1TableTriggerList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@
* TableTrigger spec.
*/
@ApiModel(description = "TableTrigger spec.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]")
public class V1alpha1TableTriggerSpec {
public static final String SERIALIZED_NAME_JOB_PROPERTIES = "jobProperties";
@SerializedName(SERIALIZED_NAME_JOB_PROPERTIES)
private Map<String, String> jobProperties = null;

public static final String SERIALIZED_NAME_PAUSED = "paused";
@SerializedName(SERIALIZED_NAME_PAUSED)
private Boolean paused;

public static final String SERIALIZED_NAME_SCHEDULE = "schedule";
@SerializedName(SERIALIZED_NAME_SCHEDULE)
private String schedule;
Expand Down Expand Up @@ -85,6 +89,29 @@ public void setJobProperties(Map<String, String> jobProperties) {
}


public V1alpha1TableTriggerSpec paused(Boolean paused) {

this.paused = paused;
return this;
}

/**
* Whether the trigger is paused.
* @return paused
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "Whether the trigger is paused.")

public Boolean getPaused() {
return paused;
}


public void setPaused(Boolean paused) {
this.paused = paused;
}


public V1alpha1TableTriggerSpec schedule(String schedule) {

this.schedule = schedule;
Expand Down Expand Up @@ -185,6 +212,7 @@ public boolean equals(Object o) {
}
V1alpha1TableTriggerSpec v1alpha1TableTriggerSpec = (V1alpha1TableTriggerSpec) o;
return Objects.equals(this.jobProperties, v1alpha1TableTriggerSpec.jobProperties) &&
Objects.equals(this.paused, v1alpha1TableTriggerSpec.paused) &&
Objects.equals(this.schedule, v1alpha1TableTriggerSpec.schedule) &&
Objects.equals(this.schema, v1alpha1TableTriggerSpec.schema) &&
Objects.equals(this.table, v1alpha1TableTriggerSpec.table) &&
Expand All @@ -193,7 +221,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(jobProperties, schedule, schema, table, yaml);
return Objects.hash(jobProperties, paused, schedule, schema, table, yaml);
}


Expand All @@ -202,6 +230,7 @@ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class V1alpha1TableTriggerSpec {\n");
sb.append(" jobProperties: ").append(toIndentedString(jobProperties)).append("\n");
sb.append(" paused: ").append(toIndentedString(paused)).append("\n");
sb.append(" schedule: ").append(toIndentedString(schedule)).append("\n");
sb.append(" schema: ").append(toIndentedString(schema)).append("\n");
sb.append(" table: ").append(toIndentedString(table)).append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* TableTrigger status.
*/
@ApiModel(description = "TableTrigger status.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2025-11-20T18:46:28.037Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2026-01-06T15:57:43.625Z[Etc/UTC]")
public class V1alpha1TableTriggerStatus {
public static final String SERIALIZED_NAME_TIMESTAMP = "timestamp";
@SerializedName(SERIALIZED_NAME_TIMESTAMP)
Expand Down
7 changes: 7 additions & 0 deletions hoptimator-k8s/src/main/resources/tabletriggers.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ spec:
schedule:
description: Cron schedule, e.g. "@hourly", which causes the trigger to fire on a schedule.
type: string
paused:
description: Whether the trigger is paused.
type: boolean
required:
- schema
- table
Expand All @@ -65,6 +68,10 @@ spec:
subresources:
status: {}
additionalPrinterColumns:
- name: PAUSED
type: boolean
description: Whether trigger is paused.
jsonPath: .spec.paused
- name: SCHEMA
type: string
description: Schema name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ public void k8sMetadataTables() throws Exception {
public void k8sConditionalJobTemplate() throws Exception {
run("k8s-metadata-beam.id", "hints=flink.app.type=BEAM");
}

@Test
public void k8sTriggerPauseResume() throws Exception {
run("k8s-trigger-pause.id");
}
}
57 changes: 57 additions & 0 deletions hoptimator-k8s/src/test/resources/k8s-trigger-pause.id
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
!set outputformat mysql
!use k8s

create trigger test on ads.ad_clicks as 'my-app' in 'my-mp' with (key 'value');
(0 rows modified)

!update

select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'test';
+------+--------+-----------+--------+
| NAME | SCHEMA | TABLE | PAUSED |
+------+--------+-----------+--------+
| test | ADS | AD_CLICKS | |
+------+--------+-----------+--------+
(1 row)

!ok

pause trigger test;
(0 rows modified)

!update

select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'test';
+------+--------+-----------+--------+
| NAME | SCHEMA | TABLE | PAUSED |
+------+--------+-----------+--------+
| test | ADS | AD_CLICKS | true |
+------+--------+-----------+--------+
(1 row)

!ok

resume trigger test;
(0 rows modified)

!update

select name, schema, "TABLE", paused from "k8s".table_triggers where name = 'test';
+------+--------+-----------+--------+
| NAME | SCHEMA | TABLE | PAUSED |
+------+--------+-----------+--------+
| test | ADS | AD_CLICKS | false |
+------+--------+-----------+--------+
(1 row)

!ok

drop trigger test;
(0 rows modified)

!update

drop trigger if exists test;
(0 rows modified)

!update
Loading