Sha256: 3449c81356ea6caded152ef3bd07161e5d4064b53c9c533211ef6921870b0e12

Contents?: true

Size: 1.91 KB

Versions: 2

Compression:

Stored size: 1.91 KB

Contents

# frozen_string_literal: true

require 'aws-sdk-sqs'
require 'google/cloud/pubsub'
require 'grpc'
require 'toiler/utils/environment_loader'
require 'toiler/utils/logging'
require 'toiler/utils/argument_parser'
require 'toiler/worker'
require 'toiler/cli'
require 'toiler/version'

# Main module
module Toiler
  @worker_class_registry = {}
  @options = {
    aws: {},
    gcp: {}
  }
  @fetchers = {}
  @processor_pools = {}
  @aws_client = nil
  @gcp_client = nil

  attr_reader :worker_class_registry, :options, :fetchers, :processor_pools
  attr_accessor :aws_client, :gcp_client

  module_function :worker_class_registry, :options, :fetchers, :processor_pools,
                  :aws_client, :gcp_client, :aws_client=, :gcp_client=

  module_function

  def logger
    Toiler::Utils::Logging.logger
  end

  def queues
    worker_class_registry.keys
  end

  def active_worker_class_registry
    active_queues = options[:active_queues]
    if active_queues
      active_queues.each_with_object({}) do |q, registry|
        worker = @worker_class_registry[q]
        if worker.nil?
          logger.warn "No worker assigned to queue: #{q}"
        else
          registry[q] = worker
        end
      end
    else
      @worker_class_registry
    end
  end

  def fetcher(queue)
    fetchers["fetcher_#{queue}".to_sym]
  end

  def set_fetcher(queue, val)
    fetchers["fetcher_#{queue}".to_sym] = val
  end

  def processor_pool(queue)
    processor_pools["processor_pool_#{queue}".to_sym]
  end

  def set_processor_pool(queue, val)
    processor_pools["processor_pool_#{queue}".to_sym] = val
  end

  def default_options
    {
      auto_visibility_timeout: false,
      deadline_extension: false,
      concurrency: 1,
      auto_delete: false,
      shutdown_timeout: 5
    }
  end

  def register_worker(queue, worker)
    @worker_class_registry[queue] = worker
  end

  def worker_class_registry=(val)
    @worker_class_registry = val
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
toiler-0.7.1 lib/toiler.rb
toiler-0.7.0 lib/toiler.rb