1
0
mirror of https://github.com/Tha14/toxic.git synced 2024-11-22 21:43:02 +01:00

Refactor message queue

We now attempt to send all queued messages per call to cqueue_try_send() instead
of just the oldest message in the queue. This speeds things up substantially.

Also fixed a very unlikely bug where the read receipt might wrap around to zero
which we used as a reserved value for an unsent message
This commit is contained in:
jfreegman 2020-11-11 16:28:50 -05:00
parent 68e1ba312d
commit 77ab71f26f
No known key found for this signature in database
GPG Key ID: 3627F3144076AE63
3 changed files with 35 additions and 19 deletions

View File

@ -59,7 +59,7 @@ void cqueue_add(struct chat_queue *q, const char *msg, size_t len, uint8_t type,
new_m->type = type; new_m->type = type;
new_m->line_id = line_id; new_m->line_id = line_id;
new_m->last_send_try = 0; new_m->last_send_try = 0;
new_m->receipt = 0; new_m->receipt = -1;
new_m->next = NULL; new_m->next = NULL;
if (q->root == NULL) { if (q->root == NULL) {
@ -135,23 +135,37 @@ void cqueue_remove(ToxWindow *self, Tox *m, uint32_t receipt)
} }
} }
#define CQUEUE_TRY_SEND_INTERVAL 10 /* We use knowledge of toxcore internals (bad!) to determine that if we haven't received a read receipt
* for a sent packet after this amount of time, the connection has been severed and the packet needs
* to be re-sent despite toxcore not returning an error on its initial send.
*/
#define TRY_SEND_TIMEOUT 32
/* Tries to send the oldest unsent message in queue. */ /*
* Tries to send all messages in the send queue in sequential order.
* If a message fails to send the function will immediately return.
*/
void cqueue_try_send(ToxWindow *self, Tox *m) void cqueue_try_send(ToxWindow *self, Tox *m)
{ {
struct chat_queue *q = self->chatwin->cqueue; struct chat_queue *q = self->chatwin->cqueue;
struct cqueue_msg *msg = q->root; struct cqueue_msg *msg = q->root;
if (!msg) { while (msg) {
return; if (msg->receipt == -1) {
} TOX_ERR_FRIEND_SEND_MESSAGE err;
Tox_Message_Type type = msg->type == OUT_MSG ? TOX_MESSAGE_TYPE_NORMAL : TOX_MESSAGE_TYPE_ACTION;
uint32_t receipt = tox_friend_send_message(m, self->num, type, (uint8_t *) msg->message, msg->len, &err);
if (msg->receipt != 0 && !timed_out(msg->last_send_try, CQUEUE_TRY_SEND_INTERVAL)) { if (err != TOX_ERR_FRIEND_SEND_MESSAGE_OK) {
return; return;
} }
Tox_Message_Type type = msg->type == OUT_MSG ? TOX_MESSAGE_TYPE_NORMAL : TOX_MESSAGE_TYPE_ACTION; msg->receipt = receipt;
msg->receipt = tox_friend_send_message(m, self->num, type, (uint8_t *) msg->message, msg->len, NULL); msg->last_send_try = get_unix_time();
msg->last_send_try = get_unix_time(); } else if (timed_out(msg->last_send_try, TRY_SEND_TIMEOUT)) { // unlikely but possible
msg->receipt = -1;
}
msg = msg->next;
}
} }

View File

@ -27,9 +27,9 @@ struct cqueue_msg {
char message[MAX_STR_SIZE]; char message[MAX_STR_SIZE];
size_t len; size_t len;
int line_id; int line_id;
uint8_t type;
uint32_t receipt;
time_t last_send_try; time_t last_send_try;
uint8_t type;
int64_t receipt;
struct cqueue_msg *next; struct cqueue_msg *next;
struct cqueue_msg *prev; struct cqueue_msg *prev;
}; };
@ -42,7 +42,10 @@ struct chat_queue {
void cqueue_cleanup(struct chat_queue *q); void cqueue_cleanup(struct chat_queue *q);
void cqueue_add(struct chat_queue *q, const char *msg, size_t len, uint8_t type, int line_id); void cqueue_add(struct chat_queue *q, const char *msg, size_t len, uint8_t type, int line_id);
/* Tries to send the oldest unsent message in queue. */ /*
* Tries to send all messages in the send queue in sequential order.
* If a message fails to send the function will immediately return.
*/
void cqueue_try_send(ToxWindow *self, Tox *m); void cqueue_try_send(ToxWindow *self, Tox *m);
/* removes message with matching receipt from queue, writes to log and updates line to show the message was received. */ /* removes message with matching receipt from queue, writes to log and updates line to show the message was received. */

View File

@ -947,6 +947,7 @@ static void do_toxic(Tox *m)
tox_iterate(m, NULL); tox_iterate(m, NULL);
do_tox_connection(m); do_tox_connection(m);
pthread_mutex_unlock(&Winthread.lock); pthread_mutex_unlock(&Winthread.lock);
} }
@ -985,9 +986,7 @@ void *thread_cqueue(void *data)
while (true) { while (true) {
pthread_mutex_lock(&Winthread.lock); pthread_mutex_lock(&Winthread.lock);
size_t i; for (size_t i = 2; i < MAX_WINDOWS_NUM; ++i) {
for (i = 2; i < MAX_WINDOWS_NUM; ++i) {
ToxWindow *toxwin = get_window_ptr(i); ToxWindow *toxwin = get_window_ptr(i);
if ((toxwin != NULL) && (toxwin->type == WINDOW_TYPE_CHAT) if ((toxwin != NULL) && (toxwin->type == WINDOW_TYPE_CHAT)
@ -998,7 +997,7 @@ void *thread_cqueue(void *data)
pthread_mutex_unlock(&Winthread.lock); pthread_mutex_unlock(&Winthread.lock);
sleep_thread(4000L); sleep_thread(750000L); // 0.75 seconds
} }
} }