ext/zeromq/src/pipe.hpp in rbczmq-1.7.1 vs ext/zeromq/src/pipe.hpp in rbczmq-1.7.2
- old
+ new
@@ -1,10 +1,7 @@
/*
- Copyright (c) 2009-2011 250bpm s.r.o.
- Copyright (c) 2007-2009 iMatix Corporation
- Copyright (c) 2011 VMware, Inc.
- Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+ Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
@@ -22,11 +19,11 @@
#ifndef __ZMQ_PIPE_HPP_INCLUDED__
#define __ZMQ_PIPE_HPP_INCLUDED__
#include "msg.hpp"
-#include "ypipe.hpp"
+#include "ypipe_base.hpp"
#include "config.hpp"
#include "object.hpp"
#include "stdint.hpp"
#include "array.hpp"
#include "blob.hpp"
@@ -41,21 +38,23 @@
// First HWM is for messages passed from first pipe to the second pipe.
// Second HWM is for messages passed from second pipe to the first pipe.
// Delay specifies how the pipe behaves when the peer terminates. If true
// pipe receives all the pending messages before terminating, otherwise it
// terminates straight away.
+ // If conflate is true, only the most recently arrived message could be
+ // read (older messages are discarded)
int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
- int hwms_ [2], bool delays_ [2]);
+ int hwms_ [2], bool conflate_ [2]);
struct i_pipe_events
{
virtual ~i_pipe_events () {}
virtual void read_activated (zmq::pipe_t *pipe_) = 0;
virtual void write_activated (zmq::pipe_t *pipe_) = 0;
virtual void hiccuped (zmq::pipe_t *pipe_) = 0;
- virtual void terminated (zmq::pipe_t *pipe_) = 0;
+ virtual void pipe_terminated (zmq::pipe_t *pipe_) = 0;
};
// Note that pipe can be stored in three different arrays.
// The array of inbound pipes (1), the array of outbound pipes (2) and
// the generic array of pipes to deallocate (3).
@@ -65,13 +64,13 @@
public array_item_t <1>,
public array_item_t <2>,
public array_item_t <3>
{
// This allows pipepair to create pipe objects.
- friend int pipepair (zmq::object_t *parents_ [2],
- zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]);
-
+ friend int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2],
+ int hwms_ [2], bool conflate_ [2]);
+
public:
// Specifies the object to send events to.
void set_event_sink (i_pipe_events *sink_);
@@ -101,36 +100,42 @@
// Temporaraily disconnects the inbound message stream and drops
// all the messages on the fly. Causes 'hiccuped' event to be generated
// in the peer.
void hiccup ();
+
+ // Ensure the pipe wont block on receiving pipe_term.
+ void set_nodelay ();
// Ask pipe to terminate. The termination will happen asynchronously
// and user will be notified about actual deallocation by 'terminated'
// event. If delay is true, the pending messages will be processed
// before actual shutdown.
void terminate (bool delay_);
+ // set the high water marks.
+ void set_hwms (int inhwm_, int outhwm_);
+
private:
// Type of the underlying lock-free pipe.
- typedef ypipe_t <msg_t, message_pipe_granularity> upipe_t;
+ typedef ypipe_base_t <msg_t, message_pipe_granularity> upipe_t;
// Command handlers.
void process_activate_read ();
void process_activate_write (uint64_t msgs_read_);
void process_hiccup (void *pipe_);
void process_pipe_term ();
void process_pipe_term_ack ();
// Handler for delimiter read from the pipe.
- void delimit ();
+ void process_delimiter ();
// Constructor is private. Pipe can only be created using
// pipepair function.
pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_,
- int inhwm_, int outhwm_, bool delay_);
+ int inhwm_, int outhwm_, bool conflate_);
// Pipepair uses this function to let us know about
// the peer pipe object.
void set_peer (pipe_t *pipe_);
@@ -163,26 +168,28 @@
pipe_t *peer;
// Sink to send events to.
i_pipe_events *sink;
- // State of the pipe endpoint. Active is common state before any
- // termination begins. Delimited means that delimiter was read from
- // pipe before term command was received. Pending means that term
- // command was already received from the peer but there are still
- // pending messages to read. Terminating means that all pending
- // messages were already read and all we are waiting for is ack from
- // the peer. Terminated means that 'terminate' was explicitly called
- // by the user. Double_terminated means that user called 'terminate'
- // and then we've got term command from the peer as well.
+ // States of the pipe endpoint:
+ // active: common state before any termination begins,
+ // delimiter_received: delimiter was read from pipe before
+ // term command was received,
+ // waiting_fo_delimiter: term command was already received
+ // from the peer but there are still pending messages to read,
+ // term_ack_sent: all pending messages were already read and
+ // all we are waiting for is ack from the peer,
+ // term_req_sent1: 'terminate' was explicitly called by the user,
+ // term_req_sent2: user called 'terminate' and then we've got
+ // term command from the peer as well.
enum {
active,
- delimited,
- pending,
- terminating,
- terminated,
- double_terminated
+ delimiter_received,
+ waiting_for_delimiter,
+ term_ack_sent,
+ term_req_sent1,
+ term_req_sent2
} state;
// If true, we receive all the pending inbound messages before
// terminating. If false, we terminate immediately when the peer
// asks us to.
@@ -194,9 +201,11 @@
// Returns true if the message is delimiter; false otherwise.
static bool is_delimiter (msg_t &msg_);
// Computes appropriate low watermark from the given high watermark.
static int compute_lwm (int hwm_);
+
+ bool conflate;
// Disable copying.
pipe_t (const pipe_t&);
const pipe_t &operator = (const pipe_t&);
};