lib/sunscraper/standalone.rb in sunscraper-1.1.0.beta3 vs lib/sunscraper/standalone.rb in sunscraper-1.2.0.beta1
- old
+ new
@@ -1,117 +1,69 @@
require 'socket'
# @private
module Sunscraper
module Standalone
- @last_query_id = 0
+ @rpc_mutex = Mutex.new
+ @rpc_socket = nil
- @rpc_mutex = Mutex.new
- @rpc_waiters = {}
- @rpc_results = {}
- @rpc_thread = nil
-
- RPC_LOAD_HTML = 1
- RPC_LOAD_URL = 2
+ RPC_LOAD_URL = 1
+ RPC_LOAD_HTML = 2
RPC_WAIT = 3
RPC_FETCH = 4
- RPC_DISCARD = 5
class << self
- attr_reader :rpc_mutex, :rpc_waiters, :rpc_results
-
def create
- @rpc_mutex.synchronize do
- @last_query_id += 1
- @last_query_id
- end
+ connect_to_worker
end
- def load_html(query_id, html)
- perform_rpc query_id,
- request: RPC_LOAD_HTML,
- data: html
- end
-
- def load_url(query_id, url)
- perform_rpc query_id,
+ def load_url(socket, url)
+ perform_rpc socket,
request: RPC_LOAD_URL,
data: url
end
- def wait(query_id, timeout)
- perform_rpc query_id,
+ def load_html(socket, html, baseUrl)
+ html, baseUrl = [html, baseUrl].map(&:to_s)
+ perform_rpc socket,
+ request: RPC_LOAD_HTML,
+ data: [html.length, html, baseUrl.length, baseUrl].pack("Na*Na*")
+ end
+
+ def wait(socket, timeout)
+ result = perform_rpc socket,
request: RPC_WAIT,
data: [timeout].pack("N"),
want_result: true
+ code, = result.unpack("N")
+
+ code == 1 # true
end
- def fetch(query_id)
- perform_rpc query_id,
+ def fetch(socket)
+ perform_rpc socket,
request: RPC_FETCH,
want_result: true
end
- def discard(query_id)
- perform_rpc query_id,
- request: RPC_DISCARD
+ def finalize(socket)
+ socket.close
end
private
- def perform_rpc(query_id, options={})
+ def perform_rpc(socket, options={})
data = options[:data] || ""
- block = options[:want_result]
+ socket.write([options[:request], data.length, data].pack("NNa*"))
- @rpc_mutex.synchronize do
- if @rpc_thread.nil?
- @rpc_thread = Standalone::Thread.new(::Thread.current)
-
- # Some fucko decided not to put any semaphores in Ruby,
- # _and_ restrict Mutexes to be unlocked only from the thread
- # which has locked them.
- #
- # Please, kill yourself if you're reading this.
- ::Thread.stop
- end
-
- @rpc_thread.perform(query_id, options[:request], data)
-
- if block
- @rpc_waiters[query_id] = Thread.current
- end
+ if options[:want_result]
+ data_length, = socket.read(4).unpack("N")
+ socket.read(data_length)
end
-
- if block
- Thread.stop
- @rpc_results[query_id]
- end
- ensure
- if block
- @rpc_waiters.delete query_id
- @rpc_results.delete query_id
- end
end
- end
- class Thread < ::Thread
- def initialize(creator)
- @creator = creator
-
- super do
- @parent = Sunscraper::Standalone
- work
- end
- end
-
- def perform(query_id, request, data)
- @socket.write([query_id, request, data.length, data].pack("NNNa*"))
- end
-
- private
-
- def work
+ def spawn_worker
if ::Sunscraper.os_x?
# Fuck you, OS X.
suffix = ".app/Contents/MacOS/sunscraper"
else
suffix = RbConfig::CONFIG["EXEEXT"]
@@ -119,50 +71,44 @@
executable = File.join(Gem.loaded_specs['sunscraper'].full_gem_path,
'ext', 'standalone', "sunscraper#{suffix}")
server_path = "/tmp/sunscraper.#{Process.pid}.sock"
- server = UNIXServer.new(server_path)
+ File.unlink server_path if File.exists? server_path
if Kernel.respond_to? :spawn
- pid = Kernel.spawn "#{executable} #{server_path}"
+ @rpc_pid = Kernel.spawn "#{executable} #{server_path}"
else
# rbx does not have Kernel.spawn (yet). Sigh...
- pid = fork { exec executable, server_path }
+ @rpc_pid = fork { exec executable, server_path }
end
- Process.detach pid
+ # Sigh again. Probably no other way.
+ loop do
+ if File.exists? server_path
+ @rpc_socket = server_path
+ break
+ elsif Process.wait(@rpc_pid, Process::WNOHANG)
+ raise RuntimeError, "Cannot start Sunscraper process"
+ end
- @socket = server.accept
+ sleep 0.1
+ end
- server.close
- FileUtils.rm server_path
+ Process.detach @rpc_pid
- # See above.
- @creator.wakeup
+ at_exit do
+ Process.kill "KILL", @rpc_pid
+ File.unlink @rpc_socket
+ end
+ end
- loop do
- header = @socket.read(4 * 3)
- query_id, request, data_length = header.unpack("NNN")
- data = @socket.read(data_length) if data_length > 0
-
- @parent.rpc_mutex.synchronize do
- if !@parent.rpc_waiters.include?(query_id)
- $stderr.puts "Sunscraper/standalone: no waiter for #{query_id}"
- else
- @parent.rpc_results[query_id] = data
- @parent.rpc_waiters[query_id].wakeup
- end
- end
+ def connect_to_worker
+ @rpc_mutex.synchronize do
+ spawn_worker if @rpc_socket.nil?
end
- rescue Exception => e
- $stderr.puts "Sunscraper error: #{e.class}: #{e.message}"
- e.backtrace.each do |line|
- $stderr.puts " #{line}"
- end
- ensure
- @socket.close
- Process.kill pid
+
+ UNIXSocket.new(@rpc_socket)
end
end
end
end
\ No newline at end of file