Skip to content

Running PR to keep track of custom changes to support YB#127

Open
fourpointfour wants to merge 78 commits into2.5.2.Finalfrom
ybdb-debezium-2.5.2
Open

Running PR to keep track of custom changes to support YB#127
fourpointfour wants to merge 78 commits into2.5.2.Finalfrom
ybdb-debezium-2.5.2

Conversation

@fourpointfour
Copy link

@fourpointfour fourpointfour commented May 30, 2024

Note

High Risk
Touches critical CDC connector paths (schema generation, snapshot/streaming coordination, offset/LSN flushing, and retry semantics) and changes defaults/drivers toward YugabyteDB, so regressions could impact correctness or recoverability of change capture.

Overview
This PR adds release/packaging automation for the YugabyteDB connector: new GitHub Actions workflows to build and publish artifacts (GitHub releases, Confluent zip packaging, and Quay image pushes) plus a new Dockerfile that builds a slimmer Debezium Connect image with Java 17 and bundled Yugabyte/Confluent dependencies.

It shifts the connector toward YugabyteDB defaults and distribution: many Maven modules now inherit ${revision}, the PG connector artifact is renamed to yugabytedb-source-connector, adds Yugabyte JDBC dependencies and S3 distributionManagement + an s3-deploy profile, and updates test connection defaults for local YB (port/user/db) while swapping org.postgresql.* driver types for com.yugabyte.*.

Core runtime behavior is updated for YugabyteDB: introduces LogicalDecoder.YBOUTPUT and a YB-specific PGTableSchemaBuilder/value wrapping model, adds parallel snapshot/streaming configuration fields, changes partition identity to include taskId/slotName, adds snapshot-to-streaming coordination that waits for snapshot completion offsets, and extends streaming/offset logic for YB (hybrid-time vs sequence LSN handling, origin metadata propagation, replica-identity handling, and revised retry/heartbeat behavior).

Written by Cursor Bugbot for commit 5dd9e17. This will update automatically on new commits. Configure here.

