Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions debezium-connector-postgres/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.yugabyte</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.15.0-yb-1</version>
<scope>test</scope>
</dependency>


<!-- Used for unit testing with Kafka -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.debezium.connector.postgresql;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Base64;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class ObjectUtil {

public static String serializeObjectToString(Object object) throws IOException {
try (ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(arrayOutputStream);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(gzipOutputStream);) {
objectOutputStream.writeObject(object);
objectOutputStream.flush();
gzipOutputStream.close();
return Base64.getEncoder().encodeToString(arrayOutputStream.toByteArray());
}
}

public static Object deserializeObjectFromString(String objectString) throws IOException, ClassNotFoundException {
try (
ByteArrayInputStream arrayInputStream = new ByteArrayInputStream(Base64.getDecoder().decode(objectString));
GZIPInputStream gzipInputStream = new GZIPInputStream(arrayInputStream);
ObjectInputStream objectInputStream = new ObjectInputStream(gzipInputStream)) {
return objectInputStream.readObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.debezium.DebeziumException;
Expand Down Expand Up @@ -387,12 +390,33 @@ public enum StreamingMode implements EnumeratedValue {
public boolean isParallel() {
return false;
}

@Override
public boolean isParallelSlot() {
return false;
}
},
PARALLEL("PARALLEL") {
@Override
public boolean isParallel() {
return true;
}

@Override
public boolean isParallelSlot() {
return false;
}
},
PARALLEL_SLOT("PARALLEL_SLOT") {
@Override
public boolean isParallel() {
return false;
}

@Override
public boolean isParallelSlot() {
return true;
}
};


Expand All @@ -412,6 +436,7 @@ public String getValue() {
}

public abstract boolean isParallel();
public abstract boolean isParallelSlot();
}

public enum LsnType implements EnumeratedValue {
Expand Down Expand Up @@ -763,14 +788,30 @@ public static SchemaRefreshMode parse(String value) {
public static final Field SLOT_RANGES = Field.create("slot.ranges")
.withDisplayName("Ranges on which a slot is supposed to operate")
.withImportance(Importance.LOW)
.withDescription("Semi-colon separated values for hash ranges to be polled by tasks.")
.withValidation((config, field, output) -> {
if (!config.getString(field, "").isEmpty() && !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")) {
output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel'");
return 1;
}
return 0;
});
.withDescription("Semi-colon separated values for hash ranges to be polled by tasks.");
// .withValidation((config, field, output) -> {
// if (!config.getString(field, "").isEmpty()
// && (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")
// || !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel_slot"))) {
// output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel' or 'parallel_slot'");
// return 1;
// }
// return 0;
// });

public static final Field SLOT_BOUNDS_INTERNAL = Field.create("slot.bounds.internal")
.withDisplayName("Ranges on which a slot is supposed to operate")
.withImportance(Importance.LOW)
.withDescription("Internal use only.");
// .withValidation((config, field, output) -> {
// if (!config.getString(field, "").isEmpty()
// && (!config.getString(STREAMING_MODE).equalsIgnoreCase("parallel")
// || !config.getString(STREAMING_MODE).equalsIgnoreCase("parallel_slot"))) {
// output.accept(field, "", "slot.ranges is only valid with streaming.mode 'parallel' or 'parallel_slot'");
// return 1;
// }
// return 0;
// });

public static final Field YB_LOAD_BALANCE_CONNECTIONS = Field.create("yb.load.balance.connections")
.withDisplayName("YB load balance connections")
Expand Down Expand Up @@ -1233,6 +1274,29 @@ public StreamingMode streamingMode() {
return StreamingMode.parse(getConfig().getString(STREAMING_MODE));
}

/**
* Reads the stream params specified and returns the slot boundaries.
* @return a {@link List} where the list represents the range of a
* tablet i.e. {@code [startHashCode, endHashCode)}
*/
public List<Integer> getSlotBounds() {
String regex = "hash_range=(\\d+),(\\d+)";

Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(streamParams());

if (matcher.find()) {
// Extract the numbers
String start = matcher.group(1);
String end = matcher.group(2);

return List.of(Integer.valueOf(start), Integer.valueOf(end));
}

// Return the full list assuming that we are in the old model.
return List.of(0, 65536);
}

protected boolean dropSlotOnStop() {
if (getConfig().hasKey(DROP_SLOT_ON_STOP.name())) {
return getConfig().getBoolean(DROP_SLOT_ON_STOP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -43,6 +45,8 @@ public class PostgresOffsetContext extends CommonOffsetContext<SourceInfo> {
private Lsn streamingStoppingLsn = null;
private final TransactionContext transactionContext;
private final IncrementalSnapshotContext<TableId> incrementalSnapshotContext;
private final Map<String, SourceInfo> partitionSourceInfo;
private final PostgresConnectorConfig connectorConfig;

private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn, Lsn lastCompletelyProcessedLsn, Lsn lastCommitLsn, Long txId, Operation messageType,
Instant time,
Expand All @@ -66,37 +70,48 @@ private PostgresOffsetContext(PostgresConnectorConfig connectorConfig, Lsn lsn,
}
this.transactionContext = transactionContext;
this.incrementalSnapshotContext = incrementalSnapshotContext;
this.connectorConfig = connectorConfig;
this.partitionSourceInfo = new ConcurrentHashMap<>();
}

@Override
public Map<String, ?> getOffset() {
// Map<String, Object> result = new HashMap<>();
// if (sourceInfo.timestamp() != null) {
// result.put(SourceInfo.TIMESTAMP_USEC_KEY, Conversions.toEpochMicros(sourceInfo.timestamp()));
// }
// if (sourceInfo.txId() != null) {
// result.put(SourceInfo.TXID_KEY, sourceInfo.txId());
// }
// if (sourceInfo.lsn() != null) {
// result.put(SourceInfo.LSN_KEY, sourceInfo.lsn().asLong());
// }
// if (sourceInfo.xmin() != null) {
// result.put(SourceInfo.XMIN_KEY, sourceInfo.xmin());
// }
// if (sourceInfo.isSnapshot()) {
// result.put(SourceInfo.SNAPSHOT_KEY, true);
// result.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, lastSnapshotRecord);
// }
// if (lastCompletelyProcessedLsn != null) {
// result.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, lastCompletelyProcessedLsn.asLong());
// }
// if (lastCommitLsn != null) {
// result.put(LAST_COMMIT_LSN_KEY, lastCommitLsn.asLong());
// }
// if (sourceInfo.messageType() != null) {
// result.put(SourceInfo.MSG_TYPE_KEY, sourceInfo.messageType().toString());
// }
// return sourceInfo.isSnapshot() ? result : incrementalSnapshotContext.store(transactionContext.store(result));

Map<String, Object> result = new HashMap<>();
if (sourceInfo.timestamp() != null) {
result.put(SourceInfo.TIMESTAMP_USEC_KEY, Conversions.toEpochMicros(sourceInfo.timestamp()));
}
if (sourceInfo.txId() != null) {
result.put(SourceInfo.TXID_KEY, sourceInfo.txId());
}
if (sourceInfo.lsn() != null) {
result.put(SourceInfo.LSN_KEY, sourceInfo.lsn().asLong());
}
if (sourceInfo.xmin() != null) {
result.put(SourceInfo.XMIN_KEY, sourceInfo.xmin());
}
if (sourceInfo.isSnapshot()) {
result.put(SourceInfo.SNAPSHOT_KEY, true);
result.put(SourceInfo.LAST_SNAPSHOT_RECORD_KEY, lastSnapshotRecord);
}
if (lastCompletelyProcessedLsn != null) {
result.put(LAST_COMPLETELY_PROCESSED_LSN_KEY, lastCompletelyProcessedLsn.asLong());
}
if (lastCommitLsn != null) {
result.put(LAST_COMMIT_LSN_KEY, lastCommitLsn.asLong());
}
if (sourceInfo.messageType() != null) {
result.put(SourceInfo.MSG_TYPE_KEY, sourceInfo.messageType().toString());

for (Map.Entry<String, SourceInfo> entry : this.partitionSourceInfo.entrySet()) {
// Key here is the identification key of the partition.
result.put(entry.getKey(), entry.getValue().lsn().asLong());
}
return sourceInfo.isSnapshot() ? result : incrementalSnapshotContext.store(transactionContext.store(result));

return result;
}

@Override
Expand All @@ -120,21 +135,43 @@ public void preSnapshotCompletion() {
lastSnapshotRecord = true;
}

public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId, Operation messageType) {
public void initSourceInfo(PostgresPartition partition, Lsn lsn, Instant commitTime, Long txId, Long xmin, TableId tableId) {
SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey());
if (info == null) {
info = new SourceInfo(this.connectorConfig);
this.partitionSourceInfo.put(partition.getPartitionIdentificationKey(), info);
}
info.update(lsn, commitTime, txId, xmin, tableId, null);
info.updateLastCommit(lsn);
}

public void updateWalPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, TableId tableId, Operation messageType) {
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
sourceInfo.update(lsn, commitTime, txId, xmin, tableId, messageType);
SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey());
if (info == null) {
info = new SourceInfo(this.connectorConfig);
this.partitionSourceInfo.put(partition.getPartitionIdentificationKey(), info);
}
info.update(lsn, commitTime, txId, xmin, tableId, messageType);
}

/**
* update wal position for lsn events that do not have an associated table or schema
*/
public void updateWalPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, Operation messageType) {
updateWalPosition(lsn, lastCompletelyProcessedLsn, commitTime, txId, xmin, null, messageType);
public void updateWalPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn, Instant commitTime, Long txId, Long xmin, Operation messageType) {
updateWalPosition(partition, lsn, lastCompletelyProcessedLsn, commitTime, txId, xmin, null, messageType);
}

public void updateCommitPosition(Lsn lsn, Lsn lastCompletelyProcessedLsn) {
public void updateCommitPosition(PostgresPartition partition, Lsn lsn, Lsn lastCompletelyProcessedLsn) {
this.lastCompletelyProcessedLsn = lastCompletelyProcessedLsn;
this.lastCommitLsn = lsn;

SourceInfo info = this.partitionSourceInfo.get(partition.getPartitionIdentificationKey());
if (info == null) {
info = new SourceInfo(this.connectorConfig);
this.partitionSourceInfo.put(partition.getPartitionIdentificationKey(), info);
}

sourceInfo.updateLastCommit(lsn);
}

Expand Down Expand Up @@ -223,18 +260,18 @@ public String toString() {
+ ", incrementalSnapshotContext=" + incrementalSnapshotContext + "]";
}

public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock) {
return initialContext(connectorConfig, jdbcConnection, clock, null, null);
public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock, Set<PostgresPartition> partitions) {
return initialContext(connectorConfig, jdbcConnection, clock, null, null, partitions);
}

public static PostgresOffsetContext initialContext(PostgresConnectorConfig connectorConfig, PostgresConnection jdbcConnection, Clock clock, Lsn lastCommitLsn,
Lsn lastCompletelyProcessedLsn) {
Lsn lastCompletelyProcessedLsn, Set<PostgresPartition> partitions) {
try {
LOGGER.info("Creating initial offset context");
final Lsn lsn = Lsn.valueOf(jdbcConnection.currentXLogLocation());
final Long txId = jdbcConnection.currentTransactionId();
LOGGER.info("Read xlogStart at '{}' from transaction '{}'", lsn, txId);
return new PostgresOffsetContext(
PostgresOffsetContext offsetContext = new PostgresOffsetContext(
connectorConfig,
lsn,
lastCompletelyProcessedLsn,
Expand All @@ -246,6 +283,12 @@ public static PostgresOffsetContext initialContext(PostgresConnectorConfig conne
false,
new TransactionContext(),
new SignalBasedIncrementalSnapshotContext<>(false));

for (PostgresPartition partition : partitions) {
offsetContext.initSourceInfo(partition, lastCommitLsn, clock.currentTimeAsInstant(), txId, null, null);
}

return offsetContext;
}
catch (SQLException e) {
throw new ConnectException("Database processing error", e);
Expand Down
Loading