Sha256: 927cafbc9eddd3e9e148720ba6c0e44da3610195180ab4a104d944bebb45af67

Contents?: true

Size: 1.44 KB

Versions: 2

Compression:

Stored size: 1.44 KB

Contents

# frozen_string_literal: true

require_relative "consumer"

module Ikibana
  class ApplicationConsumer
    private_class_method def self.inherited(subclass)
      subclass.include Ikibana::Consumer
    end

    def initialize
      super
      ObjectSpace.define_finalizer(self, self.class.method(:destructor).to_proc)
    end

    def self.call
      new.call
    end

    def call
      return if locked?
      return run_in_sync if sync?

      run_async
    end

    private

    def run_async
      Thread.new do
        loop do
          sub.fetch(1).each do |msg|
            perform(msg)
            msg.ack
          end
        rescue NATS::IO::Timeout
          puts "Awaiting messages for #{convert_namespace_to_path}..."
        end
      end
    end

    def run_in_sync
      # code here
    end

    def sub
      @sub ||= js.pull_subscribe(convert_namespace_to_path, self.class.to_s.sub("::", "_"))
    end

    def convert_namespace_to_path
      self.class.to_s.split("::").map(&:downcase).join(".").sub("consumer", "")
    end

    def logger
      @logger = Ikibana::Config.instance.logger
    end

    def js
      @js ||= Ikibana::Config.instance.js
    end

    def locked? = cache.read("#{self.class}_locked")

    def sync? = cache.read("#{self.class}_sync")

    def cache
      @cache = Ikibana::Config.instance.cache
    end

    def perform(msg)
      raise NotImplementedError, "Subclasses must implement a `perform` method"
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
ikibana-0.1.1 lib/ikibana/application_consumer.rb
ikibana-0.1.0 lib/ikibana/application_consumer.rb