Sha256: 927cafbc9eddd3e9e148720ba6c0e44da3610195180ab4a104d944bebb45af67
Contents?: true
Size: 1.44 KB
Versions: 2
Compression:
Stored size: 1.44 KB
Contents
# frozen_string_literal: true require_relative "consumer" module Ikibana class ApplicationConsumer private_class_method def self.inherited(subclass) subclass.include Ikibana::Consumer end def initialize super ObjectSpace.define_finalizer(self, self.class.method(:destructor).to_proc) end def self.call new.call end def call return if locked? return run_in_sync if sync? run_async end private def run_async Thread.new do loop do sub.fetch(1).each do |msg| perform(msg) msg.ack end rescue NATS::IO::Timeout puts "Awaiting messages for #{convert_namespace_to_path}..." end end end def run_in_sync # code here end def sub @sub ||= js.pull_subscribe(convert_namespace_to_path, self.class.to_s.sub("::", "_")) end def convert_namespace_to_path self.class.to_s.split("::").map(&:downcase).join(".").sub("consumer", "") end def logger @logger = Ikibana::Config.instance.logger end def js @js ||= Ikibana::Config.instance.js end def locked? = cache.read("#{self.class}_locked") def sync? = cache.read("#{self.class}_sync") def cache @cache = Ikibana::Config.instance.cache end def perform(msg) raise NotImplementedError, "Subclasses must implement a `perform` method" end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
ikibana-0.1.1 | lib/ikibana/application_consumer.rb |
ikibana-0.1.0 | lib/ikibana/application_consumer.rb |