Sha256: 5e2051ad3a6423ac6af14b0bf274233282420ed330e5aa4f2a1ea644fb0a3da2

Contents?: true

Size: 688 Bytes

Versions: 3

Compression:

Stored size: 688 Bytes

Contents

require 'active_support/inflector'
require_relative './event_worker'

module Materialist
  class EventHandler

    # 10 retries takes approximately 6 hours
    DEFAULT_OPTIONS = { retry: 10 }

    def initialize(options={})
      @options = DEFAULT_OPTIONS.merge(options)
    end

    def on_events_received(batch)
      batch.each { |event| call(event) if topics.include?(event['topic'].to_s) }
    end

    def call(event)
      worker.perform_async(event)
    end

    private

    attr_reader :options

    def topics
      @_topics ||= options.fetch(:topics, []).map(&:to_s)
    end

    def worker
      Materialist::EventWorker.set(options.slice(:queue, :retry))
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
materialist-2.3.1 lib/materialist/event_handler.rb
materialist-2.3.0 lib/materialist/event_handler.rb
materialist-2.2.0 lib/materialist/event_handler.rb