ext/mosquitto/client.c in mosquitto-0.2 vs ext/mosquitto/client.c in mosquitto-0.3

- old
+ new

@@ -1,16 +1,18 @@ #include "mosquitto_ext.h" static void rb_mosquitto_run_callback(mosquitto_callback_t *callback); +static void rb_mosquitto_free_callback(mosquitto_callback_t *callback); +static void rb_mosquitto_client_reap_event_thread(mosquitto_client_wrapper *client); VALUE mosquitto_tls_password; static int rb_mosquitto_tls_password_callback(char *buf, int size, int rwflag, void *obj) { strncpy(buf, StringValueCStr(mosquitto_tls_password), size); - return RSTRING_LEN(mosquitto_tls_password); rb_gc_unregister_address(&mosquitto_tls_password); + return RSTRING_LEN(mosquitto_tls_password); } /* * :nodoc: * Pushes a callback onto the client's callback queue. The callback runs within the context of an event thread. @@ -45,14 +47,14 @@ * to handle. * * Only applicable to clients that run with the threaded Mosquitto::Client#loop_start event loop * */ -static void *mosquitto_wait_for_callbacks(void *w) +static void *mosquitto_wait_for_callbacks(void *c) { - mosquitto_callback_waiting_t *waiter = (mosquitto_callback_waiting_t *)w; - mosquitto_client_wrapper *client = waiter->client; + mosquitto_client_wrapper *client = (mosquitto_client_wrapper *)c; + mosquitto_callback_waiting_t *waiter = client->waiter; pthread_mutex_lock(&client->callback_mutex); while (!waiter->abort && (waiter->callback = mosquitto_callback_queue_pop(client)) == NULL) { pthread_cond_wait(&client->callback_cond, &client->callback_mutex); @@ -67,17 +69,21 @@ * Unblocking function for the callback poller - invoked when the event thread should exit. * * Only applicable to clients that run with the threaded Mosquitto::Client#loop_start event loop * */ -static void mosquitto_stop_waiting_for_callbacks(void *w) +static void mosquitto_stop_waiting_for_callbacks(void *c) { - mosquitto_callback_waiting_t *waiter = (mosquitto_callback_waiting_t *)w; - mosquitto_client_wrapper *client = waiter->client; + mosquitto_client_wrapper *client = (mosquitto_client_wrapper *)c; + mosquitto_callback_waiting_t *waiter = client->waiter; + mosquitto_callback_t *callback = NULL; pthread_mutex_lock(&client->callback_mutex); waiter->abort = 1; + while ((callback = mosquitto_callback_queue_pop(client)) != NULL) { + rb_mosquitto_free_callback(callback); + } pthread_mutex_unlock(&client->callback_mutex); pthread_cond_signal(&client->callback_cond); } /* @@ -166,22 +172,14 @@ case ON_CONNECT_CALLBACK: { on_connect_callback_args_t *cb = (on_connect_callback_args_t *)callback->data; args[0] = client->connect_cb; args[1] = (VALUE)1; args[2] = INT2NUM(cb->rc); - switch (cb->rc) { - case 1: - MosquittoError("connection refused (unacceptable protocol version)"); - break; - case 2: - MosquittoError("connection refused (identifier rejected)"); - break; - case 3: - MosquittoError("connection refused (broker unavailable)"); - break; - default: - rb_mosquitto_funcall_protected(error_tag, args); + if (cb->rc == 0) { + rb_mosquitto_funcall_protected(error_tag, args); + } else { + MosquittoError(mosquitto_connack_string(cb->rc)); } } break; case ON_DISCONNECT_CALLBACK: { @@ -267,23 +265,21 @@ * */ static VALUE rb_mosquitto_callback_thread(void *obj) { mosquitto_client_wrapper *client = (mosquitto_client_wrapper *)obj; - mosquitto_callback_waiting_t waiter; - waiter.callback = NULL; - waiter.abort = 0; - waiter.client = client; - while (!waiter.abort) + mosquitto_callback_waiting_t *waiter = client->waiter; + waiter->callback = NULL; + waiter->abort = 0; + while (!waiter->abort) { - rb_thread_call_without_gvl(mosquitto_wait_for_callbacks, (void *)&waiter, mosquitto_stop_waiting_for_callbacks, (void *)&waiter); - if (waiter.callback) + rb_thread_call_without_gvl(mosquitto_wait_for_callbacks, (void *)client, mosquitto_stop_waiting_for_callbacks, (void *)client); + if (waiter->callback) { - rb_mosquitto_run_callback(waiter.callback); + rb_mosquitto_run_callback(waiter->callback); } } - return Qnil; } /* * :nodoc: @@ -442,11 +438,18 @@ */ static void rb_mosquitto_free_client(void *ptr) { mosquitto_client_wrapper *client = (mosquitto_client_wrapper *)ptr; if (client) { - mosquitto_destroy(client->mosq); + if (client->mosq != NULL) { + if (!NIL_P(client->callback_thread)) { + mosquitto_stop_waiting_for_callbacks(client); + mosquitto_loop_stop(client->mosq, true); + rb_mosquitto_client_reap_event_thread(client); + } + if (client->mosq != NULL) mosquitto_destroy(client->mosq); + } xfree(client); } } /* @@ -506,10 +509,11 @@ cl->subscribe_cb = Qnil; cl->unsubscribe_cb = Qnil; cl->log_cb = Qnil; cl->callback_thread = Qnil; cl->callback_queue = NULL; + cl->waiter = NULL; rb_obj_call_init(client, 0, NULL); return client; } static void *rb_mosquitto_client_reinitialise_nogvl(void *ptr) @@ -686,11 +690,11 @@ } } /* * call-seq: - * client.tls_set('/certs/all-ca.crt'), '/certs', '/certs/client.crt'), '/certs/client.key') -> Boolean + * client.tls_set('/certs/all-ca.crt'), '/certs', '/certs/client.crt'), '/certs/client.key', 'password') -> Boolean * * Configure the client for certificate based SSL/TLS support. * * Cannot be used in conjunction with Mosquitto::Client#tls_psk_set. * @@ -712,11 +716,11 @@ * @param password [String] password for encrypted keyfile * @return [true] on success * @raise [Mosquitto::Error] on invalid input params or when TLS is not supported * @note This must be called before calling Mosquitto::Client#connect * @example - * client.tls_set('/certs/all-ca.crt'), '/certs', '/certs/client.crt'), '/certs/client.key') + * client.tls_set('/certs/all-ca.crt'), '/certs', '/certs/client.crt'), '/certs/client.key', 'password') * */ static VALUE rb_mosquitto_client_tls_set(VALUE obj, VALUE cafile, VALUE capath, VALUE certfile, VALUE keyfile, VALUE password) { int ret; @@ -1173,17 +1177,21 @@ * */ static VALUE rb_mosquitto_client_disconnect(VALUE obj) { int ret; + bool retried = false; + struct timeval time; MosquittoGetClient(obj); + retry_once: ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_disconnect_nogvl, (void *)client->mosq, RUBY_UBF_IO, 0); switch (ret) { case MOSQ_ERR_INVAL: MosquittoError("invalid input params"); break; case MOSQ_ERR_NO_CONN: + RetryNotConnectedOnce(); MosquittoError("client not connected to broker"); break; default: return Qtrue; } @@ -1217,10 +1225,12 @@ */ static VALUE rb_mosquitto_client_publish(VALUE obj, VALUE mid, VALUE topic, VALUE payload, VALUE qos, VALUE retain) { struct nogvl_publish_args args; int ret, msg_id; + struct timeval time; + bool retried = false; MosquittoGetClient(obj); Check_Type(topic, T_STRING); MosquittoEncode(topic); Check_Type(payload, T_STRING); MosquittoEncode(payload); @@ -1234,19 +1244,21 @@ args.topic = StringValueCStr(topic); args.payloadlen = (int)RSTRING_LEN(payload); args.payload = (const char *)(args.payloadlen == 0 ? NULL : StringValueCStr(payload)); args.qos = NUM2INT(qos); args.retain = (retain == Qtrue) ? true : false; + retry_once: ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_publish_nogvl, (void *)&args, RUBY_UBF_IO, 0); switch (ret) { case MOSQ_ERR_INVAL: MosquittoError("invalid input params"); break; case MOSQ_ERR_NOMEM: rb_memerror(); break; case MOSQ_ERR_NO_CONN: + RetryNotConnectedOnce(); MosquittoError("client not connected to broker"); break; case MOSQ_ERR_PROTOCOL: MosquittoError("protocol error communicating with broker"); break; @@ -1284,10 +1296,12 @@ */ static VALUE rb_mosquitto_client_subscribe(VALUE obj, VALUE mid, VALUE subscription, VALUE qos) { struct nogvl_subscribe_args args; int ret, msg_id; + struct timeval time; + bool retried = false; MosquittoGetClient(obj); Check_Type(subscription, T_STRING); MosquittoEncode(subscription); Check_Type(qos, T_FIXNUM); if (!NIL_P(mid)) { @@ -1296,19 +1310,21 @@ } args.mosq = client->mosq; args.mid = NIL_P(mid) ? NULL : &msg_id; args.subscription = StringValueCStr(subscription); args.qos = NUM2INT(qos); + retry_once: ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_subscribe_nogvl, (void *)&args, RUBY_UBF_IO, 0); switch (ret) { case MOSQ_ERR_INVAL: MosquittoError("invalid input params"); break; case MOSQ_ERR_NOMEM: rb_memerror(); break; case MOSQ_ERR_NO_CONN: + RetryNotConnectedOnce(); MosquittoError("client not connected to broker"); break; default: return Qtrue; } @@ -1338,29 +1354,33 @@ */ static VALUE rb_mosquitto_client_unsubscribe(VALUE obj, VALUE mid, VALUE subscription) { struct nogvl_subscribe_args args; int ret, msg_id; + struct timeval time; + bool retried = false; MosquittoGetClient(obj); Check_Type(subscription, T_STRING); MosquittoEncode(subscription); if (!NIL_P(mid)) { Check_Type(mid, T_FIXNUM); msg_id = NUM2INT(mid); } args.mosq = client->mosq; args.mid = NIL_P(mid) ? NULL : &msg_id; args.subscription = StringValueCStr(subscription); + retry_once: ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_unsubscribe_nogvl, (void *)&args, RUBY_UBF_IO, 0); switch (ret) { case MOSQ_ERR_INVAL: MosquittoError("invalid input params"); break; case MOSQ_ERR_NOMEM: rb_memerror(); break; case MOSQ_ERR_NO_CONN: + RetryNotConnectedOnce(); MosquittoError("client not connected to broker"); break; default: return Qtrue; } @@ -1424,25 +1444,29 @@ */ static VALUE rb_mosquitto_client_loop(VALUE obj, VALUE timeout, VALUE max_packets) { struct nogvl_loop_args args; int ret; + struct timeval time; + bool retried = false; MosquittoGetClient(obj); Check_Type(timeout, T_FIXNUM); Check_Type(max_packets, T_FIXNUM); args.mosq = client->mosq; args.timeout = NUM2INT(timeout); args.max_packets = NUM2INT(max_packets); + retry_once: ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_nogvl, (void *)&args, RUBY_UBF_IO, 0); switch (ret) { case MOSQ_ERR_INVAL: MosquittoError("invalid input params"); break; case MOSQ_ERR_NOMEM: rb_memerror(); break; case MOSQ_ERR_NO_CONN: + RetryNotConnectedOnce(); MosquittoError("client not connected to broker"); break; case MOSQ_ERR_CONN_LOST: MosquittoError("connection to the broker was lost"); break; @@ -1491,25 +1515,29 @@ */ static VALUE rb_mosquitto_client_loop_forever(VALUE obj, VALUE timeout, VALUE max_packets) { struct nogvl_loop_args args; int ret; + struct timeval time; + bool retried = false; MosquittoGetClient(obj); Check_Type(timeout, T_FIXNUM); Check_Type(max_packets, T_FIXNUM); args.mosq = client->mosq; args.timeout = NUM2INT(timeout); args.max_packets = NUM2INT(max_packets); + retry_once: ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_forever_nogvl, (void *)&args, rb_mosquitto_client_loop_forever_ubf, client); switch (ret) { case MOSQ_ERR_INVAL: MosquittoError("invalid input params"); break; case MOSQ_ERR_NOMEM: rb_memerror(); break; case MOSQ_ERR_NO_CONN: + RetryNotConnectedOnce(); MosquittoError("client not connected to broker"); break; case MOSQ_ERR_CONN_LOST: MosquittoError("connection to the broker was lost"); break; @@ -1557,12 +1585,13 @@ break; case MOSQ_ERR_NOT_SUPPORTED : MosquittoError("thread support is not available"); break; default: - pthread_mutex_init(&client->callback_mutex, NULL); - pthread_cond_init(&client->callback_cond, NULL); + client->waiter = MOSQ_ALLOC(mosquitto_callback_waiting_t); + if (pthread_mutex_init(&client->callback_mutex, NULL) != 0) MosquittoError("failed to create callback thread mutex"); + if (pthread_cond_init(&client->callback_cond, NULL) != 0) MosquittoError("failed to create callback thread condition var"); client->callback_thread = rb_thread_create(rb_mosquitto_callback_thread, client); /* Allow the callback thread some startup time */ time.tv_sec = 0; time.tv_usec = 100 * 1000; /* 0.1 sec */ rb_thread_wait_for(time); @@ -1574,13 +1603,27 @@ { struct nogvl_loop_stop_args *args = ptr; return (VALUE)mosquitto_loop_stop(args->mosq, args->force); } +static void rb_mosquitto_client_reap_event_thread(mosquitto_client_wrapper *client) +{ + struct timeval time; + mosquitto_stop_waiting_for_callbacks(client); + /* Allow the callback thread some shutdown time */ + time.tv_sec = 0; + time.tv_usec = 100 * 1000; /* 0.1 sec */ + rb_thread_wait_for(time); + if (pthread_mutex_destroy(&client->callback_mutex) == EINVAL) MosquittoError("could not destroy callback thread mutex"); + if (pthread_cond_destroy(&client->callback_cond) == EINVAL) MosquittoError("could not destroy callback condition var"); + xfree(client->waiter); + client->callback_thread = Qnil; +} + /* * call-seq: - * client.loop_start -> Boolean + * client.loop_stop(true) -> Boolean * * This is part of the threaded client interface. Call this once to stop the * network thread previously created with Mosquitto::Client#loop_start. This call * will block until the network thread finishes. For the network thread to end, * you must have previously called Mosquitto::Client#disconnect or have set the force @@ -1589,11 +1632,11 @@ * @param force [Boolean] set to true to force thread cancellation. If false, Mosquitto::Client#disconnect * must have already been called. * @return [true] on success * @raise [Mosquitto::Error] on invalid input params or if thread support is not available * @example - * client.loop_start + * client.loop_stop(true) * */ static VALUE rb_mosquitto_client_loop_stop(VALUE obj, VALUE force) { struct nogvl_loop_stop_args args; @@ -1602,20 +1645,17 @@ args.mosq = client->mosq; args.force = ((force == Qtrue) ? true : false); ret = (int)rb_thread_call_without_gvl(rb_mosquitto_client_loop_stop_nogvl, (void *)&args, RUBY_UBF_IO, 0); switch (ret) { case MOSQ_ERR_INVAL: - MosquittoError("invalid input params"); + MosquittoError("Threaded main loop not running for this client. Are you sure you haven't already called Mosquitto::Client#loop_stop ?"); break; case MOSQ_ERR_NOT_SUPPORTED : MosquittoError("thread support is not available"); break; default: - pthread_mutex_destroy(&client->callback_mutex); - pthread_cond_destroy(&client->callback_cond); - rb_thread_kill(client->callback_thread); - client->callback_thread = Qnil; + rb_mosquitto_client_reap_event_thread(client); return Qtrue; } } static void *rb_mosquitto_client_loop_read_nogvl(void *ptr) @@ -1784,10 +1824,34 @@ return (ret == true) ? Qtrue : Qfalse; } /* * call-seq: + * client.destroy -> Boolean + * + * Free memory associated with a mosquitto client instance. Used in integration tests only. + * + * @return [true] true when memory freed + * @example + * client.destroy + * + */ +static VALUE rb_mosquitto_client_destroy(VALUE obj) +{ + MosquittoGetClient(obj); + if (!NIL_P(client->callback_thread)) { + mosquitto_stop_waiting_for_callbacks(client); + mosquitto_loop_stop(client->mosq, true); + rb_mosquitto_client_reap_event_thread(client); + } + mosquitto_destroy(client->mosq); + client->mosq = NULL; + return Qtrue; +} + +/* + * call-seq: * client.reconnect_delay_set(2, 10, true) -> Boolean * * Control the behaviour of the client when it has unexpectedly disconnected in * Mosquitto::Client#loop_forever or after Mosquitto::Client#loop_start. The default * behaviour if this function is not used is to repeatedly attempt to reconnect @@ -2158,6 +2222,10 @@ rb_define_method(rb_cMosquittoClient, "on_publish", rb_mosquitto_client_on_publish, -1); rb_define_method(rb_cMosquittoClient, "on_message", rb_mosquitto_client_on_message, -1); rb_define_method(rb_cMosquittoClient, "on_subscribe", rb_mosquitto_client_on_subscribe, -1); rb_define_method(rb_cMosquittoClient, "on_unsubscribe", rb_mosquitto_client_on_unsubscribe, -1); rb_define_method(rb_cMosquittoClient, "on_log", rb_mosquitto_client_on_log, -1); + + /* For integration testing only (will) */ + rb_define_method(rb_cMosquittoClient, "destroy", rb_mosquitto_client_destroy, 0); + }