require 'wukong' module Wukong # Connects Wukong to Storm. module Storm include Plugin # Configure the given settings object for use with Wukong::Storm. # # @param [Configliere::Param] settings the settings to configure # @param [String] program the name of the currently executing program def self.configure settings, program case program when 'wu-bolt' settings.define :run, description: 'Name of the processor or dataflow to use. Defaults to basename of the given path', flag: 'r' settings.define :delimiter, description: 'Emitted as a single record to mark the end of the batch ', default: 'X', flag: 't' when 'wu-storm' settings.define :name, wukong_storm: true, description: "Name for the launched topology" settings.define :command_prefix, wukong_storm: true, description: "Prefix to insert before all Wukong commands" settings.define :bolt_command, wukong_storm: true, description: "Command-line to run within the spawned Storm bolt" settings.define :dry_run, wukong_storm: true, description: "Echo commands that will be run, but don't run them", type: :boolean, default: false settings.define :wait, wukong_storm: true, description: "How many seconds to wait when killing a topology", type: Integer, default: 300 settings.define :rm, wukong_storm: true, description: "Will kill any running topology of the same name before launching", type: :boolean, default: false settings.define :delimiter, wukong_storm: true, description: "Batch delimiter to use with wu-bolt" settings.define :parallelism, wukong_storm: true, description: "Parallelism hint for wu-bolt", default: 1 settings.define :input, wukong_storm: true, description: "Input URI for the topology. The scheme of the URI determines the type of spout." settings.define :input_parallelism, wukong_storm: true, description: "Parallelism (number of simultaneous threads) reading input. Only used by some spouts.", default: 1 settings.define :offset, wukong_storm: true, description: "Offset to use when starting to read from input. Interpreted in a spout-dependent way." settings.define :from_beginning, wukong_storm: true, description: "Start reading from the beginning of the input.", type: :boolean, default: false settings.define :from_end, wukong_storm: true, description: "Start reading from the end of the input.", type: :boolean, default: false settings.define :resume, wukong_storm: true, description: "Start reading from where the topology left off. This is the default behavior.", type: :boolean, default: true settings.define :kafka_partitions, wukong_storm: true, description: "Number of Kafka partitions on the input topic", default: 1 settings.define :kafka_batch, wukong_storm: true, description: "Batch size when reading from input topic (bytes)", default: 1_048_576 settings.define :aws_key, wukong_storm: true, description: "AWS access key. (Required for S3 input)" settings.define :aws_secret, wukong_storm: true, description: "AWS secret key. (Required for S3 input)" settings.define :aws_region, wukong_storm: true, description: "AWS region, one of: us-east-1, us-west-[1,2], eu-west-1, ap-southeast-[1,2], ap-northeast-1, sa-east-1. (Required for S3 input)", default: 'us-east-1' settings.define :output, wukong_storm: true, description: "Output URI for the topology. The schee of the URI determines the type of state used." settings.define :debug, wukong_storm: true, storm: true, description: 'topology.debug' settings.define :optimize, wukong_storm: true, storm: true, description: 'topology.optimize' settings.define :timeout, wukong_storm: true, storm: true, description: 'topology.message.timeout.secs' settings.define :workers, wukong_storm: true, storm: true, description: 'topology.workers' settings.define :worker_opts, wukong_storm: true, storm: true, description: 'topology.worker.childopts' settings.define :ackers, wukong_storm: true, storm: true, description: 'topology.acker.executors' settings.define :sample_rate, wukong_storm: true, storm: true, description: 'topology.stats.sample.rate' settings.define :nimbus_host, wukong_storm: true, storm: true, description: 'nimbus.host', default: 'localhost' settings.define :nimbus_port, wukong_storm: true, storm: true, description: 'nimbus.thrift.port', default: 6627 settings.define :kafka_hosts, wukong_storm: true, description: "Comma-separated list of Kafka hosts", default: 'localhost' settings.define :zookeeper_hosts, wukong_storm: true, description: "Comma-separated list of Zookeeper hosts", default: 'localhost' settings.define :storm_home, wukong_storm: true, description: "Path to Storm installation", env_var: "STORM_HOME", default: "/usr/lib/storm" settings.define :storm_runner, wukong_storm: true, description: "Path to Storm executable. Use this for non-standard Storm installations" end end # Boots the Wukong::Storm plugin. # # @param [Configliere::Param] settings the settings to boot from # @param [String] root the root directory to boot in def self.boot settings, root end end end require 'wukong-storm/storm_runner' require 'wukong-storm/bolt_runner'