From 8670c7440a5122cbec100cf46f5a0318fbf7ca3f Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 13 Mar 2025 18:00:16 +0530 Subject: [PATCH 01/10] initial changes --- .../postgresql/PostgresConnectorConfig.java | 12 +++++-- .../postgresql/YugabyteDBConnector.java | 33 +++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 515b708c0a2..8a6847123e3 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -393,6 +393,12 @@ public boolean isParallel() { public boolean isParallel() { return true; } + }, + PARALLEL_SLOT("PARALLEL_SLOT") { + @Override + public boolean isParallel() { + return false; + } }; @@ -765,8 +771,10 @@ public static SchemaRefreshMode parse(String value) { .withImportance(Importance.LOW) .withDescription("Semi-colon separated values for hash ranges to be polled by tasks.") .withValidation((config, field, output) -> { - if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) { - output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel'"); + if (!config.getString(field, "").isEmpty() + && (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel") + || !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel_slot"))) { + output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel' or 'parallel_slot'"); return 1; } return 0; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index 835ec354189..995666b18e6 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -94,6 +94,34 @@ protected List> getTaskConfigsForParallelStreaming(List> getTaskConfigsForParallelSlot(List slotRanges) { + List> taskConfigs = new ArrayList<>(); + +// if (connectorConfig.getSnapshotter().shouldSnapshot()) { +// props.put(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name(), props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name())); +// } + + for (int i = 0; i < slotRanges.size(); ++i) { + Map taskProps = new HashMap<>(this.props); + + taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i)); + taskProps.put(PostgresConnectorConfig.STREAM_PARAMS.name(), "hash_range=" + slotRanges.get(i)); + +// if (connectorConfig.getSnapshotter().shouldSnapshot()) { +// String[] splitRange = slotRanges.get(i).split(","); +// String query = getParallelSnapshotQuery(splitRange[0], splitRange[1]); +// taskProps.put( +// PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + "." + taskProps.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), +// query +// ); +// } + + taskConfigs.add(taskProps); + } + + return taskConfigs; + } + @Override public List> taskConfigs(int maxTasks) { if (props == null) { @@ -117,6 +145,11 @@ public List> taskConfigs(int maxTasks) { YBValidate.completeRangesProvided(slotRanges); return getTaskConfigsForParallelStreaming(slotNames, publicationNames, slotRanges); + } else if (connectorConfig.streamingMode().equals(PostgresConnectorConfig.StreamingMode.PARALLEL_SLOT)) { + LOGGER.info("Initialising parallel slot streaming mode"); + validateSingleTableProvided(tableIncludeList, false /* isSnapshot */); + + return getTaskConfigsForParallelSlot(connectorConfig.getSlotRanges()); } // TODO Vaibhav (#26106): The following code block is not needed now, remove in a separate PR. From ca3ce81ff7bcd1051fd314ad0863f1f392e8c411 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 20 Mar 2025 15:27:12 +0530 Subject: [PATCH 02/10] changes --- .../postgresql/PostgresConnectorConfig.java | 26 +++++++++++++++++++ .../postgresql/PostgresPartition.java | 9 ++++--- .../connector/postgresql/connection/Lsn.java | 20 +++++++++++++- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 8a6847123e3..26997638357 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -8,9 +8,12 @@ import java.time.Duration; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.regex.Matcher; import java.util.regex.Pattern; import io.debezium.DebeziumException; @@ -1241,6 +1244,29 @@ public StreamingMode streamingMode() { return StreamingMode.parse(getConfig().getString(STREAMING_MODE)); } + /** + * Reads the stream params specified and returns the slot boundaries. + * @return a {@link List} where the list represents the range of a + * tablet i.e. {@code [startHashCode, endHashCode)} + */ + public List getSlotBounds() { + String regex = "hash_range=(\\d+),(\\d+)"; + + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(streamParams()); + + if (matcher.find()) { + // Extract the numbers + String start = matcher.group(1); + String end = matcher.group(2); + + return List.of(Integer.valueOf(start), Integer.valueOf(end)); + } + + // Return the full list assuming that we are in the old model. + return List.of(0, 65536); + } + protected boolean dropSlotOnStop() { if (getConfig().hasKey(DROP_SLOT_ON_STOP.name())) { return getConfig().getBoolean(DROP_SLOT_ON_STOP); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java index 0166ba72479..a5e5d9e0e8d 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java @@ -23,12 +23,15 @@ public class PostgresPartition extends AbstractPartition implements Partition { private final String serverName; private final String taskId; private final String slotName; + private final String startHashCode; - public PostgresPartition(String serverName, String databaseName, String taskId, String slotName) { + public PostgresPartition(String serverName, String databaseName, String taskId, String slotName, + String startHashCode) { super(databaseName); this.serverName = serverName; this.taskId = taskId; this.slotName = slotName; + this.startHashCode = startHashCode; } @Override @@ -59,7 +62,7 @@ public String toString() { } public String getPartitionIdentificationKey() { - return String.format("%s_%s_%s", serverName, taskId, slotName); + return String.format("%s_%s_%s_%s", serverName, taskId, slotName, startHashCode); } static class Provider implements Partition.Provider { @@ -75,7 +78,7 @@ static class Provider implements Partition.Provider { public Set getPartitions() { return Collections.singleton(new PostgresPartition( connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()), - connectorConfig.getTaskId(), connectorConfig.slotName())); + connectorConfig.getTaskId(), connectorConfig.slotName(), String.valueOf(connectorConfig.getSlotBounds().get(0)))); } } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java index d2cedf2a033..5d7c1d63e78 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.postgresql.connection; +import java.math.BigInteger; import java.nio.ByteBuffer; import com.yugabyte.replication.LogSequenceNumber; @@ -96,6 +97,23 @@ public long asLong() { return value; } + /** + * Return a BigInteger equal to the unsigned value of the argument. + * Code taken from Long.java + */ + public BigInteger asUnsignedBigInteger() { + if (i >= 0L) + return BigInteger.valueOf(value); + else { + int upper = (int) (value >>> 32); + int lower = (int) value; + + // return (upper << 32) + lower + return (BigInteger.valueOf(Integer.toUnsignedLong(upper))).shiftLeft(32). + add(BigInteger.valueOf(Integer.toUnsignedLong(lower))); + } + } + /** * @return PostgreSQL JDBC driver representation of position in the write-ahead log stream */ @@ -144,7 +162,7 @@ public boolean isValid() { @Override public String toString() { - return "LSN{" + asLong() + '}'; + return "LSN{" + asUnsignedBigInteger().toString() + '}'; } @Override From 0efecb793016915dce5e4fb72fb08f82151a007d Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 26 Mar 2025 12:52:11 +0530 Subject: [PATCH 03/10] changes --- .../java/io/debezium/connector/postgresql/connection/Lsn.java | 2 +- .../debezium/connector/postgresql/PostgresPartitionTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java index 5d7c1d63e78..c7caa06030f 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java @@ -102,7 +102,7 @@ public long asLong() { * Code taken from Long.java */ public BigInteger asUnsignedBigInteger() { - if (i >= 0L) + if (value >= 0L) return BigInteger.valueOf(value); else { int upper = (int) (value >>> 32); diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java index 6969bf56b21..604bbdcb94c 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresPartitionTest.java @@ -11,11 +11,11 @@ public class PostgresPartitionTest extends AbstractPartitionTest Date: Tue, 1 Apr 2025 12:19:37 +0530 Subject: [PATCH 04/10] divide hash ranges equally to tasks --- .../postgresql/YugabyteDBConnector.java | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index 995666b18e6..eaf023c3f99 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -94,27 +94,25 @@ protected List> getTaskConfigsForParallelStreaming(List> getTaskConfigsForParallelSlot(List slotRanges) { + protected List> getTaskConfigsForParallelSlot(int maxTasks) { List> taskConfigs = new ArrayList<>(); -// if (connectorConfig.getSnapshotter().shouldSnapshot()) { -// props.put(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name(), props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name())); -// } + final long upperBoundExclusive = 64 * 1024; + final long rangeSize = upperBoundExclusive / maxTasks; - for (int i = 0; i < slotRanges.size(); ++i) { + for (int i = 0; i < maxTasks; ++i) { Map taskProps = new HashMap<>(this.props); - taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i)); - taskProps.put(PostgresConnectorConfig.STREAM_PARAMS.name(), "hash_range=" + slotRanges.get(i)); + // The range defined here will be [lowerBound, upperBound) and for the last range, + // we will ensure that we are covering the final boundary. + long lowerBound = i * rangeSize; + long upperBound = (i == maxTasks - 1) ? upperBoundExclusive : (lowerBound + rangeSize); -// if (connectorConfig.getSnapshotter().shouldSnapshot()) { -// String[] splitRange = slotRanges.get(i).split(","); -// String query = getParallelSnapshotQuery(splitRange[0], splitRange[1]); -// taskProps.put( -// PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + "." + taskProps.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()), -// query -// ); -// } + LOGGER.info("Creating task {} with range [{}, {})", i, lowerBound, upperBound); + + taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i)); + taskProps.put(PostgresConnectorConfig.STREAM_PARAMS.name(), + String.format("hash_range=%d,%d", lowerBound, upperBound)); taskConfigs.add(taskProps); } @@ -149,7 +147,7 @@ public List> taskConfigs(int maxTasks) { LOGGER.info("Initialising parallel slot streaming mode"); validateSingleTableProvided(tableIncludeList, false /* isSnapshot */); - return getTaskConfigsForParallelSlot(connectorConfig.getSlotRanges()); + return getTaskConfigsForParallelSlot(maxTasks); } // TODO Vaibhav (#26106): The following code block is not needed now, remove in a separate PR. From 463853501eb82f0c44cc51e607e4c4ba1edacddc Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Tue, 1 Apr 2025 12:49:28 +0530 Subject: [PATCH 05/10] partition is now identified without task ID --- .../io/debezium/connector/postgresql/PostgresPartition.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java index a5e5d9e0e8d..3017b38babb 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java @@ -62,7 +62,7 @@ public String toString() { } public String getPartitionIdentificationKey() { - return String.format("%s_%s_%s_%s", serverName, taskId, slotName, startHashCode); + return String.format("%s_%s_%s", serverName, slotName, startHashCode); } static class Provider implements Partition.Provider { From 976ea042da9680d3121c366ea25ec0a5117577b3 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Fri, 11 Apr 2025 20:31:37 +0530 Subject: [PATCH 06/10] added multi partition setup --- .../PostgresStreamingChangeEventSource.java | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 3d04b5796e0..d38d60b1291 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -5,6 +5,7 @@ */ package io.debezium.connector.postgresql; +import java.math.BigInteger; import java.sql.SQLException; import java.util.Map; import java.util.Objects; @@ -313,10 +314,18 @@ private void processMessages(ChangeEventSourceContext context, PostgresPartition } } + public static Short getTabletStartHashCode(Lsn lsn) { + return (short) ((lsn.asLong() >> 48) & 0xFFFF); + } + private void processReplicationMessages(PostgresPartition partition, PostgresOffsetContext offsetContext, ReplicationStream stream, ReplicationMessage message) throws SQLException, InterruptedException { final Lsn lsn = stream.lastReceivedLsn(); + final PostgresPartition messagePartition = + new PostgresPartition(connectorConfig.getConnectorName(), connectorConfig.databaseName(), + connectorConfig.getTaskId(), connectorConfig.slotName(), + String.valueOf(getTabletStartHashCode((lsn)))); if (message.isLastEventForLsn()) { lastCompletelyProcessedLsn = lsn; @@ -346,7 +355,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff // Don't skip on BEGIN message as it would flush LSN for the whole transaction // too early if (message.getOperation() == Operation.COMMIT) { - commitMessage(partition, offsetContext, lsn, message); + commitMessage(messagePartition, offsetContext, lsn, message); } return; } @@ -356,11 +365,11 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff null, message.getOperation()); if (message.getOperation() == Operation.BEGIN) { - dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime()); + dispatcher.dispatchTransactionStartedEvent(messagePartition, toString(message.getTransactionId()), offsetContext, message.getCommitTime()); } else if (message.getOperation() == Operation.COMMIT) { - commitMessage(partition, offsetContext, lsn, message); - dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime()); + commitMessage(messagePartition, offsetContext, lsn, message); + dispatcher.dispatchTransactionCommittedEvent(messagePartition, offsetContext, message.getCommitTime()); } maybeWarnAboutGrowingWalBacklog(true); } @@ -371,11 +380,11 @@ else if (message.getOperation() == Operation.MESSAGE) { // non-transactional message that will not be followed by a COMMIT message if (message.isLastEventForLsn()) { - commitMessage(partition, offsetContext, lsn, message); + commitMessage(messagePartition, offsetContext, lsn, message); } dispatcher.dispatchLogicalDecodingMessage( - partition, + messagePartition, offsetContext, clock.currentTimeAsInstant().toEpochMilli(), (LogicalDecodingMessage) message); @@ -399,10 +408,10 @@ else if (message.getOperation() == Operation.MESSAGE) { message.getOperation()); boolean dispatched = message.getOperation() != Operation.NOOP && dispatcher.dispatchDataChangeEvent( - partition, + messagePartition, tableId, new PostgresChangeRecordEmitter( - partition, + messagePartition, offsetContext, clock, connectorConfig, From 9fa033c1ed03799ff639b52de7f69c889e09d03f Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Fri, 28 Mar 2025 06:51:11 +0000 Subject: [PATCH 07/10] changes for parallel slot poc --- .../postgresql/PostgresConnectorConfig.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 26997638357..dd2bf4a3280 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -772,16 +772,16 @@ public static SchemaRefreshMode parse(String value) { public static final Field SLOT_RANGES = Field.create("slot.ranges") .withDisplayName("Ranges on which a slot is supposed to operate") .withImportance(Importance.LOW) - .withDescription("Semi-colon separated values for hash ranges to be polled by tasks.") - .withValidation((config, field, output) -> { - if (!config.getString(field, "").isEmpty() - && (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel") - || !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel_slot"))) { - output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel' or 'parallel_slot'"); - return 1; - } - return 0; - }); + .withDescription("Semi-colon separated values for hash ranges to be polled by tasks."); + // .withValidation((config, field, output) -> { + // if (!config.getString(field, "").isEmpty() + // && (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel") + // || !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel_slot"))) { + // output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel' or 'parallel_slot'"); + // return 1; + // } + // return 0; + // }); public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections") .withDisplayName("YB load balance connections") From 59fe6983eba7f6cb11747527ae6375824a899e6a Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 16 Apr 2025 10:25:41 +0000 Subject: [PATCH 08/10] added multi partition handling --- .../postgresql/PostgresConnectorConfig.java | 16 +++ .../postgresql/PostgresOffsetContext.java | 86 ++++++++----- .../PostgresSnapshotChangeEventSource.java | 9 +- .../PostgresStreamingChangeEventSource.java | 118 ++++++++++++------ .../AbstractYugabyteDBTaskMetrics.java | 11 +- .../postgresql/PostgresConnectorIT.java | 26 ++++ 6 files changed, 195 insertions(+), 71 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index dd2bf4a3280..2df86c4afa8 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -390,18 +390,33 @@ public enum StreamingMode implements EnumeratedValue { public boolean isParallel() { return false; } + + @Override + public boolean isParallelSlot() { + return false; + } }, PARALLEL("PARALLEL") { @Override public boolean isParallel() { return true; } + + @Override + public boolean isParallelSlot() { + return false; + } }, PARALLEL_SLOT("PARALLEL_SLOT") { @Override public boolean isParallel() { return false; } + + @Override + public boolean isParallelSlot() { + return true; + } }; @@ -421,6 +436,7 @@ public String getValue() { } public abstract boolean isParallel(); + public abstract boolean isParallelSlot(); } public enum LsnType implements EnumeratedValue { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index 53348ba1915..74bd9009e3b 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -9,6 +9,7 @@ import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; @@ -43,6 +44,8 @@ public class PostgresOffsetContext extends CommonOffsetContext { private Lsn streamingStoppingLsn = null; private final TransactionContext transactionContext; private final IncrementalSnapshotContext incrementalSnapshotContext; + private final Map partitionSourceInfo; + private final PostgresConnectorConfig connectorConfig; private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn, Lsn lastCompletelyProcessedLsn, Lsn lastCommitLsn, Long txId, Operation messageType, Instant time, @@ -66,37 +69,48 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn, } this.transactionContext = transactionContext; this.incrementalSnapshotContext = incrementalSnapshotContext; + this.connectorConfig = connectorConfig; + this.partitionSourceInfo = new ConcurrentHashMap<>(); } @Override public Map getOffset() { + // Map result = new HashMap<>(); + // if (sourceInfo.timestamp() != null) { + // result.put(SourceInfo.TIMESTAMP_USEC_KEY, Conversions.toEpochMicros(sourceInfo.timestamp())); + // } + // if (sourceInfo.txId() != null) { + // result.put(SourceInfo.TXID_KEY, sourceInfo.txId()); + // } + // if (sourceInfo.lsn() != null) { + // result.put(SourceInfo.LSN_KEY, sourceInfo.lsn().asLong()); + // } + // if (sourceInfo.xmin() != null) { + // result.put(SourceInfo.XMIN_KEY, sourceInfo.xmin()); + // } + // if (sourceInfo.isSnapshot()) { + // result.put(SourceInfo.SNAPSHOT_KEY, true); + // result.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, lastSnapshotRecord); + // } + // if (lastCompletelyProcessedLsn != null) { + // result.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, lastCompletelyProcessedLsn.asLong()); + // } + // if (lastCommitLsn != null) { + // result.put(LAST_COMMIT_LSN_KEY, lastCommitLsn.asLong()); + // } + // if (sourceInfo.messageType() != null) { + // result.put(SourceInfo.MSG_TYPE_KEY, sourceInfo.messageType().toString()); + // } + // return sourceInfo.isSnapshot() ? result : incrementalSnapshotContext.store(transactionContext.store(result)); + Map result = new HashMap<>(); - if (sourceInfo.timestamp() != null) { - result.put(SourceInfo.TIMESTAMP_USEC_KEY, Conversions.toEpochMicros(sourceInfo.timestamp())); - } - if (sourceInfo.txId() != null) { - result.put(SourceInfo.TXID_KEY, sourceInfo.txId()); - } - if (sourceInfo.lsn() != null) { - result.put(SourceInfo.LSN_KEY, sourceInfo.lsn().asLong()); - } - if (sourceInfo.xmin() != null) { - result.put(SourceInfo.XMIN_KEY, sourceInfo.xmin()); - } - if (sourceInfo.isSnapshot()) { - result.put(SourceInfo.SNAPSHOT_KEY, true); - result.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, lastSnapshotRecord); - } - if (lastCompletelyProcessedLsn != null) { - result.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, lastCompletelyProcessedLsn.asLong()); - } - if (lastCommitLsn != null) { - result.put(LAST_COMMIT_LSN_KEY, lastCommitLsn.asLong()); - } - if (sourceInfo.messageType() != null) { - result.put(SourceInfo.MSG_TYPE_KEY, sourceInfo.messageType().toString()); + + for (Map.Entry entry : this.partitionSourceInfo.entrySet()) { + // Key here is the identification key of the partition. + result.put(entry.getKey(), entry.getValue().lsn().asLong()); } - return sourceInfo.isSnapshot() ? result : incrementalSnapshotContext.store(transactionContext.store(result)); + + return result; } @Override @@ -120,21 +134,33 @@ public void preSnapshotCompletion() { lastSnapshotRecord = true; } - public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId, Operation messageType) { + public void updateWalPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId, Operation messageType) { this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; - sourceInfo.update(lsn, commitTime, txId, xmin, tableId, messageType); + SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey()); + if (info == null) { + info = new SourceInfo(this.connectorConfig); + this.partitionSourceInfo.put(partition.getPartitionIdentificationKey(), info); + } + info.update(lsn, commitTime, txId, xmin, tableId, messageType); } /** * update wal position for lsn events that do not have an associated table or schema */ - public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, Operation messageType) { - updateWalPosition(lsn, lastCompletelyProcessedLsn, commitTime, txId, xmin, null, messageType); + public void updateWalPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, Operation messageType) { + updateWalPosition(partition, lsn, lastCompletelyProcessedLsn, commitTime, txId, xmin, null, messageType); } - public void updateCommitPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn) { + public void updateCommitPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn) { this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; this.lastCommitLsn = lsn; + + SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey()); + if (info == null) { + info = new SourceInfo(this.connectorConfig); + this.partitionSourceInfo.put(partition.getPartitionIdentificationKey(), info); + } + sourceInfo.updateLastCommit(lsn); } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java index ff875750395..7fd70a14451 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java @@ -176,20 +176,21 @@ protected void determineSnapshotOffset(RelationalSnapshotContext commitTimes; + protected ConcurrentHashMap commitLsnMap; /** * For DEBUGGING @@ -108,6 +110,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi this.replicationConnection = (PostgresReplicationConnection) replicationConnection; this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval()); this.commitTimes = new ConcurrentLinkedQueue<>(); + this.commitLsnMap = new ConcurrentHashMap<>(); } @Override @@ -202,7 +205,17 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio connection.commit(); } + // TODO Vaibhav: Can we initialize all of our partition LSN values to this? this.lastCompletelyProcessedLsn = replicationStream.get().startLsn(); + // Lsn lastLsn = this.replicationStream.get().lastReceivedLsn(); + + LOGGER.info("Start LSN for partition with hash code {} is {}", getTabletStartHashCode(this.lastCompletelyProcessedLsn.asLong()), getLsnValue(this.lastCompletelyProcessedLsn.asLong())); + // LOGGER.info("Last LSN for partition with hash code {} is {}", getTabletStartHashCode(lastLsn.asLong()), getLsnValue(lastLsn.asLong())); + + // if (connectorConfig.streamingMode().isParallelSlot()) { + // LOGGER.info("Searching WAL position for parallel slot"); + // searchWalPosition(context, partition, this.effectiveOffset, stream, walPosition); + // } // Against YB, filtering of records based on Wal position is only enabled when connector config provide.transaction.metadata is set to false. if (!YugabyteDBServer.isEnabled() || (YugabyteDBServer.isEnabled() && !connectorConfig.shouldProvideTransactionMetadata())) { @@ -314,8 +327,12 @@ private void processMessages(ChangeEventSourceContext context, PostgresPartition } } - public static Short getTabletStartHashCode(Lsn lsn) { - return (short) ((lsn.asLong() >> 48) & 0xFFFF); + public static long getTabletStartHashCode(long lsnLongValue) { + return ((lsnLongValue >> 48) & 0xFFFF); + } + + public static long getLsnValue(long lsnLongValue) { + return (lsnLongValue & 0x0000FFFFFFFFFFFFL); } private void processReplicationMessages(PostgresPartition partition, PostgresOffsetContext offsetContext, ReplicationStream stream, ReplicationMessage message) @@ -325,7 +342,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff final PostgresPartition messagePartition = new PostgresPartition(connectorConfig.getConnectorName(), connectorConfig.databaseName(), connectorConfig.getTaskId(), connectorConfig.slotName(), - String.valueOf(getTabletStartHashCode((lsn)))); + String.valueOf(getTabletStartHashCode((lsn.asLong())))); if (message.isLastEventForLsn()) { lastCompletelyProcessedLsn = lsn; @@ -360,7 +377,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff return; } - offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), + offsetContext.updateWalPosition(messagePartition, lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), taskContext.getSlotXmin(connection), null, message.getOperation()); @@ -374,7 +391,7 @@ else if (message.getOperation() == Operation.COMMIT) { maybeWarnAboutGrowingWalBacklog(true); } else if (message.getOperation() == Operation.MESSAGE) { - offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), + offsetContext.updateWalPosition(messagePartition, lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), taskContext.getSlotXmin(connection), message.getOperation()); @@ -402,11 +419,6 @@ else if (message.getOperation() == Operation.MESSAGE) { Objects.requireNonNull(tableId); } - offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), - taskContext.getSlotXmin(connection), - tableId, - message.getOperation()); - boolean dispatched = message.getOperation() != Operation.NOOP && dispatcher.dispatchDataChangeEvent( messagePartition, tableId, @@ -419,6 +431,11 @@ else if (message.getOperation() == Operation.MESSAGE) { connection, tableId, message)); + + offsetContext.updateWalPosition(messagePartition, lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), + taskContext.getSlotXmin(connection), + tableId, + message.getOperation()); maybeWarnAboutGrowingWalBacklog(dispatched); } @@ -474,7 +491,7 @@ private void probeConnectionIfNeeded() throws SQLException { private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException { lastCompletelyProcessedLsn = lsn; - offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn); + offsetContext.updateCommitPosition(partition, lsn, lastCompletelyProcessedLsn); if (this.connectorConfig.slotLsnType().isHybridTime()) { if (message.getOperation() == Operation.COMMIT) { @@ -527,37 +544,66 @@ private void maybeWarnAboutGrowingWalBacklog(boolean dispatched) { public void commitOffset(Map partition, Map offset) { try { ReplicationStream replicationStream = this.replicationStream.get(); - final Lsn commitLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)); - final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); - final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn; - - LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); - if (replicationStream != null && lsn != null) { - if (!lsnFlushingAllowed) { - LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn); + if (this.connectorConfig.streamingMode().isParallelSlot()) { + // In parallel mode, we don't have a single offset to commit, but rather + // a set of offsets for each partition. We need to commit the offset for + // the partition that is being processed. + if (replicationStream == null) { + LOGGER.debug("Streaming has already stopped, ignoring commit callback..."); + } else if (!lsnFlushingAllowed) { + LOGGER.info("Received offset commit request but ignoring it. LSN flushing is not allowed yet"); return; - } - - Lsn finalLsn; - if (this.connectorConfig.slotLsnType().isHybridTime()) { - finalLsn = getLsnToBeFlushed(lsn); } else { - finalLsn = lsn; - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Flushing LSN to server: {}", finalLsn); - } - // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments - replicationStream.flushLsn(finalLsn); + for (Map.Entry entry : offset.entrySet()) { + final long lsnAck = (Long) entry.getValue(); + final long startHash = getTabletStartHashCode(lsnAck); + final long lsnValue = getLsnValue(lsnAck); + if (this.commitLsnMap.contains(startHash) && this.commitLsnMap.get(startHash) > lsnValue) { + // Current offset is less than the last flushed offset for this partition. + LOGGER.info("{} | Skipping commit LSN {} for partition with start hash {}", taskContext.getTaskId(), lsnValue, startHash); + continue; + } - if (this.connectorConfig.slotLsnType().isHybridTime()) { - lastSentFeedback = finalLsn; - cleanCommitTimeQueue(finalLsn); + // TODO Vaibhav: This log is only at info level during testing. + LOGGER.info("{} | Flushing lsn {} for partition with start hash {}", taskContext.getTaskId(), lsnValue, startHash); + this.commitLsnMap.put(startHash, lsnValue); + replicationStream.flushLsn(Lsn.valueOf(lsnAck)); + } } } else { - LOGGER.debug("Streaming has already stopped, ignoring commit callback..."); + final Lsn commitLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)); + final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); + final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn; + + LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); + if (replicationStream != null && lsn != null) { + if (!lsnFlushingAllowed) { + LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn); + return; + } + + Lsn finalLsn; + if (this.connectorConfig.slotLsnType().isHybridTime()) { + finalLsn = getLsnToBeFlushed(lsn); + } else { + finalLsn = lsn; + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Flushing LSN to server: {}", finalLsn); + } + // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments + replicationStream.flushLsn(finalLsn); + + if (this.connectorConfig.slotLsnType().isHybridTime()) { + lastSentFeedback = finalLsn; + cleanCommitTimeQueue(finalLsn); + } + } + else { + LOGGER.debug("Streaming has already stopped, ignoring commit callback..."); + } } } catch (SQLException e) { diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBTaskMetrics.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBTaskMetrics.java index 8cf03d36e52..ed64b62d349 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBTaskMetrics.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/metrics/AbstractYugabyteDBTaskMetrics.java @@ -11,6 +11,8 @@ import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Collect; import org.apache.kafka.connect.data.Struct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.HashMap; @@ -20,9 +22,11 @@ abstract class AbstractYugabyteDBTaskMetrics extends YugabyteDBMetrics implements ChangeEventSourceMetrics, YugabyteDBTaskMetricsMXBean { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractYugabyteDBTaskMetrics.class); private final ChangeEventQueueMetrics changeEventQueueMetrics; private final Map beans = new HashMap<>(); + private final Function beanFactory; AbstractYugabyteDBTaskMetrics(CdcSourceTaskContext taskContext, String contextName, @@ -35,6 +39,8 @@ abstract class AbstractYugabyteDBTaskMetrics handler) { B bean = beans.get(partition); if (bean == null) { - throw new IllegalArgumentException("MBean for partition " + partition + " are not registered"); + LOGGER.info("MBean for partition {} are not registered, registering them now", partition); + beans.put(partition, beanFactory.apply(partition)); + bean = beans.get(partition); + bean.register(); } handler.accept(bean); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 03a5531fea0..d196ff117f8 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -344,6 +344,32 @@ public void shouldProduceEventsWithInitialSnapshot() throws Exception { assertRecordsAfterInsert(2, 3, 3); } + @Test + public void testParallelSlot() throws Exception { + TestHelper.execute("DROP SCHEMA IF EXISTS parallel CASCADE;"); + TestHelper.execute("CREATE SCHEMA parallel;"); + TestHelper.execute("CREATE TABLE parallel.test_table (id INT PRIMARY KEY) SPLIT INTO 4 TABLETS;"); + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) + .with(PostgresConnectorConfig.SLOT_NAME, "test_slot") + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "parallel.test_table") + .with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.PARALLEL_SLOT.getValue()); + start(YugabyteDBConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + + TestHelper.waitFor(Duration.ofSeconds(15)); + + // Insert 50 records. + for (int i = 0; i < 50; i++) { + TestHelper.execute("INSERT INTO parallel.test_table (id) VALUES (" + i + ");"); + } + + // Log that we are waiting and wait for 50 seconds for the records to be consumed. + LOGGER.info("Waiting for 50 seconds for the records to be consumed."); + TestHelper.waitFor(Duration.ofSeconds(50)); + } + @Test public void initialSnapshotWithExistingSlot() throws Exception { TestHelper.execute(SETUP_TABLES_STMT); From 91a191fad54469762f829e174eefd4fbd62ebedf Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 14 May 2025 10:14:21 +0000 Subject: [PATCH 09/10] added cdc state table wrapper --- .../postgresql/CDCStateTableTest.java | 40 ++++++ .../postgresql/YugabyteDBParallelSlotIT.java | 124 ++++++++++++++++++ .../postgresql/cdcstate/CDCStateRow.java | 99 ++++++++++++++ .../postgresql/cdcstate/CDCStateTable.java | 88 +++++++++++++ .../connector/postgresql/cdcstate/OpId.java | 35 +++++ 5 files changed, 386 insertions(+) create mode 100644 debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CDCStateTableTest.java create mode 100644 debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YugabyteDBParallelSlotIT.java create mode 100644 debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/CDCStateRow.java create mode 100644 debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/CDCStateTable.java create mode 100644 debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/OpId.java diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CDCStateTableTest.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CDCStateTableTest.java new file mode 100644 index 00000000000..042bd127925 --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/CDCStateTableTest.java @@ -0,0 +1,40 @@ +package io.debezium.connector.postgresql; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Test; + +import io.debezium.connector.postgresql.cdcstate.CDCStateRow; +import io.debezium.connector.postgresql.cdcstate.CDCStateTable; + +public class CDCStateTableTest extends PostgresConnectorIT { + public final static String CREATE_STMT = + "DROP SCHEMA IF EXISTS cdc_state_schema CASCADE; " + + "CREATE SCHEMA cdc_state_schema; " + + "CREATE TABLE cdc_state_schema.test_cdc_state_wrapper (id INT PRIMARY KEY) SPLIT INTO 2 TABLETS;"; + + @Test + public void testCDCStateTableWhenEmpty() { + CDCStateTable cdcStateTable = CDCStateTable.createCdcState(); + + // Since we have not created any replication slot, the cdc_state table + // will not even exist and hence this will be empty. + assertThat(cdcStateTable.isEmpty()).isTrue(); + } + + @Test + public void shouldHaveLastReplicationTimeWithoutConsumption() { + TestHelper.execute(CREATE_STMT); + TestHelper.createPublicationForAllTables(); + TestHelper.createDefaultReplicationSlot(); + + CDCStateTable cdcStateTable = CDCStateTable.createCdcState(); + for (CDCStateRow row : cdcStateTable.getCdcStateRows()) { + if (row.getTabletId().equals("dummy_id_for_replication_slot")) { + assertThat(row.getLastReplicationTime()).isNull(); + } else { + assertThat(row.getLastReplicationTime()).isNotNull(); + } + } + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YugabyteDBParallelSlotIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YugabyteDBParallelSlotIT.java new file mode 100644 index 00000000000..a3e5b3d5d3c --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/YugabyteDBParallelSlotIT.java @@ -0,0 +1,124 @@ +package io.debezium.connector.postgresql; + +import static org.junit.Assert.assertEquals; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.connect.data.Struct; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.debezium.config.Configuration; +import io.debezium.connector.postgresql.PostgresConnectorConfig.SnapshotMode; +import io.debezium.connector.postgresql.cdcstate.CDCStateRow; +import io.debezium.connector.postgresql.cdcstate.CDCStateTable; +import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.data.Envelope; + +public class YugabyteDBParallelSlotIT extends PostgresConnectorIT { + private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBParallelSlotIT.class); + + @Before + public void before() { + super.before(); + + TestHelper.execute("DROP SCHEMA IF EXISTS parallel_slot CASCADE;"); + TestHelper.execute("CREATE SCHEMA parallel_slot;"); + } + + @Test + public void shouldStreamRecordsFromMultiTabletTable() throws Exception { + PostgresStreamingChangeEventSource.TEST_maintainMapForFlushedLsn = true; + + // Create the table with 4 tablets. + TestHelper.execute("CREATE TABLE parallel_slot.test_multi_tablets (id INT PRIMARY KEY) SPLIT INTO 4 TABLETS;"); + + // Start connector with the table include list. + Configuration.Builder configBuilder = TestHelper.defaultConfig() + .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) + .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE) + .with(PostgresConnectorConfig.SLOT_NAME, "multi_slot") + .with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "parallel_slot.test_multi_tablets") + .with(PostgresConnectorConfig.STREAMING_MODE, PostgresConnectorConfig.StreamingMode.PARALLEL_SLOT.getValue()); + + start(YugabyteDBConnector.class, configBuilder.build()); + assertConnectorIsRunning(); + + // Wait for replication slot to initialize and start sending data. + TestHelper.waitFor(Duration.ofSeconds(15)); + + // Insert 100 records. + for (int i = 0; i < 100; i++) { + TestHelper.execute("INSERT INTO parallel_slot.test_multi_tablets (id) VALUES (" + i + ");"); + } + + // Log and wait for 60 seconds to allow records to be streamed. + Set primaryKeys = new HashSet<>(); + + int noMessageIterations = 0; + while (noMessageIterations < 10) { + int consumed = consumeAvailableRecords(record -> { + Struct after = ((Struct) record.value()).getStruct(Envelope.FieldName.AFTER); + if (after != null) { + primaryKeys.add(after.getInt32("id")); + } + }); + + if (consumed == 0) { + noMessageIterations++; + TestHelper.waitFor(Duration.ofSeconds(2)); + } else { + noMessageIterations = 0; + } + } + + LOGGER.info("Waiting for 60 seconds for 100 records to be consumed."); + TestHelper.waitFor(Duration.ofSeconds(60)); + + Map tabletToHashCodeMap = getTabletToHashCodeMap("parallel_slot", "test_multi_tablets"); + CDCStateTable cdcStateTable = CDCStateTable.createCdcState(); + + // Since there are 4 tablets, we expect 5 entries in the cdc_state table (assuming that we have a single slot only) + // (4 for each tablet and 1 for the replication slot i.e. dummy_id_for_replication_slot). + + for (CDCStateRow row : cdcStateTable.getCdcStateRows()) { + if (row.getTabletId().equals("dummy_id_for_replication_slot")) { + continue; + } + + Long expectedLsn = PostgresStreamingChangeEventSource.TEST_commitLsnMap.get(tabletToHashCodeMap.get(row.getTabletId())); + Long actualLsn = row.getData().get("confirmed_flush_lsn").asLong(); + assertEquals(expectedLsn, actualLsn); + } + + // Stop the connector. + stopConnector(); + } + + protected Map getTabletToHashCodeMap(String schemaName, String tableName) { + Map tabletToHashCodeMap = new HashMap<>(); + + try (PostgresConnection connection = TestHelper.create()) { + final String query = "SELECT tablet_id, COALESCE((('x'||encode(partition_key_start, 'hex'))::BIT(16)::INT), 0) partition_key_start_int " + + "FROM yb_local_tablets WHERE ysql_schema_name = '" + schemaName + "' AND table_name = '" + tableName + "';"; + connection.query(query, resultSet -> { + while (resultSet.next()) { + String tabletId = resultSet.getString("tablet_id"); + long partitionKeyStart = resultSet.getLong("partition_key_start_int"); + tabletToHashCodeMap.put(tabletId, partitionKeyStart); + } + }); + } catch (Exception e) { + LOGGER.error("Error fetching tablet to hash code mapping", e); + } + + return tabletToHashCodeMap; + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/CDCStateRow.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/CDCStateRow.java new file mode 100644 index 00000000000..ee4d02b651b --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/CDCStateRow.java @@ -0,0 +1,99 @@ +package io.debezium.connector.postgresql.cdcstate; + +import java.sql.Timestamp; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Map; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper;; + +/** + * CDCStateRow represents a row in the CDC state table. + * It contains information about the tablet ID, stream ID, operation ID, data, and last replication time. + */ +public class CDCStateRow { + private String tabletId; + private String streamId; + private OpId opId; + private JsonNode data; + private Timestamp lastReplicationTime; + + /** + * Constructor for CDCStateRow. + * + * @param tabletId The ID of the tablet. + * @param streamId The ID of the stream. + * @param opId The operation ID. + * @param data The JSON string representation of the CDC state. + * @param lastReplicationTime The last replication time as a string. + */ + public CDCStateRow(String tabletId, String streamId, OpId opId, Map data, Instant lastReplicationTime) { + this.tabletId = tabletId; + this.streamId = streamId; + this.opId = opId; + this.data = parseData(data); + this.lastReplicationTime = lastReplicationTime == null ? null : Timestamp.from(lastReplicationTime); + } + + public String getTabletId() { + return tabletId; + } + + public String getStreamId() { + return streamId; + } + + public OpId getOpId() { + return opId; + } + + public JsonNode getData() { + return data; + } + + public Timestamp getLastReplicationTime() { + return lastReplicationTime; + } + + @Override + public String toString() { + return "CDCStateRow{" + + "tabletId='" + tabletId + '\'' + + ", streamId='" + streamId + '\'' + + ", opId=" + opId + + ", data=" + data + + ", lastReplicationTime=" + lastReplicationTime + + '}'; + } + + private JsonNode parseData(Map data) { + ObjectMapper objectMapper = new ObjectMapper(); + try { + return objectMapper.valueToTree(data); + } catch (Exception e) { + throw new RuntimeException("Failed to parse JSON data", e); + } + } + + public static Timestamp parseLastReplicationTime(String tsString) { + // Cases when the cdc_state table entry hasn't been updated even once. + if (tsString == null || tsString.isEmpty()) { + return null; + } + + // This is the pattern we get the lastReplicationTime: + // yyyy-MM-dd date + // HH:mm:ss time + // .SSSSSS six-digit fraction + // XX zone offset without colon (“+0000”) + DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSXX"); + + // parse into an OffsetDateTime + OffsetDateTime odt = OffsetDateTime.parse(tsString, fmt); + + // convert to java.sql.Timestamp (in UTC) + return Timestamp.from(odt.toInstant()); + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/CDCStateTable.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/CDCStateTable.java new file mode 100644 index 00000000000..242bf7cbcb6 --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/CDCStateTable.java @@ -0,0 +1,88 @@ +package io.debezium.connector.postgresql.cdcstate; + +import java.net.InetSocketAddress; +import java.sql.Timestamp; +import java.util.List; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.ResultSet; + +/** + * CDCStateWrapper is a wrapper class for managing the CDC state in a YCQL database. + * It provides methods to create and retrieve the CDC state rows. + * + * @author Vaibhav Kushwaha (vkushwaha@yugabyte.com) + */ +public class CDCStateTable { + private static final Logger LOGGER = LoggerFactory.getLogger(CDCStateTable.class); + private static final String GET_CDC_STATE_STMT = "SELECT * FROM system.cdc_state"; + + private List cdcStateRows; + private Timestamp lastRefreshTime; + + public CDCStateTable(List cdcStateRows) { + this.cdcStateRows = cdcStateRows; + + // Initial creation will not indicate a refresh. + this.lastRefreshTime = null; + } + + public boolean isEmpty() { + return cdcStateRows == null || cdcStateRows.isEmpty(); + } + + public void refresh() { + LOGGER.info("Refreshing CDC state from service..."); + // Refresh the CDC state rows by fetching them again from the database. + this.cdcStateRows = fetchCdcStateRows(); + + // Update the last refresh time to the current time. + this.lastRefreshTime = new Timestamp(System.currentTimeMillis()); + } + + public List getCdcStateRows() { + return cdcStateRows; + } + + public static CDCStateTable createCdcState() { + return new CDCStateTable(fetchCdcStateRows()); + } + + protected static List fetchCdcStateRows() { + return fetchCdcStateRows("127.0.0.1", 9042, "cassandra", "cassandra", "yugabyte"); + } + + protected static List fetchCdcStateRows( + String hostName, int port, + String userName, String password, String keyspace) { + try { + CqlSession session = CqlSession.builder() + .addContactPoint(new InetSocketAddress(hostName, port)) + .withKeyspace(keyspace) + .withAuthCredentials(userName, password) + .withLocalDatacenter("datacenter1") + .build(); + + ResultSet resultSet = session.execute(GET_CDC_STATE_STMT); + List cdcStateRows = resultSet.all().stream() + .map(row -> new CDCStateRow( + row.getString("tablet_id"), + row.getString("stream_id"), + OpId.fromString(row.getString("checkpoint")), + row.getMap("data", String.class, String.class), + row.getInstant("last_replication_time") + )).collect(Collectors.toList()); + + return cdcStateRows; + } catch (Exception e) { + // Handle exceptions, such as connection errors or query execution errors + LOGGER.error("Error while fetching CDC state rows: " + e.getMessage()); + + throw new RuntimeException("Failed to fetch CDC state rows", e); + } + } +} diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/OpId.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/OpId.java new file mode 100644 index 00000000000..7ed6e04fcf5 --- /dev/null +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/cdcstate/OpId.java @@ -0,0 +1,35 @@ +package io.debezium.connector.postgresql.cdcstate; + +public class OpId { + private final long term; + private final long index; + + public OpId(long term, long index) { + this.term = term; + this.index = index; + } + + public long getTerm() { + return term; + } + + public long getIndex() { + return index; + } + + public static OpId fromString(String opId) { + if (opId == null || opId.isEmpty()) { + // OpId will be null for the entry where tablet ID value is "dummy_id_for_replication_slot" + return null; + } + + + String[] parts = opId.split("\\."); + + if (parts.length != 2) { + throw new IllegalArgumentException("Invalid OpId format: " + opId); + } + + return new OpId(Long.parseLong(parts[0]), Long.parseLong(parts[1])); + } +} From 9098dcba79d819a081ab7fe3e9a665305dcbdd82 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 15 May 2025 04:40:58 +0000 Subject: [PATCH 10/10] compilation changes --- debezium-connector-postgres/pom.xml | 7 ++ .../connector/postgresql/ObjectUtil.java | 33 +++++++++ .../postgresql/PostgresConnectorConfig.java | 14 ++++ .../postgresql/PostgresOffsetContext.java | 25 ++++++- .../postgresql/PostgresPartition.java | 25 +++++++ .../PostgresSnapshotChangeEventSource.java | 6 +- .../PostgresStreamingChangeEventSource.java | 22 +++++- .../postgresql/YugabyteDBConnector.java | 73 ++++++++++++++++++- .../postgresql/PostgresConnectorIT.java | 8 +- 9 files changed, 202 insertions(+), 11 deletions(-) create mode 100644 debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ObjectUtil.java diff --git a/debezium-connector-postgres/pom.xml b/debezium-connector-postgres/pom.xml index f81d9dd06b1..c46a47d1eea 100644 --- a/debezium-connector-postgres/pom.xml +++ b/debezium-connector-postgres/pom.xml @@ -155,6 +155,13 @@ rest-assured test + + com.yugabyte + java-driver-core + 4.15.0-yb-1 + test + + diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ObjectUtil.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ObjectUtil.java new file mode 100644 index 00000000000..758f0207b57 --- /dev/null +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ObjectUtil.java @@ -0,0 +1,33 @@ +package io.debezium.connector.postgresql; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Base64; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class ObjectUtil { + + public static String serializeObjectToString(Object object) throws IOException { + try (ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(arrayOutputStream); + ObjectOutputStream objectOutputStream = new ObjectOutputStream(gzipOutputStream);) { + objectOutputStream.writeObject(object); + objectOutputStream.flush(); + gzipOutputStream.close(); + return Base64.getEncoder().encodeToString(arrayOutputStream.toByteArray()); + } + } + + public static Object deserializeObjectFromString(String objectString) throws IOException, ClassNotFoundException { + try ( + ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(Base64.getDecoder().decode(objectString)); + GZIPInputStream gzipInputStream = new GZIPInputStream(arrayInputStream); + ObjectInputStream objectInputStream = new ObjectInputStream(gzipInputStream)) { + return objectInputStream.readObject(); + } + } +} \ No newline at end of file diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 2df86c4afa8..04b8dd6111a 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -798,6 +798,20 @@ public static SchemaRefreshMode parse(String value) { // } // return 0; // }); + + public static final Field SLOT_BOUNDS_INTERNAL = Field.create("slot.bounds.internal") + .withDisplayName("Ranges on which a slot is supposed to operate") + .withImportance(Importance.LOW) + .withDescription("Internal use only."); + // .withValidation((config, field, output) -> { + // if (!config.getString(field, "").isEmpty() + // && (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel") + // || !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel_slot"))) { + // output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel' or 'parallel_slot'"); + // return 1; + // } + // return 0; + // }); public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections") .withDisplayName("YB load balance connections") diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java index 74bd9009e3b..23f248f4959 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java @@ -9,6 +9,7 @@ import java.time.Instant; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.connect.data.Schema; @@ -134,6 +135,16 @@ public void preSnapshotCompletion() { lastSnapshotRecord = true; } + public void initSourceInfo(PostgresPartition partition, Lsn lsn, Instant commitTime, Long txId, Long xmin, TableId tableId) { + SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey()); + if (info == null) { + info = new SourceInfo(this.connectorConfig); + this.partitionSourceInfo.put(partition.getPartitionIdentificationKey(), info); + } + info.update(lsn, commitTime, txId, xmin, tableId, null); + info.updateLastCommit(lsn); + } + public void updateWalPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId, Operation messageType) { this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn; SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey()); @@ -249,18 +260,18 @@ public String toString() { + ", incrementalSnapshotContext=" + incrementalSnapshotContext + "]"; } - public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock) { - return initialContext(connectorConfig, jdbcConnection, clock, null, null); + public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock, Set partitions) { + return initialContext(connectorConfig, jdbcConnection, clock, null, null, partitions); } public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock, Lsn lastCommitLsn, - Lsn lastCompletelyProcessedLsn) { + Lsn lastCompletelyProcessedLsn, Set partitions) { try { LOGGER.info("Creating initial offset context"); final Lsn lsn = Lsn.valueOf(jdbcConnection.currentXLogLocation()); final Long txId = jdbcConnection.currentTransactionId(); LOGGER.info("Read xlogStart at '{}' from transaction '{}'", lsn, txId); - return new PostgresOffsetContext( + PostgresOffsetContext offsetContext = new PostgresOffsetContext( connectorConfig, lsn, lastCompletelyProcessedLsn, @@ -272,6 +283,12 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne false, new TransactionContext(), new SignalBasedIncrementalSnapshotContext<>(false)); + + for (PostgresPartition partition : partitions) { + offsetContext.initSourceInfo(partition, lastCommitLsn, clock.currentTimeAsInstant(), txId, null, null); + } + + return offsetContext; } catch (SQLException e) { throw new ConnectException("Database processing error", e); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java index 3017b38babb..4d6ab07e9f8 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java @@ -8,6 +8,8 @@ import static io.debezium.relational.RelationalDatabaseConnectorConfig.DATABASE_NAME; import java.util.Collections; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -80,5 +82,28 @@ public Set getPartitions() { connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()), connectorConfig.getTaskId(), connectorConfig.slotName(), String.valueOf(connectorConfig.getSlotBounds().get(0)))); } + + // Todo Vaibhav: This SLOT_BOUNDS_INTERNAL should be populated by the yb_get_tablets_to_poll method in YugabyteDBConnector + // before distributing the partitions to the tasks. + public Set getPartitionsFromConfig() throws Exception { + String hashRangesSerializedString = taskConfig.getString(PostgresConnectorConfig.SLOT_BOUNDS_INTERNAL.name()); + List hashRanges = (List) ObjectUtil.deserializeObjectFromString(hashRangesSerializedString); + + Set partitions = new HashSet<>(); + + for (String hashRange : hashRanges) { + String[] parts = hashRange.split("_"); + String startHashCode = parts[0]; + String endHashCode = parts[1]; // We are not interested in this value. + + PostgresPartition partition = new PostgresPartition( + connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()), + connectorConfig.getTaskId(), connectorConfig.slotName(), startHashCode); + + partitions.add(partition); + } + + return partitions; + } } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java index 7fd70a14451..56900f939d5 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java @@ -15,6 +15,7 @@ import java.util.stream.Collectors; import io.debezium.DebeziumException; +import io.debezium.config.Configuration; import io.debezium.pipeline.spi.ChangeRecordEmitter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -161,6 +162,7 @@ protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext ctx, PostgresOffsetContext previousOffset) throws Exception { PostgresOffsetContext offset = ctx.offset; + Set partitions = new PostgresPartition.Provider(connectorConfig, (Configuration) connectorConfig).getPartitionsFromConfig(); if (offset == null) { if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot()) { // The connect framework, not the connector, manages triggering committing offset state so the @@ -168,10 +170,10 @@ protected void determineSnapshotOffset(RelationalSnapshotContext { + public static boolean TEST_maintainMapForFlushedLsn = false; + public static Map TEST_commitLsnMap; private static final String KEEP_ALIVE_THREAD_NAME = "keep-alive"; @@ -111,12 +116,20 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval()); this.commitTimes = new ConcurrentLinkedQueue<>(); this.commitLsnMap = new ConcurrentHashMap<>(); + + if (TEST_maintainMapForFlushedLsn) { + TEST_commitLsnMap = new HashMap<>(); + } } @Override public void init(PostgresOffsetContext offsetContext) { - - this.effectiveOffset = offsetContext == null ? PostgresOffsetContext.initialContext(connectorConfig, connection, clock) : offsetContext; + try { + Set partitions = new PostgresPartition.Provider(connectorConfig, (Configuration) connectorConfig).getPartitionsFromConfig(); + this.effectiveOffset = offsetContext == null ? PostgresOffsetContext.initialContext(connectorConfig, connection, clock, partitions) : offsetContext; + } catch (Exception e) { + throw new DebeziumException("Error while initializing the offset context", e); + } // refresh the schema so we have a latest view of the DB tables initSchema(); } @@ -567,6 +580,11 @@ public void commitOffset(Map partition, Map offset) { // TODO Vaibhav: This log is only at info level during testing. LOGGER.info("{} | Flushing lsn {} for partition with start hash {}", taskContext.getTaskId(), lsnValue, startHash); this.commitLsnMap.put(startHash, lsnValue); + + if (TEST_maintainMapForFlushedLsn) { + TEST_commitLsnMap.put(startHash, lsnValue); + } + replicationStream.flushLsn(Lsn.valueOf(lsnAck)); } } diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java index eaf023c3f99..42796b5a2a7 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java @@ -12,6 +12,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef; @@ -21,10 +22,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.connector.common.RelationalBaseSourceConnector; import io.debezium.connector.postgresql.connection.PostgresConnection; +import io.debezium.connector.postgresql.transforms.yugabytedb.Pair; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; @@ -94,25 +101,87 @@ protected List> getTaskConfigsForParallelStreaming(List> getHashRanges() { + String[] tableNameParts = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()).split("\\."); + try (PostgresConnection connection = new PostgresConnection(connectorConfig.getJdbcConfig(), + PostgresConnection.CONNECTION_GENERAL, connectorConfig.ybShouldLoadBalanceConnections())) { + final String oidQuery = String.format("SELECT c.oid FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE c.relname = '$s' AND n.nspname = '%s' AND c.relkind = 'r';", tableNameParts[1], tableNameParts[0]); + int tableOid = connection.queryAndMap( + oidQuery, + connection.singleResultMapper(rs -> rs.getInt("oid"), "Could not fetch table OID")); + + // Now that we have the table OID, we can get the hash ranges. + final String hashRangeQuery = String.format("SELECT * from yb_get_tablets_to_poll('%s');", props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name())); + String hashRangeString = connection.queryAndMap( + hashRangeQuery, + connection.singleResultMapper(rs -> rs.getString("yb_get_tablets_to_poll"), "Could not fetch hash ranges")); + + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode json; + + try { + json = objectMapper.readTree(hashRangeString); + } catch (JsonProcessingException e) { + throw new DebeziumException("Error parsing JSON response from YSQL", e); + } + + List> hashRanges = new ArrayList<>(); + JsonNode arrayNode = json.get(String.valueOf(tableOid)); + if (arrayNode != null && arrayNode.isArray()) { + for (JsonNode node : arrayNode) { + long start = node.get(0).asLong(); + long end = node.get(1).asLong(); + hashRanges.add(new Pair<>(start, end)); + } + } else { + throw new DebeziumException("Invalid JSON response from YSQL"); + } + LOGGER.info("Returning the ranges: {}", hashRanges); + return hashRanges; + } + catch (SQLException e) { + throw new DebeziumException(e); + } + } + protected List> getTaskConfigsForParallelSlot(int maxTasks) { List> taskConfigs = new ArrayList<>(); final long upperBoundExclusive = 64 * 1024; final long rangeSize = upperBoundExclusive / maxTasks; + // TODO: Currently the code will only work when hash ranges are equal + // to the number of tasks. + List> hashRanges = getHashRanges(); + + if (hashRanges.size() != maxTasks) { + throw new DebeziumException("Number of hash ranges is not equal to the number of tasks"); + } + for (int i = 0; i < maxTasks; ++i) { Map taskProps = new HashMap<>(this.props); // The range defined here will be [lowerBound, upperBound) and for the last range, // we will ensure that we are covering the final boundary. - long lowerBound = i * rangeSize; - long upperBound = (i == maxTasks - 1) ? upperBoundExclusive : (lowerBound + rangeSize); + // long lowerBound = i * rangeSize; + // long upperBound = (i == maxTasks - 1) ? upperBoundExclusive : (lowerBound + rangeSize); + + long lowerBound = hashRanges.get(i).getFirst(); + long upperBound = hashRanges.get(i).getSecond(); LOGGER.info("Creating task {} with range [{}, {})", i, lowerBound, upperBound); taskProps.put(PostgresConnectorConfig.TASK_ID, String.valueOf(i)); taskProps.put(PostgresConnectorConfig.STREAM_PARAMS.name(), String.format("hash_range=%d,%d", lowerBound, upperBound)); + + try { + taskProps.put(PostgresConnectorConfig.SLOT_BOUNDS_INTERNAL.name(), + ObjectUtil.serializeObjectToString(List.of(hashRanges.get(i)))); + } catch (Exception e) { + throw new DebeziumException("Error serializing slot bounds", e); + } taskConfigs.add(taskProps); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index d196ff117f8..11286452840 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -3097,6 +3097,12 @@ public void testTableWithCompositePrimaryKey() throws Exception { assertTombstone(records.get(3)); } + @Test + public void shouldValidateWorkingWithParallelSlotMode() { + TestHelper.dropDefaultReplicationSlot(); + + } + @Test public void shouldNotWorkWithReplicaIdentityChangeAndPgOutput() throws Exception { final Configuration.Builder configBuilder = TestHelper.defaultConfig() @@ -4395,4 +4401,4 @@ protected void assertConnectorIsRunning() { protected void assertInsert(SourceRecord record, String pkField, int pk) { YBVerifyRecord.isValidInsert(record, pkField, pk); } -} +} \ No newline at end of file