Sha256: 95ab4e25d2de977a8ca6d0a414e5c99ef560c30f4104a108bcd6c4465659dd88
Contents?: true
Size: 1006 Bytes
Versions: 11
Compression:
Stored size: 1006 Bytes
Contents
require 'active_support/inflector' require_relative './workers/event' require_relative './materializer_factory' module Materialist class EventHandler DEFAULT_SIDEKIQ_OPTIONS = { retry: 10 }.freeze def on_events_received(batch) batch.each { |event| call(event) if should_materialize?(topic(event)) } end def call(event) worker(topic(event)).perform_async(event) end private def topic(event) event['topic'].to_s end def should_materialize?(topic) Materialist.configuration.topics.include?(topic) end def sidekiq_options(topic) [ DEFAULT_SIDEKIQ_OPTIONS, Materialist.configuration.sidekiq_options, materializer_sidekiq_options(topic) ].inject(:merge) end def worker(topic) Materialist::Workers::Event.set(sidekiq_options(topic)) end def materializer_sidekiq_options(topic) Materialist::MaterializerFactory.class_from_topic(topic)._sidekiq_options end end end
Version data entries
11 entries across 11 versions & 1 rubygems