lib/sunscraper/standalone.rb in sunscraper-1.1.0.beta3 vs lib/sunscraper/standalone.rb in sunscraper-1.2.0.beta1

- old
+ new

@@ -1,117 +1,69 @@ require 'socket' # @private module Sunscraper module Standalone - @last_query_id = 0 + @rpc_mutex = Mutex.new + @rpc_socket = nil - @rpc_mutex = Mutex.new - @rpc_waiters = {} - @rpc_results = {} - @rpc_thread = nil - - RPC_LOAD_HTML = 1 - RPC_LOAD_URL = 2 + RPC_LOAD_URL = 1 + RPC_LOAD_HTML = 2 RPC_WAIT = 3 RPC_FETCH = 4 - RPC_DISCARD = 5 class << self - attr_reader :rpc_mutex, :rpc_waiters, :rpc_results - def create - @rpc_mutex.synchronize do - @last_query_id += 1 - @last_query_id - end + connect_to_worker end - def load_html(query_id, html) - perform_rpc query_id, - request: RPC_LOAD_HTML, - data: html - end - - def load_url(query_id, url) - perform_rpc query_id, + def load_url(socket, url) + perform_rpc socket, request: RPC_LOAD_URL, data: url end - def wait(query_id, timeout) - perform_rpc query_id, + def load_html(socket, html, baseUrl) + html, baseUrl = [html, baseUrl].map(&:to_s) + perform_rpc socket, + request: RPC_LOAD_HTML, + data: [html.length, html, baseUrl.length, baseUrl].pack("Na*Na*") + end + + def wait(socket, timeout) + result = perform_rpc socket, request: RPC_WAIT, data: [timeout].pack("N"), want_result: true + code, = result.unpack("N") + + code == 1 # true end - def fetch(query_id) - perform_rpc query_id, + def fetch(socket) + perform_rpc socket, request: RPC_FETCH, want_result: true end - def discard(query_id) - perform_rpc query_id, - request: RPC_DISCARD + def finalize(socket) + socket.close end private - def perform_rpc(query_id, options={}) + def perform_rpc(socket, options={}) data = options[:data] || "" - block = options[:want_result] + socket.write([options[:request], data.length, data].pack("NNa*")) - @rpc_mutex.synchronize do - if @rpc_thread.nil? - @rpc_thread = Standalone::Thread.new(::Thread.current) - - # Some fucko decided not to put any semaphores in Ruby, - # _and_ restrict Mutexes to be unlocked only from the thread - # which has locked them. - # - # Please, kill yourself if you're reading this. - ::Thread.stop - end - - @rpc_thread.perform(query_id, options[:request], data) - - if block - @rpc_waiters[query_id] = Thread.current - end + if options[:want_result] + data_length, = socket.read(4).unpack("N") + socket.read(data_length) end - - if block - Thread.stop - @rpc_results[query_id] - end - ensure - if block - @rpc_waiters.delete query_id - @rpc_results.delete query_id - end end - end - class Thread < ::Thread - def initialize(creator) - @creator = creator - - super do - @parent = Sunscraper::Standalone - work - end - end - - def perform(query_id, request, data) - @socket.write([query_id, request, data.length, data].pack("NNNa*")) - end - - private - - def work + def spawn_worker if ::Sunscraper.os_x? # Fuck you, OS X. suffix = ".app/Contents/MacOS/sunscraper" else suffix = RbConfig::CONFIG["EXEEXT"] @@ -119,50 +71,44 @@ executable = File.join(Gem.loaded_specs['sunscraper'].full_gem_path, 'ext', 'standalone', "sunscraper#{suffix}") server_path = "/tmp/sunscraper.#{Process.pid}.sock" - server = UNIXServer.new(server_path) + File.unlink server_path if File.exists? server_path if Kernel.respond_to? :spawn - pid = Kernel.spawn "#{executable} #{server_path}" + @rpc_pid = Kernel.spawn "#{executable} #{server_path}" else # rbx does not have Kernel.spawn (yet). Sigh... - pid = fork { exec executable, server_path } + @rpc_pid = fork { exec executable, server_path } end - Process.detach pid + # Sigh again. Probably no other way. + loop do + if File.exists? server_path + @rpc_socket = server_path + break + elsif Process.wait(@rpc_pid, Process::WNOHANG) + raise RuntimeError, "Cannot start Sunscraper process" + end - @socket = server.accept + sleep 0.1 + end - server.close - FileUtils.rm server_path + Process.detach @rpc_pid - # See above. - @creator.wakeup + at_exit do + Process.kill "KILL", @rpc_pid + File.unlink @rpc_socket + end + end - loop do - header = @socket.read(4 * 3) - query_id, request, data_length = header.unpack("NNN") - data = @socket.read(data_length) if data_length > 0 - - @parent.rpc_mutex.synchronize do - if !@parent.rpc_waiters.include?(query_id) - $stderr.puts "Sunscraper/standalone: no waiter for #{query_id}" - else - @parent.rpc_results[query_id] = data - @parent.rpc_waiters[query_id].wakeup - end - end + def connect_to_worker + @rpc_mutex.synchronize do + spawn_worker if @rpc_socket.nil? end - rescue Exception => e - $stderr.puts "Sunscraper error: #{e.class}: #{e.message}" - e.backtrace.each do |line| - $stderr.puts " #{line}" - end - ensure - @socket.close - Process.kill pid + + UNIXSocket.new(@rpc_socket) end end end end \ No newline at end of file