Sha256: c9b338434623a9df79eb1deca6143e02945bd43ae30cd42feb85a96fd038071b

Contents?: true

Size: 881 Bytes

Versions: 1

Compression:

Stored size: 881 Bytes

Contents

module Umbra
  class Publisher
    DEFAULT_MAX_QUEUE = 100
    DEFAULT_MIN_THREADS = 1
    DEFAULT_MAX_THREADS = 1

    attr_reader :pool

    def initialize(**options)
      @pool = Concurrent::CachedThreadPool.new(
        min_threads: options.fetch(:min_threads, DEFAULT_MIN_THREADS),
        max_threads: options.fetch(:max_thread, DEFAULT_MAX_THREADS),
        max_queue: options.fetch(:max_queue, DEFAULT_MAX_QUEUE),
        fallback_policy: :abort
      )
    end

    def call(env, encoder: Umbra.encoder, redis: Umbra.redis)
      @pool << proc { call!(env, encoder: encoder, redis: redis) }

      true
    rescue Concurrent::RejectedExecutionError
      Umbra.logger.warn "[umbra] Queue at max - dropping items"

      false
    end

    def call!(env, encoder: Umbra.encoder, redis: Umbra.redis)
      redis.publish(Umbra::CHANNEL, encoder.call(env))
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
umbra-rb-0.3.0 lib/umbra/publisher.rb