ext/zeromq/src/pipe.cpp in rbczmq-1.7.1 vs ext/zeromq/src/pipe.cpp in rbczmq-1.7.2

- old
+ new

@@ -1,10 +1,7 @@ /* - Copyright (c) 2009-2011 250bpm s.r.o. - Copyright (c) 2007-2009 iMatix Corporation - Copyright (c) 2011 VMware, Inc. - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by @@ -24,36 +21,51 @@ #include <stddef.h> #include "pipe.hpp" #include "err.hpp" +#include "ypipe.hpp" +#include "ypipe_conflate.hpp" + int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]) + int hwms_ [2], bool conflate_ [2]) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. - pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t (); + typedef ypipe_t <msg_t, message_pipe_granularity> upipe_normal_t; + typedef ypipe_conflate_t <msg_t, message_pipe_granularity> upipe_conflate_t; + + pipe_t::upipe_t *upipe1; + if(conflate_ [0]) + upipe1 = new (std::nothrow) upipe_conflate_t (); + else + upipe1 = new (std::nothrow) upipe_normal_t (); alloc_assert (upipe1); - pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t (); + + pipe_t::upipe_t *upipe2; + if(conflate_ [1]) + upipe2 = new (std::nothrow) upipe_conflate_t (); + else + upipe2 = new (std::nothrow) upipe_normal_t (); alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0], delays_ [0]); + hwms_ [1], hwms_ [0], conflate_ [0]); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1], delays_ [1]); + hwms_ [0], hwms_ [1], conflate_ [1]); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); pipes_ [1]->set_peer (pipes_ [0]); return 0; } zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_) : + int inhwm_, int outhwm_, bool conflate_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), in_active (true), out_active (true), @@ -63,11 +75,12 @@ msgs_written (0), peers_msgs_read (0), peer (NULL), sink (NULL), state (active), - delay (delay_) + delay (true), + conflate (conflate_) { } zmq::pipe_t::~pipe_t () { @@ -97,12 +110,14 @@ return identity; } bool zmq::pipe_t::check_read () { - if (unlikely (!in_active || (state != active && state != pending))) + if (unlikely (!in_active)) return false; + if (unlikely (state != active && state != waiting_for_delimiter)) + return false; // Check if there's an item in the pipe. if (!inpipe->check_read ()) { in_active = false; return false; @@ -112,30 +127,32 @@ // initiate termination process. if (inpipe->probe (is_delimiter)) { msg_t msg; bool ok = inpipe->read (&msg); zmq_assert (ok); - delimit (); + process_delimiter (); return false; } return true; } bool zmq::pipe_t::read (msg_t *msg_) { - if (unlikely (!in_active || (state != active && state != pending))) + if (unlikely (!in_active)) return false; + if (unlikely (state != active && state != waiting_for_delimiter)) + return false; if (!inpipe->read (msg_)) { in_active = false; return false; } // If delimiter was read, start termination process of the pipe. if (msg_->is_delimiter ()) { - delimit (); + process_delimiter (); return false; } if (!(msg_->flags () & msg_t::more)) msgs_read++; @@ -177,31 +194,31 @@ void zmq::pipe_t::rollback () { // Remove incomplete message from the outbound pipe. msg_t msg; if (outpipe) { - while (outpipe->unwrite (&msg)) { - zmq_assert (msg.flags () & msg_t::more); - int rc = msg.close (); - errno_assert (rc == 0); - } + while (outpipe->unwrite (&msg)) { + zmq_assert (msg.flags () & msg_t::more); + int rc = msg.close (); + errno_assert (rc == 0); + } } } void zmq::pipe_t::flush () { // The peer does not exist anymore at this point. - if (state == terminating) + if (state == term_ack_sent) return; if (outpipe && !outpipe->flush ()) send_activate_read (peer); } void zmq::pipe_t::process_activate_read () { - if (!in_active && (state == active || state == pending)) { + if (!in_active && (state == active || state == waiting_for_delimiter)) { in_active = true; sink->read_activated (this); } } @@ -241,38 +258,38 @@ void zmq::pipe_t::process_pipe_term () { // This is the simple case of peer-induced termination. If there are no // more pending messages to read, or if the pipe was configured to drop - // pending messages, we can move directly to the terminating state. - // Otherwise we'll hang up in pending state till all the pending messages - // are sent. + // pending messages, we can move directly to the term_ack_sent state. + // Otherwise we'll hang up in waiting_for_delimiter state till all + // pending messages are read. if (state == active) { if (!delay) { - state = terminating; + state = term_ack_sent; outpipe = NULL; send_pipe_term_ack (peer); } else - state = pending; + state = waiting_for_delimiter; return; } // Delimiter happened to arrive before the term command. Now we have the - // term command as well, so we can move straight to terminating state. - if (state == delimited) { - state = terminating; + // term command as well, so we can move straight to term_ack_sent state. + if (state == delimiter_received) { + state = term_ack_sent; outpipe = NULL; send_pipe_term_ack (peer); return; } // This is the case where both ends of the pipe are closed in parallel. // We simply reply to the request by ack and continue waiting for our // own ack. - if (state == terminated) { - state = double_terminated; + if (state == term_req_sent1) { + state = term_req_sent2; outpipe = NULL; send_pipe_term_ack (peer); return; } @@ -282,78 +299,92 @@ void zmq::pipe_t::process_pipe_term_ack () { // Notify the user that all the references to the pipe should be dropped. zmq_assert (sink); - sink->terminated (this); + sink->pipe_terminated (this); - // In terminating and double_terminated states there's nothing to do. - // Simply deallocate the pipe. In terminated state we have to ack the - // peer before deallocating this side of the pipe. All the other states - // are invalid. - if (state == terminated) { + // In term_ack_sent and term_req_sent2 states there's nothing to do. + // Simply deallocate the pipe. In term_req_sent1 state we have to ack + // the peer before deallocating this side of the pipe. + // All the other states are invalid. + if (state == term_req_sent1) { outpipe = NULL; send_pipe_term_ack (peer); } else - zmq_assert (state == terminating || state == double_terminated); + zmq_assert (state == term_ack_sent || state == term_req_sent2); // We'll deallocate the inbound pipe, the peer will deallocate the outbound // pipe (which is an inbound pipe from its point of view). // First, delete all the unread messages in the pipe. We have to do it by // hand because msg_t doesn't have automatic destructor. Then deallocate // the ypipe itself. - msg_t msg; - while (inpipe->read (&msg)) { - int rc = msg.close (); - errno_assert (rc == 0); + + if (!conflate) { + msg_t msg; + while (inpipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } } + delete inpipe; // Deallocate the pipe object delete this; } +void zmq::pipe_t::set_nodelay () +{ + this->delay = false; +} + void zmq::pipe_t::terminate (bool delay_) { // Overload the value specified at pipe creation. delay = delay_; // If terminate was already called, we can ignore the duplicit invocation. - if (state == terminated || state == double_terminated) + if (state == term_req_sent1 || state == term_req_sent2) return; // If the pipe is in the final phase of async termination, it's going to // closed anyway. No need to do anything special here. - else if (state == terminating) + else + if (state == term_ack_sent) return; // The simple sync termination case. Ask the peer to terminate and wait // for the ack. - else if (state == active) { + else + if (state == active) { send_pipe_term (peer); - state = terminated; + state = term_req_sent1; } // There are still pending messages available, but the user calls // 'terminate'. We can act as if all the pending messages were read. - else if (state == pending && !delay) { + else + if (state == waiting_for_delimiter && !delay) { outpipe = NULL; send_pipe_term_ack (peer); - state = terminating; + state = term_ack_sent; } // If there are pending messages still availabe, do nothing. - else if (state == pending) { + else + if (state == waiting_for_delimiter) { } // We've already got delimiter, but not term command yet. We can ignore // the delimiter and ack synchronously terminate as if we were in // active state. - else if (state == delimited) { + else + if (state == delimiter_received) { send_pipe_term (peer); - state = terminated; + state = term_req_sent1; } // There are no other states. else zmq_assert (false); @@ -361,19 +392,19 @@ // Stop outbound flow of messages. out_active = false; if (outpipe) { - // Drop any unfinished outbound messages. - rollback (); + // Drop any unfinished outbound messages. + rollback (); - // Write the delimiter into the pipe. Note that watermarks are not - // checked; thus the delimiter can be written even when the pipe is full. - msg_t msg; - msg.init_delimiter (); - outpipe->write (msg, false); - flush (); + // Write the delimiter into the pipe. Note that watermarks are not + // checked; thus the delimiter can be written even when the pipe is full. + msg_t msg; + msg.init_delimiter (); + outpipe->write (msg, false); + flush (); } } bool zmq::pipe_t::is_delimiter (msg_t &msg_) { @@ -406,26 +437,22 @@ hwm_ - max_wm_delta : (hwm_ + 1) / 2; return result; } -void zmq::pipe_t::delimit () +void zmq::pipe_t::process_delimiter () { - if (state == active) { - state = delimited; - return; - } + zmq_assert (state == active + || state == waiting_for_delimiter); - if (state == pending) { + if (state == active) + state = delimiter_received; + else { outpipe = NULL; send_pipe_term_ack (peer); - state = terminating; - return; + state = term_ack_sent; } - - // Delimiter in any other state is invalid. - zmq_assert (false); } void zmq::pipe_t::hiccup () { // If termination is already under way do nothing. @@ -435,13 +462,24 @@ // We'll drop the pointer to the inpipe. From now on, the peer is // responsible for deallocating it. inpipe = NULL; // Create new inpipe. - inpipe = new (std::nothrow) pipe_t::upipe_t (); + if (conflate) + inpipe = new (std::nothrow) + ypipe_t <msg_t, message_pipe_granularity> (); + else + inpipe = new (std::nothrow) + ypipe_conflate_t <msg_t, message_pipe_granularity> (); + alloc_assert (inpipe); in_active = true; // Notify the peer about the hiccup. send_hiccup (peer, (void*) inpipe); } +void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_) +{ + lwm = compute_lwm (inhwm_); + hwm = outhwm_; +}