Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public List<String> path() {
return path;
}

protected String pathString() {
public String pathString() {
return String.join(".", path);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,30 +82,29 @@ public Command parseCommand(List<String> lines, List<String> content, final Stri
@Override
public void execute(Context context, boolean execute) throws Exception {
if (execute) {
try (Connection connection = context.connection()) {
if (!(connection instanceof HoptimatorConnection)) {
throw new IllegalArgumentException("This connection doesn't support `!specify`.");
}
String sql = context.previousSqlCommand().sql;
HoptimatorConnection conn = (HoptimatorConnection) connection;
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
String[] parts = line.split(" ", 2);
String pipelineName = parts.length == 2 ? parts[1] : "test";
Properties properties = new Properties();
properties.putAll(conn.connectionProperties());
properties.put(DeploymentService.PIPELINE_OPTION, pipelineName);
Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList(), properties)
.pipeline(pipelineName, conn);
List<String> specs = new ArrayList<>();
for (Source source : pipeline.sources()) {
specs.addAll(DeploymentService.specify(source, conn));
}
specs.addAll(DeploymentService.specify(pipeline.sink(), conn));
specs.addAll(DeploymentService.specify(pipeline.job(), conn));
String joined = specs.stream().sorted().collect(Collectors.joining("---\n"));
String[] lines = joined.replaceAll(";\n", "\n").split("\n");
context.echo(Arrays.asList(lines));
Connection connection = context.connection();
if (!(connection instanceof HoptimatorConnection)) {
throw new IllegalArgumentException("This connection doesn't support `!specify`.");
}
String sql = context.previousSqlCommand().sql;
HoptimatorConnection conn = (HoptimatorConnection) connection;
RelRoot root = HoptimatorDriver.convert(conn, sql).root;
String[] parts = line.split(" ", 2);
String pipelineName = parts.length == 2 ? parts[1] : "test";
Properties properties = new Properties();
properties.putAll(conn.connectionProperties());
properties.put(DeploymentService.PIPELINE_OPTION, pipelineName);
Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList(), properties)
.pipeline(pipelineName, conn);
List<String> specs = new ArrayList<>();
for (Source source : pipeline.sources()) {
specs.addAll(DeploymentService.specify(source, conn));
}
specs.addAll(DeploymentService.specify(pipeline.sink(), conn));
specs.addAll(DeploymentService.specify(pipeline.job(), conn));
String joined = specs.stream().sorted().collect(Collectors.joining("---\n"));
String[] lines = joined.replaceAll(";\n", "\n").split("\n");
context.echo(Arrays.asList(lines));
} else {
context.echo(content);
}
Expand Down
23 changes: 22 additions & 1 deletion hoptimator-k8s/src/test/resources/k8s-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,25 @@ spec:
parallelism: 1
upgradeMode: stateless
state: running
!specify PAGE_VIEWS
!specify PAGE_VIEWS

insert into ads.ad_clicks select * from ads.ad_clicks;
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: ads-database-adclicks
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_source` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10')
- CREATE DATABASE IF NOT EXISTS `ADS` WITH ()
- CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole')
- INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT * FROM `ADS`.`AD_CLICKS_source`
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running
!specify AD_CLICKS
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.calcite.plan.Convention;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.rel2sql.RelToSqlConverter;
Expand Down Expand Up @@ -114,15 +115,37 @@ public Pipeline pipeline(String name, Connection connection) throws SQLException

private ScriptImplementor script(Connection connection) throws SQLException {
ScriptImplementor script = ScriptImplementor.empty();
// Check if we need to add suffixes to avoid table name collisions
boolean needsSuffixes = hasTableNameCollision();

for (Map.Entry<Source, RelDataType> source : sources.entrySet()) {
script = script.catalog(source.getKey().catalog());
script = script.database(source.getKey().catalog(), source.getKey().schema());
Map<String, String> configs = ConnectionService.configure(source.getKey(), connection);
script = script.connector(source.getKey().catalog(), source.getKey().schema(), source.getKey().table(), source.getValue(), configs);
String suffix = needsSuffixes ? "_source" : null;
script = script.connector(source.getKey().catalog(), source.getKey().schema(), source.getKey().table(), suffix, source.getValue(), configs);
}
return script;
}

/**
* Checks if there's a collision between source and sink table names.
* A collision occurs when a source table has the same catalog, schema, and table name as the sink.
*/
private boolean hasTableNameCollision() {
if (sink == null) {
return false;
}
for (Source source : sources.keySet()) {
if (Objects.equals(source.catalog(), sink.catalog())
&& Objects.equals(source.schema(), sink.schema())
&& Objects.equals(source.table(), sink.table())) {
return true;
}
}
return false;
}

/** SQL script ending in an INSERT INTO */
public ThrowingFunction<SqlDialect, String> sql(Connection connection) throws SQLException {
return wrap(x -> {
Expand All @@ -136,8 +159,22 @@ public ThrowingFunction<SqlDialect, String> sql(Connection connection) throws SQ
Map<String, String> sinkConfigs = ConnectionService.configure(sink, connection);
script = script.catalog(sink.catalog());
script = script.database(sink.catalog(), sink.schema());
script = script.connector(sink.catalog(), sink.schema(), sink.table(), targetRowType, sinkConfigs);
script = script.insert(sink.catalog(), sink.schema(), sink.table(), query, targetFields);
// Check if we need to add suffixes to avoid table name collisions
boolean needsSuffixes = hasTableNameCollision();
String sinkSuffix = needsSuffixes ? "_sink" : null;
script = script.connector(sink.catalog(), sink.schema(), sink.table(), sinkSuffix, targetRowType, sinkConfigs);

// Build table name replacement map for the query
Map<String, String> tableNameReplacements = new HashMap<>();
if (needsSuffixes) {
for (Source source : sources.keySet()) {
String qualifiedName = source.pathString();
String suffixedTable = source.table() + "_source";
tableNameReplacements.put(qualifiedName, suffixedTable);
}
}

script = script.insert(sink.catalog(), sink.schema(), sink.table(), sinkSuffix, query, targetFields, tableNameReplacements);
return script.sql(x);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,23 @@ default ScriptImplementor with(ScriptImplementor next) {

/** Append a query */
default ScriptImplementor query(RelNode relNode) {
return with(new QueryImplementor(relNode));
return query(relNode, Collections.emptyMap());
}

/** Append a query with table name replacements */
default ScriptImplementor query(RelNode relNode, Map<String, String> tableNameReplacements) {
return with(new QueryImplementor(relNode, tableNameReplacements));
}

/** Append a connector definition, e.g. `CREATE TABLE ... WITH (...)` */
default ScriptImplementor connector(@Nullable String catalog, String schema, String table, RelDataType rowType, Map<String, String> connectorConfig) {
return with(new ConnectorImplementor(catalog, schema, table, rowType, connectorConfig));
return connector(catalog, schema, table, null, rowType, connectorConfig);
}

/** Append a connector definition with an optional suffix, e.g. `CREATE TABLE ... WITH (...)` */
default ScriptImplementor connector(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelDataType rowType,
Map<String, String> connectorConfig) {
return with(new ConnectorImplementor(catalog, schema, table, suffix, rowType, connectorConfig));
}

/** Append a database definition, e.g. `CREATE CATALOG ...` */
Expand All @@ -105,7 +116,19 @@ default ScriptImplementor database(@Nullable String catalog, String database) {

/** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */
default ScriptImplementor insert(@Nullable String catalog, String schema, String table, RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
return with(new InsertImplementor(catalog, schema, table, relNode, targetFields));
return insert(catalog, schema, table, null, relNode, targetFields, Collections.emptyMap());
}

/** Append an insert statement with an optional suffix for the target table, e.g. `INSERT INTO ... SELECT ...` */
default ScriptImplementor insert(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelNode relNode,
ImmutablePairList<Integer, String> targetFields) {
return insert(catalog, schema, table, suffix, relNode, targetFields, Collections.emptyMap());
}

/** Append an insert statement with an optional suffix and table name replacements for the query, e.g. `INSERT INTO ... SELECT ...` */
default ScriptImplementor insert(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelNode relNode,
ImmutablePairList<Integer, String> targetFields, Map<String, String> tableNameReplacements) {
return with(new InsertImplementor(catalog, schema, table, suffix, relNode, targetFields, tableNameReplacements));
}

/** Render the script as DDL/SQL in the default dialect */
Expand Down Expand Up @@ -155,9 +178,15 @@ public void implement(SqlWriter w) {
/** Implements an arbitrary RelNode as a query */
class QueryImplementor implements ScriptImplementor {
private final RelNode relNode;
private final Map<String, String> tableNameReplacements;

public QueryImplementor(RelNode relNode) {
this(relNode, Collections.emptyMap());
}

public QueryImplementor(RelNode relNode, Map<String, String> tableNameReplacements) {
this.relNode = relNode;
this.tableNameReplacements = tableNameReplacements;
}

@Override
Expand All @@ -169,6 +198,10 @@ public void implement(SqlWriter w) {
SqlSelect select = (SqlSelect) node;
select.setSelectList((SqlNodeList) Objects.requireNonNull(select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)));
}
// Apply table name replacements if any
if (!tableNameReplacements.isEmpty()) {
node = node.accept(new TableNameReplacer(tableNameReplacements));
}
node.unparse(w, 0, 0);
}

Expand All @@ -190,6 +223,33 @@ public SqlNode visit(SqlCall call) {
};
}

/**
* A SqlShuttle that replaces table names in SQL nodes.
* Used to add suffixes to table names when there are source/sink collisions.
*/
class TableNameReplacer extends SqlShuttle {
private final Map<String, String> replacements;

public TableNameReplacer(Map<String, String> replacements) {
this.replacements = replacements;
}

@Override
public SqlNode visit(SqlIdentifier id) {
if (id.names.size() >= 2) {
// Build the qualified name to check for replacement
String qualifiedName = String.join(".", id.names);
if (replacements.containsKey(qualifiedName)) {
// Replace the last component (table name) with the suffixed version
List<String> newNames = new ArrayList<>(id.names);
newNames.set(newNames.size() - 1, replacements.get(qualifiedName));
return new SqlIdentifier(newNames, id.getParserPosition());
}
}
return id;
}
}

/**
* Implements a CREATE TABLE...WITH... DDL statement.
* <p>
Expand All @@ -201,21 +261,24 @@ class ConnectorImplementor implements ScriptImplementor {
private final String catalog;
private final String schema;
private final String table;
private final String suffix;
private final RelDataType rowType;
private final Map<String, String> connectorConfig;

public ConnectorImplementor(String catalog, String schema, String table, RelDataType rowType, Map<String, String> connectorConfig) {
public ConnectorImplementor(String catalog, String schema, String table, String suffix, RelDataType rowType, Map<String, String> connectorConfig) {
this.catalog = catalog;
this.schema = schema;
this.table = table;
this.suffix = suffix;
this.rowType = rowType;
this.connectorConfig = connectorConfig;
}

@Override
public void implement(SqlWriter w) {
w.keyword("CREATE TABLE IF NOT EXISTS");
(new CompoundIdentifierImplementor(catalog, schema, table)).implement(w);
String effectiveTable = suffix != null ? table + suffix : table;
(new CompoundIdentifierImplementor(catalog, schema, effectiveTable)).implement(w);
SqlWriter.Frame frame1 = w.startList("(", ")");
(new RowTypeSpecImplementor(rowType)).implement(w);
if (rowType.getField("PRIMARY_KEY", true, false) != null) {
Expand Down Expand Up @@ -262,24 +325,30 @@ class InsertImplementor implements ScriptImplementor {
private final String catalog;
private final String schema;
private final String table;
private final String suffix;
private final RelNode relNode;
private final ImmutablePairList<Integer, String> targetFields;
private final Map<String, String> tableNameReplacements;

public InsertImplementor(@Nullable String catalog, String schema, String table, RelNode relNode, ImmutablePairList<Integer, String> targetFields) {
public InsertImplementor(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelNode relNode,
ImmutablePairList<Integer, String> targetFields, Map<String, String> tableNameReplacements) {
this.catalog = catalog;
this.schema = schema;
this.table = table;
this.suffix = suffix;
this.relNode = relNode;
this.targetFields = targetFields;
this.tableNameReplacements = tableNameReplacements;
}

@Override
public void implement(SqlWriter w) {
w.keyword("INSERT INTO");
(new CompoundIdentifierImplementor(catalog, schema, table)).implement(w);
String effectiveTable = suffix != null ? table + suffix : table;
(new CompoundIdentifierImplementor(catalog, schema, effectiveTable)).implement(w);
RelNode project = dropFields(relNode, targetFields);
(new ColumnListImplementor(project.getRowType())).implement(w);
(new QueryImplementor(project)).implement(w);
(new QueryImplementor(project, tableNameReplacements)).implement(w);
w.literal(";");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public void flattenUnflatten() {
flattenedNames);
RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory);
RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW);
String originalConnector = new ScriptImplementor.ConnectorImplementor("C", "S", "T1",
String originalConnector = new ScriptImplementor.ConnectorImplementor("C", "S", "T1", null,
rowType, Collections.emptyMap()).sql();
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("C", "S", "T1",
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("C", "S", "T1", null,
unflattenedType, Collections.emptyMap()).sql();
Assertions.assertEquals(originalConnector, unflattenedConnector,
"Flattening and unflattening data types should have no impact on connector");
Expand Down Expand Up @@ -77,7 +77,7 @@ public void flattenUnflattenNestedArrays() {
.collect(Collectors.toList());
Assertions.assertIterableEquals(Arrays.asList("FOO", "FOO$QUX", "FOO$QIZ", "BAR", "BAR$BAZ", "CAR", "DAY",
"DAY$__ARRTYPE__", "DAY$__ARRTYPE__$__ARRTYPE__"), flattenedNames);
String flattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1",
String flattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null,
flattenedType, Collections.emptyMap()).sql();
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` ("
+ "`FOO` ANY ARRAY, `FOO_QUX` VARCHAR, `FOO_QIZ` VARCHAR ARRAY, "
Expand All @@ -88,9 +88,9 @@ public void flattenUnflattenNestedArrays() {

RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory);
RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW);
String originalConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1",
String originalConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null,
rowType, Collections.emptyMap()).sql();
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1",
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null,
unflattenedType, Collections.emptyMap()).sql();
Assertions.assertEquals(originalConnector, unflattenedConnector,
"Flattening and unflattening data types should have no impact on connector");
Expand Down Expand Up @@ -123,7 +123,7 @@ public void flattenUnflattenComplexMap() {
List<String> flattenedNames = flattenedType.getFieldList().stream().map(RelDataTypeField::getName)
.collect(Collectors.toList());
Assertions.assertIterableEquals(Arrays.asList("FOO$__MAPKEYTYPE__", "FOO$__MAPVALUETYPE__$QIZ$BAR", "FOO$__MAPVALUETYPE__$QIZ$CAR"), flattenedNames);
String flattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1",
String flattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null,
flattenedType, Collections.emptyMap()).sql();
Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` ("
+ "`FOO___MAPKEYTYPE__` ROW(`QUX` VARCHAR), "
Expand All @@ -133,9 +133,9 @@ public void flattenUnflattenComplexMap() {

RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory);
RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW);
String originalConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1",
String originalConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null,
rowType, Collections.emptyMap()).sql();
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1",
String unflattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null,
unflattenedType, Collections.emptyMap()).sql();
Assertions.assertEquals(originalConnector, unflattenedConnector,
"Flattening and unflattening data types should have no impact on connector");
Expand Down
Loading