diff --git a/debezium-connector-postgres/pom.xml b/debezium-connector-postgres/pom.xml
index f81d9dd06b1..c46a47d1eea 100644
--- a/debezium-connector-postgres/pom.xml
+++ b/debezium-connector-postgres/pom.xml
@@ -155,6 +155,13 @@
rest-assured
test
+
+ com.yugabyte
+ java-driver-core
+ 4.15.0-yb-1
+ test
+
+
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ObjectUtil.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ObjectUtil.java
new file mode 100644
index 00000000000..758f0207b57
--- /dev/null
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/ObjectUtil.java
@@ -0,0 +1,33 @@
+package io.debezium.connector.postgresql;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Base64;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+public class ObjectUtil {
+
+ public static String serializeObjectToString(Object object) throws IOException {
+ try (ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
+ GZIPOutputStream gzipOutputStream = new GZIPOutputStream(arrayOutputStream);
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(gzipOutputStream);) {
+ objectOutputStream.writeObject(object);
+ objectOutputStream.flush();
+ gzipOutputStream.close();
+ return Base64.getEncoder().encodeToString(arrayOutputStream.toByteArray());
+ }
+ }
+
+ public static Object deserializeObjectFromString(String objectString) throws IOException, ClassNotFoundException {
+ try (
+ ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(Base64.getDecoder().decode(objectString));
+ GZIPInputStream gzipInputStream = new GZIPInputStream(arrayInputStream);
+ ObjectInputStream objectInputStream = new ObjectInputStream(gzipInputStream)) {
+ return objectInputStream.readObject();
+ }
+ }
+}
\ No newline at end of file
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java
index 515b708c0a2..04b8dd6111a 100755
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java
@@ -8,9 +8,12 @@
import java.time.Duration;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import io.debezium.DebeziumException;
@@ -387,12 +390,33 @@ public enum StreamingMode implements EnumeratedValue {
public boolean isParallel() {
return false;
}
+
+ @Override
+ public boolean isParallelSlot() {
+ return false;
+ }
},
PARALLEL("PARALLEL") {
@Override
public boolean isParallel() {
return true;
}
+
+ @Override
+ public boolean isParallelSlot() {
+ return false;
+ }
+ },
+ PARALLEL_SLOT("PARALLEL_SLOT") {
+ @Override
+ public boolean isParallel() {
+ return false;
+ }
+
+ @Override
+ public boolean isParallelSlot() {
+ return true;
+ }
};
@@ -412,6 +436,7 @@ public String getValue() {
}
public abstract boolean isParallel();
+ public abstract boolean isParallelSlot();
}
public enum LsnType implements EnumeratedValue {
@@ -763,14 +788,30 @@ public static SchemaRefreshMode parse(String value) {
public static final Field SLOT_RANGES = Field.create("slot.ranges")
.withDisplayName("Ranges on which a slot is supposed to operate")
.withImportance(Importance.LOW)
- .withDescription("Semi-colon separated values for hash ranges to be polled by tasks.")
- .withValidation((config, field, output) -> {
- if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) {
- output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel'");
- return 1;
- }
- return 0;
- });
+ .withDescription("Semi-colon separated values for hash ranges to be polled by tasks.");
+ // .withValidation((config, field, output) -> {
+ // if (!config.getString(field, "").isEmpty()
+ // && (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")
+ // || !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel_slot"))) {
+ // output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel' or 'parallel_slot'");
+ // return 1;
+ // }
+ // return 0;
+ // });
+
+ public static final Field SLOT_BOUNDS_INTERNAL = Field.create("slot.bounds.internal")
+ .withDisplayName("Ranges on which a slot is supposed to operate")
+ .withImportance(Importance.LOW)
+ .withDescription("Internal use only.");
+ // .withValidation((config, field, output) -> {
+ // if (!config.getString(field, "").isEmpty()
+ // && (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")
+ // || !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel_slot"))) {
+ // output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel' or 'parallel_slot'");
+ // return 1;
+ // }
+ // return 0;
+ // });
public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections")
.withDisplayName("YB load balance connections")
@@ -1233,6 +1274,29 @@ public StreamingMode streamingMode() {
return StreamingMode.parse(getConfig().getString(STREAMING_MODE));
}
+ /**
+ * Reads the stream params specified and returns the slot boundaries.
+ * @return a {@link List} where the list represents the range of a
+ * tablet i.e. {@code [startHashCode, endHashCode)}
+ */
+ public List getSlotBounds() {
+ String regex = "hash_range=(\\d+),(\\d+)";
+
+ Pattern pattern = Pattern.compile(regex);
+ Matcher matcher = pattern.matcher(streamParams());
+
+ if (matcher.find()) {
+ // Extract the numbers
+ String start = matcher.group(1);
+ String end = matcher.group(2);
+
+ return List.of(Integer.valueOf(start), Integer.valueOf(end));
+ }
+
+ // Return the full list assuming that we are in the old model.
+ return List.of(0, 65536);
+ }
+
protected boolean dropSlotOnStop() {
if (getConfig().hasKey(DROP_SLOT_ON_STOP.name())) {
return getConfig().getBoolean(DROP_SLOT_ON_STOP);
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java
index 53348ba1915..23f248f4959 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresOffsetContext.java
@@ -9,6 +9,8 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
@@ -43,6 +45,8 @@ public class PostgresOffsetContext extends CommonOffsetContext {
private Lsn streamingStoppingLsn = null;
private final TransactionContext transactionContext;
private final IncrementalSnapshotContext incrementalSnapshotContext;
+ private final Map partitionSourceInfo;
+ private final PostgresConnectorConfig connectorConfig;
private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn, Lsn lastCompletelyProcessedLsn, Lsn lastCommitLsn, Long txId, Operation messageType,
Instant time,
@@ -66,37 +70,48 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn,
}
this.transactionContext = transactionContext;
this.incrementalSnapshotContext = incrementalSnapshotContext;
+ this.connectorConfig = connectorConfig;
+ this.partitionSourceInfo = new ConcurrentHashMap<>();
}
@Override
public Map getOffset() {
+ // Map result = new HashMap<>();
+ // if (sourceInfo.timestamp() != null) {
+ // result.put(SourceInfo.TIMESTAMP_USEC_KEY, Conversions.toEpochMicros(sourceInfo.timestamp()));
+ // }
+ // if (sourceInfo.txId() != null) {
+ // result.put(SourceInfo.TXID_KEY, sourceInfo.txId());
+ // }
+ // if (sourceInfo.lsn() != null) {
+ // result.put(SourceInfo.LSN_KEY, sourceInfo.lsn().asLong());
+ // }
+ // if (sourceInfo.xmin() != null) {
+ // result.put(SourceInfo.XMIN_KEY, sourceInfo.xmin());
+ // }
+ // if (sourceInfo.isSnapshot()) {
+ // result.put(SourceInfo.SNAPSHOT_KEY, true);
+ // result.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, lastSnapshotRecord);
+ // }
+ // if (lastCompletelyProcessedLsn != null) {
+ // result.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, lastCompletelyProcessedLsn.asLong());
+ // }
+ // if (lastCommitLsn != null) {
+ // result.put(LAST_COMMIT_LSN_KEY, lastCommitLsn.asLong());
+ // }
+ // if (sourceInfo.messageType() != null) {
+ // result.put(SourceInfo.MSG_TYPE_KEY, sourceInfo.messageType().toString());
+ // }
+ // return sourceInfo.isSnapshot() ? result : incrementalSnapshotContext.store(transactionContext.store(result));
+
Map result = new HashMap<>();
- if (sourceInfo.timestamp() != null) {
- result.put(SourceInfo.TIMESTAMP_USEC_KEY, Conversions.toEpochMicros(sourceInfo.timestamp()));
- }
- if (sourceInfo.txId() != null) {
- result.put(SourceInfo.TXID_KEY, sourceInfo.txId());
- }
- if (sourceInfo.lsn() != null) {
- result.put(SourceInfo.LSN_KEY, sourceInfo.lsn().asLong());
- }
- if (sourceInfo.xmin() != null) {
- result.put(SourceInfo.XMIN_KEY, sourceInfo.xmin());
- }
- if (sourceInfo.isSnapshot()) {
- result.put(SourceInfo.SNAPSHOT_KEY, true);
- result.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, lastSnapshotRecord);
- }
- if (lastCompletelyProcessedLsn != null) {
- result.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, lastCompletelyProcessedLsn.asLong());
- }
- if (lastCommitLsn != null) {
- result.put(LAST_COMMIT_LSN_KEY, lastCommitLsn.asLong());
- }
- if (sourceInfo.messageType() != null) {
- result.put(SourceInfo.MSG_TYPE_KEY, sourceInfo.messageType().toString());
+
+ for (Map.Entry entry : this.partitionSourceInfo.entrySet()) {
+ // Key here is the identification key of the partition.
+ result.put(entry.getKey(), entry.getValue().lsn().asLong());
}
- return sourceInfo.isSnapshot() ? result : incrementalSnapshotContext.store(transactionContext.store(result));
+
+ return result;
}
@Override
@@ -120,21 +135,43 @@ public void preSnapshotCompletion() {
lastSnapshotRecord = true;
}
- public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId, Operation messageType) {
+ public void initSourceInfo(PostgresPartition partition, Lsn lsn, Instant commitTime, Long txId, Long xmin, TableId tableId) {
+ SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey());
+ if (info == null) {
+ info = new SourceInfo(this.connectorConfig);
+ this.partitionSourceInfo.put(partition.getPartitionIdentificationKey(), info);
+ }
+ info.update(lsn, commitTime, txId, xmin, tableId, null);
+ info.updateLastCommit(lsn);
+ }
+
+ public void updateWalPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId, Operation messageType) {
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
- sourceInfo.update(lsn, commitTime, txId, xmin, tableId, messageType);
+ SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey());
+ if (info == null) {
+ info = new SourceInfo(this.connectorConfig);
+ this.partitionSourceInfo.put(partition.getPartitionIdentificationKey(), info);
+ }
+ info.update(lsn, commitTime, txId, xmin, tableId, messageType);
}
/**
* update wal position for lsn events that do not have an associated table or schema
*/
- public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, Operation messageType) {
- updateWalPosition(lsn, lastCompletelyProcessedLsn, commitTime, txId, xmin, null, messageType);
+ public void updateWalPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, Operation messageType) {
+ updateWalPosition(partition, lsn, lastCompletelyProcessedLsn, commitTime, txId, xmin, null, messageType);
}
- public void updateCommitPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn) {
+ public void updateCommitPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn) {
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
this.lastCommitLsn = lsn;
+
+ SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey());
+ if (info == null) {
+ info = new SourceInfo(this.connectorConfig);
+ this.partitionSourceInfo.put(partition.getPartitionIdentificationKey(), info);
+ }
+
sourceInfo.updateLastCommit(lsn);
}
@@ -223,18 +260,18 @@ public String toString() {
+ ", incrementalSnapshotContext=" + incrementalSnapshotContext + "]";
}
- public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock) {
- return initialContext(connectorConfig, jdbcConnection, clock, null, null);
+ public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock, Set partitions) {
+ return initialContext(connectorConfig, jdbcConnection, clock, null, null, partitions);
}
public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock, Lsn lastCommitLsn,
- Lsn lastCompletelyProcessedLsn) {
+ Lsn lastCompletelyProcessedLsn, Set partitions) {
try {
LOGGER.info("Creating initial offset context");
final Lsn lsn = Lsn.valueOf(jdbcConnection.currentXLogLocation());
final Long txId = jdbcConnection.currentTransactionId();
LOGGER.info("Read xlogStart at '{}' from transaction '{}'", lsn, txId);
- return new PostgresOffsetContext(
+ PostgresOffsetContext offsetContext = new PostgresOffsetContext(
connectorConfig,
lsn,
lastCompletelyProcessedLsn,
@@ -246,6 +283,12 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne
false,
new TransactionContext(),
new SignalBasedIncrementalSnapshotContext<>(false));
+
+ for (PostgresPartition partition : partitions) {
+ offsetContext.initSourceInfo(partition, lastCommitLsn, clock.currentTimeAsInstant(), txId, null, null);
+ }
+
+ return offsetContext;
}
catch (SQLException e) {
throw new ConnectException("Database processing error", e);
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java
index 0166ba72479..4d6ab07e9f8 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresPartition.java
@@ -8,6 +8,8 @@
import static io.debezium.relational.RelationalDatabaseConnectorConfig.DATABASE_NAME;
import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -23,12 +25,15 @@ public class PostgresPartition extends AbstractPartition implements Partition {
private final String serverName;
private final String taskId;
private final String slotName;
+ private final String startHashCode;
- public PostgresPartition(String serverName, String databaseName, String taskId, String slotName) {
+ public PostgresPartition(String serverName, String databaseName, String taskId, String slotName,
+ String startHashCode) {
super(databaseName);
this.serverName = serverName;
this.taskId = taskId;
this.slotName = slotName;
+ this.startHashCode = startHashCode;
}
@Override
@@ -59,7 +64,7 @@ public String toString() {
}
public String getPartitionIdentificationKey() {
- return String.format("%s_%s_%s", serverName, taskId, slotName);
+ return String.format("%s_%s_%s", serverName, slotName, startHashCode);
}
static class Provider implements Partition.Provider {
@@ -75,7 +80,30 @@ static class Provider implements Partition.Provider {
public Set getPartitions() {
return Collections.singleton(new PostgresPartition(
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()),
- connectorConfig.getTaskId(), connectorConfig.slotName()));
+ connectorConfig.getTaskId(), connectorConfig.slotName(), String.valueOf(connectorConfig.getSlotBounds().get(0))));
+ }
+
+ // Todo Vaibhav: This SLOT_BOUNDS_INTERNAL should be populated by the yb_get_tablets_to_poll method in YugabyteDBConnector
+ // before distributing the partitions to the tasks.
+ public Set getPartitionsFromConfig() throws Exception {
+ String hashRangesSerializedString = taskConfig.getString(PostgresConnectorConfig.SLOT_BOUNDS_INTERNAL.name());
+ List hashRanges = (List) ObjectUtil.deserializeObjectFromString(hashRangesSerializedString);
+
+ Set partitions = new HashSet<>();
+
+ for (String hashRange : hashRanges) {
+ String[] parts = hashRange.split("_");
+ String startHashCode = parts[0];
+ String endHashCode = parts[1]; // We are not interested in this value.
+
+ PostgresPartition partition = new PostgresPartition(
+ connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name()),
+ connectorConfig.getTaskId(), connectorConfig.slotName(), startHashCode);
+
+ partitions.add(partition);
+ }
+
+ return partitions;
}
}
}
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java
index ff875750395..56900f939d5 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java
@@ -15,6 +15,7 @@
import java.util.stream.Collectors;
import io.debezium.DebeziumException;
+import io.debezium.config.Configuration;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -161,6 +162,7 @@ protected void releaseSchemaSnapshotLocks(RelationalSnapshotContext ctx, PostgresOffsetContext previousOffset)
throws Exception {
PostgresOffsetContext offset = ctx.offset;
+ Set partitions = new PostgresPartition.Provider(connectorConfig, (Configuration) connectorConfig).getPartitionsFromConfig();
if (offset == null) {
if (previousOffset != null && !snapshotter.shouldStreamEventsStartingFromSnapshot()) {
// The connect framework, not the connector, manages triggering committing offset state so the
@@ -168,28 +170,29 @@ protected void determineSnapshotOffset(RelationalSnapshotContext {
+ public static boolean TEST_maintainMapForFlushedLsn = false;
+ public static Map TEST_commitLsnMap;
private static final String KEEP_ALIVE_THREAD_NAME = "keep-alive";
@@ -85,6 +92,7 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
private PostgresOffsetContext effectiveOffset;
protected ConcurrentLinkedQueue commitTimes;
+ protected ConcurrentHashMap commitLsnMap;
/**
* For DEBUGGING
@@ -107,12 +115,21 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi
this.replicationConnection = (PostgresReplicationConnection) replicationConnection;
this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval());
this.commitTimes = new ConcurrentLinkedQueue<>();
+ this.commitLsnMap = new ConcurrentHashMap<>();
+
+ if (TEST_maintainMapForFlushedLsn) {
+ TEST_commitLsnMap = new HashMap<>();
+ }
}
@Override
public void init(PostgresOffsetContext offsetContext) {
-
- this.effectiveOffset = offsetContext == null ? PostgresOffsetContext.initialContext(connectorConfig, connection, clock) : offsetContext;
+ try {
+ Set partitions = new PostgresPartition.Provider(connectorConfig, (Configuration) connectorConfig).getPartitionsFromConfig();
+ this.effectiveOffset = offsetContext == null ? PostgresOffsetContext.initialContext(connectorConfig, connection, clock, partitions) : offsetContext;
+ } catch (Exception e) {
+ throw new DebeziumException("Error while initializing the offset context", e);
+ }
// refresh the schema so we have a latest view of the DB tables
initSchema();
}
@@ -201,7 +218,17 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
connection.commit();
}
+ // TODO Vaibhav: Can we initialize all of our partition LSN values to this?
this.lastCompletelyProcessedLsn = replicationStream.get().startLsn();
+ // Lsn lastLsn = this.replicationStream.get().lastReceivedLsn();
+
+ LOGGER.info("Start LSN for partition with hash code {} is {}", getTabletStartHashCode(this.lastCompletelyProcessedLsn.asLong()), getLsnValue(this.lastCompletelyProcessedLsn.asLong()));
+ // LOGGER.info("Last LSN for partition with hash code {} is {}", getTabletStartHashCode(lastLsn.asLong()), getLsnValue(lastLsn.asLong()));
+
+ // if (connectorConfig.streamingMode().isParallelSlot()) {
+ // LOGGER.info("Searching WAL position for parallel slot");
+ // searchWalPosition(context, partition, this.effectiveOffset, stream, walPosition);
+ // }
// Against YB, filtering of records based on Wal position is only enabled when connector config provide.transaction.metadata is set to false.
if (!YugabyteDBServer.isEnabled() || (YugabyteDBServer.isEnabled() && !connectorConfig.shouldProvideTransactionMetadata())) {
@@ -313,10 +340,22 @@ private void processMessages(ChangeEventSourceContext context, PostgresPartition
}
}
+ public static long getTabletStartHashCode(long lsnLongValue) {
+ return ((lsnLongValue >> 48) & 0xFFFF);
+ }
+
+ public static long getLsnValue(long lsnLongValue) {
+ return (lsnLongValue & 0x0000FFFFFFFFFFFFL);
+ }
+
private void processReplicationMessages(PostgresPartition partition, PostgresOffsetContext offsetContext, ReplicationStream stream, ReplicationMessage message)
throws SQLException, InterruptedException {
final Lsn lsn = stream.lastReceivedLsn();
+ final PostgresPartition messagePartition =
+ new PostgresPartition(connectorConfig.getConnectorName(), connectorConfig.databaseName(),
+ connectorConfig.getTaskId(), connectorConfig.slotName(),
+ String.valueOf(getTabletStartHashCode((lsn.asLong()))));
if (message.isLastEventForLsn()) {
lastCompletelyProcessedLsn = lsn;
@@ -346,36 +385,36 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
// Don't skip on BEGIN message as it would flush LSN for the whole transaction
// too early
if (message.getOperation() == Operation.COMMIT) {
- commitMessage(partition, offsetContext, lsn, message);
+ commitMessage(messagePartition, offsetContext, lsn, message);
}
return;
}
- offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()),
+ offsetContext.updateWalPosition(messagePartition, lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection),
null,
message.getOperation());
if (message.getOperation() == Operation.BEGIN) {
- dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime());
+ dispatcher.dispatchTransactionStartedEvent(messagePartition, toString(message.getTransactionId()), offsetContext, message.getCommitTime());
}
else if (message.getOperation() == Operation.COMMIT) {
- commitMessage(partition, offsetContext, lsn, message);
- dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime());
+ commitMessage(messagePartition, offsetContext, lsn, message);
+ dispatcher.dispatchTransactionCommittedEvent(messagePartition, offsetContext, message.getCommitTime());
}
maybeWarnAboutGrowingWalBacklog(true);
}
else if (message.getOperation() == Operation.MESSAGE) {
- offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()),
+ offsetContext.updateWalPosition(messagePartition, lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()),
taskContext.getSlotXmin(connection),
message.getOperation());
// non-transactional message that will not be followed by a COMMIT message
if (message.isLastEventForLsn()) {
- commitMessage(partition, offsetContext, lsn, message);
+ commitMessage(messagePartition, offsetContext, lsn, message);
}
dispatcher.dispatchLogicalDecodingMessage(
- partition,
+ messagePartition,
offsetContext,
clock.currentTimeAsInstant().toEpochMilli(),
(LogicalDecodingMessage) message);
@@ -393,16 +432,11 @@ else if (message.getOperation() == Operation.MESSAGE) {
Objects.requireNonNull(tableId);
}
- offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()),
- taskContext.getSlotXmin(connection),
- tableId,
- message.getOperation());
-
boolean dispatched = message.getOperation() != Operation.NOOP && dispatcher.dispatchDataChangeEvent(
- partition,
+ messagePartition,
tableId,
new PostgresChangeRecordEmitter(
- partition,
+ messagePartition,
offsetContext,
clock,
connectorConfig,
@@ -410,6 +444,11 @@ else if (message.getOperation() == Operation.MESSAGE) {
connection,
tableId,
message));
+
+ offsetContext.updateWalPosition(messagePartition, lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()),
+ taskContext.getSlotXmin(connection),
+ tableId,
+ message.getOperation());
maybeWarnAboutGrowingWalBacklog(dispatched);
}
@@ -465,7 +504,7 @@ private void probeConnectionIfNeeded() throws SQLException {
private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException {
lastCompletelyProcessedLsn = lsn;
- offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn);
+ offsetContext.updateCommitPosition(partition, lsn, lastCompletelyProcessedLsn);
if (this.connectorConfig.slotLsnType().isHybridTime()) {
if (message.getOperation() == Operation.COMMIT) {
@@ -518,37 +557,71 @@ private void maybeWarnAboutGrowingWalBacklog(boolean dispatched) {
public void commitOffset(Map partition, Map offset) {
try {
ReplicationStream replicationStream = this.replicationStream.get();
- final Lsn commitLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY));
- final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY));
- final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn;
-
- LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn);
- if (replicationStream != null && lsn != null) {
- if (!lsnFlushingAllowed) {
- LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn);
+ if (this.connectorConfig.streamingMode().isParallelSlot()) {
+ // In parallel mode, we don't have a single offset to commit, but rather
+ // a set of offsets for each partition. We need to commit the offset for
+ // the partition that is being processed.
+ if (replicationStream == null) {
+ LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
+ } else if (!lsnFlushingAllowed) {
+ LOGGER.info("Received offset commit request but ignoring it. LSN flushing is not allowed yet");
return;
- }
-
- Lsn finalLsn;
- if (this.connectorConfig.slotLsnType().isHybridTime()) {
- finalLsn = getLsnToBeFlushed(lsn);
} else {
- finalLsn = lsn;
- }
+ for (Map.Entry entry : offset.entrySet()) {
+ final long lsnAck = (Long) entry.getValue();
+ final long startHash = getTabletStartHashCode(lsnAck);
+ final long lsnValue = getLsnValue(lsnAck);
+ if (this.commitLsnMap.contains(startHash) && this.commitLsnMap.get(startHash) > lsnValue) {
+ // Current offset is less than the last flushed offset for this partition.
+ LOGGER.info("{} | Skipping commit LSN {} for partition with start hash {}", taskContext.getTaskId(), lsnValue, startHash);
+ continue;
+ }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Flushing LSN to server: {}", finalLsn);
- }
- // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
- replicationStream.flushLsn(finalLsn);
+ // TODO Vaibhav: This log is only at info level during testing.
+ LOGGER.info("{} | Flushing lsn {} for partition with start hash {}", taskContext.getTaskId(), lsnValue, startHash);
+ this.commitLsnMap.put(startHash, lsnValue);
+
+ if (TEST_maintainMapForFlushedLsn) {
+ TEST_commitLsnMap.put(startHash, lsnValue);
+ }
- if (this.connectorConfig.slotLsnType().isHybridTime()) {
- lastSentFeedback = finalLsn;
- cleanCommitTimeQueue(finalLsn);
+ replicationStream.flushLsn(Lsn.valueOf(lsnAck));
+ }
}
}
else {
- LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
+ final Lsn commitLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY));
+ final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY));
+ final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn;
+
+ LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn);
+ if (replicationStream != null && lsn != null) {
+ if (!lsnFlushingAllowed) {
+ LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn);
+ return;
+ }
+
+ Lsn finalLsn;
+ if (this.connectorConfig.slotLsnType().isHybridTime()) {
+ finalLsn = getLsnToBeFlushed(lsn);
+ } else {
+ finalLsn = lsn;
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Flushing LSN to server: {}", finalLsn);
+ }
+ // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
+ replicationStream.flushLsn(finalLsn);
+
+ if (this.connectorConfig.slotLsnType().isHybridTime()) {
+ lastSentFeedback = finalLsn;
+ cleanCommitTimeQueue(finalLsn);
+ }
+ }
+ else {
+ LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
+ }
}
}
catch (SQLException e) {
diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
index 835ec354189..42796b5a2a7 100644
--- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
+++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/YugabyteDBConnector.java
@@ -12,6 +12,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.kafka.common.config.ConfigDef;
@@ -21,10 +22,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.common.RelationalBaseSourceConnector;
import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.connector.postgresql.transforms.yugabytedb.Pair;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
@@ -94,6 +101,94 @@ protected List