fourpointfour and others added 20 commits March 18, 2024 15:26
Initial changes required for the Debezium Connector for Postgres to work with YugabyteDB source.
…st YugabyteDB (#105)

This PR includes the changes required for the tests so that they can
work against YugabyteDB.

YugabyteDB issue: yugabyte/yugabyte-db#21394
Modified Dockerfile to package custom log4j.properties so that the log
files can be rolled over when their size exceeds 100MB.

Also changed the Kafka connect JDBC jar being used - this new jar has a
custom change to log every sink record going to the target database.
Changes in this PR:
1. Modification of Dockerfile to include transformers for aiven at the
time of docker image compilation
a. Aiven source:
https://github.com/Aiven-Open/transforms-for-apache-kafka-connect
…BC driver (#107)

## Problem

The Debezium connector for Postgres uses a single host model where the
JDBC driver connects to a PG instance and continues execution. However,
when we move to YugabyteDB where we have a multi node deployment, the
current model can fail in case the node it has connected to goes down.

## Solution

To address that, we have made changes in this PR and replaced the
Postgres JDBC driver with [YugabyteDB smart
driver](https://github.com/yugabyte/pgjdbc) which allows us to specify
multiple hosts in the JDBC url so that the connector does not fail or
run into any fatal error while maintaining the High Availability aspect
of YugabyteDB.

Changes in this PR include:
1. Changing of version in `pom.xml` from `2.5.2.Final` to
`2.5.2.ybpg.20241-SNAPSHOT`
a. This is done to ensure that upon image compilation, the changed code
from Debezium Code is picked up.
2. Replacing of all packages from `org.postgresql.*` to `com.yugabyte.*`
to comply with the new JDBC driver.
3. Masking the validator method in debezium-core which disallowed
characters like `: (colon)` in the configuration property
`database.hostname`
**Summary**
This PR is to support consistent snapshot in the case of an existing
slot.

In this case, the consistent_point hybrid time is determined from the
pg_replication_slots view, specifically from the yb_restart_commit_ht
column.

There is an assumption here that this slot has not been used for
streaming till this point. If this holds, then the history retention
barrier will be in place as of the consistent snapshot time
(consistent_point). The snapshot query will be run as of the
consistent_point and subsequent streaming will start from the
consistent_point of the slot.

**Test Plan**
Added new test
mvn -Dtest=PostgresConnectorIT#initialSnapshotWithExistingSlot test
**Changes:**
1. Providing JMX Exporter jar to KAFKA_OPTS to be further provided to
java options.
2. Modifying `metrics.yaml` to include correct regex to be scraped as
per Postgres connector.
…peruser (#115)

**Summary**
This PR adds the support for a non superuser to be configured as the
connector user (database.user).

Such a user is required to have the privileges listed in
https://debezium.io/documentation/reference/2.5/connectors/postgresql.html#postgresql-permissions

Specifically, the changes in this revision relate to how the
consistent_point is specified to the YugabyteDB server in order
to execute a consistent snapshot.

**Test Plan**
Added new test
mvn -Dtest=PostgresConnectorIT#nonSuperUserSnapshotAndStreaming test
…nectorTask (#114)

This PR is to add a higher level retry whenever there's a where while
starting a PostgresConnectorTask, the failures can include but not
limited to the following:
1. Failure of creating JDBC connection
2. Failure to execute query
3. Tserver/master restart
4. Node restart
5. Connection failure
…streaming (#116)

## Problem

PG connector does not wait for acknowledgement of snapshot completion
offset before transitioning to streaming. This can lead to an issue if
there is a connector restart in the streaming phase and it goes for a
snapshot on restart. In streaming phase, as soon as the 1st GetChanges
call is made on the server, the retention barriers are lifted and so the
server can no longer serve the snapshot records on a restart. Therefore
it is important that the connector waits for acknowledgement of snapshot
completion offset before it actually transitions to streaming.

## Solution

This PR introduces a waiting mechanism for acknowledgement of snapshot
completion offset before transitioning to streaming.

We have introduced a custom heartbeat implementation that will dispatch
heartbeat when forced heartbeat method is called but we'll dispatch
nothing when a normal heartbeat method is called.

With this PR, connector will dispatch heartbeats while waiting for the
snapshot completion offset i.e during the transition phase. For these
heartbeat calls, there is no need to set the `heartbeat.interval.ms`
since we are making forced heartbeat calls which do not rely on this
config. Note, this heartbeat call is only required to support
applications using debezium engine/embedded engine. It is not required
when the connector is run with kakfa-connect.

### Test Plan
Manually deployed connector in a docker container and tested two
scenarios: 0 snapshot records & non-zero snapshot records. Unit tests
corresponding to these scenarios will be added in a separate PR.
#119)

**Summary**
This PR adds support for the INITIAL_ONLY snapshot mode for Yugabyte.

In the case of Yugabyte also, the snapshot is consumed by executing a
snapshot query (SELECT statement) . To ensure that the streaming phase
continues exactly from where the snapshot left, this snapshot query is
executed as of a specific database state. In YB, this database state is
represented by a value of HybridTime. Changes due to transactions with
commit_time strictly greater than this snapshot HybridTime will be
consumed during the streaming phase.

This value for HybridTime is the value of the "yb_restart_commit_ht"
column of the pg_replication_slots view of the associated slot. Thus, in
the case of Yugabyte, even for the INITIAL_ONLY snapshot mode, a slot
needs to be created if one does not exist.

With this approach, a connector can be deployed in INITIAL_ONLY mode to
consume the initial snapshot. This can be followed by the deployment of
another connector in NEVER mode. This connector will continue the
streaming from exactly where the snapshot left.

**Test Plan**
1. Added new test -` mvn
-Dtest=PostgresConnectorIT#snapshotInitialOnlyFollowedByNever test `
2. Enabled existing test - `mvn
-Dtest=PostgresConnectorIT#shouldNotProduceEventsWithInitialOnlySnapshot
test`
3. Enabled existing test - `mvn
-Dtest=PostgresConnectorIT#shouldPerformSnapshotOnceForInitialOnlySnapshotMode
test `
… image (#118)

This PR adds the dependencies for the `AvroConverter` to function in the
Kafka Connect environment. The dependencies will only be added at the
time of building the docker image.
This PR adds a log which will be print the IP of the node every time a
connection is created.
Retry in case of failures while task is restarting. Right now any kind
of failure will lead to task throwing RetriableException exception
causing Task restart.
…te source (#120)

**Summary**
This PR enables 30/33 tests in IncrementalSnapshotIT for Yugabyte source

The tests that are excluded are 

1. updates
2. updatesLargeChunk
3. updatesWithRestart

**Test Plan**
`mvn -Dtest=IncrementalSnapshotIT test`
This PR comments out the part in the init_database i.e. the startup
script during tests where some extensions are being installed - it is
taking more than 2 minutes at this stage and since we do not need it in
the tests we use, it can be skipped.
Throw retry for all exceptions. In future, we will need to throw runtime
exception for wrong configurations.
@github-actions
Copy link

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

…r image (#129)

This PR only changes the link in the `Dockerfile` to fetch the latest
custom sink connector jar from GitHub.

According to PR yugabyte/kafka-connect-jdbc#3,
changes include the following:
1. Addition of 3 new configuration properties 
    * `log.table.balance`:
i. Default is `false` but when set to `true`, the sink connector will
execute a query to get the table balance from the target table.
ii. Note that this is only applicable for consistency related tests
where the given query is applicable - it will fail if set in any other
tests.
    * `expected.total.balance`
i. Default is `1000000` (1M) which can be changed to whatever value we
are expecting the total balance to be in the target table.
    * `tables.for.balance`
i. This takes a comma separated string value for all the table names
from which the sink connector is supposed to extract balances from.
ii. This property will only be valid when `log.table.balance` is
specified as `true`
iii. There is no default for this property so if `log.table.balance` is
set to `true` and `tables.for.balance` is not specified then we will
throw a `RuntimeException`
2. Log additions to aid debugging.
@github-actions
Copy link

github-actions bot commented Jun 3, 2024

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

…is set (#131)

## Problem
PG connector filters out record based on its starting point (WAL
position), which in turn depends on the offset received from Kafka. So,
in case, the starting point corresponds to a record in the middle of a
transaction, PG connector will filter out the records of that
transaction with LSN < starting point.

This creates a problem in the downstream pipeline expects consistency of
data. Filtering of records leads to PG connector shipping transaction
with missing records. When such a transaction is applied on the sink,
consistency breaks.

## Solution
When 'provide.transaction.metadata' connector configuration is set, PG
connector ships transaction boundary records BEGIN/COMMIT. Based on
these boundary records, sink connector writes data maintaining
consistency. Therefore, when this connector config is set, we will
disable filtering records based on WAL Resume position.

## Testing
Manually testing - Ran the connector with this fix in our QA runs where
the issue was first discovered. All 10 runs triggered passed.
Unit testing - Cannot reproduce the above mentioned scenario in a unit
test.
@github-actions
Copy link

github-actions bot commented Jun 7, 2024

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

@github-actions
Copy link

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

@github-actions
Copy link

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

This PR adds a configuration to let the user disable consistent snapshot
i.e. `yb.consistent.snapshot` to the connector which is enabled by
default. Setting consistent snapshot means setting/establishing the
boundary between snapshot records (records that existed at the time of
stream creation) and streaming records.

If `yb.consistent.snapshot` is disabled i.e. set to `false`:
- We will not be setting a boundary for snapshot
- If the connector restarts after taking a snapshot but before
acknowledging the streaming LSN, the snapshot will be taken again. This
can result in some records being received both during the snapshot phase
and the streaming phase.
@github-actions
Copy link

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

This PR adds the following two tests wrt to partitioned tables:
1. Addition of a partition table to a publication and ensuring that we
are receiving records for the newly added table.
2. Removal of a partition table from a publication and ensuring that we
are not receiving records for removed table.
@github-actions
Copy link

github-actions bot commented Apr 4, 2025

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

… connection (#179)

## Problem

A block of code currently sets the GUC
`yb_disable_catalog_version_check` with every connection it creates. The
problem here is that if the user is a non-superuser, this block will
lead to an error log which doesn't cause any failure but can lead to bad
user experience.

## Solution

Removing the code block from the creation flow of a connection resolves
the issue and now:
1. The GUC will not be set when consistent snapshot is disabled by the
config `yb.consistent.snapshot=false` or when snapshot is skipped i.e.
`snapshot.mode=never`.
2. When using consistent snapshot with a non-superuser, the GUC will
only be set when a procedure `disable_catalog_version_check()` is
accessible to the connector user.

### Note

The procedure can be created by the superuser and be granted to the
connector user by executing the following statements:

```
CREATE OR REPLACE PROCEDURE connector_user.disable_catalog_version_check()
LANGUAGE plpgsql
AS $$
BEGIN
  EXECUTE 'SET yb_disable_catalog_version_check = true';
END;
$$
SECURITY DEFINER;


REVOKE EXECUTE ON PROCEDURE connector_user.disable_catalog_version_check FROM PUBLIC; 
GRANT EXECUTE ON PROCEDURE connector_user.disable_catalog_version_check TO connector_user;
```

### Testing

The changes were tested using the existing tests for snapshot which fail
if this catalog version check is not disabled while setting
`yb_read_time`.
@github-actions
Copy link

github-actions bot commented Jun 6, 2025

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

This PR fixes a reported vulnerability CVE-2024-7254 in the repository
by upgrading the dependency version to `3.25.5`.
@github-actions
Copy link

github-actions bot commented Jul 3, 2025

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

This PR changes the artifact ID for the logical replication connector
for YugabyteDB.
@github-actions
Copy link

Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

…erty (#185)

### Problem

With Yugbayte-DB's ysql upgrade to PG 15, the catalog tables are deleted
and recreated during the upgrade process. As a result the walsender
fails when it tries to read the catalog tables by setting `yb_read_time`
pointing to PG 11 era.

### Solution

With this PR we are introducing a new config property called
`ysql.major.upgrade`. The default value of this property is false. Any
user experiencing an error due to failure in walsender while reading
catalog table can set this property to true. When set, the connector
task will set the the `skip_yb_read_time_in_walsender` GUC in the
walsender session, whereby the catalog reads will be performed as of the
current time. Once the restart time crosses the update time, this config
property should be reset to false.

### Test Plan
Tested the changes manually by orchestrating the upgrade process
manually on yugabyted cluster and using custom docker image for the
connector.
@github-actions
Copy link

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

…nstead of boolean (#184)

### Problem

Currently, the yb.load.balance.connections config is a boolean and so
only accepts either true or false. However, load balancing property of
smart drivers accepts additional values such as `any`, `only-primary`,
`only-rr`, `prefer-primary` and `prefer-rr`. Currently, the value `any`
is alias to yb.load.balance.connections' value `true`.

### Solution

To support these additional values of load balancing property, this PR
converts yb.load.balance.connections to string with default value being
**`only-primary`**. PR also changes the JDBC smart driver version to
`42.7.3-yb-4` since load-balance property (including advanced options
like `any`, `only-primary`, `only-rr`, etc.) is supported in version
`42.7.3-yb-1` and later.

### Test Plan

To validate the change, a YugabyteDB cluster was deployed and connected
to a custom docker image of the connector. The connection was
established to the correct node as per the value of the
yb.load.balance.connections property provided in connector creation
request (or defaulted to `only-primary` when no value of it was
provided).
@github-actions
Copy link

github-actions bot commented Oct 7, 2025

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

Upgraded jackson version to 2.19.0 to fix CVE-2025-52999

---------

Co-authored-by: Cloud User <ec2-user@ip-172-165-30-206.ap-south-1.compute.internal>
@github-actions
Copy link

github-actions bot commented Nov 7, 2025

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

… jar to YB s3 maven (#187)

This PR adds dependencies required to publish the yugabytedb connector
to Yugabyte's s3 hosted maven repository.

#### List of changes made in this PR
1. Bumped up the version in all the pom files to the appropriate latest
value.
2. Added plugins necessary to publish the jar in correct format to
Yugabyte's s3 hosted maven repository.
3. Suppressed / removed the plugins inherited from the upstream code
which are used for pushing the jar to maven central repository. This was
needed because otherwise `mvn deploy` tries to publish to maven central
and fails, thus skipping publishing to Yugabyte's s3 hosted maven
 
#### Steps to publish a connector jar to s3 maven repository
1. Change the revision property in the root pom file to appropriate
value.
  2. Compile the jar.
4. Update the s3 credentials either in the env variables or in
`~/.s2/credentials` file. Set AWS_REGION=us-east-1, this is needed
because the s3 bucket is in us-east-1 region.
5. From inside project root directory run `mvn clean deploy -Ps3-deploy
-Dquick -pl debezium-connector-postgres -B`. This will publish a zip
file, a jar and a pom file to Yugabyte's s3 hosted maven repository. The
list and format of artefacts is kept the same as seen with
`debezium-connector-postgres` on maven central repository.
@github-actions
Copy link

github-actions bot commented Dec 3, 2025

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

…7 in YB connector (#190)

## Problem 
The current YB connector is based on Java 11. This PR upgrades the Java
version and necessary dependencies to Java 17-compatible versions.
The upstream Debezium project upgraded to Java 17 in version 3.0. While
we are currently on version 2.5.2, upgrading to Java 17 in this version
is safe and beneficial because it simplifies cherry-picking from
upstream patches from upstream (which uses Java 17) will not require
Java version modifications.


## Solution
To upgrade the Java version we will do the following:

- Update `pom.xml` and change the min. jdk version and maven release
version
- Update `debezium-schema-generator/pom.xml` and change
maven-plugin-plugin from 3.6.0 to 3.9.0. Required because 3.6.0 uses ASM
7.x which cannot read Java 17 bytecode.
- Update `Dockerfile` - Install Java 17 runtime

## Test Plan
Manual testing + Few existing
@github-actions
Copy link

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

## Problem
The current YugabyteDB connector is based on Debezium version 2.5.2,
which does not include support for pgVector.

## Solution
We will cherry pick the upstream commit
debezium@7cf7af5
which adds support for pgvector in Postgres Connector in version 3.0:

Test Compatibility
- Updated VectorDatabaseIT.java to use YugabyteDB-specific classes:
- PostgresConnector.class → YugabyteDBConnector.class
- SnapshotMode.NO_DATA → SnapshotMode.NEVER

Additional changes:
For Java 17:
- Upgrade impsort version to 1.12.0 in `pom.xml`
- Upgrade format.imports.source.compliance to 17 in
`debezium-parent/pom.xml`

## Test Plan
```
mvn -pl debezium-connector-postgres -Dtest=VectorDatabaseTest,VectorDatabaseIT test

```

---------

Co-authored-by: Jiri Pechanec <jpechane@redhat.com>
@github-actions
Copy link

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

…OT TO EXPORT_SNAPSHOT (#188)

## Problem
With 2025.2.2 and later branches, we plan to enable EXPORT_SNAPSHOT by
default instead of USE_SNAPSHOT. However, we need to maintain backward
compatibility with old branches where we didn't have EXPORT_SNAPSHOT on
by default and needed a g-flag to be set to true for it to even work.
Otherwise, it ends up throwing the below error.
```
ERROR:  cannot export or import snapshot when ysql_enable_pg_export_snapshot is disabled.
```

Recommended to read
[https://github.com/yugabyte/debezium/pull/113](https://github.com/yugabyte/debezium/pull/113)
for getting a better context of the change.


## Solution

1. Add a variable (`exportSnapshotUsed`) to `SlotCreationResult` which
will tell the connector if the slot was created with EXPORT_SNAPSHOT or
USE_SNAPSHOT.
2. First, to maintain backward compatibility with versions where the
flag `ysql_enable_pg_export_snapshot` was not enabled, we will add a
fallback mechanism.
i. First, the connector will try to create a replication slot by
explicitly mentioning `EXPORT_SNAPSHOT` in the command (or not if
parallel streaming is off). If this fails with an error `cannot export
or import snapshot when ysql_enable_pg_export_snapshot is disabled.`,
then we fallback to the old code of the connector which will create a
replication slot by explicitly mentioning `USE_SNAPSHOT` (or not if
parallel streaming is off).
ii. **Note**: Even if we don't pass anything in the create replication
slot command, it uses the default, which in 2025.2.2 and above versions
is `EXPORT_SNAPSHOT` and in 2025.2.1 and below versions is
`USE_SNAPSHOT`.
3. When a new replication slot is created, we store all the data in
`slotCreatedInfo` including whether EXPORT_SNAPSHOT was used.
i. For versions where `EXPORT_SNAPSHOT` is enabled,
`slotCreatedInfo.isExportSnapshotUsed()` will be `true` and
snapshot_name will be the snapshot ID.
```
yugabyte=# CREATE_REPLICATION_SLOT test_replication_slot_99d2fc58624f4aa29f1317d434e LOGICAL pgoutput;
                     slot_name                     | consistent_point |                           snapshot_name                           | output_plugin 
---------------------------------------------------+------------------+-------------------------------------------------------------------+---------------
 test_replication_slot_99d2fc58624f4aa29f1317d434e | 0/2              | e928c3c6c4d54deca3c449dae1e00233-ae0666b99cc6d3be93494b0d1022eb8f | pgoutput
```
and then use this snapshot ID to set it in the snapshot session:
```
String setSnapshotQuery = "SET TRANSACTION SNAPSHOT '" + slotCreatedInfo.snapshotName() + "';";
jdbcConnection.executeWithoutCommitting(setSnapshotQuery);
```


ii. For versions where `EXPORT_SNAPSHOT` is **not** enabled,
snapshot_name will be the hybrid time.
```
yugabyte=# CREATE_REPLICATION_SLOT test_replication_slot_99d2fc58624f4aa29f1317d434e4831c LOGICAL pgoutput;
                       slot_name                        | consistent_point |    snapshot_name    | output_plugin 
--------------------------------------------------------+------------------+---------------------+---------------
 test_replication_slot_99d2fc58624f4aa29f1317d434e4831c | 0/2              | 7227389791448002560 | pgoutput
```

and then we use this hybrid time to set the local read time of the
snapshot session (Note: This is pre-existing code):
```
private String ybSnapshotStatement(String ybReadTime) {
        return String.format("DO " +
                             "LANGUAGE plpgsql $$ " +
                             "BEGIN " +
                                "SET LOCAL yb_read_time TO '%s ht'; "  +
                             "EXCEPTION " +
                                "WHEN OTHERS THEN " +
                                    "CALL set_yb_read_time('%s ht'); " +
                             "END $$;",
                             ybReadTime, ybReadTime);
    }
```

4. For connector restart case:
i. If the slot is re-created (parallel streaming mode drops and
recreates the slot on mid-snapshot restart): Steps 1, 2 & 3 will be
repeated.
    ii. If the slot already exists (slotCreatedInfo is null):
a. For EXPORT_SNAPSHOT, we cannot use the old snapshot ID as the moment
the session closes, the snapshot is deleted, and when the connector is
restarted, there is no way to access it. So for this case, we will
fallback to the old code where we set the hybrid time from the slot
(`slotState.slotRestartCommitHT()`) as the local read time.
 


## Test Plan

```

./mvnw test -pl debezium-connector-postgres -Dtest="PostgresConnectorIT#shouldStreamWithExportSnapshotDisabledAndPreCreatedSlotWithUseSnapshot,PostgresConnectorIT#shouldStreamWithDefaultFlagsAndConnectorCreatedSlot,PostgresConnectorIT#shouldStreamWithDefaultFlagsAndPreCreatedSlotViaReplicationConnection"
```
@github-actions
Copy link

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

… in Yugabyte Connector (#191)

## Problem 
PostgreSQL's logical replication protocol includes ORIGIN messages that
identify the source of transactions in cascaded replication setups. This
is particularly important for hub-and-spoke replication topologies where
changes may originate from different clusters and need to be tracked to
prevent replication loops.

**Current Limitation: When a PostgreSQL connector receives an ORIGIN
message, it simply acknowledges and skips it without processing.**

Read more about the Origin message format at [Logical Replication
Message
Formats](https://www.postgresql.org/docs/12/protocol-logicalrep-message-formats.html)

## Solution

Extend the PostgreSQL connector to capture and propagate replication
origin metadata from PostgreSQL's logical replication stream.

### Implementation Details

1. **Add ORIGIN as a ReplicationMessage Operation**
   - Added `ORIGIN` to the `ReplicationMessage.Operation` enum
- Created `OriginMessage` class (similar to `TransactionMessage`) to
represent ORIGIN messages

2. **Parse and Emit ORIGIN Messages in PgOutputMessageDecoder**
- Parse `origin_name` and `origin_lsn` when an ORIGIN message is
received
- ORIGIN message format: Int64 (origin LSN) followed by null-terminated
string (origin name)
- Emit `OriginMessage` to the processor instead of caching values in the
decoder

3. **Handle ORIGIN in PostgresStreamingChangeEventSource**
- Process `Operation.ORIGIN` messages to update origin state in
`PostgresOffsetContext`
- ORIGIN messages are "swallowed" (not dispatched to EventDispatcher) -
they only update state
- Subsequent DML events automatically include origin info from
`SourceInfo`

4. **Propagate Origin to Change Events**
   - `SourceInfo` stores origin metadata (`originName`, `originLsn`)
- `PostgresSourceInfoStructMaker` adds optional `origin` and
`origin_lsn` fields to the source schema
- All DML events within a transaction include the origin info in their
source block

5. **Restart Recovery Handling**
- Always process ORIGIN messages even during WAL position recovery (skip
phase)
- `shouldMessageBeSkipped()` explicitly returns `false` for ORIGIN
messages
- This ensures that when replaying from transaction BEGIN after a crash,
the origin state is populated before processing resumed events

Example:
```
{
  "before": null,
  "after": {
    "id": {
      "value": 48417,
      "set": true
    }
  },
  "source": {
    "version": "dz.2.5.2.yb.2025.1.SNAPSHOT.3",
    "connector": "postgresql",
    "name": "dbserver2",
    "ts_ms": 1767343556846,
    "snapshot": "false",
    "db": "yugabyte",
    "sequence": "[\"5\",\"38423\"]",
    "schema": "public",
    "table": "t1",
    "txId": 3,
    "lsn": 38423,
    "xmin": null,
    "origin": "origin1",  // ← New field added
    "origin_lsn": 100006        // ← New field added
  },
  "op": "c",
  "ts_ms": 1767343722393,
  "transaction": null
}
```


## Test plan

```
# Unit tests
SourceInfoTest#originInfoIsNullByDefault+originInfoCanBeSetAndCleared+originInfoIncludedInToString"

# Integration tests
YBRecordsStreamProducerIT#shouldIncludeOriginInfoInSourceMetadataWhenOriginIsSet+shouldHaveNullOriginInfoWhenNoOriginIsSet+shouldNotLeakOriginInfoBetweenTransactions+shouldCorrectlyTrackDifferentOriginsAcrossTransactions+shouldPreserveOriginInfoAfterConnectorRestartMidTransaction
```

---------

Signed-off-by: Shishir Sharma <ssharma@yugabyte.com>
@github-actions
Copy link

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

…hot is disabled (#192)

## Problem
When ysql_enable_pg_export_snapshot & streamingMode.isParallel() are
disabled, the current implementation doesn't properly fall back to the
alternative snapshot mechanism, resulting in the error below.
```
Caused by: com.yugabyte.util.PSQLException: ERROR: cannot export or import snapshot when ysql_enable_pg_export_snapshot is disabled.
```

## Solution
Determine snapshot value based on canExportSnapshot; if false, fall back
to streamingMode.isParallel()
@github-actions
Copy link

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

…tXLogLocation() for YugabyteDB (#193)

## Problem
With the commit
yugabyte/yugabyte-db@f1d0a1fb6158 we have
disabled `pg_current_wal_lsn()` since each tablet maintains its own WAL,
making a single global WAL LSN is unsupported. Before this change, the
function returned a garbage value (e.g. 0/1000118) that was never
meaningful for YugabyteDB. Now calling it throws an error, causing the
connector to fail during the streaming init phase:
```
Caused by: com.yugabyte.util.PSQLException: ERROR: pg_current_wal_lsn() is not yet supported
  Hint: See yugabyte/yugabyte-db#30243. React with thumbs up to raise its priority.
```

## Solution

Skip calling `currentXLogLocation()` in `initialContext()` when
YugabyteDB is enabled, passing `null` as the LSN instead. This is safe
because the value is never meaningfully used for YugabyteDB:


- **Snapshot path** (`determineSnapshotOffset`): The LSN from
`initialContext()` is immediately overwritten by
`updateOffsetForSnapshot()` → `getTransactionStartLsn()`, which already
has a YB-specific path returning `slotLastFlushedLsn()`.
- **Streaming init path** (when `offsetContext` is null): The streaming
start position is determined by the replication slot, not this value
(`hasStartLsnStoredInContext` is false →
`replicationConnection.startStreaming(walPosition)` without an LSN).
- **Pre-snapshot catch-up streaming**
(`updateOffsetForPreSnapshotCatchUpStreaming`): Never entered because
`shouldStreamEventsStartingFromSnapshot()` defaults to `true` and no
snapshotter overrides it.
- **`getTransactionStartLsn()` fallback**: Never reached for YugabyteDB
due to the early return at the `YugabyteDBServer.isEnabled()` branch.


## Test Plan
```
mvn test -pl debezium-connector-postgres -Dtest=PostgresConnectionIT#shouldReportValidXLogPos
```

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Medium Risk**
> Changes startup offset initialization behavior by allowing a `null`
LSN for YugabyteDB, which could affect any code paths that implicitly
assume a non-null starting LSN.
> 
> **Overview**
> Prevents the connector from failing during streaming initialization on
YugabyteDB by skipping `currentXLogLocation()` in
`PostgresOffsetContext.initialContext()` and initializing the starting
LSN as `null` when `YugabyteDBServer.isEnabled()`.
> 
> Updates `PostgresConnectionIT#shouldReportValidXLogPos` to assert that
`currentXLogLocation()` throws a `SQLException` with the expected
“pg_current_wal_lsn() is not yet supported” message (reflecting
YugabyteDB behavior).
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
e7f550a. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->
@github-actions
Copy link

Hi @fourpointfour, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 4 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

This is the final PR Bugbot will review for you during this billing cycle

Your free Bugbot reviews will reset on February 25

Details

Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.

To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.


protected boolean isRetryRemaining() {
return (connectorConfig.getMaxRetriesOnError() == -1) || getRetries() <= connectorConfig.getMaxRetriesOnError();
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-by-one in retry count comparison logic

Medium Severity

isRetryRemaining() uses getRetries() <= getMaxRetriesOnError() while the base class hasMoreRetries() uses retries < maxRetries. When retries equals maxRetries, setProducerThrowable calls hasMoreRetries() which returns false and queues a ConnectException, but then isRetryRemaining() returns true, causing the start() catch block to throw a RetriableException. This allows one extra retry beyond the configured maximum.

Additional Locations (1)

Fix in Cursor Fix in Web


public List<String> getSlotRanges() {
return List.of(getConfig().getString(SLOT_RANGES).trim().split(";"));
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPE in parallel config getters with no defaults

Medium Severity

getSlotNames(), getPublicationNames(), and getSlotRanges() call .trim() on the result of getConfig().getString() which returns null when the corresponding fields (SLOT_NAMES, PUBLICATION_NAMES, SLOT_RANGES) are not configured, since they have no default values. The validation only checks that these fields aren't used outside parallel mode — it doesn't require them when streaming.mode is parallel. This causes a NullPointerException if a user enables parallel streaming without setting these fields.

Fix in Cursor Fix in Web

public SchemaBuilder datatypeSparseVectorSchema() {
return SchemaBuilder.struct()
.name(SparseVector.LOGICAL_NAME)
.name(SparseVector.LOGICAL_NAME)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate .name() call in sparse vector schema

Low Severity

datatypeSparseVectorSchema() calls .name(SparseVector.LOGICAL_NAME) twice in succession on the SchemaBuilder. This is a redundant duplicate call that appears to be a copy-paste error.

Fix in Cursor Fix in Web

@Override
public Map<String, String> getSourcePartition() {
return Collect.hashMapOf(SERVER_PARTITION_KEY, serverName);
return Collect.hashMapOf(SERVER_PARTITION_KEY, getPartitionIdentificationKey());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition equality inconsistent with source partition key

Medium Severity

getSourcePartition() now returns a key derived from serverName, taskId, and slotName via getPartitionIdentificationKey(), but equals() and hashCode() still compare only serverName. In parallel mode, partitions with different taskId/slotName are considered equal by equals() and produce the same hashCode(), despite having distinct source partition keys used for offset storage. This semantic inconsistency could cause subtle bugs when partitions are used in hash-based collections.

Additional Locations (1)

Fix in Cursor Fix in Web

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants