Sha256: e93e5b395e14b344dcb38892fd7ef72868e118bb171c1a304369286c89373fcc

Contents?: true

Size: 1.37 KB

Versions: 3

Compression:

Stored size: 1.37 KB

Contents

module Sad
	class Server
		class << self
			def run(queue)
				@_shutdown = false
				register_signal
				fetch(Sad::Config.queue(queue))
			end

			def fetch(queue)
				request = ::Sad::Config.redis.blpop(queue, 60)
				request.callback{|_, data|
					if data
						logger.info '-'*15 + data.inspect + '-'*15
						payload = Payload.decode(data)
						payload_call(payload)
					end
					fetch_with_interval(queue)
				}
				request.errback{
					logger.error "error with redis request.\n#{request.inspect}"
					fetch_with_interval(queue)
				}
			end

			def fetch_with_interval(queue)
				EM.add_timer(::Sad::Config.interval){
					fetch(queue) unless shutdown?
				}
			end

			def payload_call(payload)
				# 如果该任务有延时执行要求,
				# 则在定时器执行时将其延时的key删掉,
				# 再重新入队
				if payload.sad_args['delay'] and payload.sad_args['delay'] != '' and payload.sad_args['delay'] != 0
					EM.add_timer(payload.sad_args['delay'].to_i){
						payload.sad_args.delete('delay')
						payload.enqueue
					}
				else
					payload.perform
				end
			end

			def register_signal
				trap('TERM') { shutdown!  }
      			trap('INT')  { shutdown!  }
      			trap('QUIT') { shutdown   }
			end

			def shutdown!
				EM.stop
				exit(0)
			end

			def shutdown?
				@_shutdown
			end

			def shutdown
				@_shutdown = true
				EM.stop
			end
		end
	end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
sad-1.5.2 lib/sad/server.rb
sad-1.5.1 lib/sad/server.rb
sad-1.5.0 lib/sad/server.rb