ext/mosquitto/client.c in mosquitto-0.2 vs ext/mosquitto/client.c in mosquitto-0.3
- old
+ new
@@ -1,16 +1,18 @@
#include "mosquitto_ext.h"
static void rb_mosquitto_run_callback(mosquitto_callback_t *callback);
+static void rb_mosquitto_free_callback(mosquitto_callback_t *callback);
+static void rb_mosquitto_client_reap_event_thread(mosquitto_client_wrapper *client);
VALUE mosquitto_tls_password;
static int rb_mosquitto_tls_password_callback(char *buf, int size, int rwflag, void *obj)
{
strncpy(buf, StringValueCStr(mosquitto_tls_password), size);
- return RSTRING_LEN(mosquitto_tls_password);
rb_gc_unregister_address(&mosquitto_tls_password);
+ return RSTRING_LEN(mosquitto_tls_password);
}
/*
* :nodoc:
* Pushes a callback onto the client's callback queue. The callback runs within the context of an event thread.
@@ -45,14 +47,14 @@
* to handle.
*
* Only applicable to clients that run with the threaded Mosquitto::Client#loop_start event loop
*
*/
-static void *mosquitto_wait_for_callbacks(void *w)
+static void *mosquitto_wait_for_callbacks(void *c)
{
- mosquitto_callback_waiting_t *waiter = (mosquitto_callback_waiting_t *)w;
- mosquitto_client_wrapper *client = waiter->client;
+ mosquitto_client_wrapper *client = (mosquitto_client_wrapper *)c;
+ mosquitto_callback_waiting_t *waiter = client->waiter;
pthread_mutex_lock(&client->callback_mutex);
while (!waiter->abort && (waiter->callback = mosquitto_callback_queue_pop(client)) == NULL)
{
pthread_cond_wait(&client->callback_cond, &client->callback_mutex);
@@ -67,17 +69,21 @@
* Unblocking function for the callback poller - invoked when the event thread should exit.
*
* Only applicable to clients that run with the threaded Mosquitto::Client#loop_start event loop
*
*/
-static void mosquitto_stop_waiting_for_callbacks(void *w)
+static void mosquitto_stop_waiting_for_callbacks(void *c)
{
- mosquitto_callback_waiting_t *waiter = (mosquitto_callback_waiting_t *)w;
- mosquitto_client_wrapper *client = waiter->client;
+ mosquitto_client_wrapper *client = (mosquitto_client_wrapper *)c;
+ mosquitto_callback_waiting_t *waiter = client->waiter;
+ mosquitto_callback_t *callback = NULL;
pthread_mutex_lock(&client->callback_mutex);
waiter->abort = 1;
+ while ((callback = mosquitto_callback_queue_pop(client)) != NULL) {
+ rb_mosquitto_free_callback(callback);
+ }
pthread_mutex_unlock(&client->callback_mutex);
pthread_cond_signal(&client->callback_cond);
}
/*
@@ -166,22 +172,14 @@
case ON_CONNECT_CALLBACK: {
on_connect_callback_args_t *cb = (on_connect_callback_args_t *)callback->data;
args[0] = client->connect_cb;
args[1] = (VALUE)1;
args[2] = INT2NUM(cb->rc);
- switch (cb->rc) {
- case 1:
- MosquittoError("connection refused (unacceptable protocol version)");
- break;
- case 2:
- MosquittoError("connection refused (identifier rejected)");
- break;
- case 3:
- MosquittoError("connection refused (broker unavailable)");
- break;
- default:
- rb_mosquitto_funcall_protected(error_tag, args);
+ if (cb->rc == 0) {
+ rb_mosquitto_funcall_protected(error_tag, args);
+ } else {
+ MosquittoError(mosquitto_connack_string(cb->rc));
}
}
break;
case ON_DISCONNECT_CALLBACK: {
@@ -267,23 +265,21 @@
*
*/
static VALUE rb_mosquitto_callback_thread(void *obj)
{
mosquitto_client_wrapper *client = (mosquitto_client_wrapper *)obj;
- mosquitto_callback_waiting_t waiter;
- waiter.callback = NULL;
- waiter.abort = 0;
- waiter.client = client;
- while (!waiter.abort)
+ mosquitto_callback_waiting_t *waiter = client->waiter;
+ waiter->callback = NULL;
+ waiter->abort = 0;
+ while (!waiter->abort)
{
- rb_thread_call_without_gvl(mosquitto_wait_for_callbacks, (void *)&waiter, mosquitto_stop_waiting_for_callbacks, (void *)&waiter);
- if (waiter.callback)
+ rb_thread_call_without_gvl(mosquitto_wait_for_callbacks, (void *)client, mosquitto_stop_waiting_for_callbacks, (void *)client);
+ if (waiter->callback)
{
- rb_mosquitto_run_callback(waiter.callback);
+ rb_mosquitto_run_callback(waiter->callback);
}
}
-
return Qnil;
}
/*
* :nodoc:
@@ -442,11 +438,18 @@
*/
static void rb_mosquitto_free_client(void *ptr)
{
mosquitto_client_wrapper *client = (mosquitto_client_wrapper *)ptr;
if (client) {
- mosquitto_destroy(client->mosq);
+ if (client->mosq != NULL) {
+ if (!NIL_P(client->callback_thread)) {
+ mosquitto_stop_waiting_for_callbacks(client);
+ mosquitto_loop_stop(client->mosq, true);
+ rb_mosquitto_client_reap_event_thread(client);
+ }
+ if (client->mosq != NULL) mosquitto_destroy(client->mosq);
+ }
xfree(client);
}
}
/*
@@ -506,10 +509,11 @@
cl->subscribe_cb = Qnil;
cl->unsubscribe_cb = Qnil;
cl->log_cb = Qnil;
cl->callback_thread = Qnil;
cl->callback_queue = NULL;
+ cl->waiter = NULL;
rb_obj_call_init(client, 0, NULL);
return client;
}
static void *rb_mosquitto_client_reinitialise_nogvl(void *ptr)
@@ -686,11 +690,11 @@
}
}
/*
* call-seq:
- * client.tls_set('/certs/all-ca.crt'), '/certs', '/certs/client.crt'), '/certs/client.key') -> Boolean
+ * client.tls_set('/certs/all-ca.crt'), '/certs', '/certs/client.crt'), '/certs/client.key', 'password') -> Boolean
*
* Configure the client for certificate based SSL/TLS support.
*
* Cannot be used in conjunction with Mosquitto::Client#tls_psk_set.
*
@@ -712,11 +716,11 @@
* @param password [String] password for encrypted keyfile
* @return [true] on success
* @raise [Mosquitto::Error] on invalid input params or when TLS is not supported
* @note This must be called before calling Mosquitto::Client#connect
* @example
- * client.tls_set('/certs/all-ca.crt'), '/certs', '/certs/client.crt'), '/certs/client.key')
+ * client.tls_set('/certs/all-ca.crt'), '/certs', '/certs/client.crt'), '/certs/client.key', 'password')
*
*/
static VALUE rb_mosquitto_client_tls_set(VALUE obj, VALUE cafile, VALUE capath, VALUE certfile, VALUE keyfile, VALUE password)
{
int ret;
@@ -1173,17 +1177,21 @@
*
*/
static VALUE rb_mosquitto_client_disconnect(VALUE obj)
{
int ret;
+ bool retried = false;
+ struct timeval time;
MosquittoGetClient(obj);
+ retry_once:
ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_disconnect_nogvl, (void *)client->mosq, RUBY_UBF_IO, 0);
switch (ret) {
case MOSQ_ERR_INVAL:
MosquittoError("invalid input params");
break;
case MOSQ_ERR_NO_CONN:
+ RetryNotConnectedOnce();
MosquittoError("client not connected to broker");
break;
default:
return Qtrue;
}
@@ -1217,10 +1225,12 @@
*/
static VALUE rb_mosquitto_client_publish(VALUE obj, VALUE mid, VALUE topic, VALUE payload, VALUE qos, VALUE retain)
{
struct nogvl_publish_args args;
int ret, msg_id;
+ struct timeval time;
+ bool retried = false;
MosquittoGetClient(obj);
Check_Type(topic, T_STRING);
MosquittoEncode(topic);
Check_Type(payload, T_STRING);
MosquittoEncode(payload);
@@ -1234,19 +1244,21 @@
args.topic = StringValueCStr(topic);
args.payloadlen = (int)RSTRING_LEN(payload);
args.payload = (const char *)(args.payloadlen == 0 ? NULL : StringValueCStr(payload));
args.qos = NUM2INT(qos);
args.retain = (retain == Qtrue) ? true : false;
+ retry_once:
ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_publish_nogvl, (void *)&args, RUBY_UBF_IO, 0);
switch (ret) {
case MOSQ_ERR_INVAL:
MosquittoError("invalid input params");
break;
case MOSQ_ERR_NOMEM:
rb_memerror();
break;
case MOSQ_ERR_NO_CONN:
+ RetryNotConnectedOnce();
MosquittoError("client not connected to broker");
break;
case MOSQ_ERR_PROTOCOL:
MosquittoError("protocol error communicating with broker");
break;
@@ -1284,10 +1296,12 @@
*/
static VALUE rb_mosquitto_client_subscribe(VALUE obj, VALUE mid, VALUE subscription, VALUE qos)
{
struct nogvl_subscribe_args args;
int ret, msg_id;
+ struct timeval time;
+ bool retried = false;
MosquittoGetClient(obj);
Check_Type(subscription, T_STRING);
MosquittoEncode(subscription);
Check_Type(qos, T_FIXNUM);
if (!NIL_P(mid)) {
@@ -1296,19 +1310,21 @@
}
args.mosq = client->mosq;
args.mid = NIL_P(mid) ? NULL : &msg_id;
args.subscription = StringValueCStr(subscription);
args.qos = NUM2INT(qos);
+ retry_once:
ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_subscribe_nogvl, (void *)&args, RUBY_UBF_IO, 0);
switch (ret) {
case MOSQ_ERR_INVAL:
MosquittoError("invalid input params");
break;
case MOSQ_ERR_NOMEM:
rb_memerror();
break;
case MOSQ_ERR_NO_CONN:
+ RetryNotConnectedOnce();
MosquittoError("client not connected to broker");
break;
default:
return Qtrue;
}
@@ -1338,29 +1354,33 @@
*/
static VALUE rb_mosquitto_client_unsubscribe(VALUE obj, VALUE mid, VALUE subscription)
{
struct nogvl_subscribe_args args;
int ret, msg_id;
+ struct timeval time;
+ bool retried = false;
MosquittoGetClient(obj);
Check_Type(subscription, T_STRING);
MosquittoEncode(subscription);
if (!NIL_P(mid)) {
Check_Type(mid, T_FIXNUM);
msg_id = NUM2INT(mid);
}
args.mosq = client->mosq;
args.mid = NIL_P(mid) ? NULL : &msg_id;
args.subscription = StringValueCStr(subscription);
+ retry_once:
ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_unsubscribe_nogvl, (void *)&args, RUBY_UBF_IO, 0);
switch (ret) {
case MOSQ_ERR_INVAL:
MosquittoError("invalid input params");
break;
case MOSQ_ERR_NOMEM:
rb_memerror();
break;
case MOSQ_ERR_NO_CONN:
+ RetryNotConnectedOnce();
MosquittoError("client not connected to broker");
break;
default:
return Qtrue;
}
@@ -1424,25 +1444,29 @@
*/
static VALUE rb_mosquitto_client_loop(VALUE obj, VALUE timeout, VALUE max_packets)
{
struct nogvl_loop_args args;
int ret;
+ struct timeval time;
+ bool retried = false;
MosquittoGetClient(obj);
Check_Type(timeout, T_FIXNUM);
Check_Type(max_packets, T_FIXNUM);
args.mosq = client->mosq;
args.timeout = NUM2INT(timeout);
args.max_packets = NUM2INT(max_packets);
+ retry_once:
ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_nogvl, (void *)&args, RUBY_UBF_IO, 0);
switch (ret) {
case MOSQ_ERR_INVAL:
MosquittoError("invalid input params");
break;
case MOSQ_ERR_NOMEM:
rb_memerror();
break;
case MOSQ_ERR_NO_CONN:
+ RetryNotConnectedOnce();
MosquittoError("client not connected to broker");
break;
case MOSQ_ERR_CONN_LOST:
MosquittoError("connection to the broker was lost");
break;
@@ -1491,25 +1515,29 @@
*/
static VALUE rb_mosquitto_client_loop_forever(VALUE obj, VALUE timeout, VALUE max_packets)
{
struct nogvl_loop_args args;
int ret;
+ struct timeval time;
+ bool retried = false;
MosquittoGetClient(obj);
Check_Type(timeout, T_FIXNUM);
Check_Type(max_packets, T_FIXNUM);
args.mosq = client->mosq;
args.timeout = NUM2INT(timeout);
args.max_packets = NUM2INT(max_packets);
+ retry_once:
ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_forever_nogvl, (void *)&args, rb_mosquitto_client_loop_forever_ubf, client);
switch (ret) {
case MOSQ_ERR_INVAL:
MosquittoError("invalid input params");
break;
case MOSQ_ERR_NOMEM:
rb_memerror();
break;
case MOSQ_ERR_NO_CONN:
+ RetryNotConnectedOnce();
MosquittoError("client not connected to broker");
break;
case MOSQ_ERR_CONN_LOST:
MosquittoError("connection to the broker was lost");
break;
@@ -1557,12 +1585,13 @@
break;
case MOSQ_ERR_NOT_SUPPORTED :
MosquittoError("thread support is not available");
break;
default:
- pthread_mutex_init(&client->callback_mutex, NULL);
- pthread_cond_init(&client->callback_cond, NULL);
+ client->waiter = MOSQ_ALLOC(mosquitto_callback_waiting_t);
+ if (pthread_mutex_init(&client->callback_mutex, NULL) != 0) MosquittoError("failed to create callback thread mutex");
+ if (pthread_cond_init(&client->callback_cond, NULL) != 0) MosquittoError("failed to create callback thread condition var");
client->callback_thread = rb_thread_create(rb_mosquitto_callback_thread, client);
/* Allow the callback thread some startup time */
time.tv_sec = 0;
time.tv_usec = 100 * 1000; /* 0.1 sec */
rb_thread_wait_for(time);
@@ -1574,13 +1603,27 @@
{
struct nogvl_loop_stop_args *args = ptr;
return (VALUE)mosquitto_loop_stop(args->mosq, args->force);
}
+static void rb_mosquitto_client_reap_event_thread(mosquitto_client_wrapper *client)
+{
+ struct timeval time;
+ mosquitto_stop_waiting_for_callbacks(client);
+ /* Allow the callback thread some shutdown time */
+ time.tv_sec = 0;
+ time.tv_usec = 100 * 1000; /* 0.1 sec */
+ rb_thread_wait_for(time);
+ if (pthread_mutex_destroy(&client->callback_mutex) == EINVAL) MosquittoError("could not destroy callback thread mutex");
+ if (pthread_cond_destroy(&client->callback_cond) == EINVAL) MosquittoError("could not destroy callback condition var");
+ xfree(client->waiter);
+ client->callback_thread = Qnil;
+}
+
/*
* call-seq:
- * client.loop_start -> Boolean
+ * client.loop_stop(true) -> Boolean
*
* This is part of the threaded client interface. Call this once to stop the
* network thread previously created with Mosquitto::Client#loop_start. This call
* will block until the network thread finishes. For the network thread to end,
* you must have previously called Mosquitto::Client#disconnect or have set the force
@@ -1589,11 +1632,11 @@
* @param force [Boolean] set to true to force thread cancellation. If false, Mosquitto::Client#disconnect
* must have already been called.
* @return [true] on success
* @raise [Mosquitto::Error] on invalid input params or if thread support is not available
* @example
- * client.loop_start
+ * client.loop_stop(true)
*
*/
static VALUE rb_mosquitto_client_loop_stop(VALUE obj, VALUE force)
{
struct nogvl_loop_stop_args args;
@@ -1602,20 +1645,17 @@
args.mosq = client->mosq;
args.force = ((force == Qtrue) ? true : false);
ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_stop_nogvl, (void *)&args, RUBY_UBF_IO, 0);
switch (ret) {
case MOSQ_ERR_INVAL:
- MosquittoError("invalid input params");
+ MosquittoError("Threaded main loop not running for this client. Are you sure you haven't already called Mosquitto::Client#loop_stop ?");
break;
case MOSQ_ERR_NOT_SUPPORTED :
MosquittoError("thread support is not available");
break;
default:
- pthread_mutex_destroy(&client->callback_mutex);
- pthread_cond_destroy(&client->callback_cond);
- rb_thread_kill(client->callback_thread);
- client->callback_thread = Qnil;
+ rb_mosquitto_client_reap_event_thread(client);
return Qtrue;
}
}
static void *rb_mosquitto_client_loop_read_nogvl(void *ptr)
@@ -1784,10 +1824,34 @@
return (ret == true) ? Qtrue : Qfalse;
}
/*
* call-seq:
+ * client.destroy -> Boolean
+ *
+ * Free memory associated with a mosquitto client instance. Used in integration tests only.
+ *
+ * @return [true] true when memory freed
+ * @example
+ * client.destroy
+ *
+ */
+static VALUE rb_mosquitto_client_destroy(VALUE obj)
+{
+ MosquittoGetClient(obj);
+ if (!NIL_P(client->callback_thread)) {
+ mosquitto_stop_waiting_for_callbacks(client);
+ mosquitto_loop_stop(client->mosq, true);
+ rb_mosquitto_client_reap_event_thread(client);
+ }
+ mosquitto_destroy(client->mosq);
+ client->mosq = NULL;
+ return Qtrue;
+}
+
+/*
+ * call-seq:
* client.reconnect_delay_set(2, 10, true) -> Boolean
*
* Control the behaviour of the client when it has unexpectedly disconnected in
* Mosquitto::Client#loop_forever or after Mosquitto::Client#loop_start. The default
* behaviour if this function is not used is to repeatedly attempt to reconnect
@@ -2158,6 +2222,10 @@
rb_define_method(rb_cMosquittoClient, "on_publish", rb_mosquitto_client_on_publish, -1);
rb_define_method(rb_cMosquittoClient, "on_message", rb_mosquitto_client_on_message, -1);
rb_define_method(rb_cMosquittoClient, "on_subscribe", rb_mosquitto_client_on_subscribe, -1);
rb_define_method(rb_cMosquittoClient, "on_unsubscribe", rb_mosquitto_client_on_unsubscribe, -1);
rb_define_method(rb_cMosquittoClient, "on_log", rb_mosquitto_client_on_log, -1);
+
+ /* For integration testing only (will) */
+ rb_define_method(rb_cMosquittoClient, "destroy", rb_mosquitto_client_destroy, 0);
+
}