ext/czmq/src/zsocket.c in rbczmq-1.7.1 vs ext/czmq/src/zsocket.c in rbczmq-1.7.2

- old
+ new

@@ -7,20 +7,20 @@ This file is part of CZMQ, the high-level C binding for 0MQ: http://czmq.zeromq.org. This is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by the - Free Software Foundation; either version 3 of the License, or (at your + the terms of the GNU Lesser General Public License as published by the + Free Software Foundation; either version 3 of the License, or (at your option) any later version. This software is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL- - ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General + ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. - You should have received a copy of the GNU Lesser General Public License + You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. ========================================================================= */ /* @@ -33,11 +33,11 @@ */ #include "../include/czmq.h" // -------------------------------------------------------------------------- -// Create a new socket within our czmq context, replaces zmq_socket. +// Create a new socket within our CZMQ context, replaces zmq_socket. // Use this to get automatic management of the socket at shutdown. // Note: SUB sockets do not automatically subscribe to everything; you // must set filters explicitly. void * @@ -63,11 +63,11 @@ // -------------------------------------------------------------------------- // Bind a socket to a formatted endpoint. If the port is specified as // '*', binds to any free port from ZSOCKET_DYNFROM to ZSOCKET_DYNTO // and returns the actual port number used. Always returns the // port number if successful. Note that if a previous process or thread -// used the same port, peers may connect to the caller thinking it was +// used the same port, peers may connect to the caller thinking it was // the previous process/thread. int zsocket_bind (void *self, const char *format, ...) { @@ -102,10 +102,31 @@ } } // -------------------------------------------------------------------------- +// Unbind a socket from a formatted endpoint. +// Returns 0 if OK, -1 if the endpoint was invalid or the function +// isn't supported. + +int +zsocket_unbind (void *self, const char *format, ...) +{ +#if (ZMQ_VERSION >= ZMQ_MAKE_VERSION(3,2,0)) + char endpoint [256]; + va_list argptr; + va_start (argptr, format); + vsnprintf (endpoint, 256, format, argptr); + va_end (argptr); + return zmq_unbind (self, endpoint); +#else + return -1; +#endif +} + + +// -------------------------------------------------------------------------- // Connect a socket to a formatted endpoint // Returns 0 if the endpoint is valid, -1 if the connect failed. int zsocket_connect (void *self, const char *format, ...) @@ -157,14 +178,18 @@ zsocket_type_str (void *self) { char *type_name [] = { "PAIR", "PUB", "SUB", "REQ", "REP", "DEALER", "ROUTER", "PULL", "PUSH", - "XPUB", "XSUB" + "XPUB", "XSUB", "STREAM" }; - int type = zsockopt_type (self); + int type = zsocket_type (self); +#if ZMQ_VERSION_MAJOR == 4 + if (type < 0 || type > ZMQ_STREAM) +#else if (type < 0 || type > ZMQ_XSUB) +#endif return "UNKNOWN"; else return type_name [type]; } @@ -175,56 +200,22 @@ int zsocket_sendmem (void *zocket, const void* data, size_t size, int flags) { assert (zocket); assert (size == 0 || data); - + int snd_flags = (flags & ZFRAME_MORE)? ZMQ_SNDMORE : 0; snd_flags |= (flags & ZFRAME_DONTWAIT)? ZMQ_DONTWAIT : 0; - + zmq_msg_t msg; zmq_msg_init_size (&msg, size); memcpy (zmq_msg_data (&msg), data, size); - - int rc = zmq_sendmsg (zocket, &msg, snd_flags); - return rc == -1? -1: 0; -} - -// -------------------------------------------------------------------------- -// Send data over a socket as a single message frame -// Accepts these flags: ZFRAME_MORE and ZFRAME_DONTWAIT. -// NOTE: this method is DEPRECATED and is slated for removal. These are the -// problems with the method: -// - premature optimization: do we really need this? It makes the API more -// complex; high-performance applications would not use this in any case, -// they would work directly with zmq_msg objects. -// - selftest method leaks memory -// (PH, 2013/05/18) - -int -zsocket_sendmem_zero_copy (void *zocket, void *data, size_t size, - zsocket_free_fn *free_fn, void *hint, int flags) -{ - assert (zocket); - assert (size == 0 || data); - - int snd_flags = (flags & ZFRAME_MORE)? ZMQ_SNDMORE : 0; - snd_flags |= (flags & ZFRAME_DONTWAIT)? ZMQ_DONTWAIT : 0; - - zmq_msg_t msg; - zmq_msg_init_data (&msg, data, size, free_fn, hint); int rc = zmq_sendmsg (zocket, &msg, snd_flags); return rc == -1? -1: 0; } -static void -s_test_free_str_cb (void *str, void *arg) -{ - assert (str); - free (str); -} // -------------------------------------------------------------------------- // Selftest int @@ -247,18 +238,34 @@ assert (reader); assert (streq (zsocket_type_str (writer), "PUSH")); assert (streq (zsocket_type_str (reader), "PULL")); int rc = zsocket_bind (writer, "tcp://%s:%d", interf, service); assert (rc == service); + +#if (ZMQ_VERSION >= ZMQ_MAKE_VERSION (3,2,0)) + // Check unbind + rc = zsocket_unbind (writer, "tcp://%s:%d", interf, service); + assert (rc == 0); + + // In some cases and especially when running under Valgrind, doing + // a bind immediately after an unbind causes an EADDRINUSE error. + // Even a short sleep allows the OS to release the port for reuse. + zclock_sleep (100); + + // Bind again + rc = zsocket_bind (writer, "tcp://%s:%d", interf, service); + assert (rc == service); +#endif + rc = zsocket_connect (reader, "tcp://%s:%d", domain, service); assert (rc == 0); zstr_send (writer, "HELLO"); char *message = zstr_recv (reader); assert (message); assert (streq (message, "HELLO")); free (message); - + // Test binding to ports int port = zsocket_bind (writer, "tcp://%s:*", interf); assert (port >= ZSOCKET_DYNFROM && port <= ZSOCKET_DYNTO); assert (zsocket_poll (writer, 100) == false); @@ -269,36 +276,16 @@ // Test sending frames to socket rc = zsocket_sendmem (writer,"ABC", 3, ZFRAME_MORE); assert (rc == 0); rc = zsocket_sendmem (writer, "DEFG", 4, 0); assert (rc == 0); - + zframe_t *frame = zframe_recv (reader); assert (frame); assert (zframe_streq (frame, "ABC")); assert (zframe_more (frame)); zframe_destroy (&frame); - - frame = zframe_recv (reader); - assert (frame); - assert (zframe_streq (frame, "DEFG")); - assert (!zframe_more (frame)); - zframe_destroy (&frame); - // Test zframe_sendmem_zero_copy - rc = zsocket_sendmem_zero_copy (writer, strdup ("ABC"), 3, - s_test_free_str_cb, NULL, ZFRAME_MORE); - assert (rc == 0); - rc = zsocket_sendmem_zero_copy (writer, strdup ("DEFG"), 4, - s_test_free_str_cb, NULL, 0); - assert (rc == 0); - - frame = zframe_recv (reader); - assert (frame); - assert (zframe_streq (frame, "ABC")); - assert (zframe_more (frame)); - zframe_destroy (&frame); - frame = zframe_recv (reader); assert (frame); assert (zframe_streq (frame, "DEFG")); assert (!zframe_more (frame)); zframe_destroy (&frame);