Sha256: 78ff7ff287b3f1857c64b79f956fa502a060093c8a10df4786f3ef1fb5cb399a
Contents?: true
Size: 1.98 KB
Versions: 1
Compression:
Stored size: 1.98 KB
Contents
require 'metaractor' require 'excon' require 'json' require 'diplomat' require 'consul_stockpile/consul_lock' require 'consul_stockpile/logger' module ConsulStockpile class WatchEvent include Metaractor EVENT = 'kv_update'.freeze URL = 'http://127.0.0.1:8500/v1/event/list'.freeze KEY = 'event/kv_update'.freeze LOCK_KEY = 'event/kv_update/lock'.freeze required :handler def call Logger.tagged('WatchEvent') do Logger.info "Watching for event: #{EVENT}" response = Excon.get( URL, query: {name: EVENT}, expects: [200], connect_timeout: 5, read_timeout: 5, write_timeout: 5, tcp_nodelay: true ) handle_events(response.body) loop do index = response.headers['X-Consul-Index'] response = Excon.get( URL, query: {name: EVENT, index: index}, expects: [200], connect_timeout: 5, read_timeout: 86400, write_timeout: 5, tcp_nodelay: true ) handle_events(response.body) end end end private def handler context.handler end def handle_events(events) events = JSON.parse(events) return if events.empty? sift(events) do |event| handler.call(event) end end def sift(events) ConsulLock.with_lock(key: LOCK_KEY) do last_worked = Diplomat::Kv.get(KEY, {}, :return) # Work around diplomat returning "" instead of nil last_worked = nil if last_worked.empty? last_worked = last_worked.to_i unless last_worked.nil? event = events.last Logger.info "Sifting event: #{event.inspect}" ltime = event['LTime'] if last_worked.nil? || ltime > last_worked yield event Diplomat::Kv.put(KEY, ltime.to_s) else Logger.info 'Skipping duplicate event' end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
consul_stockpile-0.1.5 | lib/consul_stockpile/watch_event.rb |