Sha256: 8849be871b4ba7ac74665b276ba7216231c51ef890dfe50046496629aa0dfed6

Contents?: true

Size: 1.08 KB

Versions: 1

Compression:

Stored size: 1.08 KB

Contents

module SimpleWorker
  class EventServer
    include Observable
    include RedisSupport

    def initialize(redis, namespace, jobid)
      @redis     = redis
      @namespace = namespace
      @jobid     = jobid
      load_lua_scripts
    end

    def pull_events
      log, processing, remaining = @redis.multi do
        @redis.evalsha @lpopall_sha, [log_key]
        @redis.evalsha @expired_tasks_sha, [active_tasks_key]
        @redis.llen tasks_key
      end

      log.map { |str| JSON.parse(str) }.each do |event|
        fire(*event)
      end

      processing[0].each do |key|
        hostname, task = parse_active_task_key(key)
        fire('on_task_expire', hostname, task)
      end

      processing[1].each do |key|
        hostname, task = parse_active_task_key(key)
        fire('on_task_active', hostname, task)
      end

      remaining + processing[0].size
    end

    private

    def parse_active_task_key(str)
      hostname, *task = str.split(':')[3..-1]
      [hostname, task.join(':')]
    end

    def fire(*args)
      changed
      notify_observers *args
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
simpleworker-0.1.1 lib/simpleworker/event_server.rb