lib/scales-worker/worker.rb in scales-worker-0.0.1.beta.2 vs lib/scales-worker/worker.rb in scales-worker-0.0.4
- old
+ new
@@ -1,13 +1,15 @@
module Scales
module Worker
class Worker
attr_reader :app
attr_reader :type
+ attr_reader :pool
+ attr_reader :status
def initialize(type = Application::Rails)
- @type, @app, @status = type, type.app, Status.new("localhost")
+ @type, @app, @status, @pool = type, type.app, Status.new("localhost"), []
at_exit{ @status.stop! }
end
def parse(job)
Job.to_env(job)
@@ -19,61 +21,78 @@
@status.took_request_from_queue!(env)
begin
response = @app.call(env)
- response.last.close
+ response.last.close if response.last.respond_to?(:close)
[id, Response.to_job(id, response)]
- rescue
- [id, [500, {}, ""]]
+ rescue Exception => e
+ [id, [500, {}, e.to_s]]
end
end
def post_process!(job)
env = parse(job)
while path = Thread.current[:post_process_queue].pop
request = Path.to_env(path, env)
begin
response = @app.call(request)
- response.last.close
+ response.last.close if response.last.respond_to?(:close)
rescue Exception => e
puts e
end
end
end
# Wait for a request, process it, publish the response and exit
- def process_request!(should_wait_for_request_to_finish = false)
- job = Scales::Queue::Sync.pop
+ def process_request!
+ job = Thread.current[:redis_blocking].brpop(Scales::Storage::REQUEST_QUEUE, 0).last
id, response = nil, nil
- Thread.abort_on_exception = true
- thread = Thread.new do
- Thread.current[:post_process_queue] = []
- id, response = process!(job)
- post_process!(job)
- @status.put_response_in_queue!(response)
- Scales::PubSub::Sync.publish("scales_response_#{id}", JSON.generate(response))
- end
+ Thread.current[:post_process_queue] = []
+ id, response = process!(job)
+ post_process!(job)
+ @status.put_response_in_queue!(response)
+ Thread.current[:redis_nonblocking].publish(Scales::Storage::RESPONSE_CHANNEL, JSON.generate(response))
- thread.join if should_wait_for_request_to_finish
-
[id, response]
end
+ def start_pool!(size = Scales.config.worker_threads)
+ Thread.abort_on_exception = true
+ size.times do
+ @pool << Thread.new do
+ Thread.current[:redis_blocking] = Scales::Storage::Sync.new_connection!
+ Thread.current[:redis_nonblocking] = Scales::Storage::Sync.new_connection!
+ loop do
+ begin
+ process_request!
+ rescue Exception => e
+ @status.logger.error(e.to_s)
+ raise e if Scales.env == "test"
+ end
+ end
+ end
+ end
+ sleep
+ end
+
# Loop the processing of requests
def work!
@status.start!
puts "Environment: #{Scales.env}".green
puts "Application: #{@type.name}".green
puts "Path: #{Dir.pwd}".green
+ puts "Log Path: #{@status.log_path}".green
+ puts "Threads: #{Scales.config.worker_threads}".green
puts "Redis: #{Scales.config.host}:#{Scales.config.port}/#{Scales.config.database}".green
begin
- loop{ process_request! }
+ start_pool!
rescue Interrupt => e
+ @pool.map(&:exit)
puts "Goodbye".green
end
end
end
\ No newline at end of file