Sha256: c1a3d5176ce590560cb78041b016fc4c4f1200a2fe010801dcb4115536c6c0a4
Contents?: true
Size: 1.7 KB
Versions: 3
Compression:
Stored size: 1.7 KB
Contents
# Copyright 2007, OpenRain, LLC. All rights reserved. module Journeta # An outgoing message tube. Messages may or may not arrive at the destination, but if they do they'll be in order. class PeerConnection < Journeta::Asynchronous include Logger attr_accessor :uuid attr_accessor :ip_address attr_accessor :peer_port attr_accessor :version attr_accessor :created_at attr_accessor :updated_at attr_accessor :groups def initialize(engine) super(engine) @queue = Queue.new @settings_lock = Mutex.new end # Adds the given payload to the outbound message queue and immediately returns. def send_payload(payload) raise "Don't try to send nil payloads!" if payload.nil? @queue.push payload end def update_settings(other) @settings_lock.synchronize do self.ip_address = other.ip_address self.peer_port = other.peer_port self.version = other.version self.created_at = other.created_at self.updated_at = other.updated_at end end def go begin while true # TODO Reuse TCP connections between pops! payload = @queue.pop s = nil @settings_lock.synchronize do # To prevent corruption of settings. s = TCPSocket.new(ip_address, peer_port) end data = YAML::dump(payload) # pp data s.send(data , 0) s.close end rescue putsd "Peer #{uuid} has gone away. Deregistering self." # Yeah... kindof wierd, I know. Thread.new { @engine.unregister_peer(self) } end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
journeta-0.0.3 | lib/journeta/peer_connection.rb |
journeta-0.0.4 | lib/journeta/peer_connection.rb |
journeta-0.0.5 | lib/journeta/peer_connection.rb |