Sha256: cb53a6479c45521f073970efc8758c2c10113e89e2ee7ba6c4d25629cdc71f99
Contents?: true
Size: 1.69 KB
Versions: 78
Compression:
Stored size: 1.69 KB
Contents
# frozen_string_literal: true module SplitIoClient module SSE module Workers class SegmentsWorker def initialize(synchronizer, config, segments_repository) @synchronizer = synchronizer @config = config @segments_repository = segments_repository @queue = Queue.new @running = Concurrent::AtomicBoolean.new(false) end def add_to_queue(change_number, segment_name) item = { change_number: change_number, segment_name: segment_name } @config.logger.debug("SegmentsWorker add to queue #{item}") @queue.push(item) end def start if @running.value @config.logger.debug('segments worker already running.') return end @running.make_true perform_thread end def stop unless @running.value @config.logger.debug('segments worker not running.') return end @running.make_false SplitIoClient::Helpers::ThreadHelper.stop(:segment_update_worker, @config) end private def perform while (item = @queue.pop) segment_name = item[:segment_name] cn = item[:change_number] @config.logger.debug("SegmentsWorker change_number dequeue #{segment_name}, #{cn}") @synchronizer.fetch_segment(segment_name, cn) end end def perform_thread @config.threads[:segment_update_worker] = Thread.new do @config.logger.debug('Starting segments worker ...') if @config.debug_enabled perform end end end end end end
Version data entries
78 entries across 78 versions & 1 rubygems