lib/scales-worker/worker.rb in scales-worker-0.0.1.beta.2 vs lib/scales-worker/worker.rb in scales-worker-0.0.4

- old
+ new

@@ -1,13 +1,15 @@ module Scales module Worker class Worker attr_reader :app attr_reader :type + attr_reader :pool + attr_reader :status def initialize(type = Application::Rails) - @type, @app, @status = type, type.app, Status.new("localhost") + @type, @app, @status, @pool = type, type.app, Status.new("localhost"), [] at_exit{ @status.stop! } end def parse(job) Job.to_env(job) @@ -19,61 +21,78 @@ @status.took_request_from_queue!(env) begin response = @app.call(env) - response.last.close + response.last.close if response.last.respond_to?(:close) [id, Response.to_job(id, response)] - rescue - [id, [500, {}, ""]] + rescue Exception => e + [id, [500, {}, e.to_s]] end end def post_process!(job) env = parse(job) while path = Thread.current[:post_process_queue].pop request = Path.to_env(path, env) begin response = @app.call(request) - response.last.close + response.last.close if response.last.respond_to?(:close) rescue Exception => e puts e end end end # Wait for a request, process it, publish the response and exit - def process_request!(should_wait_for_request_to_finish = false) - job = Scales::Queue::Sync.pop + def process_request! + job = Thread.current[:redis_blocking].brpop(Scales::Storage::REQUEST_QUEUE, 0).last id, response = nil, nil - Thread.abort_on_exception = true - thread = Thread.new do - Thread.current[:post_process_queue] = [] - id, response = process!(job) - post_process!(job) - @status.put_response_in_queue!(response) - Scales::PubSub::Sync.publish("scales_response_#{id}", JSON.generate(response)) - end + Thread.current[:post_process_queue] = [] + id, response = process!(job) + post_process!(job) + @status.put_response_in_queue!(response) + Thread.current[:redis_nonblocking].publish(Scales::Storage::RESPONSE_CHANNEL, JSON.generate(response)) - thread.join if should_wait_for_request_to_finish - [id, response] end + def start_pool!(size = Scales.config.worker_threads) + Thread.abort_on_exception = true + size.times do + @pool << Thread.new do + Thread.current[:redis_blocking] = Scales::Storage::Sync.new_connection! + Thread.current[:redis_nonblocking] = Scales::Storage::Sync.new_connection! + loop do + begin + process_request! + rescue Exception => e + @status.logger.error(e.to_s) + raise e if Scales.env == "test" + end + end + end + end + sleep + end + # Loop the processing of requests def work! @status.start! puts "Environment: #{Scales.env}".green puts "Application: #{@type.name}".green puts "Path: #{Dir.pwd}".green + puts "Log Path: #{@status.log_path}".green + puts "Threads: #{Scales.config.worker_threads}".green puts "Redis: #{Scales.config.host}:#{Scales.config.port}/#{Scales.config.database}".green begin - loop{ process_request! } + start_pool! rescue Interrupt => e + @pool.map(&:exit) puts "Goodbye".green end end end \ No newline at end of file