require 'shellwords' module Wukong module Storm # This module defines several methods that generate command lines # that interact with Storm using the `storm` program. module StormInvocation # # == Topology Structure & Properties # # Return the name of the Storm topology from the given settings # and/or commandline args. # # @return [String] the name of the Storm topology def topology_name settings[:name] || dataflow end # Name of the Wukong dataflow to be launched. # # Obtained from either the first non-option argument passed to # `wu-storm` or the `--run` option. # # @return [String] def dataflow_name args.first || settings[:run] end # The input URI for the topology. Will determine the Trident # spout that will be used. # # @return [URI] def input_uri @input_uri ||= URI.parse(settings[:input]) end # Does this topology read from Kafka? # # @return [true, false] def kafka_input? ! blob_input? end # Does this topology read from a filesystem? # # @return [true, false] def blob_input? s3_input? || file_input? end # Does this topology read from Amazon's S3? # # @return [true, false] def s3_input? input_uri.scheme == 's3' end # Does this topology read from a local filesystem? # # @return [true, false] def file_input? input_uri.scheme == 'file' end # The input URI for the topology. Will determine the Trident # state that will be used. # # @return [URI] def output_uri @output_uri ||= URI.parse(settings[:output]) end # Does this topology write to Kafka? # # @return [true, false] def kafka_output? true # only option right now end # # == Interaction w/Storm == # # Generates a commandline that can be used to launch a new Storm # topology based on the given dataflow, input and output topics, # and settings. # # @return [String] def storm_launch_commandline [ storm_runner, "jar #{wukong_topology_submitter_jar}", fully_qualified_class_name, native_storm_options, storm_topology_options, ].flatten.compact.join("\ \t\\\n ") end # Generates a commandline that can be used to kill a running # Storm topology based on the given topology name. # # @return [String] def storm_kill_commandline "#{storm_runner} kill #{topology_name} #{storm_kill_options} > /dev/null 2>&1" end # Generates the commandline that will be used to launch wu-bolt # within each bolt of the Storm topology. # # @return [String] def wu_bolt_commandline return settings[:bolt_command] if settings[:bolt_command] [settings[:command_prefix], 'wu-bolt', dataflow_name, non_wukong_storm_params_string].compact.map(&:to_s).reject(&:empty?).join(' ') end # Return the path to the `storm` program. # # Will pay attention to `--storm_runner` and `--storm_home` # options. # # @return [String] def storm_runner explicit_runner = settings[:storm_runner] home_runner = File.join(settings[:storm_home], 'bin/storm') default_runner = 'storm' case when explicit_runner then explicit_runner when File.exist?(home_runner) then home_runner else default_runner end end # Path to the Java jar file containing the submitter class. # # @return [String] # # @see #fully_qualified_class_name def wukong_topology_submitter_jar File.expand_path("wukong-storm.jar", File.dirname(__FILE__)) end # The default Java Submitter class. # # @see #fully_qualified_class_name TOPOLOGY_SUBMITTER_CLASS = "com.infochimps.wukong.storm.TopologySubmitter" # Returns the fully qualified name of the Java submitter class. # # @see TOPOLOGY_SUBMITTER_CLASS def fully_qualified_class_name TOPOLOGY_SUBMITTER_CLASS end # Return Java `-D` options constructed from mapping the passed # in "friendly" options (`--timeout`) to native, Storm options # (`topology.message.timeout.secs`). # # @return [Array<String>] an array of each `-D` option def native_storm_options settings.params_with(:storm).map do |option, value| defn = settings.definition_of(option, :description) [defn, settings[option.to_sym]] end.map { |option, value| java_option(option, value) } end # Return Java `-D` options for Wukong-specific options. # # @return [Array<String>] def storm_topology_options (services_options + topology_options + spout_options + dataflow_options + state_options).reject do |pair| key, value = pair value.nil? || value.to_s.strip.empty? end.map { |pair| java_option(*pair) }.sort end # Return Java `-D` option key-value pairs related to services # used by the topology. # # @return [Array<Array>] an Array of key-value pairs def services_options [ ["wukong.kafka.hosts", settings[:kafka_hosts]], ["wukong.zookeeper.hosts", settings[:zookeeper_hosts]], ] end # Return Java `-D` option key-value pairs related to the overall # topology. # # @return [Array<Array>] an Array of key-value pairs def topology_options [ ["wukong.topology", topology_name], ] end # Return Java `-D` option key-value pairs related to the # topology's spout. # # @return [Array<Array>] an Array of key-value pairs def spout_options case when blob_input? blob_spout_options + (s3_input? ? s3_spout_options : file_spout_options) else kafka_spout_options end end # Return Java `-D` option key-value pairs related to the # topology's spout if it is reading from a generic filesystem. # # @return [Array<Array>] an Array of key-value pairs def blob_spout_options [ ["wukong.input.type", "blob"], ].tap do |so| so << ["wukong.input.blob.marker", settings[:offset]] if settings[:offset] so << case when settings[:from_beginning] ["wukong.input.blob.start", "EARLIEST"] when settings[:from_end] ["wukong.input.blob.start", "LATEST"] when settings[:offset] ["wukong.input.blob.start", "EXPLICIT"] else ["wukong.input.blob.start", "RESUME"] end end end # Return Java `-D` option key-value pairs related to the # topology's spout if it is reading from S3. # # @return [Array<Array>] an Array of key-value pairs def s3_spout_options [ ["wukong.input.blob.type", "s3"], ["wukong.input.blob.path", input_uri.path.gsub(%r{^/},'')], ["wukong.input.blob.s3_bucket", input_uri.host], ["wukong.input.blob.aws_key", settings[:aws_key]], ["wukong.input.blob.aws_secret", settings[:aws_secret]], ["wukong.input.blob.s3_endpoint", s3_endpoint] ] end # The AWS endpoint used to communicate with AWS for S3 access. # # Determined by the AWS region the S3 bucket was declared to be # in. # # @see http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region def s3_endpoint case settings[:aws_region] when 'us-east-1' then 's3.amazonaws.com' when 'us-west-1' then 's3-us-west-1.amazonaws.com' when 'us-west-2' then 's3-us-west-2.amazonaws.com' when /EU/, 'eu-west-1' then 's3-eu-west-1.amazonaws.com' when 'ap-southeast-1' then 's3-ap-southeast-1.amazonaws.com' when 'ap-southeast-2' then 's3-ap-southeast-2.amazonaws.com' when 'ap-northeast-1' then 's3-ap-northeast-1.amazonaws.com' when 'sa-east-1' then 's3-sa-east-1.amazonaws.com' end end # Return Java `-D` option key-value pairs related to the # topology's spout if it is reading from a local file. # # @return [Array<Array>] an Array of key-value pairs def file_spout_options [ ["wukong.input.blob.type", "file"], ["wukong.input.blob.path", input_uri.path], ] end # Return Java `-D` option key-value pairs related to the # topology's spout if it is reading from Kafka. # # @return [Array<Array>] an Array of key-value pairs def kafka_spout_options [ ["wukong.input.type", 'kafka'], ["wukong.input.kafka.topic", settings[:input]], ["wukong.input.kafka.partitions", settings[:kafka_partitions]], ["wukong.input.kafka.batch", settings[:kafka_batch]], ["wukong.input.parallelism", settings[:input_parallelism]], case when settings[:from_beginning] ["wukong.input.kafka.offset", "-2"] when settings[:from_end] ["wukong.input.kafka.offset", "-1"] when settings[:offset] ["wukong.input.kafka.offset", settings[:offset]] else # Do *not* set anything and the spout will attempt to # resume and, finding no prior offset, will start from the # end, as though we'd passed "-1" end ] end # Return Java `-D` option key-value pairs related to the Wukong # dataflow run by the topology. # # @return [Array<Array>] an Array of key-value pairs def dataflow_options [ ["wukong.directory", Dir.pwd], ["wukong.dataflow", dataflow_name], ["wukong.command", wu_bolt_commandline], ["wukong.parallelism", settings[:parallelism]], ].tap do |opts| opts << ["wukong.environment", settings[:environment]] if settings[:environment] end end # Return Java `-D` option key-value pairs related to the final # state used by the topology. # # @return [Array<Array>] an Array of key-value pairs def state_options case when kafka_output? kafka_state_options end end # Return Java `-D` option key-value pairs related to the final # state used by the topology when it is writing to Kafka. # # @return [Array<Array>] an Array of key-value pairs def kafka_state_options [ ["wukong.output.kafka.topic", settings[:output]], ] end protected # Return a String of options used when attempting to kill a # running Storm topology. # # @return [String] def storm_kill_options "-w #{settings[:wait]}" end # Format the given `option` and `value` into a Java option # (`-D`). # # @param [Object] option # @param [Object] value # @return [String] def java_option option, value return unless value return if value.to_s.strip.empty? "-D#{option}=#{Shellwords.escape(value.to_s)}" end # Parameters that should be passed onto subprocesses. # # @return [Configliere::Param] def params_to_pass settings end # Return a String stripped of any `wu-storm`-specific params but # still including any other params. # # @return [String] def non_wukong_storm_params_string params_to_pass.reject do |param, val| (params_to_pass.definition_of(param, :wukong_storm) || params_to_pass.definition_of(param, :wukong)) end.map do |param, val| "--#{param}=#{Shellwords.escape(val.to_s)}" end.join(" ") end end end end