Skip to content

Conversation

@bowenli86
Copy link
Member

@bowenli86 bowenli86 commented Jan 13, 2026

What is the purpose of the change

create Dynamic Kafka Source for pyflink

Brief change log

  • create Dynamic Kafka Source and related py classes for pyflink
  • add tests
  • upgrade kafka connector dependency 4.0.1-2.0 so it picks up dynamic source java classes, which existing 3.0.0 connector does not have

Verifying this change

This change added tests and can be verified as follows:

  • Added test_dynamic_kafka.py

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 13, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@bowenli86
Copy link
Member Author

bowenli86 commented Jan 14, 2026

test failures are unrelated

@bowenli86 bowenli86 requested a review from dianfu January 14, 2026 05:32
Copy link
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bowenli86 Thanks a lot for the work! 👍 Only a few minor comments.

super().__init__(j_service)


class KafkaStreamSubscriber(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about also introducing class StreamPatternSubscriber and KafkaStreamSetSubscriber ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

from pyflink.datastream.connectors.kafka import KafkaOffsetsInitializer
from pyflink.java_gateway import get_gateway

__all__ = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@bowenli86 bowenli86 Jan 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(btw, the pyflink kafka connector migration to connector repo which has been left half way done for 2 years is better to be fixed, otherwise we'll be constantly facing the inconsistency among the 2 repos and jump back and forth for all kinds of issues. Do you have time or anyone in OSS or Alibaba can take on it?)

@dianfu dianfu merged commit 4def6ef into master Jan 15, 2026
9 of 11 checks passed
@bowenli86 bowenli86 deleted the dynamick branch January 15, 2026 06:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants