ext/zeromq/src/pipe.hpp in rbczmq-1.7.1 vs ext/zeromq/src/pipe.hpp in rbczmq-1.7.2

- old
+ new

@@ -1,10 +1,7 @@ /* - Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation - Copyright (c) 2011 VMware, Inc. - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by @@ -22,11 +19,11 @@ #ifndef __ZMQ_PIPE_HPP_INCLUDED__ #define __ZMQ_PIPE_HPP_INCLUDED__ #include "msg.hpp" -#include "ypipe.hpp" +#include "ypipe_base.hpp" #include "config.hpp" #include "object.hpp" #include "stdint.hpp" #include "array.hpp" #include "blob.hpp" @@ -41,21 +38,23 @@ // First HWM is for messages passed from first pipe to the second pipe. // Second HWM is for messages passed from second pipe to the first pipe. // Delay specifies how the pipe behaves when the peer terminates. If true // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. + // If conflate is true, only the most recently arrived message could be + // read (older messages are discarded) int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]); + int hwms_ [2], bool conflate_ [2]); struct i_pipe_events { virtual ~i_pipe_events () {} virtual void read_activated (zmq::pipe_t *pipe_) = 0; virtual void write_activated (zmq::pipe_t *pipe_) = 0; virtual void hiccuped (zmq::pipe_t *pipe_) = 0; - virtual void terminated (zmq::pipe_t *pipe_) = 0; + virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0; }; // Note that pipe can be stored in three different arrays. // The array of inbound pipes (1), the array of outbound pipes (2) and // the generic array of pipes to deallocate (3). @@ -65,13 +64,13 @@ public array_item_t <1>, public array_item_t <2>, public array_item_t <3> { // This allows pipepair to create pipe objects. - friend int pipepair (zmq::object_t *parents_ [2], - zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); - + friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], + int hwms_ [2], bool conflate_ [2]); + public: // Specifies the object to send events to. void set_event_sink (i_pipe_events *sink_); @@ -101,36 +100,42 @@ // Temporaraily disconnects the inbound message stream and drops // all the messages on the fly. Causes 'hiccuped' event to be generated // in the peer. void hiccup (); + + // Ensure the pipe wont block on receiving pipe_term. + void set_nodelay (); // Ask pipe to terminate. The termination will happen asynchronously // and user will be notified about actual deallocation by 'terminated' // event. If delay is true, the pending messages will be processed // before actual shutdown. void terminate (bool delay_); + // set the high water marks. + void set_hwms (int inhwm_, int outhwm_); + private: // Type of the underlying lock-free pipe. - typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t; + typedef ypipe_base_t <msg_t, message_pipe_granularity> upipe_t; // Command handlers. void process_activate_read (); void process_activate_write (uint64_t msgs_read_); void process_hiccup (void *pipe_); void process_pipe_term (); void process_pipe_term_ack (); // Handler for delimiter read from the pipe. - void delimit (); + void process_delimiter (); // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_); + int inhwm_, int outhwm_, bool conflate_); // Pipepair uses this function to let us know about // the peer pipe object. void set_peer (pipe_t *pipe_); @@ -163,26 +168,28 @@ pipe_t *peer; // Sink to send events to. i_pipe_events *sink; - // State of the pipe endpoint. Active is common state before any - // termination begins. Delimited means that delimiter was read from - // pipe before term command was received. Pending means that term - // command was already received from the peer but there are still - // pending messages to read. Terminating means that all pending - // messages were already read and all we are waiting for is ack from - // the peer. Terminated means that 'terminate' was explicitly called - // by the user. Double_terminated means that user called 'terminate' - // and then we've got term command from the peer as well. + // States of the pipe endpoint: + // active: common state before any termination begins, + // delimiter_received: delimiter was read from pipe before + // term command was received, + // waiting_fo_delimiter: term command was already received + // from the peer but there are still pending messages to read, + // term_ack_sent: all pending messages were already read and + // all we are waiting for is ack from the peer, + // term_req_sent1: 'terminate' was explicitly called by the user, + // term_req_sent2: user called 'terminate' and then we've got + // term command from the peer as well. enum { active, - delimited, - pending, - terminating, - terminated, - double_terminated + delimiter_received, + waiting_for_delimiter, + term_ack_sent, + term_req_sent1, + term_req_sent2 } state; // If true, we receive all the pending inbound messages before // terminating. If false, we terminate immediately when the peer // asks us to. @@ -194,9 +201,11 @@ // Returns true if the message is delimiter; false otherwise. static bool is_delimiter (msg_t &msg_); // Computes appropriate low watermark from the given high watermark. static int compute_lwm (int hwm_); + + bool conflate; // Disable copying. pipe_t (const pipe_t&); const pipe_t &operator = (const pipe_t&); };