diff --git a/Gemfile b/Gemfile index 1dd82b5..a15cc9c 100644 --- a/Gemfile +++ b/Gemfile @@ -39,6 +39,7 @@ gem 'devise' # Взаимодействие с микросервисами gem 'httparty' gem 'bunny' +gem 'karafka' group :development, :test do gem 'bundler-audit' diff --git a/Gemfile.lock b/Gemfile.lock index 27789bb..c69c74b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -95,9 +95,9 @@ GEM uri (>= 0.13.1) addressable (2.8.7) public_suffix (>= 2.0.2, < 7.0) - amq-protocol (2.3.3) alba (3.5.0) ostruct (~> 0.6) + amq-protocol (2.3.3) arbre (1.7.0) activesupport (>= 3.0.0) ruby2_keywords (>= 0.0.2) @@ -146,6 +146,8 @@ GEM responders warden (~> 1.2.3) diff-lcs (1.5.1) + digest-crc (0.7.0) + rake (>= 12.0.0, < 14.0.0) docile (1.4.1) dotenv (3.1.4) drb (2.2.1) @@ -235,6 +237,19 @@ GEM activerecord kaminari-core (= 1.2.2) kaminari-core (1.2.2) + karafka (2.4.17) + base64 (~> 0.2) + karafka-core (>= 2.4.4, < 2.5.0) + karafka-rdkafka (>= 0.17.2) + waterdrop (>= 2.7.3, < 3.0.0) + zeitwerk (~> 2.3) + karafka-core (2.4.9) + karafka-rdkafka (>= 0.17.6, < 0.19.0) + logger (>= 1.6.0) + karafka-rdkafka (0.18.1) + ffi (~> 1.15) + mini_portile2 (~> 2.6) + rake (> 12) language_server-protocol (3.17.0.3) logger (1.6.0) loofah (2.23.1) @@ -251,9 +266,9 @@ GEM mini_mime (1.1.5) mini_portile2 (2.8.8) minitest (5.25.1) + multi_json (1.15.0) multi_xml (0.7.1) bigdecimal (~> 3.1) - multi_json (1.15.0) net-http (0.6.0) uri net-imap (0.5.6) @@ -401,6 +416,8 @@ GEM rubocop-rails rubocop-rspec (3.0.4) rubocop (~> 1.61) + ruby-kafka (1.5.0) + digest-crc ruby-progressbar (1.13.0) ruby2_keywords (0.0.5) ruby_parser (3.21.1) @@ -458,6 +475,10 @@ GEM useragent (0.16.10) warden (1.2.9) rack (>= 2.0.9) + waterdrop (2.8.2) + karafka-core (>= 2.4.3, < 3.0.0) + karafka-rdkafka (>= 0.17.5) + zeitwerk (~> 2.3) web-console (4.2.1) actionview (>= 6.0.0) activemodel (>= 6.0.0) @@ -502,6 +523,7 @@ DEPENDENCIES fasterer ffaker httparty + karafka parallel pg pry-byebug @@ -513,6 +535,7 @@ DEPENDENCIES rubocop-rails rubocop-rails-omakase rubocop-rspec + ruby-kafka rubyzip searchkick shoulda-matchers diff --git a/docs/kafkatry-ruby/app/consumers/example_consumer.rb b/app/consumers/example_consumer.rb similarity index 88% rename from docs/kafkatry-ruby/app/consumers/example_consumer.rb rename to app/consumers/example_consumer.rb index 9b82d9a..04395e1 100644 --- a/docs/kafkatry-ruby/app/consumers/example_consumer.rb +++ b/app/consumers/example_consumer.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true # Example consumer that prints messages payloads -class ExampleConsumer < ApplicationConsumer +class ExampleConsumer < KafkaConsumer def consume messages.each { |message| puts message.payload } end diff --git a/docs/kafkatry-ruby/app/consumers/application_consumer.rb b/app/consumers/kafka_consumer.rb similarity index 82% rename from docs/kafkatry-ruby/app/consumers/application_consumer.rb rename to app/consumers/kafka_consumer.rb index 2214b6c..94595b8 100644 --- a/docs/kafkatry-ruby/app/consumers/application_consumer.rb +++ b/app/consumers/kafka_consumer.rb @@ -3,5 +3,5 @@ # Application consumer from which all Karafka consumers should inherit # You can rename it if it would conflict with your current code base (in case you're integrating # Karafka with other frameworks) -class ApplicationConsumer < Karafka::BaseConsumer +class KafkaConsumer < Karafka::BaseConsumer end diff --git a/app/consumers/likes_consumer.rb b/app/consumers/likes_consumer.rb new file mode 100644 index 0000000..3f1dc03 --- /dev/null +++ b/app/consumers/likes_consumer.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class LikesConsumer < KafkaConsumer + def consume + messages.each do |message| + puts message.raw_payload # Обработка сообщения + end + end +end diff --git a/app/producers/concerns/topicable.rb b/app/producers/concerns/topicable.rb new file mode 100644 index 0000000..c7b1812 --- /dev/null +++ b/app/producers/concerns/topicable.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Topicable + extend ActiveSupport::Concern + + class_methods do + attr_reader :topic + + def topic_name(q) + @topic = q.to_s + end + end +end diff --git a/app/producers/kafka_producer.rb b/app/producers/kafka_producer.rb new file mode 100644 index 0000000..f561900 --- /dev/null +++ b/app/producers/kafka_producer.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class KafkaProducer + include Topicable + class << self + def send(payload) + payload = payload.to_json unless payload.is_a?(String) + Karafka.producer.produce_async(topic: topic, payload: payload) + end + end +end diff --git a/app/producers/likes_producer.rb b/app/producers/likes_producer.rb new file mode 100644 index 0000000..fc9011e --- /dev/null +++ b/app/producers/likes_producer.rb @@ -0,0 +1,5 @@ +# frozen_string_literal: true + +class LikesProducer < KafkaProducer # < RabbitProducer - если нужно постепенно переводить очереди с кролика на кафку + topic_name :likes +end diff --git a/app/producers/rabbit_producer.rb b/app/producers/rabbit_producer.rb new file mode 100644 index 0000000..3aaee1d --- /dev/null +++ b/app/producers/rabbit_producer.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +class RabbitProducer + include Topicable + class << self + def send(payload) + payload = payload.to_json unless payload.is_a?(String) + queue.publish(payload) + end + + def queue + @queue ||= channel.queue(topic, durable: true, auto_delete: false) + end + + def channel + @channel ||= connection.create_channel + end + + attr_writer :connection + + def connection + @connection ||= Bunny.new( + host: ENV['RABBITMQ_HOST'] || 'rabbitmq', + port: ENV['RABBITMQ_PORT'] || 5672, + user: ENV.fetch('RABBITMQ_USER', nil), + password: ENV.fetch('RABBITMQ_PASSWORD', nil) + ).tap(&:start) + end + end +end diff --git a/docker-compose.yml b/docker-compose.yml index 5f059c9..6794f5d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -21,6 +21,33 @@ services: volumes: - es-data:/usr/share/elasticsearch/data + redpanda: + image: docker.redpanda.com/redpandadata/redpanda:latest + container_name: redpanda_dev + command: + - redpanda start + - --smp 1 + - --memory 1G + - --overprovisioned + - --node-id 1 + - --check=false + - --advertise-kafka-addr 127.0.0.1:9092 + ports: ["9092:9092"] + volumes: + - redpanda_data:/var/lib/redpanda/data + environment: + - REDPANDA_ENVIRONMENT=development + + console: + image: docker.redpanda.com/redpandadata/console:latest + container_name: redpanda_console_dev + ports: ["8080:8080"] + environment: + - KAFKA_BROKERS=127.0.0.1:9092 + depends_on: + - redpanda + + web: tty: true stdin_open: true @@ -50,5 +77,6 @@ services: POSTGRES_PASSWORD: 'postgres' volumes: db: + redpanda_data: es-data: # Определение тома driver: local diff --git a/karafka.rb b/karafka.rb new file mode 100644 index 0000000..c4362a9 --- /dev/null +++ b/karafka.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +class KarafkaApp < Karafka::App + setup do |config| + config.kafka = { 'bootstrap.servers': ENV.fetch('KAFKA_HOST','127.0.0.1:9092') } + config.client_id = 'example_app' + # Recreate consumers with each batch. This will allow Rails code reload to work in the + # development mode. Otherwise Karafka process would not be aware of code changes + config.consumer_persistence = !Rails.env.development? + end + + # Comment out this part if you are not using instrumentation and/or you are not + # interested in logging events for certain environments. Since instrumentation + # notifications add extra boilerplate, if you want to achieve max performance, + # listen to only what you really need for given environment. + Karafka.monitor.subscribe( + Karafka::Instrumentation::LoggerListener.new( + # Karafka, when the logger is set to info, produces logs each time it polls data from an + # internal messages queue. This can be extensive, so you can turn it off by setting below + # to false. + log_polling: true + ) + ) + # Karafka.monitor.subscribe(Karafka::Instrumentation::ProctitleListener.new) + + # This logger prints the producer development info using the Karafka logger. + # It is similar to the consumer logger listener but producer oriented. + Karafka.producer.monitor.subscribe( + WaterDrop::Instrumentation::LoggerListener.new( + # Log producer operations using the Karafka logger + Karafka.logger, + # If you set this to true, logs will contain each message details + # Please note, that this can be extensive + log_messages: false + ) + ) + + # You can subscribe to all consumer related errors and record/track them that way + # + # Karafka.monitor.subscribe 'error.occurred' do |event| + # type = event[:type] + # error = event[:error] + # details = (error.backtrace || []).join("\n") + # ErrorTracker.send_error(error, type, details) + # end + + # You can subscribe to all producer related errors and record/track them that way + # Please note, that producer and consumer have their own notifications pipeline so you need to + # setup error tracking independently for each of them + # + # Karafka.producer.monitor.subscribe('error.occurred') do |event| + # type = event[:type] + # error = event[:error] + # details = (error.backtrace || []).join("\n") + # ErrorTracker.send_error(error, type, details) + # end + + routes.draw do + # Uncomment this if you use Karafka with ActiveJob + # You need to define the topic per each queue name you use + # active_job_topic :default + topic :example do + # Uncomment this if you want Karafka to manage your topics configuration + # Managing topics configuration via routing will allow you to ensure config consistency + # across multiple environments + # + # config(partitions: 2, 'cleanup.policy': 'compact') + consumer ExampleConsumer + end + end +end + +# Karafka now features a Web UI! +# Visit the setup documentation to get started and enhance your experience. +# +# https://karafka.io/docs/Web-UI-Getting-Started diff --git a/lib/tasks/like.rake b/lib/tasks/like.rake index 7c30d74..b0443da 100644 --- a/lib/tasks/like.rake +++ b/lib/tasks/like.rake @@ -2,9 +2,5 @@ desc 'Ставим like' task like: :environment do json_path = Rails.root.join('spec', 'fixtures', 'like.json').to_path message = JSON.parse(File.read(json_path)) - connection = Bunny.new(Settings.rabbitmq.to_hash).start - channel = connection.create_channel - exchange = channel.topic('services', durable: true) - exchange.publish(message.to_json, routing_key: Settings.sneakers.queue) - connection.close + LikesProducer.send(message) # Если инкапсулировать логику продьюсера, то код не придется переписывать при переходе с кролика на кафку или если через пару лет еще что-то появится end