# encoding: UTF-8 module Izanami # Open a communication channel between workers to handle the output # of a command. # # A channel does two things: it publish and reads the data # sent by a worker to a Redis pub/sub channel and stores that data when # the channel is closed. That way, a process can read the output of # a command when this is been executed or when is finished. class Channel EOC = '--EOC--' # End Of Channel EOC.freeze SEPARATOR = "\n" SEPARATOR.freeze # Handles the input written to the channel. class Input # @param [Izanami::Mapper::Command] mapper the instance which talks to redis. # @param [String] id the command id. def initialize(mapper, id) @mapper = mapper @id = id @content = [] end # Add new content to the channel. # # @param [String] payload the new content to add. def <<(payload) @mapper.publish(@id, payload) @content << payload end alias_method :add, :<< # Closes the channel. # # It stores all the content sent and sends an {EOC} signal # # @return [String] all the written content. def close string = to_s @mapper.update(@id, 'output', string) @mapper.publish(@id, EOC) string end # Content that has been written. # # @note each line of the content is separated by {SEPARATOR}. # # @return [String] def to_s @content.join(SEPARATOR) end alias_method :read, :to_s end # @param [Izanami::Mapper::Command] mapper the instance which talks to redis. def initialize(mapper) @mapper = mapper end # Opens the channel to write. # # @note if a block is given, the input is closed when the block # finish. If not, the client of the channel must close it. # # @param [String] id the command id. # # @yield [Izanami::Channel::Input] content handler. # # @return [Izanami::Channel::Input] def write(id) input = Input.new(@mapper, id) if block_given? yield input input.close end input end # Opens the channel for reading. # # @param [String] id the command id. # # @yield [String] each of the lines read from the channel. def read(id, &block) command = @mapper.find(id) || {} output = command['output'] if output read_string(output, &block) else read_buffer(id, &block) end end # Reads the content from a string. # # @param [String] output the stored command's output. # # @yield [String] each of the lines read from the stored output. def read_string(output, &block) output.split(SEPARATOR).each(&block) end protected :read_string # Reads the content from Redis channel. # # @param [String] id the command id. # # @yield [String] each of the lines read from the Redis channel. # It stops when a {EOC} flag is read. def read_buffer(id) @mapper.subscribe(id) do |on| on.message do |_, line| if line.match(/\A#{EOC}\z/) @mapper.unsubscribe(id) else yield line end end end end protected :read_buffer end end