Sha256: c9b338434623a9df79eb1deca6143e02945bd43ae30cd42feb85a96fd038071b
Contents?: true
Size: 881 Bytes
Versions: 1
Compression:
Stored size: 881 Bytes
Contents
module Umbra class Publisher DEFAULT_MAX_QUEUE = 100 DEFAULT_MIN_THREADS = 1 DEFAULT_MAX_THREADS = 1 attr_reader :pool def initialize(**options) @pool = Concurrent::CachedThreadPool.new( min_threads: options.fetch(:min_threads, DEFAULT_MIN_THREADS), max_threads: options.fetch(:max_thread, DEFAULT_MAX_THREADS), max_queue: options.fetch(:max_queue, DEFAULT_MAX_QUEUE), fallback_policy: :abort ) end def call(env, encoder: Umbra.encoder, redis: Umbra.redis) @pool << proc { call!(env, encoder: encoder, redis: redis) } true rescue Concurrent::RejectedExecutionError Umbra.logger.warn "[umbra] Queue at max - dropping items" false end def call!(env, encoder: Umbra.encoder, redis: Umbra.redis) redis.publish(Umbra::CHANNEL, encoder.call(env)) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
umbra-rb-0.3.0 | lib/umbra/publisher.rb |