diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java index 03ed6ddc..06d01ad9 100644 --- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Source.java @@ -46,7 +46,7 @@ public List path() { return path; } - protected String pathString() { + public String pathString() { return String.join(".", path); } diff --git a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java index fd311c63..98f82462 100644 --- a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java +++ b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java @@ -82,30 +82,29 @@ public Command parseCommand(List lines, List content, final Stri @Override public void execute(Context context, boolean execute) throws Exception { if (execute) { - try (Connection connection = context.connection()) { - if (!(connection instanceof HoptimatorConnection)) { - throw new IllegalArgumentException("This connection doesn't support `!specify`."); - } - String sql = context.previousSqlCommand().sql; - HoptimatorConnection conn = (HoptimatorConnection) connection; - RelRoot root = HoptimatorDriver.convert(conn, sql).root; - String[] parts = line.split(" ", 2); - String pipelineName = parts.length == 2 ? parts[1] : "test"; - Properties properties = new Properties(); - properties.putAll(conn.connectionProperties()); - properties.put(DeploymentService.PIPELINE_OPTION, pipelineName); - Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList(), properties) - .pipeline(pipelineName, conn); - List specs = new ArrayList<>(); - for (Source source : pipeline.sources()) { - specs.addAll(DeploymentService.specify(source, conn)); - } - specs.addAll(DeploymentService.specify(pipeline.sink(), conn)); - specs.addAll(DeploymentService.specify(pipeline.job(), conn)); - String joined = specs.stream().sorted().collect(Collectors.joining("---\n")); - String[] lines = joined.replaceAll(";\n", "\n").split("\n"); - context.echo(Arrays.asList(lines)); + Connection connection = context.connection(); + if (!(connection instanceof HoptimatorConnection)) { + throw new IllegalArgumentException("This connection doesn't support `!specify`."); } + String sql = context.previousSqlCommand().sql; + HoptimatorConnection conn = (HoptimatorConnection) connection; + RelRoot root = HoptimatorDriver.convert(conn, sql).root; + String[] parts = line.split(" ", 2); + String pipelineName = parts.length == 2 ? parts[1] : "test"; + Properties properties = new Properties(); + properties.putAll(conn.connectionProperties()); + properties.put(DeploymentService.PIPELINE_OPTION, pipelineName); + Pipeline pipeline = DeploymentService.plan(root, Collections.emptyList(), properties) + .pipeline(pipelineName, conn); + List specs = new ArrayList<>(); + for (Source source : pipeline.sources()) { + specs.addAll(DeploymentService.specify(source, conn)); + } + specs.addAll(DeploymentService.specify(pipeline.sink(), conn)); + specs.addAll(DeploymentService.specify(pipeline.job(), conn)); + String joined = specs.stream().sorted().collect(Collectors.joining("---\n")); + String[] lines = joined.replaceAll(";\n", "\n").split("\n"); + context.echo(Arrays.asList(lines)); } else { context.echo(content); } diff --git a/hoptimator-k8s/src/test/resources/k8s-ddl.id b/hoptimator-k8s/src/test/resources/k8s-ddl.id index a75076ca..5ce64abd 100644 --- a/hoptimator-k8s/src/test/resources/k8s-ddl.id +++ b/hoptimator-k8s/src/test/resources/k8s-ddl.id @@ -162,4 +162,25 @@ spec: parallelism: 1 upgradeMode: stateless state: running -!specify PAGE_VIEWS \ No newline at end of file +!specify PAGE_VIEWS + +insert into ads.ad_clicks select * from ads.ad_clicks; +apiVersion: flink.apache.org/v1beta1 +kind: FlinkSessionJob +metadata: + name: ads-database-adclicks +spec: + deploymentName: basic-session-deployment + job: + entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner + args: + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_source` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT * FROM `ADS`.`AD_CLICKS_source` + jarURI: file:///opt/hoptimator-flink-runner.jar + parallelism: 1 + upgradeMode: stateless + state: running +!specify AD_CLICKS diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index 7e8ee78a..f48e4c0c 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -9,6 +9,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.calcite.plan.Convention; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.rel2sql.RelToSqlConverter; @@ -114,15 +115,37 @@ public Pipeline pipeline(String name, Connection connection) throws SQLException private ScriptImplementor script(Connection connection) throws SQLException { ScriptImplementor script = ScriptImplementor.empty(); + // Check if we need to add suffixes to avoid table name collisions + boolean needsSuffixes = hasTableNameCollision(); + for (Map.Entry source : sources.entrySet()) { script = script.catalog(source.getKey().catalog()); script = script.database(source.getKey().catalog(), source.getKey().schema()); Map configs = ConnectionService.configure(source.getKey(), connection); - script = script.connector(source.getKey().catalog(), source.getKey().schema(), source.getKey().table(), source.getValue(), configs); + String suffix = needsSuffixes ? "_source" : null; + script = script.connector(source.getKey().catalog(), source.getKey().schema(), source.getKey().table(), suffix, source.getValue(), configs); } return script; } + /** + * Checks if there's a collision between source and sink table names. + * A collision occurs when a source table has the same catalog, schema, and table name as the sink. + */ + private boolean hasTableNameCollision() { + if (sink == null) { + return false; + } + for (Source source : sources.keySet()) { + if (Objects.equals(source.catalog(), sink.catalog()) + && Objects.equals(source.schema(), sink.schema()) + && Objects.equals(source.table(), sink.table())) { + return true; + } + } + return false; + } + /** SQL script ending in an INSERT INTO */ public ThrowingFunction sql(Connection connection) throws SQLException { return wrap(x -> { @@ -136,8 +159,22 @@ public ThrowingFunction sql(Connection connection) throws SQ Map sinkConfigs = ConnectionService.configure(sink, connection); script = script.catalog(sink.catalog()); script = script.database(sink.catalog(), sink.schema()); - script = script.connector(sink.catalog(), sink.schema(), sink.table(), targetRowType, sinkConfigs); - script = script.insert(sink.catalog(), sink.schema(), sink.table(), query, targetFields); + // Check if we need to add suffixes to avoid table name collisions + boolean needsSuffixes = hasTableNameCollision(); + String sinkSuffix = needsSuffixes ? "_sink" : null; + script = script.connector(sink.catalog(), sink.schema(), sink.table(), sinkSuffix, targetRowType, sinkConfigs); + + // Build table name replacement map for the query + Map tableNameReplacements = new HashMap<>(); + if (needsSuffixes) { + for (Source source : sources.keySet()) { + String qualifiedName = source.pathString(); + String suffixedTable = source.table() + "_source"; + tableNameReplacements.put(qualifiedName, suffixedTable); + } + } + + script = script.insert(sink.catalog(), sink.schema(), sink.table(), sinkSuffix, query, targetFields, tableNameReplacements); return script.sql(x); }); } diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 10ad2993..166e01fb 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -85,12 +85,23 @@ default ScriptImplementor with(ScriptImplementor next) { /** Append a query */ default ScriptImplementor query(RelNode relNode) { - return with(new QueryImplementor(relNode)); + return query(relNode, Collections.emptyMap()); + } + + /** Append a query with table name replacements */ + default ScriptImplementor query(RelNode relNode, Map tableNameReplacements) { + return with(new QueryImplementor(relNode, tableNameReplacements)); } /** Append a connector definition, e.g. `CREATE TABLE ... WITH (...)` */ default ScriptImplementor connector(@Nullable String catalog, String schema, String table, RelDataType rowType, Map connectorConfig) { - return with(new ConnectorImplementor(catalog, schema, table, rowType, connectorConfig)); + return connector(catalog, schema, table, null, rowType, connectorConfig); + } + + /** Append a connector definition with an optional suffix, e.g. `CREATE TABLE ... WITH (...)` */ + default ScriptImplementor connector(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelDataType rowType, + Map connectorConfig) { + return with(new ConnectorImplementor(catalog, schema, table, suffix, rowType, connectorConfig)); } /** Append a database definition, e.g. `CREATE CATALOG ...` */ @@ -105,7 +116,19 @@ default ScriptImplementor database(@Nullable String catalog, String database) { /** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */ default ScriptImplementor insert(@Nullable String catalog, String schema, String table, RelNode relNode, ImmutablePairList targetFields) { - return with(new InsertImplementor(catalog, schema, table, relNode, targetFields)); + return insert(catalog, schema, table, null, relNode, targetFields, Collections.emptyMap()); + } + + /** Append an insert statement with an optional suffix for the target table, e.g. `INSERT INTO ... SELECT ...` */ + default ScriptImplementor insert(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelNode relNode, + ImmutablePairList targetFields) { + return insert(catalog, schema, table, suffix, relNode, targetFields, Collections.emptyMap()); + } + + /** Append an insert statement with an optional suffix and table name replacements for the query, e.g. `INSERT INTO ... SELECT ...` */ + default ScriptImplementor insert(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelNode relNode, + ImmutablePairList targetFields, Map tableNameReplacements) { + return with(new InsertImplementor(catalog, schema, table, suffix, relNode, targetFields, tableNameReplacements)); } /** Render the script as DDL/SQL in the default dialect */ @@ -155,9 +178,15 @@ public void implement(SqlWriter w) { /** Implements an arbitrary RelNode as a query */ class QueryImplementor implements ScriptImplementor { private final RelNode relNode; + private final Map tableNameReplacements; public QueryImplementor(RelNode relNode) { + this(relNode, Collections.emptyMap()); + } + + public QueryImplementor(RelNode relNode, Map tableNameReplacements) { this.relNode = relNode; + this.tableNameReplacements = tableNameReplacements; } @Override @@ -169,6 +198,10 @@ public void implement(SqlWriter w) { SqlSelect select = (SqlSelect) node; select.setSelectList((SqlNodeList) Objects.requireNonNull(select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR))); } + // Apply table name replacements if any + if (!tableNameReplacements.isEmpty()) { + node = node.accept(new TableNameReplacer(tableNameReplacements)); + } node.unparse(w, 0, 0); } @@ -190,6 +223,33 @@ public SqlNode visit(SqlCall call) { }; } + /** + * A SqlShuttle that replaces table names in SQL nodes. + * Used to add suffixes to table names when there are source/sink collisions. + */ + class TableNameReplacer extends SqlShuttle { + private final Map replacements; + + public TableNameReplacer(Map replacements) { + this.replacements = replacements; + } + + @Override + public SqlNode visit(SqlIdentifier id) { + if (id.names.size() >= 2) { + // Build the qualified name to check for replacement + String qualifiedName = String.join(".", id.names); + if (replacements.containsKey(qualifiedName)) { + // Replace the last component (table name) with the suffixed version + List newNames = new ArrayList<>(id.names); + newNames.set(newNames.size() - 1, replacements.get(qualifiedName)); + return new SqlIdentifier(newNames, id.getParserPosition()); + } + } + return id; + } + } + /** * Implements a CREATE TABLE...WITH... DDL statement. *

@@ -201,13 +261,15 @@ class ConnectorImplementor implements ScriptImplementor { private final String catalog; private final String schema; private final String table; + private final String suffix; private final RelDataType rowType; private final Map connectorConfig; - public ConnectorImplementor(String catalog, String schema, String table, RelDataType rowType, Map connectorConfig) { + public ConnectorImplementor(String catalog, String schema, String table, String suffix, RelDataType rowType, Map connectorConfig) { this.catalog = catalog; this.schema = schema; this.table = table; + this.suffix = suffix; this.rowType = rowType; this.connectorConfig = connectorConfig; } @@ -215,7 +277,8 @@ public ConnectorImplementor(String catalog, String schema, String table, RelData @Override public void implement(SqlWriter w) { w.keyword("CREATE TABLE IF NOT EXISTS"); - (new CompoundIdentifierImplementor(catalog, schema, table)).implement(w); + String effectiveTable = suffix != null ? table + suffix : table; + (new CompoundIdentifierImplementor(catalog, schema, effectiveTable)).implement(w); SqlWriter.Frame frame1 = w.startList("(", ")"); (new RowTypeSpecImplementor(rowType)).implement(w); if (rowType.getField("PRIMARY_KEY", true, false) != null) { @@ -262,24 +325,30 @@ class InsertImplementor implements ScriptImplementor { private final String catalog; private final String schema; private final String table; + private final String suffix; private final RelNode relNode; private final ImmutablePairList targetFields; + private final Map tableNameReplacements; - public InsertImplementor(@Nullable String catalog, String schema, String table, RelNode relNode, ImmutablePairList targetFields) { + public InsertImplementor(@Nullable String catalog, String schema, String table, @Nullable String suffix, RelNode relNode, + ImmutablePairList targetFields, Map tableNameReplacements) { this.catalog = catalog; this.schema = schema; this.table = table; + this.suffix = suffix; this.relNode = relNode; this.targetFields = targetFields; + this.tableNameReplacements = tableNameReplacements; } @Override public void implement(SqlWriter w) { w.keyword("INSERT INTO"); - (new CompoundIdentifierImplementor(catalog, schema, table)).implement(w); + String effectiveTable = suffix != null ? table + suffix : table; + (new CompoundIdentifierImplementor(catalog, schema, effectiveTable)).implement(w); RelNode project = dropFields(relNode, targetFields); (new ColumnListImplementor(project.getRowType())).implement(w); - (new QueryImplementor(project)).implement(w); + (new QueryImplementor(project, tableNameReplacements)).implement(w); w.literal(";"); } diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java index 75214c1d..f379c045 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/TestDataTypeUtils.java @@ -42,9 +42,9 @@ public void flattenUnflatten() { flattenedNames); RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory); RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW); - String originalConnector = new ScriptImplementor.ConnectorImplementor("C", "S", "T1", + String originalConnector = new ScriptImplementor.ConnectorImplementor("C", "S", "T1", null, rowType, Collections.emptyMap()).sql(); - String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("C", "S", "T1", + String unflattenedConnector = new ScriptImplementor.ConnectorImplementor("C", "S", "T1", null, unflattenedType, Collections.emptyMap()).sql(); Assertions.assertEquals(originalConnector, unflattenedConnector, "Flattening and unflattening data types should have no impact on connector"); @@ -77,7 +77,7 @@ public void flattenUnflattenNestedArrays() { .collect(Collectors.toList()); Assertions.assertIterableEquals(Arrays.asList("FOO", "FOO$QUX", "FOO$QIZ", "BAR", "BAR$BAZ", "CAR", "DAY", "DAY$__ARRTYPE__", "DAY$__ARRTYPE__$__ARRTYPE__"), flattenedNames); - String flattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", + String flattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null, flattenedType, Collections.emptyMap()).sql(); Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` (" + "`FOO` ANY ARRAY, `FOO_QUX` VARCHAR, `FOO_QIZ` VARCHAR ARRAY, " @@ -88,9 +88,9 @@ public void flattenUnflattenNestedArrays() { RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory); RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW); - String originalConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", + String originalConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null, rowType, Collections.emptyMap()).sql(); - String unflattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", + String unflattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null, unflattenedType, Collections.emptyMap()).sql(); Assertions.assertEquals(originalConnector, unflattenedConnector, "Flattening and unflattening data types should have no impact on connector"); @@ -123,7 +123,7 @@ public void flattenUnflattenComplexMap() { List flattenedNames = flattenedType.getFieldList().stream().map(RelDataTypeField::getName) .collect(Collectors.toList()); Assertions.assertIterableEquals(Arrays.asList("FOO$__MAPKEYTYPE__", "FOO$__MAPVALUETYPE__$QIZ$BAR", "FOO$__MAPVALUETYPE__$QIZ$CAR"), flattenedNames); - String flattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", + String flattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null, flattenedType, Collections.emptyMap()).sql(); Assertions.assertEquals("CREATE TABLE IF NOT EXISTS `S`.`T1` (" + "`FOO___MAPKEYTYPE__` ROW(`QUX` VARCHAR), " @@ -133,9 +133,9 @@ public void flattenUnflattenComplexMap() { RelDataType unflattenedType = DataTypeUtils.unflatten(flattenedType, typeFactory); RelOptUtil.eq("original", rowType, "flattened-unflattened", unflattenedType, Litmus.THROW); - String originalConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", + String originalConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null, rowType, Collections.emptyMap()).sql(); - String unflattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", + String unflattenedConnector = new ScriptImplementor.ConnectorImplementor(null, "S", "T1", null, unflattenedType, Collections.emptyMap()).sql(); Assertions.assertEquals(originalConnector, unflattenedConnector, "Flattening and unflattening data types should have no impact on connector"); diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorSuffixTest.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorSuffixTest.java new file mode 100644 index 00000000..3ce50987 --- /dev/null +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorSuffixTest.java @@ -0,0 +1,174 @@ +package com.linkedin.hoptimator.util.planner; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.runtime.ImmutablePairList; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.tools.Frameworks; +import org.apache.calcite.tools.RelBuilder; +import org.junit.jupiter.api.Test; + +import java.util.AbstractMap; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for ScriptImplementor suffix functionality to handle source/sink table name collisions. + */ +public class ScriptImplementorSuffixTest { + @Test + public void testConnectorWithSuffix() { + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataType rowType = typeFactory.builder() + .add("CAMPAIGN_URN", typeFactory.createSqlType(SqlTypeName.VARCHAR)) + .add("MEMBER_URN", typeFactory.createSqlType(SqlTypeName.VARCHAR)) + .build(); + + Map config = new HashMap<>(); + config.put("connector", "datagen"); + config.put("number-of-rows", "10"); + + String sql = ScriptImplementor.empty() + .connector(null, "ADS", "AD_CLICKS", "_source", rowType, config) + .sql(); + + assertTrue(sql.contains("CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_source`"), + "Should create table with _source suffix. Got: " + sql); + assertTrue(sql.contains("'connector'='datagen'"), + "Should include connector config. Got: " + sql); + } + + @Test + public void testInsertWithSuffix() { + // Create a simple RelNode for testing + RelBuilder builder = RelBuilder.create( + Frameworks.newConfigBuilder() + .defaultSchema(Frameworks.createRootSchema(true)) + .build()); + + RelNode scan = builder + .values(new String[]{"CAMPAIGN_URN", "MEMBER_URN"}, "urn1", "urn2") + .build(); + + ImmutablePairList targetFields = ImmutablePairList.copyOf(Arrays.asList( + new AbstractMap.SimpleEntry<>(0, "CAMPAIGN_URN"), + new AbstractMap.SimpleEntry<>(1, "MEMBER_URN") + )); + + String sql = ScriptImplementor.empty() + .insert(null, "ADS", "AD_CLICKS", "_sink", scan, targetFields) + .sql(); + + assertTrue(sql.contains("INSERT INTO `ADS`.`AD_CLICKS_sink`"), + "Should insert into table with _sink suffix. Got: " + sql); + } + + @Test + public void testTableNameReplacements() { + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + + // Create a schema with a table to scan from + SchemaPlus rootSchema = Frameworks.createRootSchema(true); + SchemaPlus adsSchema = rootSchema.add("ADS", new AbstractSchema()); + + // Add a mock table + RelDataType tableType = typeFactory.builder() + .add("CAMPAIGN_URN", typeFactory.createSqlType(SqlTypeName.VARCHAR)) + .add("MEMBER_URN", typeFactory.createSqlType(SqlTypeName.VARCHAR)) + .build(); + + adsSchema.add("AD_CLICKS", new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return tableType; + } + }); + + RelBuilder builder = RelBuilder.create( + Frameworks.newConfigBuilder() + .defaultSchema(rootSchema) + .build()); + + // Create a scan that references ADS.AD_CLICKS + RelNode scan = builder + .scan("ADS", "AD_CLICKS") + .build(); + + ImmutablePairList targetFields = ImmutablePairList.copyOf(Arrays.asList( + new AbstractMap.SimpleEntry<>(0, "CAMPAIGN_URN"), + new AbstractMap.SimpleEntry<>(1, "MEMBER_URN") + )); + + // Test that table name replacement works in the SELECT query + Map tableReplacements = new HashMap<>(); + tableReplacements.put("ADS.AD_CLICKS", "AD_CLICKS_source"); + + String sql = ScriptImplementor.empty() + .insert(null, "ADS", "AD_CLICKS", "_sink", scan, targetFields, tableReplacements) + .sql(); + + assertTrue(sql.contains("INSERT INTO `ADS`.`AD_CLICKS_sink`"), + "Should insert into table with _sink suffix. Got: " + sql); + assertTrue(sql.contains("FROM `ADS`.`AD_CLICKS_source`"), + "Should select from table with _source suffix (table name replaced). Got: " + sql); + } + + @Test + public void testFullPipelineWithCollision() { + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataType rowType = typeFactory.builder() + .add("CAMPAIGN_URN", typeFactory.createSqlType(SqlTypeName.VARCHAR)) + .add("MEMBER_URN", typeFactory.createSqlType(SqlTypeName.VARCHAR)) + .build(); + + Map sourceConfig = new HashMap<>(); + sourceConfig.put("connector", "datagen"); + sourceConfig.put("number-of-rows", "10"); + + Map sinkConfig = new HashMap<>(); + sinkConfig.put("connector", "blackhole"); + + // Create a simple RelNode + RelBuilder builder = RelBuilder.create( + Frameworks.newConfigBuilder() + .defaultSchema(Frameworks.createRootSchema(true)) + .build()); + + RelNode scan = builder + .values(new String[]{"CAMPAIGN_URN", "MEMBER_URN"}, "urn1", "urn2") + .build(); + + ImmutablePairList targetFields = ImmutablePairList.copyOf(Arrays.asList( + new AbstractMap.SimpleEntry<>(0, "CAMPAIGN_URN"), + new AbstractMap.SimpleEntry<>(1, "MEMBER_URN") + )); + + String sql = ScriptImplementor.empty() + .database(null, "ADS") + .connector(null, "ADS", "AD_CLICKS", "_source", rowType, sourceConfig) + .database(null, "ADS") + .connector(null, "ADS", "AD_CLICKS", "_sink", rowType, sinkConfig) + .insert(null, "ADS", "AD_CLICKS", "_sink", scan, targetFields) + .sql(); + + // Verify both tables are created with different suffixes + assertTrue(sql.contains("CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_source`"), + "Should create source table with suffix. Got: " + sql); + assertTrue(sql.contains("CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_sink`"), + "Should create sink table with suffix. Got: " + sql); + assertTrue(sql.contains("'connector'='datagen'"), + "Should include datagen connector. Got: " + sql); + assertTrue(sql.contains("'connector'='blackhole'"), + "Should include blackhole connector. Got: " + sql); + assertTrue(sql.contains("INSERT INTO `ADS`.`AD_CLICKS_sink`"), + "Should insert into sink table. Got: " + sql); + } +}