lib/rabbit/twitter.rb in rabbit-1.0.8 vs lib/rabbit/twitter.rb in rabbit-1.0.9

- old
+ new

@@ -1,7 +1,8 @@ -require 'shellwords' -require 'pathname' +require "shellwords" +require "pathname" +require "gio2" module Rabbit class Twitter CONSUMER_KEY = "wT9WSC0afRw94fxUw0iIKw" CONSUMER_SECRET = "mwY35vfQfmWde9lZbyNNB15QzCq3k2VwGj3X1IAkQ8" @@ -47,12 +48,10 @@ def start(*filters, &block) register_listener(&block) if block_given? setup if @oauth_parameters.nil? return if @oauth_parameters.nil? - require 'socket' - require 'openssl' begin require 'twitter/json_stream' rescue LoadError @logger.warn(_("twitter-stream gem is missing. " \ "Install it by 'gem install twitter-stream'.")) @@ -60,19 +59,17 @@ end stream_options = { :oauth => @oauth_parameters, :user_agent => "Rabitter #{Rabbit::VERSION}", - :host => "stream.twitter.com", - :port => 443, - :path => "/1/statuses/filter.json", :method => "POST", - :ssl => true, :filters => filters, } - @stream = ::Twitter::JSONStream.new(:signature, stream_options) - @connection = GLibConnection.new(@logger, @stream, stream_options) + @stream = ::Twitter::JSONStream.allocate + @stream.send(:initialize, stream_options) + @stream.send(:reset_state) + @connection = GLibConnection.new(@logger, @stream) @stream.each_item do |item| status = JSON.parse(item) @listeners.each do |listener| listener.call(status) @@ -118,98 +115,88 @@ config_file.puts(YAML.dump(oauth_parameters)) end end class GLibConnection - def initialize(logger, handler, options) + def initialize(logger, handler) @logger = logger - @options = options @handler = handler - @tcp_socket = nil - @ssl_socket = nil - @channel = nil + @options = @handler.instance_variable_get("@options") + @client = nil + @connection = nil + @socket = nil @source_ids = [] end def connect close - @tcp_socket = TCPSocket.new(@options[:host], "https") - @ssl_socket = OpenSSL::SSL::SSLSocket.new(@tcp_socket) - @ssl_socket.sync_close = true - @ssl_socket.connect - if GLib.const_defined?(:IOChannelWin32Socket) - @channel = GLib::IOChannelWin32Socket.new(@tcp_socket.fileno) - else - @channel = GLib::IOChannel.new(@tcp_socket.fileno) - end - begin - @channel.flags = GLib::IOChannel::FLAG_NONBLOCK - rescue GLib::IOChannelError - @logger.warn("[twitter][read][error] " + - "failed to set non-blocking mode: " + - "#{$!.message}(#{$!.class})") - end - reader_id = @channel.add_watch(GLib::IOChannel::IN) do |io, condition| + @client = Gio::SocketClient.new + @client.tls = @options[:ssl] + @connection = @client.connect_to_host(@options[:host], @options[:port]) + @socket = @connection.socket + @socket.blocking = false + @input = @connection.input_stream + @output = @connection.output_stream + + reader_source = @socket.create_source(:in) do |socket, condition| @logger.debug("[twitter][read][start]") - data = @ssl_socket.read(8192) + data = @input.read(8192) || "" @logger.debug("[twitter][read][done] #{data.bytesize}") if data.empty? - @source_ids.reject! {|id| id == reader_id} + @source_ids.reject! {|id| id == reader_source.id} @logger.debug("[twitter][read][eof]") false else @handler.receive_data(data) true end end - @source_ids << reader_id - error_id = @channel.add_watch(GLib::IOChannel::ERR) do |io, condition| + @source_ids << reader_source.attach + + error_source = @socket.create_source(:err) do |socket, condition| @handler.receive_error(condition) true end - @source_ids << error_id + @source_ids << error_source.attach + @handler.extend(GLibAdapter) @handler.connection = self @handler.connection_completed end def send_data(data) rest = data.bytesize - writer_id = @channel.add_watch(GLib::IOChannel::OUT) do |io, condition| + writer_source = @socket.create_source(:out) do |socket, condition| if rest.zero? @logger.debug("[twitter][flush][start]") - @ssl_socket.flush + @output.flush @logger.debug("[twitter][flush][done]") - @source_ids.reject! {|id| id == writer_id} + @source_ids.reject! {|id| id == writer_source.id} false else @logger.debug("[twitter][write][start]") - written_size = @ssl_socket.write(data) - if written_size.is_a?(Numeric) - @logger.debug("[twitter][write][done] #{written_size}") - rest -= written_size - data[0, written_size] = "" - else - # for Ruby/GLib2 < 0.90.9 - rest = 0 - data.replace("") - end + written_size = @output.write(data) + @logger.debug("[twitter][write][done] #{written_size}") + rest -= written_size + data[0, written_size] = "" true end end - @source_ids << writer_id + @source_ids << writer_source.attach end def close - return if @ssl_socket.nil? + return if @client.nil? @source_ids.reject! do |id| GLib::Source.remove(id) true end - @channel = nil - @ssl_socket.close - @tcp_socket = nil - @ssl_socket = nil + @socket.close + @socket = nil + @input = nil + @output = nil + @connection = nil + @client = nil end def reconnect(options={}) close after = options[:after] || 0