From 77ab71f26f764d305aacbbb5c4bda0476d6859bf Mon Sep 17 00:00:00 2001 From: jfreegman Date: Wed, 11 Nov 2020 16:28:50 -0500 Subject: [PATCH] Refactor message queue We now attempt to send all queued messages per call to cqueue_try_send() instead of just the oldest message in the queue. This speeds things up substantially. Also fixed a very unlikely bug where the read receipt might wrap around to zero which we used as a reserved value for an unsent message --- src/message_queue.c | 38 ++++++++++++++++++++++++++------------ src/message_queue.h | 9 ++++++--- src/toxic.c | 7 +++---- 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/src/message_queue.c b/src/message_queue.c index 61bee24..a19e417 100644 --- a/src/message_queue.c +++ b/src/message_queue.c @@ -59,7 +59,7 @@ void cqueue_add(struct chat_queue *q, const char *msg, size_t len, uint8_t type, new_m->type = type; new_m->line_id = line_id; new_m->last_send_try = 0; - new_m->receipt = 0; + new_m->receipt = -1; new_m->next = NULL; if (q->root == NULL) { @@ -135,23 +135,37 @@ void cqueue_remove(ToxWindow *self, Tox *m, uint32_t receipt) } } -#define CQUEUE_TRY_SEND_INTERVAL 10 +/* We use knowledge of toxcore internals (bad!) to determine that if we haven't received a read receipt + * for a sent packet after this amount of time, the connection has been severed and the packet needs + * to be re-sent despite toxcore not returning an error on its initial send. + */ +#define TRY_SEND_TIMEOUT 32 -/* Tries to send the oldest unsent message in queue. */ +/* + * Tries to send all messages in the send queue in sequential order. + * If a message fails to send the function will immediately return. + */ void cqueue_try_send(ToxWindow *self, Tox *m) { struct chat_queue *q = self->chatwin->cqueue; struct cqueue_msg *msg = q->root; - if (!msg) { - return; - } + while (msg) { + if (msg->receipt == -1) { + TOX_ERR_FRIEND_SEND_MESSAGE err; + Tox_Message_Type type = msg->type == OUT_MSG ? TOX_MESSAGE_TYPE_NORMAL : TOX_MESSAGE_TYPE_ACTION; + uint32_t receipt = tox_friend_send_message(m, self->num, type, (uint8_t *) msg->message, msg->len, &err); - if (msg->receipt != 0 && !timed_out(msg->last_send_try, CQUEUE_TRY_SEND_INTERVAL)) { - return; - } + if (err != TOX_ERR_FRIEND_SEND_MESSAGE_OK) { + return; + } - Tox_Message_Type type = msg->type == OUT_MSG ? TOX_MESSAGE_TYPE_NORMAL : TOX_MESSAGE_TYPE_ACTION; - msg->receipt = tox_friend_send_message(m, self->num, type, (uint8_t *) msg->message, msg->len, NULL); - msg->last_send_try = get_unix_time(); + msg->receipt = receipt; + msg->last_send_try = get_unix_time(); + } else if (timed_out(msg->last_send_try, TRY_SEND_TIMEOUT)) { // unlikely but possible + msg->receipt = -1; + } + + msg = msg->next; + } } diff --git a/src/message_queue.h b/src/message_queue.h index 7c9bbb0..460c334 100644 --- a/src/message_queue.h +++ b/src/message_queue.h @@ -27,9 +27,9 @@ struct cqueue_msg { char message[MAX_STR_SIZE]; size_t len; int line_id; - uint8_t type; - uint32_t receipt; time_t last_send_try; + uint8_t type; + int64_t receipt; struct cqueue_msg *next; struct cqueue_msg *prev; }; @@ -42,7 +42,10 @@ struct chat_queue { void cqueue_cleanup(struct chat_queue *q); void cqueue_add(struct chat_queue *q, const char *msg, size_t len, uint8_t type, int line_id); -/* Tries to send the oldest unsent message in queue. */ +/* + * Tries to send all messages in the send queue in sequential order. + * If a message fails to send the function will immediately return. + */ void cqueue_try_send(ToxWindow *self, Tox *m); /* removes message with matching receipt from queue, writes to log and updates line to show the message was received. */ diff --git a/src/toxic.c b/src/toxic.c index 4a2ff6d..2236471 100644 --- a/src/toxic.c +++ b/src/toxic.c @@ -947,6 +947,7 @@ static void do_toxic(Tox *m) tox_iterate(m, NULL); do_tox_connection(m); + pthread_mutex_unlock(&Winthread.lock); } @@ -985,9 +986,7 @@ void *thread_cqueue(void *data) while (true) { pthread_mutex_lock(&Winthread.lock); - size_t i; - - for (i = 2; i < MAX_WINDOWS_NUM; ++i) { + for (size_t i = 2; i < MAX_WINDOWS_NUM; ++i) { ToxWindow *toxwin = get_window_ptr(i); if ((toxwin != NULL) && (toxwin->type == WINDOW_TYPE_CHAT) @@ -998,7 +997,7 @@ void *thread_cqueue(void *data) pthread_mutex_unlock(&Winthread.lock); - sleep_thread(4000L); + sleep_thread(750000L); // 0.75 seconds } }