Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ gem 'devise'
# Взаимодействие с микросервисами
gem 'httparty'
gem 'bunny'
gem 'karafka'

group :development, :test do
gem 'bundler-audit'
Expand Down
27 changes: 25 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ GEM
uri (>= 0.13.1)
addressable (2.8.7)
public_suffix (>= 2.0.2, < 7.0)
amq-protocol (2.3.3)
alba (3.5.0)
ostruct (~> 0.6)
amq-protocol (2.3.3)
arbre (1.7.0)
activesupport (>= 3.0.0)
ruby2_keywords (>= 0.0.2)
Expand Down Expand Up @@ -146,6 +146,8 @@ GEM
responders
warden (~> 1.2.3)
diff-lcs (1.5.1)
digest-crc (0.7.0)
rake (>= 12.0.0, < 14.0.0)
docile (1.4.1)
dotenv (3.1.4)
drb (2.2.1)
Expand Down Expand Up @@ -235,6 +237,19 @@ GEM
activerecord
kaminari-core (= 1.2.2)
kaminari-core (1.2.2)
karafka (2.4.17)
base64 (~> 0.2)
karafka-core (>= 2.4.4, < 2.5.0)
karafka-rdkafka (>= 0.17.2)
waterdrop (>= 2.7.3, < 3.0.0)
zeitwerk (~> 2.3)
karafka-core (2.4.9)
karafka-rdkafka (>= 0.17.6, < 0.19.0)
logger (>= 1.6.0)
karafka-rdkafka (0.18.1)
ffi (~> 1.15)
mini_portile2 (~> 2.6)
rake (> 12)
language_server-protocol (3.17.0.3)
logger (1.6.0)
loofah (2.23.1)
Expand All @@ -251,9 +266,9 @@ GEM
mini_mime (1.1.5)
mini_portile2 (2.8.8)
minitest (5.25.1)
multi_json (1.15.0)
multi_xml (0.7.1)
bigdecimal (~> 3.1)
multi_json (1.15.0)
net-http (0.6.0)
uri
net-imap (0.5.6)
Expand Down Expand Up @@ -401,6 +416,8 @@ GEM
rubocop-rails
rubocop-rspec (3.0.4)
rubocop (~> 1.61)
ruby-kafka (1.5.0)
digest-crc
ruby-progressbar (1.13.0)
ruby2_keywords (0.0.5)
ruby_parser (3.21.1)
Expand Down Expand Up @@ -458,6 +475,10 @@ GEM
useragent (0.16.10)
warden (1.2.9)
rack (>= 2.0.9)
waterdrop (2.8.2)
karafka-core (>= 2.4.3, < 3.0.0)
karafka-rdkafka (>= 0.17.5)
zeitwerk (~> 2.3)
web-console (4.2.1)
actionview (>= 6.0.0)
activemodel (>= 6.0.0)
Expand Down Expand Up @@ -502,6 +523,7 @@ DEPENDENCIES
fasterer
ffaker
httparty
karafka
parallel
pg
pry-byebug
Expand All @@ -513,6 +535,7 @@ DEPENDENCIES
rubocop-rails
rubocop-rails-omakase
rubocop-rspec
ruby-kafka
rubyzip
searchkick
shoulda-matchers
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

# Example consumer that prints messages payloads
class ExampleConsumer < ApplicationConsumer
class ExampleConsumer < KafkaConsumer
def consume
messages.each { |message| puts message.payload }
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
# Application consumer from which all Karafka consumers should inherit
# You can rename it if it would conflict with your current code base (in case you're integrating
# Karafka with other frameworks)
class ApplicationConsumer < Karafka::BaseConsumer
class KafkaConsumer < Karafka::BaseConsumer
end
9 changes: 9 additions & 0 deletions app/consumers/likes_consumer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

class LikesConsumer < KafkaConsumer
def consume
messages.each do |message|
puts message.raw_payload # Обработка сообщения
end
end
end
13 changes: 13 additions & 0 deletions app/producers/concerns/topicable.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# frozen_string_literal: true

module Topicable
extend ActiveSupport::Concern

class_methods do
attr_reader :topic

def topic_name(q)
@topic = q.to_s
end
end
end
11 changes: 11 additions & 0 deletions app/producers/kafka_producer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# frozen_string_literal: true

class KafkaProducer
include Topicable
class << self
def send(payload)
payload = payload.to_json unless payload.is_a?(String)
Karafka.producer.produce_async(topic: topic, payload: payload)
end
end
end
5 changes: 5 additions & 0 deletions app/producers/likes_producer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# frozen_string_literal: true

class LikesProducer < KafkaProducer # < RabbitProducer - если нужно постепенно переводить очереди с кролика на кафку
topic_name :likes
end
30 changes: 30 additions & 0 deletions app/producers/rabbit_producer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true

class RabbitProducer
include Topicable
class << self
def send(payload)
payload = payload.to_json unless payload.is_a?(String)
queue.publish(payload)
end

