Sha256: 05c57af8370131f5f7a18b39e03d993605cde0d252d616d4abfb01d37f1f52d4
Contents?: true
Size: 1.57 KB
Versions: 2
Compression:
Stored size: 1.57 KB
Contents
module Sad class Server class << self def run(queue) ::Sad.logger.info("#{'#'*5} Sad server start. #{'#'*5}") @_shutdown = false register_signal fetch(Sad::Config.queue(queue)) end def fetch(queue) ::Sad::Procline.set("Wainting for #{queue}") request = ::Sad::Config.redis.blpop(queue, 30) request.callback{|_, data| ::Sad::Procline.set("Fetched #{queue} - #{Time.now.strftime('%Y-%m-%d %H:%M:%S')}") if data ::Sad.logger.info '-'*15 + data.inspect + '-'*15 payload = Payload.decode(data) payload_call(payload) end fetch_with_interval(queue) } request.errback{ ::Sad.logger.error "error with redis request.\n#{request.inspect}" fetch_with_interval(queue) } end def fetch_with_interval(queue) EM.add_timer(::Sad::Config.interval){ fetch(queue) unless shutdown? } end def payload_call(payload) # 如果该任务有延时执行要求, # 则在定时器执行时将其延时的key删掉, # 再重新入队 if payload.sad_args['delay'] and payload.sad_args['delay'] != '' and payload.sad_args['delay'] != 0 EM.add_timer(payload.sad_args['delay'].to_i){ payload.sad_args.delete('delay') payload.enqueue } else payload.perform end end def register_signal trap('TERM') { shutdown! } trap('INT') { shutdown! } trap('QUIT') { shutdown } end def shutdown! EM.stop exit(0) end def shutdown? @_shutdown end def shutdown @_shutdown = true EM.stop end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
sad-1.5.5 | lib/sad/server.rb |
sad-1.5.4 | lib/sad/server.rb |