# -*- coding: utf-8 -*- $LOAD_PATH.unshift File.expand_path('../../../../../trema/ruby', __FILE__) require 'fileutils' require 'isono' require 'ipaddress' require 'trema' module Dcmgr module NodeModules class ServiceOpenFlow < Isono::NodeModules::Base include Dcmgr::Logger include Dcmgr::Helpers::NicHelper attr_reader :networks config_section do desc "configuration file for ofc..." end initialize_hook do @networks = {} # Trema hack... $verbose = true Dcmgr.run_initializers('sequel') EH = CustomEventHandler.new EH.set_as_handler Trema::Util::cleanup_current_session rule = { :port_status => "OpenFlowController", :packet_in => "OpenFlowController", :state_notify => "OpenFlowController", :vendor => "OpenFlowController" } unix_socket = "#{node.manifest.config.ovs_run_dir}/#{node.manifest.config.bridge_novlan}.controller" FileUtils.remove_file(unix_socket, true) @switch_manager = Trema::SwitchManager.new( rule, nil, unix_socket ) myinstance.worker_thread.pass { @switch_manager.run! myinstance.openflow_controller.init_trema myinstance.openflow_controller.run_immediate! } event = Isono::NodeModules::EventChannel.new(node) event.subscribe('hva/instance_started', '#') do |args| myinstance.worker_thread.pass { logger.info("refresh on instance_started: #{args.inspect}") inst_id = args[0] myinstance.add_openflow_by_instance_id(inst_id) } end event.subscribe('hva/instance_terminated', '#') do |args| myinstance.worker_thread.pass { logger.info("refresh on instance_terminated: #{args.inspect}") inst_id = args[0] myinstance.delete_openflow_by_instance_id(inst_id) } end event.subscribe('hva/openflow_updated', '#') do |args| myinstance.worker_thread.pass { logger.info("refresh on openflow_updated: #{args.inspect}") openflow_group_id = args[0] myinstance.refresh_openflow_by_joined_openflow_group_id(openflow_group_id) } end end terminate_hook do myinstance.worker_thread.pass { myinstance.openflow_controller.stop_immediate! myinstance.openflow_controller.shutdown! } Trema::Util::cleanup_current_session end def add_openflow_by_instance_id(inst_id) port = get_port_from_instance_id inst_id add_instance_now port[1] if not port.nil? end def delete_openflow_by_instance_id(inst_id) port = get_port_from_instance_id inst_id delete_instance_now port[1] if not port.nil? end def refresh_openflow_by_joined_openflow_group_id(openflow_group_id) raise "UnknownOpenflowGroupID" if openflow_group_id.nil? logger.info "Refresh Openflow..." end # # # def add_eth port worker_thread.pass { add_eth_now port } end def add_eth_now port logger.info "adding eth #{port.port_info.name}." return if port.object_id != openflow_controller.ports[port.port_info.number].object_id return if not port.is_active logger.info "port: #{port.port_info.number}" logger.info "mac: #{port.port_info.hw_addr.to_s}" logger.info "config: #{port.port_info.config}" logger.info "state: #{port.port_info.state}" logger.info "curr: #{port.port_info.curr}" logger.info "advertised: #{port.port_info.advertised}" logger.info "supported: #{port.port_info.supported}" logger.info "peer: #{port.port_info.peer}" port_number = port.port_info.number networks_map = get_physical_networks raise "A single (and only a single) physical network must be registered. (With ipv4_gw set)" unless networks_map.one? network = get_network_from_map networks_map[0] port.lock.synchronize { return if not port.is_active network.add_port port_number, true openflow_controller.install_eth port openflow_controller.update_network network port.network = network openflow_controller.ovs_ofctl.add_flows_from_list port.queued_flows port.queued_flows.clear } end def add_instance port worker_thread.pass { add_instance_now port } end def add_instance_now port logger.info "adding instance #{port.port_info.name}." return if port.object_id != openflow_controller.ports[port.port_info.number].object_id return if port.has_instance or not port.is_active inst_map = rpc.request('hva-collector', 'get_instance_of_nic', port.port_info.name) raise ArgumentError, "Unknown Nic: #{port.port_info.name}" if inst_map.nil? vif_map = inst_map[:vif].detect { |vif| vif[:vif_id] == port.port_info.name } raise ArgumentError, "Unknown Nic: #{port.port_info.name}" if vif_map.nil? ip_map = vif_map[:ipv4] raise ArgumentError, "Unknown Nic: #{port.port_info.name}" if ip_map.nil? # logger.info "inst_map: #{inst_map.inspect}" logger.debug "vif_map: #{vif_map.inspect}" ip_lease = ip_map[:address] mac_lease = clean_mac(vif_map[:mac_addr]) port.ip = ip_lease port.mac = mac_lease port.has_instance = true logger.info "port: #{port.port_info.number}" logger.info "mac: #{port.port_info.hw_addr.to_s} <=> #{mac_lease}" logger.info "ip: #{ip_lease}" logger.info "config: #{port.port_info.config}" logger.info "state: #{port.port_info.state}" logger.info "curr: #{port.port_info.curr}" logger.info "advertised: #{port.port_info.advertised}" logger.info "supported: #{port.port_info.supported}" logger.info "peer: #{port.port_info.peer}" port_number = port.port_info.number network = get_network_from_map ip_map[:network] if not network.virtual openflow_controller.install_route port, mac_lease, ip_lease openflow_controller.install_arp_antispoof port, mac_lease, ip_lease openflow_controller.install_static_d_transport 17, port, mac_lease, ip_lease, ip_map[:network][:dns_server], 53 if not ip_map[:network][:dns_server].nil? if not ip_map[:network][:dns_server].nil? openflow_controller.install_static_d_transport 17, port, mac_lease, ip_lease, ip_map[:network][:dhcp_server], 67 openflow_controller.install_static_d_transport 17, port, mac_lease, ip_lease, ip_map[:network][:dhcp_server], 68 else openflow_controller.install_static_d_transport 17, port, mac_lease, ip_lease, "0.0.0.0/0", 67 openflow_controller.install_static_d_transport 17, port, mac_lease, ip_lease, "0.0.0.0/0", 68 end add_security_group port, inst_map[:uuid], vif_map # Testing guest -> * openflow_controller.install_local_icmp port, mac_lease, ip_lease openflow_controller.install_local_transport 6, port, mac_lease, ip_lease openflow_controller.install_local_transport 17, port, mac_lease, ip_lease else openflow_controller.install_virtual_route network, port, mac_lease, ip_lease end port.lock.synchronize { return if not port.is_active network.add_port port_number, true openflow_controller.update_network network port.network = network openflow_controller.ovs_ofctl.add_flows_from_list port.queued_flows port.queued_flows.clear } end # Always call in the worker thread. def delete_instance port worker_thread.pass { delete_instance_now port } end def delete_instance_now port logger.info "deleting instance #{port.port_info.name}." return if not port.has_instance ip_lease = port.ip mac_lease = port.mac port.ip = nil port.mac = nil port.has_instance = false logger.info "port: #{port.port_info.number}" logger.info "mac: #{port.port_info.hw_addr.to_s} <=> #{mac_lease}" logger.info "ip: #{ip_lease}" logger.info "config: #{port.port_info.config}" logger.info "state: #{port.port_info.state}" logger.info "curr: #{port.port_info.curr}" logger.info "advertised: #{port.port_info.advertised}" logger.info "supported: #{port.port_info.supported}" logger.info "peer: #{port.port_info.peer}" networks.each { |network| network.remove_port port.port_info.number } end def add_tunnel port worker_thread.pass { add_tunnel_now port } end def add_tunnel_now port logger.info "Got tunnel port: name:#{port.port_info.name}." return if port.object_id != openflow_controller.ports[port.port_info.number].object_id return if not port.is_active logger.info "port: #{port.port_info.number}" logger.info "mac: #{port.port_info.hw_addr.to_s}" logger.info "config: #{port.port_info.config}" logger.info "state: #{port.port_info.state}" logger.info "curr: #{port.port_info.curr}" logger.info "advertised: #{port.port_info.advertised}" logger.info "supported: #{port.port_info.supported}" logger.info "peer: #{port.port_info.peer}" # Note that vnet_id may be different from the actual GRE # tunnel id used. vnet_id = port.port_info.name[/^gre-[a-z]*-([0-9]*)$/, 1].to_i raise "GRE tunnel interface name must match 'gre-[a-z]*-[0-9]*'." if vnet_id.nil? or vnet_id == 0 network = get_network_from_id vnet_id port.lock.synchronize { return if not port.is_active network.add_port port.port_info.number, false openflow_controller.update_network network port.network = network openflow_controller.install_gre_tunnel network.id, port openflow_controller.ovs_ofctl.add_flows_from_list port.queued_flows port.queued_flows.clear } end def delete_tunnel port worker_thread.pass { delete_tunnel_now port } end def delete_tunnel_now port return if not port.has_instance end # def rebuild_networks # worker_thread.pass { rebuild_networks_now } # end # def rebuild_networks_now # networks_map = rpc.request('hva-collector', 'get_networks') # raise "Failed to retrieve networks." if networks_map.nil? # # networks.clear # networks_map.each { |network| # } # end def get_physical_networks networks_map = rpc.request('hva-collector', 'get_networks') raise "Failed to retrieve networks." if networks_map.nil? networks_map.select { |network| not network[:ipv4_gw].nil? } end def get_network_from_id network_id return networks[network_id] if networks.has_key? network_id network_map = rpc.request('hva-collector', 'get_network', network_id) raise "Failed to retrieve network #{network_id}." if network_map.nil? logger.debug "get network from: id:#{network_id} map:#{network_map.inspect}." create_network network_map end def get_network_from_map network_map if networks.has_key? network_map[:id] networks[network_map[:id]] else create_network network_map end end def create_network network_map throw "Network map is invalid: #{network_map.inspect}." if network_map.nil? or network_map[:id] <= 0 network_id = network_map[:id] throw "Network already created" if networks.has_key? network_id and not networks[network_id].nil? if not network_map[:ipv4_gw].nil? logger.info "Creating physical network: id:#{network_id} link_interface:#{network_map[:link_interface]}." # Do more here... network = networks[network_id] = OpenFlowNetwork.new(network_id) network.add_port OpenFlowController::OFPP_LOCAL, true openflow_controller.install_physical_network network else logger.info "Creating virtual network: id:#{network_id} link_interface:#{network_map[:link_interface]}." raise "No valid IPv4 network defined." if network_map[:ipv4_network].nil? or not network_map[:ipv4_network] =~ /\.0$/ dhcp_ip = IPAddr.new(network_map[:ipv4_network]) | IPAddr.new("0.0.0.1") network = networks[network_id] = OpenFlowNetwork.new(network_id) network.dhcp_hw = openflow_controller.local_hw network.dhcp_ip = dhcp_ip network.ipv4_network = IPAddr.new(network_map[:ipv4_network]) network.prefix = network_map[:prefix] network.virtual = true openflow_controller.install_virtual_network network end network end # # Cut-n-paste from ServiceNetfilter # def add_security_group port, inst_id, vif_map ng_maps = rpc.request('hva-collector', 'get_security_groups_of_instance', inst_id) rules = ng_maps.map { |ng_map| ng_map[:rules].map { |rule| rule[:permission] } }.flatten # security group build_rule(rules).each do |rule| case rule[:ip_protocol] when 'tcp', 'udp' if rule[:ip_fport] == rule[:ip_tport] openflow_controller.install_static_transport 6, port, port.mac, port.ip, rule[:ip_fport], rule[:ip_source] elsif rule[:ip_fport] <= 1 and rule[:ip_tport] >= 65535 openflow_controller.install_static_transport 6, port, port.mac, port.ip, 0, rule[:ip_source] else logger.info "add_security_group: No support for port ranges yet: ip_source:#{rule[:ip_source]} ports:#{rule[:ip_fport]}-#{rule[:ip_tport]}" end when 'icmp' # icmp # This extension can be used if `--protocol icmp' is specified. It provides the following option: # [!] --icmp-type {type[/code]|typename} # This allows specification of the ICMP type, which can be a numeric ICMP type, type/code pair, or one of the ICMP type names shown by the command # iptables -p icmp -h openflow_controller.install_static_icmp rule[:icmp_type], rule[:icmp_code], port, port.mac, port.ip, rule[:ip_source] end end end def build_rule(rules = []) rule_maps = [] rules.each do |rule| rule = rule.strip.gsub(/[\s\t]+/, '') from_group = false ipv4s = [] # ex. # "tcp:22,22,ip4:0.0.0.0" # "udp:53,53,ip4:0.0.0.0" # "icmp:-1,-1,ip4:0.0.0.0" # 1st phase # ip_tport : tcp,udp? 1 - 16bit, icmp: -1 # id_port has been separeted in first phase. from_pair, ip_tport, source_pair = rule.split(',') next if from_pair.nil? next if ip_tport.nil? next if source_pair.nil? # 2nd phase # ip_protocol : [ tcp | udp | icmp ] # ip_fport : tcp,udp? 1 - 16bit, icmp: -1 ip_protocol, ip_fport = from_pair.split(':') # protocol : [ ip4 | ip6 | #{account_id} ] # ip_source : ip4? xxx.xxx.xxx.xxx./[0-32], ip6? (not yet supprted), #{netfilter_group_id} protocol, ip_source = source_pair.split(':') begin s = StringScanner.new(protocol) until s.eos? case when s.scan(/ip6/) # TODO#FUTURE: support IPv6 address format next when s.scan(/ip4/) # IPAddress doesn't support prefix '0'. ip_addr, prefix = ip_source.split('/', 2) if prefix.to_i == 0 ip_source = ip_addr end when s.scan(/a-\w{8}/) from_group = true inst_maps = rpc.request('hva-collector', 'get_instances_of_account_netfilter_group', protocol, ip_source) inst_maps.each { |inst_map| ipv4s << inst_map[:ips] } else raise "unexpected protocol '#{s.peep(20)}'" end end rescue Exception => e p e next end begin if from_group == false #p "from_group:(#{from_group}) ip_source -> #{ip_source}" ip = IPAddress(ip_source) ip_source = case ip.u32 when 0 "#{ip.address}/0" else "#{ip.address}/#{ip.prefix}" end else ipv4s = ipv4s.flatten.uniq end rescue Exception => e p e next end case ip_protocol when 'tcp', 'udp' ip_fport = ip_fport.to_i ip_tport = ip_tport.to_i # validate port range [ ip_fport, ip_tport ].each do |port| next unless port >= 1 && port <= 65535 end if ip_fport <= ip_tport if from_group == false rule_maps << { :ip_protocol => ip_protocol, :ip_fport => ip_fport, :ip_tport => ip_tport, :protocol => protocol, :ip_source => ip_source, } else ipv4s.each { |ip| rule_maps << { :ip_protocol => ip_protocol, :ip_fport => ip_fport, :ip_tport => ip_tport, :protocol => 'ip4', :ip_source => ip, } } end end when 'icmp' # via http://docs.amazonwebservices.com/AWSEC2/latest/CommandLineReference/ # # For the ICMP protocol, the ICMP type and code must be specified. # This must be specified in the format type:code where both are integers. # Type, code, or both can be specified as -1, which is a wildcard. icmp_type = ip_fport.to_i icmp_code = ip_tport.to_i # icmp_type case icmp_type when -1 when 0, 3, 5, 8, 11, 12, 13, 14, 15, 16, 17, 18 else next end # icmp_code case icmp_code when -1 when 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 # when icmp_type equals -1 icmp_code must equal -1. next if icmp_type == -1 else next end if from_group == false rule_maps << { :ip_protocol => ip_protocol, :icmp_type => ip_tport.to_i, # ip_tport.to_i, # -1 or 0, 3, 5, 8, 11, 12, 13, 14, 15, 16, 17, 18 :icmp_code => ip_fport.to_i, # ip_fport.to_i, # -1 or 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 :protocol => protocol, :ip_source => ip_source, } else ipv4s.each { |ip| rule_maps << { :ip_protocol => ip_protocol, :icmp_type => ip_tport.to_i, # ip_tport.to_i, # -1 or 0, 3, 5, 8, 11, 12, 13, 14, 15, 16, 17, 18 :icmp_code => ip_fport.to_i, # ip_fport.to_i, # -1 or 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15 :protocol => 'ip4', :ip_source => ip, } } end end end rule_maps end def openflow_controller @openflow_controller ||= OpenFlowController.new(self) end def rpc @rpc ||= Isono::NodeModules::RpcChannel.new(@node) end def worker_thread @worker_thread ||= Isono::ThreadPool.new(1, 'Openflow') end def get_port_from_instance_id inst_id raise ArgumentError, "Unknown Instance ID: #{inst_id}" if inst_id.nil? inst_map = rpc.request('hva-collector', 'get_instance', inst_id) raise ArgumentError, "Unknown Instance ID: #{inst_id}" if inst_map.nil? port = openflow_controller.ports.detect { |f| f[1].port_info.name == inst_map[:instance_nics].first[:uuid] } end end # # Event handler glue for Trema. # module EventFdConnection def initialize connection @connection = connection end def notify_readable @connection.notify_readable end def notify_writable @connection.notify_writable end def unbind # EM.next_tick do # # socket is detached from the eventloop, but still open # data = @io.read # end end end class CustomEventHandler < Trema::EventHandler include Dcmgr::Logger @@fd_set = [] def init_event_handler logger.debug "Called CustomEventHandler init_event_handler." end def finalize_event_handler logger.debug "Called CustomEventHandler finalize_event_handler." end def stop_event_handler logger.debug "Called CustomEventHandler stop_event_handler." EM.stop end def run_event_handler_once logger.debug "Called CustomEventHandler run_event_handler_once." end def set_fd_handler fd, connection logger.debug "Called CustomEventHandler immediate set_fd_handler, #{fd}." raise "CustomEventHandler event handler already registered." if not @@fd_set[ fd ].nil? @@fd_set[ fd ] = EM.watch fd, EventFdConnection, connection end def delete_fd_handler fd logger.debug "Called CustomEventHandler delete_fd_handler, #{fd}." connection = @@fd_set[ fd ] @@fd_set[ fd ] = nil connection.detach end def set_readable fd, state # logger.debug "Called CustomEventHandler set_readable, #{fd}." if fd < 0 logger.error "Called set_readable with negative fd: #{fd}, #{state}." return end @@fd_set[ fd ].notify_readable = state end def set_writable fd, state # logger.debug "Called CustomEventHandler set_writable, #{fd}." if fd < 0 logger.error "Called set_writable with negative fd: #{fd}, #{state}." return end @@fd_set[ fd ].notify_writable = state end def readable fd logger.debug "Called CustomEventHandler readable, #{fd}." end def writable fd logger.debug "Called CustomEventHandler writable, #{fd}." end # Timer event handlers: def init_timer logger.debug "Called CustomEventHandler init_timer." end def finalize_timer logger.debug "Called CustomEventHandler finalize_timer." end def add_timer_event_callback timer logger.debug "Called CustomEventHandler: first:#{timer.inspect} interval:#{timer.interval} expiration:#{timer.expiration}." timer.handle.cancel if not timer.handle.nil? # timer.handle = EventMachine::Timer.new(timer.interval) do timer.handle = EventMachine::Timer.new(timer.expiration) do logger.debug "Calling timer event: first:#{timer.inspect} interval:#{timer.interval} expiration:#{timer.expiration}." timer.call end end def add_periodic_event_callback interval, timer logger.debug "Called CustomEventHandler: timer:#{timer.inspect} interval:#{interval}." timer.handle.cancel if not timer.handle.nil? timer.handle = EventMachine::PeriodicTimer.new(interval) do # logger.debug "Calling periodic timer event: timer:#{timer.inspect} interval:#{interval}." timer.call end end def delete_timer_event timer logger.debug "Called CustomEventHandler: timer:#{timer.inspect}." timer.handle.cancel if not timer.handle.nil? timer.handle = nil end def execute_timer_events logger.debug "Called CustomEventHandler execute_timer_events." end end end end