def queue
@queue ||= channel.queue(topic, durable: true, auto_delete: false)
end

def channel
@channel ||= connection.create_channel
end

attr_writer :connection

def connection
@connection ||= Bunny.new(
host: ENV['RABBITMQ_HOST'] || 'rabbitmq',
port: ENV['RABBITMQ_PORT'] || 5672,
user: ENV.fetch('RABBITMQ_USER', nil),
password: ENV.fetch('RABBITMQ_PASSWORD', nil)
).tap(&:start)
end
end
end
28 changes: 28 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,33 @@ services:
volumes:
- es-data:/usr/share/elasticsearch/data

redpanda:
image: docker.redpanda.com/redpandadata/redpanda:latest
container_name: redpanda_dev
command:
- redpanda start
- --smp 1
- --memory 1G
- --overprovisioned
- --node-id 1
- --check=false
- --advertise-kafka-addr 127.0.0.1:9092
ports: ["9092:9092"]
volumes:
- redpanda_data:/var/lib/redpanda/data
environment:
- REDPANDA_ENVIRONMENT=development

console:
image: docker.redpanda.com/redpandadata/console:latest
container_name: redpanda_console_dev
ports: ["8080:8080"]
environment:
- KAFKA_BROKERS=127.0.0.1:9092
depends_on:
- redpanda


web:
tty: true
stdin_open: true
Expand Down Expand Up @@ -50,5 +77,6 @@ services:
POSTGRES_PASSWORD: 'postgres'
volumes:
db:
redpanda_data:
es-data: # Определение тома
driver: local
76 changes: 76 additions & 0 deletions karafka.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# frozen_string_literal: true

class KarafkaApp < Karafka::App
setup do |config|
config.kafka = { 'bootstrap.servers': ENV.fetch('KAFKA_HOST','127.0.0.1:9092') }
config.client_id = 'example_app'
# Recreate consumers with each batch. This will allow Rails code reload to work in the
# development mode. Otherwise Karafka process would not be aware of code changes
config.consumer_persistence = !Rails.env.development?
end

# Comment out this part if you are not using instrumentation and/or you are not
# interested in logging events for certain environments. Since instrumentation
# notifications add extra boilerplate, if you want to achieve max performance,
# listen to only what you really need for given environment.
Karafka.monitor.subscribe(
Karafka::Instrumentation::LoggerListener.new(
# Karafka, when the logger is set to info, produces logs each time it polls data from an
# internal messages queue. This can be extensive, so you can turn it off by setting below
# to false.
log_polling: true
)
)
# Karafka.monitor.subscribe(Karafka::Instrumentation::ProctitleListener.new)

# This logger prints the producer development info using the Karafka logger.
# It is similar to the consumer logger listener but producer oriented.
Karafka.producer.monitor.subscribe(
WaterDrop::Instrumentation::LoggerListener.new(
# Log producer operations using the Karafka logger
Karafka.logger,
# If you set this to true, logs will contain each message details
# Please note, that this can be extensive
log_messages: false
)
)

# You can subscribe to all consumer related errors and record/track them that way
#
# Karafka.monitor.subscribe 'error.occurred' do |event|
# type = event[:type]
# error = event[:error]
# details = (error.backtrace || []).join("\n")
# ErrorTracker.send_error(error, type, details)
# end

# You can subscribe to all producer related errors and record/track them that way
# Please note, that producer and consumer have their own notifications pipeline so you need to
# setup error tracking independently for each of them
#
# Karafka.producer.monitor.subscribe('error.occurred') do |event|
# type = event[:type]
# error = event[:error]
# details = (error.backtrace || []).join("\n")
# ErrorTracker.send_error(error, type, details)
# end

routes.draw do
# Uncomment this if you use Karafka with ActiveJob
# You need to define the topic per each queue name you use
# active_job_topic :default
topic :example do
# Uncomment this if you want Karafka to manage your topics configuration
# Managing topics configuration via routing will allow you to ensure config consistency
# across multiple environments
#
# config(partitions: 2, 'cleanup.policy': 'compact')
consumer ExampleConsumer
end
end
end

# Karafka now features a Web UI!
# Visit the setup documentation to get started and enhance your experience.
#
# https://karafka.io/docs/Web-UI-Getting-Started
6 changes: 1 addition & 5 deletions lib/tasks/like.rake
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,5 @@ desc 'Ставим like'
task like: :environment do
json_path = Rails.root.join('spec', 'fixtures', 'like.json').to_path
message = JSON.parse(File.read(json_path))
connection = Bunny.new(Settings.rabbitmq.to_hash).start
channel = connection.create_channel
exchange = channel.topic('services', durable: true)
exchange.publish(message.to_json, routing_key: Settings.sneakers.queue)
connection.close
LikesProducer.send(message) # Если инкапсулировать логику продьюсера, то код не придется переписывать при переходе с кролика на кафку или если через пару лет еще что-то появится
end