Sha256: 09da9eb1effb60986238244874d1d0288f5b30db2e0ef874be14fc3324ed2427

Contents?: true

Size: 1.78 KB

Versions: 6

Compression:

Stored size: 1.78 KB

Contents

module Dynflow
  module Listeners
    module Serialization
      module Protocol

        Job = Algebrick.type do
          Event = type do
            fields! execution_plan_id: String,
                    step_id:           Fixnum,
                    event:             Object
          end

          Execution = type do
            fields! execution_plan_id: String
          end

          variants Event, Execution
        end

        Message = Algebrick.type do
          Request = type do
            variants Do = type { fields request_id: Integer, job: Job }
          end

          Response = type do
            variants Accepted = type { fields request_id: Integer },
                     Failed   = type { fields request_id: Integer, error: String },
                     Done     = type { fields request_id: Integer }
          end

          variants Request, Response
        end

        module Event
          def to_hash
            super.update event: Marshal.dump(event)
          end

          def self.product_from_hash(hash)
            super(hash.merge 'event' => Marshal.load(hash.fetch('event')))
          end
        end
      end

      def dump(obj)
        MultiJson.dump(obj.to_hash)
      end

      def load(str)
        Protocol::Message.from_hash MultiJson.load(str)
      end

      def send_message(io, message, barrier = nil)
        barrier.lock if barrier
        io.puts dump(message)
        true
      rescue SystemCallError => error
        @logger.warn "message could not be sent #{message} because #{error}"
        false
      ensure
        barrier.unlock if barrier
      end

      def receive_message(io)
        if (message = io.gets)
          load(message)
        else
          nil
        end
      rescue IOError
        nil
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
dynflow-0.6.0 lib/dynflow/listeners/serialization.rb
dynflow-0.5.1 lib/dynflow/listeners/serialization.rb
dynflow-0.5.0 lib/dynflow/listeners/serialization.rb
dynflow-0.4.1 lib/dynflow/listeners/serialization.rb
dynflow-0.4.0 lib/dynflow/listeners/serialization.rb
dynflow-0.3.0 lib/dynflow/listeners/serialization.rb