require 'eventmachine' require 'rflow' class RFlow # @!parse # # Fake classes in this tree to document the actual message types. # class Message # # Fake classes in this tree to document the actual message types. # class Data # # IRC messages. # module IRC # # RFlow format defined for IRC messages which can be emitted by # # {RFlow::Components::IRC::Client}. Of course the real class # # is {RFlow::Message} with type +RFlow::Message::Data::IRC::Message+. # class Message # # @!attribute timestamp # # The timestamp of the message. Accepts a +Time+ or +String+. # # @return [String] in XML schema format # # # # @!attribute prefix # # The message prefix. # # @return [String] # # # # @!attribute command # # The IRC command. # # @return [String] # # # # @!attribute parameters # # IRC parameters. # # @return [String] # # # # Just here to force Yard to create documentation. # # @!visibility private # def initialize; end # end # end # end # end module Components module IRC # Component that subscribes to an IRC channel and issues +RFlow::Message+s # of type {RFlow::Message::Data::IRC::Message} when it receives messages. # # Assumes a single connection, meaning that it can choose to be # 'promiscuous' and not limit its sending to only those messages # derived from one of its own # # Accepts config parameters: # - +server+ - server address # - +port+ - server port # - +server_password+ - server password, not sent if nil # - +nickname+ - IRC nickname # - +username+ - server username # - +oper_user+ - op username, defaults to nickname if nil # - +oper_password+ - op password # - +nickserv_password+ - nickserv password, not sent if nil # - +reconnect_interval+ - reconnect interval in seconds # - +promiscuous+ - true to listen to all messages; false to listen only to those with provenance linking to us class Client < RFlow::Component # @!attribute [r] to_server # Expects +RFlow::Message+s of type {RFlow::Message::Data::IRC::Message} # to be sent along the IRC channel. # # @return [RFlow::Component::InputPort] input_port :to_server # @!attribute [r] from_server # Outputs +RFlow::Message+s of type {RFlow::Message::Data::IRC::Message} # when receiving an IRC message. # # @return [RFlow::Component::OutputPort] output_port :from_server # @!attribute [r] log_port # Outputs +RFlow::Message+s of type {RFlow::Message:Data::Log} # when there are log messages to be emitted, as with reconnection problems. # # @return [RFlow::Component::OutputPort] output_port :log_port # @!visibility private attr_accessor :config, :client_connection, :server, :port # Default configuration. DEFAULT_CONFIG = { 'server' => '127.0.0.1', 'port' => 6667, 'server_password' => nil, 'nickname' => 'rflow', 'username' => 'rflow', 'oper_user' => nil, 'oper_password' => nil, 'nickserv_password' => nil, 'reconnect_interval' => 2, 'promiscuous' => true, } # RFlow-called method at startup. # @return [void] def configure!(config) @config = DEFAULT_CONFIG.merge config @config['port'] = @config['port'].to_i @config['reconnect_interval'] = @config['reconnect_interval'].to_i # TODO: More config sanitization @server = @config['server'] @port = @config['port'].to_i end # RFlow-called method at startup. # @return [void] def run! @client_connection = EM.connect(@server, @port, Connection) do |conn| conn.client = self RFlow.logger.debug { "#{name}: Connection from #{@client_ip}:#{@client_port} to #{@server_ip}:#{@server_port}" } end end # RFlow-called method at message arrival. # @return [void] def process_message(input_port, input_port_key, connection, message) RFlow.logger.debug { "#{name}: Received a message" } return unless message.data_type_name == 'RFlow::Message::Data::IRC::Message' if config['promiscuous'] RFlow.logger.debug { "#{name}: Received an IRC::Message message, sending to client connection" } client_connection.send_irc_message message else RFlow.logger.debug { "#{name}: Received an IRC::Message message, determining if it's mine" } my_events = message.provenance.find_all {|event| event.component_instance_uuid == uuid} RFlow.logger.debug { "#{name}: Found #{my_events.size} processing events from me" } # Attempt to send the data to each context match. # TODO: check for correctness my_events.each do |event| RFlow.logger.debug { "#{name}: Inspecting context #{client_connection.signature.to_s} == #{event.context.to_s}" } if client_connection.signature.to_s == event.context.to_s RFlow.logger.debug { "#{name}: Found connection for #{event.context}, sending message to associated client connection" } client_connection.send_irc_message message end end end end # @!visibility private class Connection < EventMachine::Connection include EventMachine::Protocols::LineText2 # @!visibility private attr_accessor :client # @!visibility private attr_reader :client_ip, :client_port, :server_ip, :server_port # @!visibility private def post_init @client_port, @client_ip = Socket.unpack_sockaddr_in(get_peername) rescue ["?", "?.?.?.?"] @server_port, @server_ip = Socket.unpack_sockaddr_in(get_sockname) rescue ["?", "?.?.?.?"] super end # @!visibility private def connection_completed @reconnecting = false @connected = true RFlow.logger.info "#{client.name}: Connected to IRC server #{client.config['server']}:#{client.config['port']}" command "PASS", [client.config['server_password']] unless client.config['server_password'].nil? command "NICK", [client.config['nickname']] command "USER", [client.config['username'], client.config['username'], client.config['username'], client.config['username']] command "NickServ IDENTIFY", [client.config['nickname'], client.config['nickserv_password']] unless client.config['nickserv_password'].nil? command "OPER", [client.config['oper_user'] || client.config['nickname'], client.config['oper_password']] unless client.config['oper_password'].nil? end # @!visibility private def receive_line(line) RFlow.logger.debug { "#{client.name}: IRCClient#receive_line: #{line}" } prefix, cmd, params = IRC.parse_irc_line(line) # Now have an optional prefix, required cmd, and optional param array case cmd when /PING/ command('PONG', params) else # create an IRC message here and send it along RFlow.logger.debug { "#{client.name}: Sending IRC message '#{line}', signature '#{signature.class}:#{signature}', '#{signature.to_s.class}:#{signature.to_s}'" } client.from_server.send_message(RFlow::Message.new('RFlow::Message::Data::IRC::Message').tap do |m| m.data.prefix = prefix m.data.command = cmd m.data.parameters = params m.provenance << RFlow::Message::ProcessingEvent.new(client.uuid, Time.now.utc).tap do |e| e.context = signature.to_s e.completed_at = Time.now.utc end end) end end # @!visibility private def unbind(reason = nil) if @connected || @reconnecting RFlow.logger.error "#{client.name}: Disconnected from IRC server #{client.config['server']}:#{client.config['port']} due to '#{reason}', reconnecting ..." client.log_port.send_message(RFlow::Message.new('RFlow::Message::Data::Log').tap do |m| m.data.timestamp = Integer(Time.now.to_f * 1000) # ms since epoch m.data.level = 'INFO' m.data.text = "IRC disconnected. Reconnecting in #{client.config['reconnect_interval']} seconds." end) EM.add_timer(client.config['reconnect_interval']) do RFlow.logger.info "#{client.name}: Attempting reconnect to IRC server #{client.config['server']}:#{client.config['port']}" reconnect(client.config['server'], client.config['port']) end @connected = false @reconnecting = true else raise RuntimeError, "Unable to connect to IRC server #{client.config['server']}:#{client.config['port']} due to '#{reason}'" end end # @!visibility private def send_irc_message(irc_message) RFlow.logger.debug { "#{client.name}: Sending an IRC message to #{client_ip}:#{client_port}" } command irc_message.data.command, irc_message.data.parameters, irc_message.data.prefix end # @!visibility private def send_irc_line(line) RFlow.logger.debug { "#{client.name}: Sending line '#{line}'" } send_data "#{line}\r\n" end # @!visibility private def command(cmd, args = [], prefix = nil) RFlow.logger.debug { "#{client.name}: command: '#{cmd}' with args ['#{args.join("', '")}'] and prefix '#{prefix}'" } line = '' if prefix line << ":#{prefix} " end line << cmd.upcase last_arg = args.pop line << " #{args.join ' '}" unless args.empty? if last_arg =~ /\s/; line << ' :' << last_arg else line << ' ' << last_arg end send_irc_line line end end end end end end