require 'eventmachine' require 'rflow' class RFlow module Components module IRC # Assumes a single connection, meaning that it can choose to be # 'promiscuous' and not limit its sending to only those messages # derived from one of its own class Client < RFlow::Component input_port :to_server output_port :from_server attr_accessor :config attr_accessor :client_connection attr_accessor :server, :port DEFAULT_CONFIG = { 'server' => '127.0.0.1', 'port' => 6667, 'server_password' => nil, 'nickname' => 'rflow', 'username' => 'rflow', 'oper_user' => nil, 'oper_password' => nil, 'nickserv_password' => nil, 'reconnect_interval' => 2, 'promiscuous' => true, } def configure!(config) @config = DEFAULT_CONFIG.merge config @config['port'] = @config['port'].to_i @config['reconnect_interval'] = @config['reconnect_interval'].to_i # TODO: More config sanitization @server = @config['server'] @port = @config['port'].to_i end def run! @client_connection = EM.connect(@server, @port, Connection) do |conn| conn.client = self end end def process_message(input_port, input_port_key, connection, message) RFlow.logger.debug "Received a message" return unless message.data_type_name == 'RFlow::Message::Data::IRC::Message' if config['promiscuous'] RFlow.logger.debug "Received an IRC::Message message, sending to client connection" client_connection.send_irc_message message else RFlow.logger.debug "Received an IRC::Message message, determining if its mine" my_events = message.provenance.find_all {|processing_event| processing_event.component_instance_uuid == instance_uuid} RFlow.logger.debug "Found #{my_events.size} processing events from me" # Attempt to send the data to each context match. TODO: # check for correctness/ my_events.each do |processing_event| RFlow.logger.debug "Inspecting context #{client_connection.signature.to_s} == #{processing_event.context.to_s}" if client_connection.signature.to_s == processing_event.context.to_s RFlow.logger.debug "Found connection for #{processing_event.context}, sending message to associated client connection" client_connection.send_irc_message message end end end end def shutdown! end def cleanup! end class Connection < EventMachine::Connection include EventMachine::Protocols::LineText2 attr_accessor :client attr_reader :client_ip, :client_port, :server_ip, :server_port def post_init @client_port, @client_ip = Socket.unpack_sockaddr_in(get_peername) rescue ["?", "?.?.?.?"] @server_port, @server_ip = Socket.unpack_sockaddr_in(get_sockname) rescue ["?", "?.?.?.?"] RFlow.logger.debug "Connection from #{@client_ip}:#{@client_port} to #{@server_ip}:#{@server_port}" super end def connection_completed @reconnecting = false @connected = true RFlow.logger.info("Connected to IRC server #{client.config['server']}:#{client.config['port']}") command "PASS", [client.config['server_password']] unless client.config['server_password'].nil? command "NICK", [client.config['nickname']] command "USER", [client.config['username'], client.config['username'], client.config['username'], client.config['username']] command "NickServ IDENTIFY", [client.config['nickname'], client.config['nickserv_password']] unless client.config['nickserv_password'].nil? command "OPER", [client.config['oper_user'] || client.config['nickname'], client.config['oper_password']] unless client.config['oper_password'].nil? end def receive_line(line) RFlow.logger.debug("IRCClient#receive_line: #{line}") processing_event = RFlow::Message::ProcessingEvent.new(client.instance_uuid, Time.now.utc) prefix, cmd, params = IRC.parse_irc_line(line) # Now have an optional prefix, required cmd, and optional # param array case cmd when /PING/ command('PONG', params) else # create an IRC message here and send it along RFlow.logger.debug("Sending IRC message '#{line}', signature '#{signature.class}:#{signature}', '#{signature.to_s.class}:#{signature.to_s}'") irc_message = RFlow::Message.new('RFlow::Message::Data::IRC::Message') irc_message.data.prefix = prefix irc_message.data.command = cmd irc_message.data.parameters = params processing_event.context = signature.to_s processing_event.completed_at = Time.now.utc irc_message.provenance << processing_event client.from_server.send_message irc_message end end def unbind(reason=nil) if @connected or @reconnecting RFlow.logger.error("Disconnected from IRC server #{client.config['server']}:#{client.config['port']} due to '#{reason}', reconnecting ...") EM.add_timer(client.config['reconnect_interval']) do RFlow.logger.error "Attempting reconnect to IRC server #{client.config['server']}:#{client.config['port']}" reconnect(client.config['server'], client.config['port']) end @connected = false @reconnecting = true else error_message = "Unable to connect to IRC server #{client.config['server']}:#{client.config['port']} due to '#{reason}'" RFlow.logger.error error_message raise RuntimeError, error_message end end def send_irc_message(irc_message) RFlow.logger.debug "Sending an IRC message to #{client_ip}:#{client_port}" command irc_message.data.command, irc_message.data.parameters, irc_message.data.prefix end def send_irc_line(line) RFlow.logger.debug "Sending line '#{line}'" send_data "#{line}\r\n" end def command(cmd, args=[], prefix=nil) RFlow.logger.debug("command: '#{cmd}' with args ['#{args.join("', '")}'] and prefix '#{prefix}'") line = '' if prefix line << ":#{prefix} " end line << cmd.upcase last_arg = args.pop unless args.empty? line << " #{args.join ' '}" end if last_arg =~ /\s/ line << ' :' << last_arg else line << ' ' << last_arg end send_irc_line line end end end end end end