From 89d8d728ffc95d30fac909f806a4e1702eca06fc Mon Sep 17 00:00:00 2001 From: Green Sky Date: Sat, 19 Aug 2023 22:37:55 +0200 Subject: [PATCH] after 2 weeks of porting over the ngc_ft1 code to solanaceae and rewriting the highlevel logic (29 commits predate this) --- CMakeLists.txt | 56 + LICENSE | 24 + external/CMakeLists.txt | 4 + external/sha1/CMakeLists.txt | 12 + external/sha1/sha1/package.json | 9 + external/sha1/sha1/sha1.c | 295 +++++ external/sha1/sha1/sha1.h | 52 + solanaceae/ngc_ext/ngcext.cpp | 281 +++++ solanaceae/ngc_ext/ngcext.hpp | 278 +++++ solanaceae/ngc_ft1/ledbat.cpp | 250 ++++ solanaceae/ngc_ft1/ledbat.hpp | 122 ++ solanaceae/ngc_ft1/ngcft1.cpp | 719 +++++++++++ solanaceae/ngc_ft1/ngcft1.hpp | 252 ++++ solanaceae/ngc_ft1/ngcft1_file_kind.hpp | 76 ++ solanaceae/ngc_ft1/rcv_buf.cpp | 44 + solanaceae/ngc_ft1/rcv_buf.hpp | 35 + solanaceae/ngc_ft1/snd_buf.cpp | 16 + solanaceae/ngc_ft1/snd_buf.hpp | 33 + solanaceae/ngc_ft1_sha1/ft1_sha1_info.cpp | 139 +++ solanaceae/ngc_ft1_sha1/ft1_sha1_info.hpp | 55 + solanaceae/ngc_ft1_sha1/hash_utils.cpp | 26 + solanaceae/ngc_ft1_sha1/hash_utils.hpp | 10 + solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp | 1311 +++++++++++++++++++++ solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp | 134 +++ 24 files changed, 4233 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 LICENSE create mode 100644 external/CMakeLists.txt create mode 100644 external/sha1/CMakeLists.txt create mode 100644 external/sha1/sha1/package.json create mode 100644 external/sha1/sha1/sha1.c create mode 100644 external/sha1/sha1/sha1.h create mode 100644 solanaceae/ngc_ext/ngcext.cpp create mode 100644 solanaceae/ngc_ext/ngcext.hpp create mode 100644 solanaceae/ngc_ft1/ledbat.cpp create mode 100644 solanaceae/ngc_ft1/ledbat.hpp create mode 100644 solanaceae/ngc_ft1/ngcft1.cpp create mode 100644 solanaceae/ngc_ft1/ngcft1.hpp create mode 100644 solanaceae/ngc_ft1/ngcft1_file_kind.hpp create mode 100644 solanaceae/ngc_ft1/rcv_buf.cpp create mode 100644 solanaceae/ngc_ft1/rcv_buf.hpp create mode 100644 solanaceae/ngc_ft1/snd_buf.cpp create mode 100644 solanaceae/ngc_ft1/snd_buf.hpp create mode 100644 solanaceae/ngc_ft1_sha1/ft1_sha1_info.cpp create mode 100644 solanaceae/ngc_ft1_sha1/ft1_sha1_info.hpp create mode 100644 solanaceae/ngc_ft1_sha1/hash_utils.cpp create mode 100644 solanaceae/ngc_ft1_sha1/hash_utils.hpp create mode 100644 solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp create mode 100644 solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..1f91154 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,56 @@ +cmake_minimum_required(VERSION 3.9 FATAL_ERROR) + +add_subdirectory(./external) + +project(solanaceae) + +add_library(solanaceae_ngcext + ./solanaceae/ngc_ext/ngcext.hpp + ./solanaceae/ngc_ext/ngcext.cpp +) +target_compile_features(solanaceae_ngcext PUBLIC cxx_std_17) +target_link_libraries(solanaceae_ngcext PUBLIC + solanaceae_toxcore + solanaceae_util +) + +######################################## + +add_library(solanaceae_ngcft1 + ./solanaceae/ngc_ft1/ngcft1_file_kind.hpp + ./solanaceae/ngc_ft1/ngcft1.hpp + ./solanaceae/ngc_ft1/ngcft1.cpp + + ./solanaceae/ngc_ft1/ledbat.hpp + ./solanaceae/ngc_ft1/ledbat.cpp + + ./solanaceae/ngc_ft1/rcv_buf.hpp + ./solanaceae/ngc_ft1/rcv_buf.cpp + ./solanaceae/ngc_ft1/snd_buf.hpp + ./solanaceae/ngc_ft1/snd_buf.cpp +) +target_compile_features(solanaceae_ngcft1 PUBLIC cxx_std_17) +target_link_libraries(solanaceae_ngcft1 PUBLIC + solanaceae_ngcext +) + +######################################## + +add_library(solanaceae_sha1_ngcft1 + ./solanaceae/ngc_ft1_sha1/hash_utils.hpp + ./solanaceae/ngc_ft1_sha1/hash_utils.cpp + + ./solanaceae/ngc_ft1_sha1/ft1_sha1_info.hpp + ./solanaceae/ngc_ft1_sha1/ft1_sha1_info.cpp + + ./solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp + ./solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp +) +target_compile_features(solanaceae_sha1_ngcft1 PUBLIC cxx_std_17) +target_link_libraries(solanaceae_sha1_ngcft1 PUBLIC + solanaceae_ngcft1 + sha1::sha1 + solanaceae_tox_contacts + solanaceae_message3 +) + diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..2780797 --- /dev/null +++ b/LICENSE @@ -0,0 +1,24 @@ +The Code is under the following License, if not stated otherwise: + +MIT License + +Copyright (c) 2023 Erik Scholz + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/external/CMakeLists.txt b/external/CMakeLists.txt new file mode 100644 index 0000000..01a690c --- /dev/null +++ b/external/CMakeLists.txt @@ -0,0 +1,4 @@ +cmake_minimum_required(VERSION 3.9 FATAL_ERROR) + +add_subdirectory(./sha1) + diff --git a/external/sha1/CMakeLists.txt b/external/sha1/CMakeLists.txt new file mode 100644 index 0000000..9d7bfeb --- /dev/null +++ b/external/sha1/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.8) + +project(sha1 LANGUAGES C) + +add_library(sha1 STATIC + ./sha1/sha1.h + ./sha1/sha1.c +) +add_library(sha1::sha1 ALIAS sha1) + +target_include_directories(sha1 PUBLIC "sha1") + diff --git a/external/sha1/sha1/package.json b/external/sha1/sha1/package.json new file mode 100644 index 0000000..6a5843d --- /dev/null +++ b/external/sha1/sha1/package.json @@ -0,0 +1,9 @@ +{ + "name": "sha1", + "version": "0.0.1", + "repo": "clibs/sha1", + "description": "sha1 hash algorithm", + "keywords": ["sha1", "hash"], + "license": "public domain", + "src": ["sha1.c", "sha1.h"] +} diff --git a/external/sha1/sha1/sha1.c b/external/sha1/sha1/sha1.c new file mode 100644 index 0000000..76cd6ca --- /dev/null +++ b/external/sha1/sha1/sha1.c @@ -0,0 +1,295 @@ +/* +SHA-1 in C +By Steve Reid +100% Public Domain + +Test Vectors (from FIPS PUB 180-1) +"abc" + A9993E36 4706816A BA3E2571 7850C26C 9CD0D89D +"abcdbcdecdefdefgefghfghighijhijkijkljklmklmnlmnomnopnopq" + 84983E44 1C3BD26E BAAE4AA1 F95129E5 E54670F1 +A million repetitions of "a" + 34AA973C D4C4DAA4 F61EEB2B DBAD2731 6534016F +*/ + +/* #define LITTLE_ENDIAN * This should be #define'd already, if true. */ +/* #define SHA1HANDSOFF * Copies data before messing with it. */ + +#define SHA1HANDSOFF + +#include +#include + +/* for uint32_t */ +#include + +#include "sha1.h" + + +#define rol(value, bits) (((value) << (bits)) | ((value) >> (32 - (bits)))) + +/* blk0() and blk() perform the initial expand. */ +/* I got the idea of expanding during the round function from SSLeay */ +#if BYTE_ORDER == LITTLE_ENDIAN +#define blk0(i) (block->l[i] = (rol(block->l[i],24)&0xFF00FF00) \ + |(rol(block->l[i],8)&0x00FF00FF)) +#elif BYTE_ORDER == BIG_ENDIAN +#define blk0(i) block->l[i] +#else +#error "Endianness not defined!" +#endif +#define blk(i) (block->l[i&15] = rol(block->l[(i+13)&15]^block->l[(i+8)&15] \ + ^block->l[(i+2)&15]^block->l[i&15],1)) + +/* (R0+R1), R2, R3, R4 are the different operations used in SHA1 */ +#define R0(v,w,x,y,z,i) z+=((w&(x^y))^y)+blk0(i)+0x5A827999+rol(v,5);w=rol(w,30); +#define R1(v,w,x,y,z,i) z+=((w&(x^y))^y)+blk(i)+0x5A827999+rol(v,5);w=rol(w,30); +#define R2(v,w,x,y,z,i) z+=(w^x^y)+blk(i)+0x6ED9EBA1+rol(v,5);w=rol(w,30); +#define R3(v,w,x,y,z,i) z+=(((w|x)&y)|(w&x))+blk(i)+0x8F1BBCDC+rol(v,5);w=rol(w,30); +#define R4(v,w,x,y,z,i) z+=(w^x^y)+blk(i)+0xCA62C1D6+rol(v,5);w=rol(w,30); + + +/* Hash a single 512-bit block. This is the core of the algorithm. */ + +void SHA1Transform( + uint32_t state[5], + const unsigned char buffer[64] +) +{ + uint32_t a, b, c, d, e; + + typedef union + { + unsigned char c[64]; + uint32_t l[16]; + } CHAR64LONG16; + +#ifdef SHA1HANDSOFF + CHAR64LONG16 block[1]; /* use array to appear as a pointer */ + + memcpy(block, buffer, 64); +#else + /* The following had better never be used because it causes the + * pointer-to-const buffer to be cast into a pointer to non-const. + * And the result is written through. I threw a "const" in, hoping + * this will cause a diagnostic. + */ + CHAR64LONG16 *block = (const CHAR64LONG16 *) buffer; +#endif + /* Copy context->state[] to working vars */ + a = state[0]; + b = state[1]; + c = state[2]; + d = state[3]; + e = state[4]; + /* 4 rounds of 20 operations each. Loop unrolled. */ + R0(a, b, c, d, e, 0); + R0(e, a, b, c, d, 1); + R0(d, e, a, b, c, 2); + R0(c, d, e, a, b, 3); + R0(b, c, d, e, a, 4); + R0(a, b, c, d, e, 5); + R0(e, a, b, c, d, 6); + R0(d, e, a, b, c, 7); + R0(c, d, e, a, b, 8); + R0(b, c, d, e, a, 9); + R0(a, b, c, d, e, 10); + R0(e, a, b, c, d, 11); + R0(d, e, a, b, c, 12); + R0(c, d, e, a, b, 13); + R0(b, c, d, e, a, 14); + R0(a, b, c, d, e, 15); + R1(e, a, b, c, d, 16); + R1(d, e, a, b, c, 17); + R1(c, d, e, a, b, 18); + R1(b, c, d, e, a, 19); + R2(a, b, c, d, e, 20); + R2(e, a, b, c, d, 21); + R2(d, e, a, b, c, 22); + R2(c, d, e, a, b, 23); + R2(b, c, d, e, a, 24); + R2(a, b, c, d, e, 25); + R2(e, a, b, c, d, 26); + R2(d, e, a, b, c, 27); + R2(c, d, e, a, b, 28); + R2(b, c, d, e, a, 29); + R2(a, b, c, d, e, 30); + R2(e, a, b, c, d, 31); + R2(d, e, a, b, c, 32); + R2(c, d, e, a, b, 33); + R2(b, c, d, e, a, 34); + R2(a, b, c, d, e, 35); + R2(e, a, b, c, d, 36); + R2(d, e, a, b, c, 37); + R2(c, d, e, a, b, 38); + R2(b, c, d, e, a, 39); + R3(a, b, c, d, e, 40); + R3(e, a, b, c, d, 41); + R3(d, e, a, b, c, 42); + R3(c, d, e, a, b, 43); + R3(b, c, d, e, a, 44); + R3(a, b, c, d, e, 45); + R3(e, a, b, c, d, 46); + R3(d, e, a, b, c, 47); + R3(c, d, e, a, b, 48); + R3(b, c, d, e, a, 49); + R3(a, b, c, d, e, 50); + R3(e, a, b, c, d, 51); + R3(d, e, a, b, c, 52); + R3(c, d, e, a, b, 53); + R3(b, c, d, e, a, 54); + R3(a, b, c, d, e, 55); + R3(e, a, b, c, d, 56); + R3(d, e, a, b, c, 57); + R3(c, d, e, a, b, 58); + R3(b, c, d, e, a, 59); + R4(a, b, c, d, e, 60); + R4(e, a, b, c, d, 61); + R4(d, e, a, b, c, 62); + R4(c, d, e, a, b, 63); + R4(b, c, d, e, a, 64); + R4(a, b, c, d, e, 65); + R4(e, a, b, c, d, 66); + R4(d, e, a, b, c, 67); + R4(c, d, e, a, b, 68); + R4(b, c, d, e, a, 69); + R4(a, b, c, d, e, 70); + R4(e, a, b, c, d, 71); + R4(d, e, a, b, c, 72); + R4(c, d, e, a, b, 73); + R4(b, c, d, e, a, 74); + R4(a, b, c, d, e, 75); + R4(e, a, b, c, d, 76); + R4(d, e, a, b, c, 77); + R4(c, d, e, a, b, 78); + R4(b, c, d, e, a, 79); + /* Add the working vars back into context.state[] */ + state[0] += a; + state[1] += b; + state[2] += c; + state[3] += d; + state[4] += e; + /* Wipe variables */ + a = b = c = d = e = 0; +#ifdef SHA1HANDSOFF + memset(block, '\0', sizeof(block)); +#endif +} + + +/* SHA1Init - Initialize new context */ + +void SHA1Init( + SHA1_CTX * context +) +{ + /* SHA1 initialization constants */ + context->state[0] = 0x67452301; + context->state[1] = 0xEFCDAB89; + context->state[2] = 0x98BADCFE; + context->state[3] = 0x10325476; + context->state[4] = 0xC3D2E1F0; + context->count[0] = context->count[1] = 0; +} + + +/* Run your data through this. */ + +void SHA1Update( + SHA1_CTX * context, + const unsigned char *data, + uint32_t len +) +{ + uint32_t i; + + uint32_t j; + + j = context->count[0]; + if ((context->count[0] += len << 3) < j) + context->count[1]++; + context->count[1] += (len >> 29); + j = (j >> 3) & 63; + if ((j + len) > 63) + { + memcpy(&context->buffer[j], data, (i = 64 - j)); + SHA1Transform(context->state, context->buffer); + for (; i + 63 < len; i += 64) + { + SHA1Transform(context->state, &data[i]); + } + j = 0; + } + else + i = 0; + memcpy(&context->buffer[j], &data[i], len - i); +} + + +/* Add padding and return the message digest. */ + +void SHA1Final( + unsigned char digest[20], + SHA1_CTX * context +) +{ + unsigned i; + + unsigned char finalcount[8]; + + unsigned char c; + +#if 0 /* untested "improvement" by DHR */ + /* Convert context->count to a sequence of bytes + * in finalcount. Second element first, but + * big-endian order within element. + * But we do it all backwards. + */ + unsigned char *fcp = &finalcount[8]; + + for (i = 0; i < 2; i++) + { + uint32_t t = context->count[i]; + + int j; + + for (j = 0; j < 4; t >>= 8, j++) + *--fcp = (unsigned char) t} +#else + for (i = 0; i < 8; i++) + { + finalcount[i] = (unsigned char) ((context->count[(i >= 4 ? 0 : 1)] >> ((3 - (i & 3)) * 8)) & 255); /* Endian independent */ + } +#endif + c = 0200; + SHA1Update(context, &c, 1); + while ((context->count[0] & 504) != 448) + { + c = 0000; + SHA1Update(context, &c, 1); + } + SHA1Update(context, finalcount, 8); /* Should cause a SHA1Transform() */ + for (i = 0; i < 20; i++) + { + digest[i] = (unsigned char) + ((context->state[i >> 2] >> ((3 - (i & 3)) * 8)) & 255); + } + /* Wipe variables */ + memset(context, '\0', sizeof(*context)); + memset(&finalcount, '\0', sizeof(finalcount)); +} + +void SHA1( + char *hash_out, + const char *str, + uint32_t len) +{ + SHA1_CTX ctx; + unsigned int ii; + + SHA1Init(&ctx); + for (ii=0; ii + 100% Public Domain + */ + +#include "stdint.h" + +#if defined(__cplusplus) +extern "C" { +#endif + +typedef struct +{ + uint32_t state[5]; + uint32_t count[2]; + unsigned char buffer[64]; +} SHA1_CTX; + +void SHA1Transform( + uint32_t state[5], + const unsigned char buffer[64] + ); + +void SHA1Init( + SHA1_CTX * context + ); + +void SHA1Update( + SHA1_CTX * context, + const unsigned char *data, + uint32_t len + ); + +void SHA1Final( + unsigned char digest[20], + SHA1_CTX * context + ); + +void SHA1( + char *hash_out, + const char *str, + uint32_t len); + +#if defined(__cplusplus) +} +#endif + +#endif /* SHA1_H */ diff --git a/solanaceae/ngc_ext/ngcext.cpp b/solanaceae/ngc_ext/ngcext.cpp new file mode 100644 index 0000000..c94612b --- /dev/null +++ b/solanaceae/ngc_ext/ngcext.cpp @@ -0,0 +1,281 @@ +#include "./ngcext.hpp" + +#include + +NGCEXTEventProvider::NGCEXTEventProvider(ToxEventProviderI& tep) : _tep(tep) { + _tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PACKET); + _tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_CUSTOM_PRIVATE_PACKET); +} + +#define _DATA_HAVE(x, error) if ((data_size - curser) < (x)) { error; } + +bool NGCEXTEventProvider::parse_hs1_request_last_ids( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + return false; +} + +bool NGCEXTEventProvider::parse_hs1_response_last_ids( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + return false; +} + +bool NGCEXTEventProvider::parse_ft1_request( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool // dont care private +) { + Events::NGCEXT_ft1_request e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 4 byte (file_kind) + e.file_kind = 0u; + _DATA_HAVE(sizeof(e.file_kind), std::cerr << "NGCEXT: packet too small, missing file_kind\n"; return false) + for (size_t i = 0; i < sizeof(e.file_kind); i++, curser++) { + e.file_kind |= uint32_t(data[curser]) << (i*8); + } + + // - X bytes (file_kind dependent id, differnt sizes) + e.file_id = {data+curser, data+curser+(data_size-curser)}; + + return dispatch( + NGCEXT_Event::FT1_REQUEST, + e + ); +} + +bool NGCEXTEventProvider::parse_ft1_init( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + if (!_private) { + std::cerr << "NGCEXT: ft1_init cant be public\n"; + return false; + } + + Events::NGCEXT_ft1_init e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 4 byte (file_kind) + e.file_kind = 0u; + _DATA_HAVE(sizeof(e.file_kind), std::cerr << "NGCEXT: packet too small, missing file_kind\n"; return false) + for (size_t i = 0; i < sizeof(e.file_kind); i++, curser++) { + e.file_kind |= uint32_t(data[curser]) << (i*8); + } + + // - 8 bytes (data size) + e.file_size = 0u; + _DATA_HAVE(sizeof(e.file_size), std::cerr << "NGCEXT: packet too small, missing file_size\n"; return false) + for (size_t i = 0; i < sizeof(e.file_size); i++, curser++) { + e.file_size |= size_t(data[curser]) << (i*8); + } + + // - 1 byte (temporary_file_tf_id) + _DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false) + e.transfer_id = data[curser++]; + + // - X bytes (file_kind dependent id, differnt sizes) + e.file_id = {data+curser, data+curser+(data_size-curser)}; + + return dispatch( + NGCEXT_Event::FT1_INIT, + e + ); +} + +bool NGCEXTEventProvider::parse_ft1_init_ack( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + if (!_private) { + std::cerr << "NGCEXT: ft1_init_ack cant be public\n"; + return false; + } + + Events::NGCEXT_ft1_init_ack e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 1 byte (temporary_file_tf_id) + _DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false) + e.transfer_id = data[curser++]; + + return dispatch( + NGCEXT_Event::FT1_INIT_ACK, + e + ); +} + +bool NGCEXTEventProvider::parse_ft1_data( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + if (!_private) { + std::cerr << "NGCEXT: ft1_data cant be public\n"; + return false; + } + + Events::NGCEXT_ft1_data e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 1 byte (temporary_file_tf_id) + _DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false) + e.transfer_id = data[curser++]; + + // - 2 bytes (sequence_id) + e.sequence_id = 0u; + _DATA_HAVE(sizeof(e.sequence_id), std::cerr << "NGCEXT: packet too small, missing sequence_id\n"; return false) + for (size_t i = 0; i < sizeof(e.sequence_id); i++, curser++) { + e.sequence_id |= uint32_t(data[curser]) << (i*8); + } + + // - X bytes (the data fragment) + // (size is implicit) + e.data = {data+curser, data+curser+(data_size-curser)}; + + return dispatch( + NGCEXT_Event::FT1_DATA, + e + ); +} + +bool NGCEXTEventProvider::parse_ft1_data_ack( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + if (!_private) { + std::cerr << "NGCEXT: ft1_data_ack cant be public\n"; + return false; + } + + Events::NGCEXT_ft1_data_ack e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 1 byte (temporary_file_tf_id) + _DATA_HAVE(sizeof(e.transfer_id), std::cerr << "NGCEXT: packet too small, missing transfer_id\n"; return false) + e.transfer_id = data[curser++]; + + while (curser < data_size) { + _DATA_HAVE(sizeof(uint16_t), std::cerr << "NGCEXT: packet too small, missing seq_id\n"; return false) + uint16_t seq_id = data[curser++]; + seq_id |= data[curser++] << (1*8); + e.sequence_ids.push_back(seq_id); + } + + return dispatch( + NGCEXT_Event::FT1_DATA_ACK, + e + ); +} + +bool NGCEXTEventProvider::parse_ft1_message( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private +) { + if (_private) { + std::cerr << "NGCEXT: ft1_message cant be private (yet)\n"; + return false; + } + + Events::NGCEXT_ft1_message e; + e.group_number = group_number; + e.peer_number = peer_number; + size_t curser = 0; + + // - 4 byte (message_id) + e.message_id = 0u; + _DATA_HAVE(sizeof(e.message_id), std::cerr << "NGCEXT: packet too small, missing message_id\n"; return false) + for (size_t i = 0; i < sizeof(e.message_id); i++, curser++) { + e.message_id |= uint32_t(data[curser]) << (i*8); + } + + // - 4 byte (file_kind) + e.file_kind = 0u; + _DATA_HAVE(sizeof(e.file_kind), std::cerr << "NGCEXT: packet too small, missing file_kind\n"; return false) + for (size_t i = 0; i < sizeof(e.file_kind); i++, curser++) { + e.file_kind |= uint32_t(data[curser]) << (i*8); + } + + // - X bytes (file_kind dependent id, differnt sizes) + e.file_id = {data+curser, data+curser+(data_size-curser)}; + + return dispatch( + NGCEXT_Event::FT1_MESSAGE, + e + ); +} + +bool NGCEXTEventProvider::handlePacket( + const uint32_t group_number, + const uint32_t peer_number, + const uint8_t* data, + const size_t data_size, + const bool _private +) { + if (data_size < 1) { + return false; // waht + } + + NGCEXT_Event pkg_type = static_cast(data[0]); + + switch (pkg_type) { + case NGCEXT_Event::HS1_REQUEST_LAST_IDS: + return false; + case NGCEXT_Event::HS1_RESPONSE_LAST_IDS: + return false; + case NGCEXT_Event::FT1_REQUEST: + return parse_ft1_request(group_number, peer_number, data+1, data_size-1, _private); + case NGCEXT_Event::FT1_INIT: + return parse_ft1_init(group_number, peer_number, data+1, data_size-1, _private); + case NGCEXT_Event::FT1_INIT_ACK: + return parse_ft1_init_ack(group_number, peer_number, data+1, data_size-1, _private); + case NGCEXT_Event::FT1_DATA: + return parse_ft1_data(group_number, peer_number, data+1, data_size-1, _private); + case NGCEXT_Event::FT1_DATA_ACK: + return parse_ft1_data_ack(group_number, peer_number, data+1, data_size-1, _private); + case NGCEXT_Event::FT1_MESSAGE: + return parse_ft1_message(group_number, peer_number, data+1, data_size-1, _private); + default: + return false; + } + + return false; +} + +bool NGCEXTEventProvider::onToxEvent(const Tox_Event_Group_Custom_Packet* e) { + const auto group_number = tox_event_group_custom_packet_get_group_number(e); + const auto peer_number = tox_event_group_custom_packet_get_peer_id(e); + const uint8_t* data = tox_event_group_custom_packet_get_data(e); + const auto data_length = tox_event_group_custom_packet_get_data_length(e); + + return handlePacket(group_number, peer_number, data, data_length, false); +} + +bool NGCEXTEventProvider::onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) { + const auto group_number = tox_event_group_custom_private_packet_get_group_number(e); + const auto peer_number = tox_event_group_custom_private_packet_get_peer_id(e); + const uint8_t* data = tox_event_group_custom_private_packet_get_data(e); + const auto data_length = tox_event_group_custom_private_packet_get_data_length(e); + + return handlePacket(group_number, peer_number, data, data_length, true); +} + diff --git a/solanaceae/ngc_ext/ngcext.hpp b/solanaceae/ngc_ext/ngcext.hpp new file mode 100644 index 0000000..4932dcf --- /dev/null +++ b/solanaceae/ngc_ext/ngcext.hpp @@ -0,0 +1,278 @@ +#pragma once + +// solanaceae port of tox_ngc_ext + +#include +#include + +#include + +#include +#include + +namespace Events { + + // TODO: implement events as non-owning + + struct NGCEXT_hs1_request_last_ids { + uint32_t group_number; + uint32_t peer_number; + + // - peer_key bytes (peer key we want to know ids for) + ToxKey peer_key; + + // - 1 byte (uint8_t count ids, atleast 1) + uint8_t count_ids; + }; + + struct NGCEXT_hs1_response_last_ids { + uint32_t group_number; + uint32_t peer_number; + + // respond to a request with 0 or more message ids, sorted by newest first + // - peer_key bytes (the msg_ids are from) + ToxKey peer_key; + + // - 1 byte (uint8_t count ids, can be 0) + uint8_t count_ids; + + // - array [ + // - msg_id bytes (the message id) + // - ] + std::vector msg_ids; + }; + + struct NGCEXT_ft1_request { + uint32_t group_number; + uint32_t peer_number; + + // request the other side to initiate a FT + // - 4 byte (file_kind) + uint32_t file_kind; + + // - X bytes (file_kind dependent id, differnt sizes) + std::vector file_id; + }; + + struct NGCEXT_ft1_init { + uint32_t group_number; + uint32_t peer_number; + + // tell the other side you want to start a FT + // - 4 byte (file_kind) + uint32_t file_kind; + + // - 8 bytes (data size) + uint64_t file_size; + + // - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts) + uint8_t transfer_id; + + // - X bytes (file_kind dependent id, differnt sizes) + std::vector file_id; + + // TODO: max supported lossy packet size + }; + + struct NGCEXT_ft1_init_ack { + uint32_t group_number; + uint32_t peer_number; + + // - 1 byte (transfer_id) + uint8_t transfer_id; + + // TODO: max supported lossy packet size + }; + + struct NGCEXT_ft1_data { + uint32_t group_number; + uint32_t peer_number; + + // data fragment + // - 1 byte (temporary_file_tf_id) + uint8_t transfer_id; + + // - 2 bytes (sequece id) + uint16_t sequence_id; + + // - X bytes (the data fragment) + // (size is implicit) + std::vector data; + }; + + struct NGCEXT_ft1_data_ack { + uint32_t group_number; + uint32_t peer_number; + + // - 1 byte (temporary_file_tf_id) + uint8_t transfer_id; + + // - array [ (of sequece ids) + // - 2 bytes (sequece id) + // - ] + std::vector sequence_ids; + }; + + struct NGCEXT_ft1_message { + uint32_t group_number; + uint32_t peer_number; + + // - 4 byte (message_id) + uint32_t message_id; + + // request the other side to initiate a FT + // - 4 byte (file_kind) + uint32_t file_kind; + + // - X bytes (file_kind dependent id, differnt sizes) + std::vector file_id; + }; + +} // Events + +enum class NGCEXT_Event : uint8_t { + //TODO: make it possible to go further back + // request last (few) message_ids for a peer + // - peer_key bytes (peer key we want to know ids for) + // - 1 byte (uint8_t count ids, atleast 1) + HS1_REQUEST_LAST_IDS = 0x80 | 1u, + + // respond to a request with 0 or more message ids, sorted by newest first + // - peer_key bytes (the msg_ids are from) + // - 1 byte (uint8_t count ids, can be 0) + // - array [ + // - msg_id bytes (the message id) + // - ] + HS1_RESPONSE_LAST_IDS, + + // request the other side to initiate a FT + // - 4 byte (file_kind) + // - X bytes (file_kind dependent id, differnt sizes) + FT1_REQUEST = 0x80 | 8u, + + // TODO: request result negative, speed up not found + + // tell the other side you want to start a FT + // TODO: might use id layer instead. with it, it would look similar to friends_ft + // - 4 byte (file_kind) + // - 8 bytes (data size, can be 0 if unknown, BUT files have to be atleast 1 byte) + // - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts) + // - X bytes (file_kind dependent id, differnt sizes) + FT1_INIT, + + // acknowlage init (like an accept) + // like tox ft control continue + // - 1 byte (transfer_id) + FT1_INIT_ACK, + + // TODO: init deny, speed up non acceptance + + // data fragment + // - 1 byte (temporary_file_tf_id) + // - 2 bytes (sequece id) + // - X bytes (the data fragment) + // (size is implicit) + FT1_DATA, + + // acknowlage data fragments + // TODO: last 3 should be sufficient, 5 should be generous + // - 1 byte (temporary_file_tf_id) + // // this is implicit (pkg size)- 1 byte (number of sequence ids to ack, this kind of depends on window size) + // - array [ (of sequece ids) + // - 2 bytes (sequece id) + // - ] + FT1_DATA_ACK, + + // send file as message + // basically the opposite of request + // contains file_kind and file_id (and timestamp?) + // - 4 byte (message_id) + // - 4 byte (file_kind) + // - X bytes (file_kind dependent id, differnt sizes) + FT1_MESSAGE, + + MAX +}; + +struct NGCEXTEventI { + using enumType = NGCEXT_Event; + virtual bool onEvent(const Events::NGCEXT_hs1_request_last_ids&) { return false; } + virtual bool onEvent(const Events::NGCEXT_hs1_response_last_ids&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_request&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_init&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_init_ack&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_data&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_data_ack&) { return false; } + virtual bool onEvent(const Events::NGCEXT_ft1_message&) { return false; } +}; + +using NGCEXTEventProviderI = EventProviderI; + +class NGCEXTEventProvider : public ToxEventI, public NGCEXTEventProviderI { + ToxEventProviderI& _tep; + + public: + NGCEXTEventProvider(ToxEventProviderI& tep/*, ToxI& t*/); + + protected: + bool parse_hs1_request_last_ids( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_hs1_response_last_ids( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_request( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_init( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_init_ack( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_data( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_data_ack( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool parse_ft1_message( + uint32_t group_number, uint32_t peer_number, + const uint8_t* data, size_t data_size, + bool _private + ); + + bool handlePacket( + const uint32_t group_number, + const uint32_t peer_number, + const uint8_t* data, + const size_t data_size, + const bool _private + ); + + protected: + bool onToxEvent(const Tox_Event_Group_Custom_Packet* e) override; + bool onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) override; +}; + diff --git a/solanaceae/ngc_ft1/ledbat.cpp b/solanaceae/ngc_ft1/ledbat.cpp new file mode 100644 index 0000000..5f7c5d5 --- /dev/null +++ b/solanaceae/ngc_ft1/ledbat.cpp @@ -0,0 +1,250 @@ +#include "./ledbat.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +// https://youtu.be/0HRwNSA-JYM + +inline constexpr bool PLOTTING = false; + +LEDBAT::LEDBAT(size_t maximum_segment_data_size) : MAXIMUM_SEGMENT_DATA_SIZE(maximum_segment_data_size) { + _time_start_offset = clock::now(); +} + +size_t LEDBAT::canSend(void) const { + if (_in_flight.empty()) { + return MAXIMUM_SEGMENT_DATA_SIZE; + } + + const int64_t cspace = _cwnd - _in_flight_bytes; + if (cspace < MAXIMUM_SEGMENT_DATA_SIZE) { + return 0u; + } + + const int64_t fspace = _fwnd - _in_flight_bytes; + if (fspace < MAXIMUM_SEGMENT_DATA_SIZE) { + return 0u; + } + + size_t space = std::ceil(std::min(cspace, fspace) / MAXIMUM_SEGMENT_DATA_SIZE) * MAXIMUM_SEGMENT_DATA_SIZE; + + return space; +} + +std::vector LEDBAT::getTimeouts(void) const { + std::vector list; + + // after 2 delays we trigger timeout + const auto now_adjusted = getTimeNow() - getCurrentDelay()*2.f; + + for (const auto& [seq, time_stamp, size] : _in_flight) { + if (now_adjusted > time_stamp) { + list.push_back(seq); + } + } + + return list; +} + + +void LEDBAT::onSent(SeqIDType seq, size_t data_size) { + if (true) { + for (const auto& it : _in_flight) { + assert(std::get<0>(it) != seq); + } + } + + _in_flight.push_back({seq, getTimeNow(), data_size + SEGMENT_OVERHEAD}); + _in_flight_bytes += data_size + SEGMENT_OVERHEAD; + _recently_sent_bytes += data_size + SEGMENT_OVERHEAD; +} + +void LEDBAT::onAck(std::vector seqs) { + // only take the smallest value + float most_recent {-std::numeric_limits::infinity()}; + + int64_t acked_data {0}; + + const auto now {getTimeNow()}; + + for (const auto& seq : seqs) { + auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { + return std::get<0>(v) == seq; + }); + + if (it == _in_flight.end()) { + continue; // not found, ignore + } else { + addRTT(now - std::get<1>(*it)); + + // TODO: remove + most_recent = std::max(most_recent, std::get<1>(*it)); + _in_flight_bytes -= std::get<2>(*it); + _recently_acked_data += std::get<2>(*it); + assert(_in_flight_bytes >= 0); // TODO: this triggers + _in_flight.erase(it); + } + } + + if (most_recent == -std::numeric_limits::infinity()) { + return; // not found, ignore + } + + updateWindows(); +} + +void LEDBAT::onLoss(SeqIDType seq, bool discard) { + auto it = std::find_if(_in_flight.begin(), _in_flight.end(), [seq](const auto& v) -> bool { + assert(!std::isnan(std::get<1>(v))); + return std::get<0>(v) == seq; + }); + + if (it == _in_flight.end()) { + // error + return; // not found, ignore ?? + } + + _recently_lost_data = true; + + // at most once per rtt? + + if (PLOTTING) { + std::cerr << "CCA: onLoss: TIME: " << getTimeNow() << "\n"; + } + + // TODO: "if data lost is not to be retransmitted" + if (discard) { + _in_flight_bytes -= std::get<2>(*it); + assert(_in_flight_bytes >= 0); + _in_flight.erase(it); + } + // TODO: reset timestamp? + + updateWindows(); +} + +float LEDBAT::getCurrentDelay(void) const { + float sum {0.f}; + size_t count {0}; + for (size_t i = 0; i < _tmp_rtt_buffer.size(); i++) { + //sum += _tmp_rtt_buffer.at(_tmp_rtt_buffer.size()-(1+i)); + sum += _tmp_rtt_buffer.at(i); + count++; + } + + if (count) { + return sum / count; + } else { + return std::numeric_limits::infinity(); + } +} + +void LEDBAT::addRTT(float new_delay) { + auto now = getTimeNow(); + + _base_delay = std::min(_base_delay, new_delay); + // TODO: use fixed size instead? allocations can ruin perf + _rtt_buffer.push_back({now, new_delay}); + + _tmp_rtt_buffer.push_front(new_delay); + // HACKY + if (_tmp_rtt_buffer.size() > current_delay_filter_window) { + _tmp_rtt_buffer.resize(current_delay_filter_window); + } + + // is it 1 minute yet + if (now - _rtt_buffer.front().first >= 30.f) { + + float new_section_minimum = new_delay; + for (const auto it : _rtt_buffer) { + new_section_minimum = std::min(it.second, new_section_minimum); + } + + _rtt_buffer_minutes.push_back(new_section_minimum); + + _rtt_buffer.clear(); + + if (_rtt_buffer_minutes.size() > 20) { + _rtt_buffer_minutes.pop_front(); + } + + _base_delay = std::numeric_limits::infinity(); + for (const float it : _rtt_buffer_minutes) { + _base_delay = std::min(_base_delay, it); + } + } +} + +void LEDBAT::updateWindows(void) { + const auto now {getTimeNow()}; + + const float current_delay {getCurrentDelay()}; + + if (now - _last_cwnd >= current_delay) { + const float queuing_delay {current_delay - _base_delay}; + + _fwnd = max_byterate_allowed * current_delay; + _fwnd *= 1.3f; // try do balance conservative algo a bit, current_delay + + float gain {1.f / std::min(16.f, std::ceil(2.f*target_delay/_base_delay))}; + //gain *= 400.f; // from packets to bytes ~ + gain *= _recently_acked_data/5.f; // from packets to bytes ~ + //gain *= 0.1f; + + if (_recently_lost_data) { + _cwnd = std::clamp( + _cwnd / 2.f, + //_cwnd / 1.6f, + 2.f * MAXIMUM_SEGMENT_SIZE, + _cwnd + ); + } else { + // LEDBAT++ (the Rethinking the LEDBAT Protocol paper) + // "Multiplicative decrease" + const float constant {2.f}; // spec recs 1 + if (queuing_delay < target_delay) { + _cwnd = std::min( + _cwnd + gain, + _fwnd + ); + } else if (queuing_delay > target_delay) { + _cwnd = std::clamp( + _cwnd + std::max( + gain - constant * _cwnd * (queuing_delay / target_delay - 1.f), + -_cwnd/2.f // at most halve + ), + + // never drop below 2 "packets" in flight + 2.f * MAXIMUM_SEGMENT_SIZE, + + // cap rate + _fwnd + ); + } // no else, we on point. very unlikely with float + } + + if (PLOTTING) { // plotting + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " cwnd: " << _cwnd << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " fwnd: " << _fwnd << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " current_delay: " << current_delay << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " base_delay: " << _base_delay << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " gain: " << gain << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " speed: " << (_recently_sent_bytes / (now - _last_cwnd)) / (1024*1024) << "\n"; + std::cerr << std::fixed << "CCA: onAck: TIME: " << now << " in_flight_bytes: " << _in_flight_bytes << "\n"; + } + + _last_cwnd = now; + _recently_acked_data = 0; + _recently_lost_data = false; + _recently_sent_bytes = 0; + } +} + diff --git a/solanaceae/ngc_ft1/ledbat.hpp b/solanaceae/ngc_ft1/ledbat.hpp new file mode 100644 index 0000000..d39e2b4 --- /dev/null +++ b/solanaceae/ngc_ft1/ledbat.hpp @@ -0,0 +1,122 @@ +#pragma once + +#include +#include +#include +#include + +// LEDBAT: https://www.rfc-editor.org/rfc/rfc6817 +// LEDBAT++: https://www.ietf.org/archive/id/draft-irtf-iccrg-ledbat-plus-plus-01.txt + +// LEDBAT++ implementation +struct LEDBAT { + public: // config + using SeqIDType = std::pair; // tf_id, seq_id + + static constexpr size_t IPV4_HEADER_SIZE {20}; + static constexpr size_t IPV6_HEADER_SIZE {40}; // bru + static constexpr size_t UDP_HEADER_SIZE {8}; + + // TODO: tcp AND IPv6 will be different + static constexpr size_t SEGMENT_OVERHEAD { + 4+ // ft overhead + 46+ // tox? + UDP_HEADER_SIZE+ + IPV4_HEADER_SIZE + }; + + // TODO: make configurable, set with tox ngc lossy packet size + //const size_t MAXIMUM_SEGMENT_DATA_SIZE {1000-4}; + const size_t MAXIMUM_SEGMENT_DATA_SIZE {500-4}; + + //static constexpr size_t maximum_segment_size {496 + segment_overhead}; // tox 500 - 4 from ft + const size_t MAXIMUM_SEGMENT_SIZE {MAXIMUM_SEGMENT_DATA_SIZE + SEGMENT_OVERHEAD}; // tox 500 - 4 from ft + //static_assert(maximum_segment_size == 574); // mesured in wireshark + + // ledbat++ says 60ms, we might need other values if relayed + //const float target_delay {0.060f}; + const float target_delay {0.030f}; + //const float target_delay {0.120f}; // 2x if relayed? + + // TODO: use a factor for multiple of rtt + static constexpr size_t current_delay_filter_window {16*4}; + + //static constexpr size_t rtt_buffer_size_max {2000}; + + float max_byterate_allowed {10*1024*1024}; // 10MiB/s + + public: + LEDBAT(size_t maximum_segment_data_size); + + // return the current believed window in bytes of how much data can be inflight, + // without overstepping the delay requirement + float getCWnD(void) const { + return _cwnd; + } + + // TODO: api for how much data we should send + // take time since last sent into account + // respect max_byterate_allowed + size_t canSend(void) const; + + // get the list of timed out seq_ids + std::vector getTimeouts(void) const; + + public: // callbacks + // data size is without overhead + void onSent(SeqIDType seq, size_t data_size); + + void onAck(std::vector seqs); + + // if discard, not resent, not inflight + void onLoss(SeqIDType seq, bool discard); + + private: + using clock = std::chrono::steady_clock; + + // make values relative to algo start for readability (and precision) + // get timestamp in seconds + float getTimeNow(void) const { + return std::chrono::duration{clock::now() - _time_start_offset}.count(); + } + + // moving avg over the last few delay samples + // VERY sensitive to bundling acks + float getCurrentDelay(void) const; + + void addRTT(float new_delay); + + void updateWindows(void); + + private: // state + //float _cto {2.f}; // congestion timeout value in seconds + + float _cwnd {2.f * MAXIMUM_SEGMENT_SIZE}; // in bytes + float _base_delay {2.f}; // lowest mesured delay in _rtt_buffer in seconds + + float _last_cwnd {0.f}; // timepoint of last cwnd correction + int64_t _recently_acked_data {0}; // reset on _last_cwnd + bool _recently_lost_data {false}; + int64_t _recently_sent_bytes {0}; + + // initialize to low value, will get corrected very fast + float _fwnd {0.01f * max_byterate_allowed}; // in bytes + + + // ssthresh + + // spec recomends 10min + // TODO: optimize and devide into spans of 1min (spec recom) + std::deque _tmp_rtt_buffer; + std::deque> _rtt_buffer; // timepoint, delay + std::deque _rtt_buffer_minutes; + + // list of sequence ids and timestamps of when they where sent + std::deque> _in_flight; + + int64_t _in_flight_bytes {0}; + + private: // helper + clock::time_point _time_start_offset; +}; + diff --git a/solanaceae/ngc_ft1/ngcft1.cpp b/solanaceae/ngc_ft1/ngcft1.cpp new file mode 100644 index 0000000..25b194a --- /dev/null +++ b/solanaceae/ngc_ft1/ngcft1.cpp @@ -0,0 +1,719 @@ +#include "./ngcft1.hpp" + +#include + +#include + +#include +#include +#include +#include +#include + +bool NGCFT1::sendPKG_FT1_REQUEST( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size +) { + // - 1 byte packet id + // - 4 byte file_kind + // - X bytes file_id + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_REQUEST)); + for (size_t i = 0; i < sizeof(file_kind); i++) { + pkg.push_back((file_kind>>(i*8)) & 0xff); + } + for (size_t i = 0; i < file_id_size; i++) { + pkg.push_back(file_id[i]); + } + + // lossless + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +bool NGCFT1::sendPKG_FT1_INIT( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + uint64_t file_size, + uint8_t transfer_id, + const uint8_t* file_id, size_t file_id_size +) { + // - 1 byte packet id + // - 4 byte (file_kind) + // - 8 bytes (data size) + // - 1 byte (temporary_file_tf_id, for this peer only, technically just a prefix to distinguish between simultainious fts) + // - X bytes (file_kind dependent id, differnt sizes) + + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_INIT)); + for (size_t i = 0; i < sizeof(file_kind); i++) { + pkg.push_back((file_kind>>(i*8)) & 0xff); + } + for (size_t i = 0; i < sizeof(file_size); i++) { + pkg.push_back((file_size>>(i*8)) & 0xff); + } + pkg.push_back(transfer_id); + for (size_t i = 0; i < file_id_size; i++) { + pkg.push_back(file_id[i]); + } + + // lossless + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +bool NGCFT1::sendPKG_FT1_INIT_ACK( + uint32_t group_number, uint32_t peer_number, + uint8_t transfer_id +) { + // - 1 byte packet id + // - 1 byte transfer_id + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_INIT_ACK)); + pkg.push_back(transfer_id); + + // lossless + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +bool NGCFT1::sendPKG_FT1_DATA( + uint32_t group_number, uint32_t peer_number, + uint8_t transfer_id, + uint16_t sequence_id, + const uint8_t* data, size_t data_size +) { + assert(data_size > 0); + + // TODO + // check header_size+data_size <= max pkg size + + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_DATA)); + pkg.push_back(transfer_id); + pkg.push_back(sequence_id & 0xff); + pkg.push_back((sequence_id >> (1*8)) & 0xff); + + // TODO: optimize + for (size_t i = 0; i < data_size; i++) { + pkg.push_back(data[i]); + } + + // lossy + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +bool NGCFT1::sendPKG_FT1_DATA_ACK( + uint32_t group_number, uint32_t peer_number, + uint8_t transfer_id, + const uint16_t* seq_ids, size_t seq_ids_size +) { + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_DATA_ACK)); + pkg.push_back(transfer_id); + + // TODO: optimize + for (size_t i = 0; i < seq_ids_size; i++) { + pkg.push_back(seq_ids[i] & 0xff); + pkg.push_back((seq_ids[i] >> (1*8)) & 0xff); + } + + // lossy + return _t.toxGroupSendCustomPrivatePacket(group_number, peer_number, false, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PRIVATE_PACKET_OK; +} + +bool NGCFT1::sendPKG_FT1_MESSAGE( + uint32_t group_number, + uint32_t message_id, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size +) { + std::vector pkg; + pkg.push_back(static_cast(NGCEXT_Event::FT1_MESSAGE)); + + for (size_t i = 0; i < sizeof(message_id); i++) { + pkg.push_back((message_id>>(i*8)) & 0xff); + } + for (size_t i = 0; i < sizeof(file_kind); i++) { + pkg.push_back((file_kind>>(i*8)) & 0xff); + } + for (size_t i = 0; i < file_id_size; i++) { + pkg.push_back(file_id[i]); + } + + // lossless + return _t.toxGroupSendCustomPacket(group_number, true, pkg) == TOX_ERR_GROUP_SEND_CUSTOM_PACKET_OK; +} + +void NGCFT1::updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set) { + auto& tf_opt = peer.send_transfers.at(idx); + assert(tf_opt.has_value()); + auto& tf = tf_opt.value(); + + tf.time_since_activity += time_delta; + + switch (tf.state) { + using State = Group::Peer::SendTransfer::State; + case State::INIT_SENT: + if (tf.time_since_activity >= init_retry_timeout_after) { + if (tf.inits_sent >= 3) { + // delete, timed out 3 times + std::cerr << "NGCFT1 warning: ft init timed out, deleting\n"; + dispatch( + NGCFT1_Event::send_done, + Events::NGCFT1_send_done{ + group_number, peer_number, + static_cast(idx), + } + ); + tf_opt.reset(); + } else { + // timed out, resend + std::cerr << "NGCFT1 warning: ft init timed out, resending\n"; + sendPKG_FT1_INIT(group_number, peer_number, tf.file_kind, tf.file_size, idx, tf.file_id.data(), tf.file_id.size()); + tf.inits_sent++; + tf.time_since_activity = 0.f; + } + } + //break; + return; + case State::SENDING: { + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + // no ack after 5 sec -> resend + //if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) { + if (timeouts_set.count({idx, id})) { + // TODO: can fail + sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size()); + peer.cca->onLoss({idx, id}, false); + time_since_activity = 0.f; + timeouts_set.erase({idx, id}); + } + }); + + if (tf.time_since_activity >= sending_give_up_after) { + // no ack after 30sec, close ft + std::cerr << "NGCFT1 warning: sending ft in progress timed out, deleting\n"; + dispatch( + NGCFT1_Event::send_done, + Events::NGCFT1_send_done{ + group_number, peer_number, + static_cast(idx), + } + ); + + // clean up cca + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + peer.cca->onLoss({idx, id}, true); + timeouts_set.erase({idx, id}); + }); + + tf_opt.reset(); + //continue; // dangerous control flow + return; + } + + // if chunks in flight < window size (2) + //while (tf.ssb.size() < ngc_ft1_ctx->options.packet_window_size) { + int64_t can_packet_size {static_cast(peer.cca->canSend())}; + //if (can_packet_size) { + //std::cerr << "FT: can_packet_size: " << can_packet_size; + //} + size_t count {0}; + while (can_packet_size > 0 && tf.file_size > 0) { + std::vector new_data; + + // TODO: parameterize packet size? -> only if JF increases lossy packet size >:) + //size_t chunk_size = std::min(496u, tf.file_size - tf.file_size_current); + //size_t chunk_size = std::min(can_packet_size, tf.file_size - tf.file_size_current); + size_t chunk_size = std::min({ + //496u, + //996u, + peer.cca->MAXIMUM_SEGMENT_DATA_SIZE, + static_cast(can_packet_size), + tf.file_size - tf.file_size_current + }); + if (chunk_size == 0) { + tf.state = State::FINISHING; + break; // we done + } + + new_data.resize(chunk_size); + + //ngc_ft1_ctx->cb_send_data[tf.file_kind]( + //tox, + //group_number, peer_number, + //idx, + //tf.file_size_current, + //new_data.data(), new_data.size(), + //ngc_ft1_ctx->ud_send_data.count(tf.file_kind) ? ngc_ft1_ctx->ud_send_data.at(tf.file_kind) : nullptr + //); + assert(idx <= 0xffu); + // TODO: check return value + dispatch( + NGCFT1_Event::send_data, + Events::NGCFT1_send_data{ + group_number, peer_number, + static_cast(idx), + tf.file_size_current, + new_data.data(), new_data.size(), + } + ); + + uint16_t seq_id = tf.ssb.add(std::move(new_data)); + sendPKG_FT1_DATA(group_number, peer_number, idx, seq_id, tf.ssb.entries.at(seq_id).data.data(), tf.ssb.entries.at(seq_id).data.size()); + peer.cca->onSent({idx, seq_id}, chunk_size); + +#if defined(EXTRA_LOGGING) && EXTRA_LOGGING == 1 + fprintf(stderr, "FT: sent data size: %ld (seq %d)\n", chunk_size, seq_id); +#endif + + tf.file_size_current += chunk_size; + can_packet_size -= chunk_size; + count++; + } + //if (count) { + //std::cerr << " split over " << count << "\n"; + //} + } + break; + case State::FINISHING: // we still have unacked packets + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + // no ack after 5 sec -> resend + //if (time_since_activity >= ngc_ft1_ctx->options.sending_resend_without_ack_after) { + if (timeouts_set.count({idx, id})) { + sendPKG_FT1_DATA(group_number, peer_number, idx, id, data.data(), data.size()); + peer.cca->onLoss({idx, id}, false); + time_since_activity = 0.f; + timeouts_set.erase({idx, id}); + } + }); + if (tf.time_since_activity >= sending_give_up_after) { + // no ack after 30sec, close ft + // TODO: notify app + std::cerr << "NGCFT1 warning: sending ft finishing timed out, deleting\n"; + + // clean up cca + tf.ssb.for_each(time_delta, [&](uint16_t id, const std::vector& data, float& time_since_activity) { + peer.cca->onLoss({idx, id}, true); + timeouts_set.erase({idx, id}); + }); + + tf_opt.reset(); + } + break; + default: // invalid state, delete + std::cerr << "NGCFT1 error: ft in invalid state, deleting\n"; + tf_opt.reset(); + //continue; + return; + } +} + +void NGCFT1::iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer) { + auto timeouts = peer.cca->getTimeouts(); + std::set timeouts_set{timeouts.cbegin(), timeouts.cend()}; + + for (size_t idx = 0; idx < peer.send_transfers.size(); idx++) { + if (peer.send_transfers.at(idx).has_value()) { + updateSendTransfer(time_delta, group_number, peer_number, peer, idx, timeouts_set); + } + } + + // TODO: receiving tranfers? +} + +NGCFT1::NGCFT1( + ToxI& t, + ToxEventProviderI& tep, + NGCEXTEventProviderI& neep +) : _t(t), _tep(tep), _neep(neep) +{ + _neep.subscribe(this, NGCEXT_Event::FT1_REQUEST); + _neep.subscribe(this, NGCEXT_Event::FT1_INIT); + _neep.subscribe(this, NGCEXT_Event::FT1_INIT_ACK); + _neep.subscribe(this, NGCEXT_Event::FT1_DATA); + _neep.subscribe(this, NGCEXT_Event::FT1_DATA_ACK); + _neep.subscribe(this, NGCEXT_Event::FT1_MESSAGE); + + _tep.subscribe(this, Tox_Event::TOX_EVENT_GROUP_PEER_EXIT); +} + +void NGCFT1::iterate(float time_delta) { + for (auto& [group_number, group] : groups) { + for (auto& [peer_number, peer] : group.peers) { + iteratePeer(time_delta, group_number, peer_number, peer); + } + } +} + +void NGCFT1::NGC_FT1_send_request_private( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size +) { + // TODO: error check + sendPKG_FT1_REQUEST(group_number, peer_number, file_kind, file_id, file_id_size); +} + +bool NGCFT1::NGC_FT1_send_init_private( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size, + size_t file_size, + uint8_t* transfer_id +) { + if (std::get<0>(_t.toxGroupPeerGetConnectionStatus(group_number, peer_number)).value_or(TOX_CONNECTION_NONE) == TOX_CONNECTION_NONE) { + std::cerr << "NGCFT1 error: cant init ft, peer offline\n"; + return false; + } + + auto& peer = groups[group_number].peers[peer_number]; + + // allocate transfer_id + size_t idx = peer.next_send_transfer_idx; + peer.next_send_transfer_idx = (peer.next_send_transfer_idx + 1) % 256; + { // TODO: extract + size_t i = idx; + bool found = false; + do { + if (!peer.send_transfers[i].has_value()) { + // free slot + idx = i; + found = true; + break; + } + + i = (i + 1) % 256; + } while (i != idx); + + if (!found) { + std::cerr << "NGCFT1 error: cant init ft, no free transfer slot\n"; + return false; + } + } + + // TODO: check return value + sendPKG_FT1_INIT(group_number, peer_number, file_kind, file_size, idx, file_id, file_id_size); + + peer.send_transfers[idx] = Group::Peer::SendTransfer{ + file_kind, + std::vector(file_id, file_id+file_id_size), + Group::Peer::SendTransfer::State::INIT_SENT, + 1, + 0.f, + file_size, + 0, + {}, // ssb + }; + + if (transfer_id != nullptr) { + *transfer_id = idx; + } + + return true; +} + +bool NGCFT1::NGC_FT1_send_message_public( + uint32_t group_number, + uint32_t& message_id, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size +) { + // create msg_id + message_id = randombytes_random(); + + // TODO: check return value + return sendPKG_FT1_MESSAGE(group_number, message_id, file_kind, file_id, file_id_size); +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_request& e) { +//#if !NDEBUG + std::cout << "NGCFT1: FT1_REQUEST fk:" << e.file_kind << " [" << bin2hex(e.file_id) << "]\n"; +//#endif + + // .... just rethrow?? + // TODO: dont + return dispatch( + NGCFT1_Event::recv_request, + Events::NGCFT1_recv_request{ + e.group_number, e.peer_number, + static_cast(e.file_kind), + e.file_id.data(), e.file_id.size() + } + ); +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init& e) { +//#if !NDEBUG + std::cout << "NGCFT1: FT1_INIT fk:" << e.file_kind << " fs:" << e.file_size << " tid:" << int(e.transfer_id) << " [" << bin2hex(e.file_id) << "]\n"; +//#endif + + bool accept = false; + dispatch( + NGCFT1_Event::recv_init, + Events::NGCFT1_recv_init{ + e.group_number, e.peer_number, + static_cast(e.file_kind), + e.file_id.data(), e.file_id.size(), + e.transfer_id, + e.file_size, + accept + } + ); + + if (!accept) { + std::cout << "NGCFT1: rejected init\n"; + return true; // return true? + } + + sendPKG_FT1_INIT_ACK(e.group_number, e.peer_number, e.transfer_id); + + std::cout << "NGCFT1: accepted init\n"; + + auto& peer = groups[e.group_number].peers[e.peer_number]; + if (peer.recv_transfers[e.transfer_id].has_value()) { + std::cerr << "NGCFT1 warning: overwriting existing recv_transfer " << int(e.transfer_id) << "\n"; + } + + peer.recv_transfers[e.transfer_id] = Group::Peer::RecvTransfer{ + e.file_kind, + e.file_id, + Group::Peer::RecvTransfer::State::INITED, + e.file_size, + 0u, + {} // rsb + }; + + return true; +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_init_ack& e) { +//#if !NDEBUG + std::cout << "NGCFT1: FT1_INIT_ACK\n"; +//#endif + + // we now should start sending data + + if (!groups.count(e.group_number)) { + std::cerr << "NGCFT1 warning: init_ack for unknown group\n"; + return true; + } + + Group::Peer& peer = groups[e.group_number].peers[e.peer_number]; + if (!peer.send_transfers[e.transfer_id].has_value()) { + std::cerr << "NGCFT1 warning: init_ack for unknown transfer\n"; + return true; + } + + Group::Peer::SendTransfer& transfer = peer.send_transfers[e.transfer_id].value(); + + using State = Group::Peer::SendTransfer::State; + if (transfer.state != State::INIT_SENT) { + std::cerr << "NGCFT1 error: inti_ack but not in INIT_SENT state\n"; + return true; + } + + // iterate will now call NGC_FT1_send_data_cb + transfer.state = State::SENDING; + transfer.time_since_activity = 0.f; + + return true; +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data& e) { +#if !NDEBUG + std::cout << "NGCFT1: FT1_DATA\n"; +#endif + + if (e.data.empty()) { + std::cerr << "NGCFT1 error: data of size 0!\n"; + return true; + } + + if (!groups.count(e.group_number)) { + std::cerr << "NGCFT1 warning: data for unknown group\n"; + return true; + } + + Group::Peer& peer = groups[e.group_number].peers[e.peer_number]; + if (!peer.recv_transfers[e.transfer_id].has_value()) { + std::cerr << "NGCFT1 warning: data for unknown transfer\n"; + return true; + } + + auto& transfer = peer.recv_transfers[e.transfer_id].value(); + + // do reassembly, ignore dups + transfer.rsb.add(e.sequence_id, std::vector(e.data)); // TODO: ugly explicit copy for what should just be a move + + // loop for chunks without holes + while (transfer.rsb.canPop()) { + auto data = transfer.rsb.pop(); + + // TODO: check return value + dispatch( + NGCFT1_Event::recv_data, + Events::NGCFT1_recv_data{ + e.group_number, e.peer_number, + e.transfer_id, + transfer.file_size_current, + data.data(), data.size() + } + ); + + transfer.file_size_current += data.size(); + } + + // send acks + std::vector ack_seq_ids(transfer.rsb.ack_seq_ids.cbegin(), transfer.rsb.ack_seq_ids.cend()); + // TODO: check if this caps at max acks + if (!ack_seq_ids.empty()) { + // TODO: check return value + sendPKG_FT1_DATA_ACK(e.group_number, e.peer_number, e.transfer_id, ack_seq_ids.data(), ack_seq_ids.size()); + } + + + if (transfer.file_size_current == transfer.file_size) { + // TODO: set all data received, and clean up + //transfer.state = Group::Peer::RecvTransfer::State::RECV; + dispatch( + NGCFT1_Event::recv_done, + Events::NGCFT1_recv_done{ + e.group_number, e.peer_number, + e.transfer_id + } + ); + } + + return true; +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_data_ack& e) { +#if !NDEBUG + std::cout << "NGCFT1: FT1_DATA_ACK\n"; +#endif + + if (!groups.count(e.group_number)) { + std::cerr << "NGCFT1 warning: data_ack for unknown group\n"; + return true; + } + + Group::Peer& peer = groups[e.group_number].peers[e.peer_number]; + if (!peer.send_transfers[e.transfer_id].has_value()) { + std::cerr << "NGCFT1 warning: data_ack for unknown transfer\n"; + return true; + } + + Group::Peer::SendTransfer& transfer = peer.send_transfers[e.transfer_id].value(); + + using State = Group::Peer::SendTransfer::State; + if (transfer.state != State::SENDING && transfer.state != State::FINISHING) { + std::cerr << "NGCFT1 error: data_ack but not in SENDING or FINISHING state (" << int(transfer.state) << ")\n"; + return true; + } + + //if ((length - curser) % sizeof(uint16_t) != 0) { + //fprintf(stderr, "FT: data_ack with misaligned data\n"); + //return; + //} + + transfer.time_since_activity = 0.f; + + std::vector seqs; + for (const auto it : e.sequence_ids) { + // TODO: improve this o.o + seqs.push_back({e.transfer_id, it}); + transfer.ssb.erase(it); + } + peer.cca->onAck(seqs); + + // delete if all packets acked + if (transfer.file_size == transfer.file_size_current && transfer.ssb.size() == 0) { + std::cout << "NGCFT1: " << int(e.transfer_id) << " done\n"; + dispatch( + NGCFT1_Event::send_done, + Events::NGCFT1_send_done{ + e.group_number, e.peer_number, + e.transfer_id, + } + ); + peer.send_transfers[e.transfer_id].reset(); + } + + return true; +} + +bool NGCFT1::onEvent(const Events::NGCEXT_ft1_message& e) { + std::cout << "NGCFT1: FT1_MESSAGE mid:" << e.message_id << " fk:" << e.file_kind << " [" << bin2hex(e.file_id) << "]\n"; + + // .... just rethrow?? + // TODO: dont + return dispatch( + NGCFT1_Event::recv_message, + Events::NGCFT1_recv_message{ + e.group_number, e.peer_number, + e.message_id, + static_cast(e.file_kind), + e.file_id.data(), e.file_id.size() + } + ); +} + +bool NGCFT1::onToxEvent(const Tox_Event_Group_Peer_Exit* e) { + const auto group_number = tox_event_group_peer_exit_get_group_number(e); + const auto peer_number = tox_event_group_peer_exit_get_peer_id(e); + + // peer disconnected, end all transfers + + if (!groups.count(group_number)) { + return false; + } + + auto& group = groups.at(group_number); + + if (!group.peers.count(peer_number)) { + return false; + } + + auto& peer = group.peers.at(peer_number); + + for (size_t i = 0; i < peer.send_transfers.size(); i++) { + auto& it_opt = peer.send_transfers.at(i); + if (!it_opt.has_value()) { + continue; + } + + std::cout << "NGCFT1: sending " << int(i) << " canceled bc peer offline\n"; + dispatch( + NGCFT1_Event::send_done, + Events::NGCFT1_send_done{ + group_number, peer_number, + static_cast(i), + } + ); + + it_opt.reset(); + } + + for (size_t i = 0; i < peer.recv_transfers.size(); i++) { + auto& it_opt = peer.recv_transfers.at(i); + if (!it_opt.has_value()) { + continue; + } + + std::cout << "NGCFT1: receiving " << int(i) << " canceled bc peer offline\n"; + dispatch( + NGCFT1_Event::recv_done, + Events::NGCFT1_recv_done{ + group_number, peer_number, + static_cast(i), + } + ); + + it_opt.reset(); + } + + // reset cca + peer.cca = std::make_unique(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4 + + return false; +} + diff --git a/solanaceae/ngc_ft1/ngcft1.hpp b/solanaceae/ngc_ft1/ngcft1.hpp new file mode 100644 index 0000000..86b59db --- /dev/null +++ b/solanaceae/ngc_ft1/ngcft1.hpp @@ -0,0 +1,252 @@ +#pragma once + +// solanaceae port of tox_ngc_ft1 + +#include +#include + +#include "./ngcext.hpp" +#include "./ledbat.hpp" + +#include "./rcv_buf.hpp" +#include "./snd_buf.hpp" + +#include "./ngcft1_file_kind.hpp" + +#include +#include +#include + +namespace Events { + + struct NGCFT1_recv_request { + uint32_t group_number; + uint32_t peer_number; + + NGCFT1_file_kind file_kind; + + const uint8_t* file_id; + size_t file_id_size; + }; + + struct NGCFT1_recv_init { + uint32_t group_number; + uint32_t peer_number; + + NGCFT1_file_kind file_kind; + + const uint8_t* file_id; + size_t file_id_size; + + const uint8_t transfer_id; + const size_t file_size; + + // return true to accept, false to deny + bool& accept; + }; + + struct NGCFT1_recv_data { + uint32_t group_number; + uint32_t peer_number; + + uint8_t transfer_id; + + size_t data_offset; + const uint8_t* data; + size_t data_size; + }; + + // request to fill data_size bytes into data + struct NGCFT1_send_data { + uint32_t group_number; + uint32_t peer_number; + + uint8_t transfer_id; + + size_t data_offset; + uint8_t* data; + size_t data_size; + }; + + struct NGCFT1_recv_done { + uint32_t group_number; + uint32_t peer_number; + + uint8_t transfer_id; + // TODO: reason + }; + + struct NGCFT1_send_done { + uint32_t group_number; + uint32_t peer_number; + + uint8_t transfer_id; + // TODO: reason + }; + + struct NGCFT1_recv_message { + uint32_t group_number; + uint32_t peer_number; + + uint32_t message_id; + + NGCFT1_file_kind file_kind; + + const uint8_t* file_id; + size_t file_id_size; + }; + +} // Events + +enum class NGCFT1_Event : uint8_t { + recv_request, + recv_init, + + recv_data, + send_data, + + recv_done, + send_done, + + recv_message, + + MAX +}; + +struct NGCFT1EventI { + using enumType = NGCFT1_Event; + virtual bool onEvent(const Events::NGCFT1_recv_request&) { return false; } + virtual bool onEvent(const Events::NGCFT1_recv_init&) { return false; } + virtual bool onEvent(const Events::NGCFT1_recv_data&) { return false; } + virtual bool onEvent(const Events::NGCFT1_send_data&) { return false; } // const? + virtual bool onEvent(const Events::NGCFT1_recv_done&) { return false; } + virtual bool onEvent(const Events::NGCFT1_send_done&) { return false; } + virtual bool onEvent(const Events::NGCFT1_recv_message&) { return false; } +}; + +using NGCFT1EventProviderI = EventProviderI; + +class NGCFT1 : public ToxEventI, public NGCEXTEventI, public NGCFT1EventProviderI { + ToxI& _t; + ToxEventProviderI& _tep; + NGCEXTEventProviderI& _neep; + + // TODO: config + size_t acks_per_packet {3u}; // 3 + float init_retry_timeout_after {5.f}; // 10sec + float sending_give_up_after {30.f}; // 30sec + + + struct Group { + struct Peer { + std::unique_ptr cca = std::make_unique(500-4); // TODO: replace with tox_group_max_custom_lossy_packet_length()-4 + + struct RecvTransfer { + uint32_t file_kind; + std::vector file_id; + + enum class State { + INITED, //init acked, but no data received yet (might be dropped) + RECV, // receiving data + } state; + + // float time_since_last_activity ? + size_t file_size {0}; + size_t file_size_current {0}; + + // sequence id based reassembly + RecvSequenceBuffer rsb; + }; + std::array, 256> recv_transfers; + size_t next_recv_transfer_idx {0}; // next id will be 0 + + struct SendTransfer { + uint32_t file_kind; + std::vector file_id; + + enum class State { + INIT_SENT, // keep this state until ack or deny or giveup + + SENDING, // we got the ack and are now sending data + + FINISHING, // we sent all data but acks still outstanding???? + + // delete + } state; + + size_t inits_sent {1}; // is sent when creating + + float time_since_activity {0.f}; + size_t file_size {0}; + size_t file_size_current {0}; + + // sequence array + // list of sent but not acked seq_ids + SendSequenceBuffer ssb; + }; + std::array, 256> send_transfers; + size_t next_send_transfer_idx {0}; // next id will be 0 + }; + std::map peers; + }; + std::map groups; + + protected: + bool sendPKG_FT1_REQUEST(uint32_t group_number, uint32_t peer_number, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size); + bool sendPKG_FT1_INIT(uint32_t group_number, uint32_t peer_number, uint32_t file_kind, uint64_t file_size, uint8_t transfer_id, const uint8_t* file_id, size_t file_id_size); + bool sendPKG_FT1_INIT_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id); + bool sendPKG_FT1_DATA(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, uint16_t sequence_id, const uint8_t* data, size_t data_size); + bool sendPKG_FT1_DATA_ACK(uint32_t group_number, uint32_t peer_number, uint8_t transfer_id, const uint16_t* seq_ids, size_t seq_ids_size); + bool sendPKG_FT1_MESSAGE(uint32_t group_number, uint32_t message_id, uint32_t file_kind, const uint8_t* file_id, size_t file_id_size); + + void updateSendTransfer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer, size_t idx, std::set& timeouts_set); + void iteratePeer(float time_delta, uint32_t group_number, uint32_t peer_number, Group::Peer& peer); + + public: + NGCFT1( + ToxI& t, + ToxEventProviderI& tep, + NGCEXTEventProviderI& neep + ); + + void iterate(float delta); + + public: // ft1 api + // TODO: public variant? + void NGC_FT1_send_request_private( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size + ); + + // public does not make sense here + bool NGC_FT1_send_init_private( + uint32_t group_number, uint32_t peer_number, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size, + size_t file_size, + uint8_t* transfer_id + ); + + // sends the message and fills in message_id + bool NGC_FT1_send_message_public( + uint32_t group_number, + uint32_t& message_id, + uint32_t file_kind, + const uint8_t* file_id, size_t file_id_size + ); + + protected: + bool onEvent(const Events::NGCEXT_ft1_request&) override; + bool onEvent(const Events::NGCEXT_ft1_init&) override; + bool onEvent(const Events::NGCEXT_ft1_init_ack&) override; + bool onEvent(const Events::NGCEXT_ft1_data&) override; + bool onEvent(const Events::NGCEXT_ft1_data_ack&) override; + bool onEvent(const Events::NGCEXT_ft1_message&) override; + + protected: + bool onToxEvent(const Tox_Event_Group_Peer_Exit* e) override; + //bool onToxEvent(const Tox_Event_Group_Custom_Packet* e) override; + //bool onToxEvent(const Tox_Event_Group_Custom_Private_Packet* e) override; +}; + diff --git a/solanaceae/ngc_ft1/ngcft1_file_kind.hpp b/solanaceae/ngc_ft1/ngcft1_file_kind.hpp new file mode 100644 index 0000000..e4427b7 --- /dev/null +++ b/solanaceae/ngc_ft1/ngcft1_file_kind.hpp @@ -0,0 +1,76 @@ +#pragma once + +#include + +// uint32_t - same as tox friend ft +// TODO: fill in toxfriend file kinds +enum class NGCFT1_file_kind : uint32_t { + //INVALID = 0u, // DATA? + + // id: + // group (implicit) + // peer pub key + msg_id + NGC_HS1_MESSAGE_BY_ID = 1u, // history sync PoC 1 + // TODO: oops, 1 should be avatar v1 + + // id: TOX_FILE_ID_LENGTH (32) bytes + // this is basically and id and probably not a hash, like the tox friend api + // this id can be unique between 2 peers + ID = 8u, + + // id: hash of the info, like a torrent infohash (using the same hash as the data) + // TODO: determain internal format + // draft: (for single file) + // - 256 bytes | filename + // - 8bytes | file size + // - 4bytes | chunk size + // - array of chunk hashes (ids) [ + // - SHA1 bytes (20) + // - ] + HASH_SHA1_INFO, + // draft: (for single file) v2 + // - c-string | filename + // - 8bytes | file size + // - 4bytes | chunk size + // - array of chunk hashes (ids) [ + // - SHA1 bytes (20) + // - ] + HASH_SHA1_INFO2, + // draft: multiple files + // - 4bytes | number of filenames + // - array of filenames (variable length c-strings) [ + // - c-string | filename (including path and '/' as dir seperator) + // - ] + // - 256 bytes | filename + // - 8bytes | file size + // - fixed chunk size of 4kb + // - array of chunk hashes (ids) [ + // - SHAX bytes + // - ] + HASH_SHA1_INFO3, + HASH_SHA2_INFO, // hm? + + // id: hash of the content + // TODO: fixed chunk size or variable (defined in info) + // if "variable" sized, it can be aliased with TORRENT_V1_CHUNK in the implementation + HASH_SHA1_CHUNK, + HASH_SHA2_CHUNK, + + // TODO: design the same thing again for tox? (msg_pack instead of bencode?) + // id: infohash + TORRENT_V1_METAINFO, + // id: sha1 + TORRENT_V1_PIECE, // alias with SHA1_CHUNK? + + // TODO: fix all the v2 stuff here + // id: infohash + // in v2, metainfo contains only the root hashes of the merkletree(s) + TORRENT_V2_METAINFO, + // id: root hash + // contains all the leaf hashes for a file root hash + TORRENT_V2_FILE_HASHES, + // id: sha256 + // always of size 16KiB, except if last piece in file + TORRENT_V2_PIECE, +}; + diff --git a/solanaceae/ngc_ft1/rcv_buf.cpp b/solanaceae/ngc_ft1/rcv_buf.cpp new file mode 100644 index 0000000..21fc336 --- /dev/null +++ b/solanaceae/ngc_ft1/rcv_buf.cpp @@ -0,0 +1,44 @@ +#include "./rcv_buf.hpp" + +#include + +void RecvSequenceBuffer::erase(uint16_t seq) { + entries.erase(seq); +} + +// inflight chunks +size_t RecvSequenceBuffer::size(void) const { + return entries.size(); +} + +void RecvSequenceBuffer::add(uint16_t seq_id, std::vector&& data) { + entries[seq_id] = {data}; + ack_seq_ids.push_back(seq_id); + if (ack_seq_ids.size() > 3) { // TODO: magic + ack_seq_ids.pop_front(); + } +} + +bool RecvSequenceBuffer::canPop(void) const { + return entries.count(next_seq_id); +} + +std::vector RecvSequenceBuffer::pop(void) { + assert(canPop()); + auto tmp_data = entries.at(next_seq_id).data; + erase(next_seq_id); + next_seq_id++; + return tmp_data; +} + +// for acking, might be bad since its front +std::vector RecvSequenceBuffer::frontSeqIDs(size_t count) const { + std::vector seq_ids; + auto it = entries.cbegin(); + for (size_t i = 0; i < count && it != entries.cend(); i++, it++) { + seq_ids.push_back(it->first); + } + + return seq_ids; +} + diff --git a/solanaceae/ngc_ft1/rcv_buf.hpp b/solanaceae/ngc_ft1/rcv_buf.hpp new file mode 100644 index 0000000..1ac7187 --- /dev/null +++ b/solanaceae/ngc_ft1/rcv_buf.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include +#include +#include +#include + +struct RecvSequenceBuffer { + struct RSBEntry { + std::vector data; + }; + + // sequence_id -> entry + std::map entries; + + uint16_t next_seq_id {0}; + + // list of seq_ids to ack, this is seperate bc rsbentries are deleted once processed + std::deque ack_seq_ids; + + void erase(uint16_t seq); + + // inflight chunks + size_t size(void) const; + + void add(uint16_t seq_id, std::vector&& data); + + bool canPop(void) const; + + std::vector pop(void); + + // for acking, might be bad since its front + std::vector frontSeqIDs(size_t count = 5) const; +}; + diff --git a/solanaceae/ngc_ft1/snd_buf.cpp b/solanaceae/ngc_ft1/snd_buf.cpp new file mode 100644 index 0000000..810cd2c --- /dev/null +++ b/solanaceae/ngc_ft1/snd_buf.cpp @@ -0,0 +1,16 @@ +#include "./snd_buf.hpp" + +void SendSequenceBuffer::erase(uint16_t seq) { + entries.erase(seq); +} + +// inflight chunks +size_t SendSequenceBuffer::size(void) const { + return entries.size(); +} + +uint16_t SendSequenceBuffer::add(std::vector&& data) { + entries[next_seq_id] = {data, 0.f}; + return next_seq_id++; +} + diff --git a/solanaceae/ngc_ft1/snd_buf.hpp b/solanaceae/ngc_ft1/snd_buf.hpp new file mode 100644 index 0000000..416f051 --- /dev/null +++ b/solanaceae/ngc_ft1/snd_buf.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include + +struct SendSequenceBuffer { + struct SSBEntry { + std::vector data; // the data (variable size, but smaller than 500) + float time_since_activity {0.f}; + }; + + // sequence_id -> entry + std::map entries; + + uint16_t next_seq_id {0}; + + void erase(uint16_t seq); + + // inflight chunks + size_t size(void) const; + + uint16_t add(std::vector&& data); + + template + void for_each(float time_delta, FN&& fn) { + for (auto& [id, entry] : entries) { + entry.time_since_activity += time_delta; + fn(id, entry.data, entry.time_since_activity); + } + } +}; + diff --git a/solanaceae/ngc_ft1_sha1/ft1_sha1_info.cpp b/solanaceae/ngc_ft1_sha1/ft1_sha1_info.cpp new file mode 100644 index 0000000..0daaadb --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/ft1_sha1_info.cpp @@ -0,0 +1,139 @@ +#include "./ft1_sha1_info.hpp" + +#include + +SHA1Digest::SHA1Digest(const std::vector& v) { + assert(v.size() == data.size()); + for (size_t i = 0; i < data.size(); i++) { + data[i] = v[i]; + } +} + +SHA1Digest::SHA1Digest(const uint8_t* d, size_t s) { + assert(s == data.size()); + for (size_t i = 0; i < data.size(); i++) { + data[i] = d[i]; + } +} + +std::ostream& operator<<(std::ostream& out, const SHA1Digest& v) { + std::string str{}; + str.resize(v.size()*2, '?'); + + // HECK, std is 1 larger than size returns ('\0') + sodium_bin2hex(str.data(), str.size()+1, v.data.data(), v.data.size()); + + out << str; + + return out; +} + +size_t FT1InfoSHA1::chunkSize(size_t chunk_index) const { + if (chunk_index+1 == chunks.size()) { + // last chunk + return file_size - chunk_index * chunk_size; + } else { + return chunk_size; + } +} + +std::vector FT1InfoSHA1::toBuffer(void) const { + std::vector buffer; + + assert(!file_name.empty()); + // TODO: optimize + for (size_t i = 0; i < 256; i++) { + if (i < file_name.size()) { + buffer.push_back(file_name.at(i)); + } else { + buffer.push_back(0); + } + } + assert(buffer.size() == 256); + + { // HACK: endianess + buffer.push_back((file_size>>(0*8)) & 0xff); + buffer.push_back((file_size>>(1*8)) & 0xff); + buffer.push_back((file_size>>(2*8)) & 0xff); + buffer.push_back((file_size>>(3*8)) & 0xff); + buffer.push_back((file_size>>(4*8)) & 0xff); + buffer.push_back((file_size>>(5*8)) & 0xff); + buffer.push_back((file_size>>(6*8)) & 0xff); + buffer.push_back((file_size>>(7*8)) & 0xff); + } + assert(buffer.size() == 256+8); + + // chunk size + { // HACK: endianess + buffer.push_back((chunk_size>>(0*8)) & 0xff); + buffer.push_back((chunk_size>>(1*8)) & 0xff); + buffer.push_back((chunk_size>>(2*8)) & 0xff); + buffer.push_back((chunk_size>>(3*8)) & 0xff); + } + + assert(buffer.size() == 256+8+4); + + for (const auto& chunk : chunks) { + for (size_t i = 0; i < chunk.data.size(); i++) { + buffer.push_back(chunk.data[i]); + } + } + assert(buffer.size() == 256+8+4+20*chunks.size()); + + return buffer; +} + +void FT1InfoSHA1::fromBuffer(const std::vector& buffer) { + assert(buffer.size() >= 256+8+4); + + // TODO: optimize + file_name.clear(); + for (size_t i = 0; i < 256; i++) { + char next_char = static_cast(buffer[i]); + if (next_char == 0) { + break; + } + file_name.push_back(next_char); + } + + { // HACK: endianess + file_size = 0; + file_size |= uint64_t(buffer[256+0]) << (0*8); + file_size |= uint64_t(buffer[256+1]) << (1*8); + file_size |= uint64_t(buffer[256+2]) << (2*8); + file_size |= uint64_t(buffer[256+3]) << (3*8); + file_size |= uint64_t(buffer[256+4]) << (4*8); + file_size |= uint64_t(buffer[256+5]) << (5*8); + file_size |= uint64_t(buffer[256+6]) << (6*8); + file_size |= uint64_t(buffer[256+7]) << (7*8); + } + + { // HACK: endianess + chunk_size = 0; + chunk_size |= uint32_t(buffer[256+8+0]) << (0*8); + chunk_size |= uint32_t(buffer[256+8+1]) << (1*8); + chunk_size |= uint32_t(buffer[256+8+2]) << (2*8); + chunk_size |= uint32_t(buffer[256+8+3]) << (3*8); + } + + assert((buffer.size()-(256+8+4)) % 20 == 0); + + for (size_t offset = 256+8+4; offset < buffer.size();) { + assert(buffer.size() >= offset + 20); + + auto& chunk = chunks.emplace_back(); + for (size_t i = 0; i < chunk.size(); i++, offset++) { + chunk.data[i] = buffer.at(offset); + } + // TODO: error/leftover checking + } +} + +std::ostream& operator<<(std::ostream& out, const FT1InfoSHA1& v) { + out << " file_name: " << v.file_name << "\n"; + out << " file_size: " << v.file_size << "\n"; + out << " chunk_size: " << v.chunk_size << "\n"; + out << " chunks.size(): " << v.chunks.size() << "\n"; + return out; +} + diff --git a/solanaceae/ngc_ft1_sha1/ft1_sha1_info.hpp b/solanaceae/ngc_ft1_sha1/ft1_sha1_info.hpp new file mode 100644 index 0000000..17efca7 --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/ft1_sha1_info.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +struct SHA1Digest { + std::array data; + + SHA1Digest(void) = default; + SHA1Digest(const std::vector& v); + SHA1Digest(const uint8_t* d, size_t s); + + bool operator==(const SHA1Digest& other) const { return data == other.data; } + bool operator!=(const SHA1Digest& other) const { return data != other.data; } + + size_t size(void) const { return data.size(); } +}; + +std::ostream& operator<<(std::ostream& out, const SHA1Digest& v); + +namespace std { // inject + template<> struct hash { + std::size_t operator()(const SHA1Digest& h) const noexcept { + return + size_t(h.data[0]) << (0*8) | + size_t(h.data[1]) << (1*8) | + size_t(h.data[2]) << (2*8) | + size_t(h.data[3]) << (3*8) | + size_t(h.data[4]) << (4*8) | + size_t(h.data[5]) << (5*8) | + size_t(h.data[6]) << (6*8) | + size_t(h.data[7]) << (7*8) + ; + } + }; +} // std + +struct FT1InfoSHA1 { + std::string file_name; + uint64_t file_size {0}; + uint32_t chunk_size {128*1024}; // 128KiB for now + std::vector chunks; + + size_t chunkSize(size_t chunk_index) const; + + std::vector toBuffer(void) const; + void fromBuffer(const std::vector& buffer); +}; +std::ostream& operator<<(std::ostream& out, const FT1InfoSHA1& v); + diff --git a/solanaceae/ngc_ft1_sha1/hash_utils.cpp b/solanaceae/ngc_ft1_sha1/hash_utils.cpp new file mode 100644 index 0000000..9039447 --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/hash_utils.cpp @@ -0,0 +1,26 @@ +#include "./hash_utils.hpp" + +#include + +// returns the 20bytes sha1 hash +std::vector hash_sha1(const uint8_t* data, size_t size) { + SHA1_CTX ctx; + SHA1Init(&ctx); + + { // lib only takes uint32_t sizes, so chunk it + constexpr size_t hash_block_size {0xffffffff}; + size_t i = 0; + for (; i + hash_block_size < size; i += hash_block_size) { + SHA1Update(&ctx, reinterpret_cast(data) + i, hash_block_size); + } + + if (i < size) { + SHA1Update(&ctx, reinterpret_cast(data) + i, size - i); + } + } + + std::vector sha1_hash(20); + SHA1Final(sha1_hash.data(), &ctx); + return sha1_hash; +} + diff --git a/solanaceae/ngc_ft1_sha1/hash_utils.hpp b/solanaceae/ngc_ft1_sha1/hash_utils.hpp new file mode 100644 index 0000000..4de52ec --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/hash_utils.hpp @@ -0,0 +1,10 @@ +#pragma once + +#include +#include + +// returns the 20bytes sha1 hash +std::vector hash_sha1(const uint8_t* data, size_t size); + +inline std::vector hash_sha1(const char* data, size_t size) { return hash_sha1(reinterpret_cast(data), size); } + diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp new file mode 100644 index 0000000..c55579b --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.cpp @@ -0,0 +1,1311 @@ +#include "./sha1_ngcft1.hpp" + +#include + +#include +#include +#include +#include + +#include +#include + +#include "./ft1_sha1_info.hpp" +#include "./hash_utils.hpp" + +#include + +#include + +#include +#include +#include + +namespace Message::Components { + + using Content = ContentHandle; + +} // Message::Components + +// TODO: rename to content components +namespace Components { + + struct Messages { + std::vector messages; + }; + + using FT1InfoSHA1 = FT1InfoSHA1; + + struct FT1InfoSHA1Data { + std::vector data; + }; + + struct FT1InfoSHA1Hash { + std::vector hash; + }; + + struct FT1ChunkSHA1Cache { + std::vector have_chunk; + bool have_all {false}; + size_t have_count {0}; + entt::dense_map> chunk_hash_to_index; + + std::vector chunkIndices(const SHA1Digest& hash) const; + bool haveChunk(const SHA1Digest& hash) const; + }; + + struct FT1ChunkSHA1Requested { + // requested chunks with a timer since last request + entt::dense_map chunks; + }; + + struct SuspectedParticipants { + entt::dense_set participants; + }; + + struct ReRequestInfoTimer { + float timer {0.f}; + }; + + struct ReadHeadHint { + // points to the first byte we want + // this is just a hint, that can be set from outside + // to guide the sequential "piece picker" strategy + // the strategy *should* set this to the first byte we dont yet have + uint64_t offset_into_file {0u}; + }; + +} // Components + +std::vector Components::FT1ChunkSHA1Cache::chunkIndices(const SHA1Digest& hash) const { + const auto it = chunk_hash_to_index.find(hash); + if (it != chunk_hash_to_index.cend()) { + return it->second; + } else { + return {}; + } +} + +bool Components::FT1ChunkSHA1Cache::haveChunk(const SHA1Digest& hash) const { + if (have_all) { // short cut + return true; + } + + if (auto i_vec = chunkIndices(hash); !i_vec.empty()) { + // TODO: should i test all? + return have_chunk[i_vec.front()]; + } + + // not part of this file + return false; +} + +static size_t chunkSize(const FT1InfoSHA1& sha1_info, size_t chunk_index) { + if (chunk_index+1 == sha1_info.chunks.size()) { + // last chunk + return sha1_info.file_size - chunk_index * sha1_info.chunk_size; + } else { + return sha1_info.chunk_size; + } +} + +void SHA1_NGCFT1::queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash) { + // TODO: transfers + for (auto& [i_g, i_p, i_m, i_h, i_t] : _queue_requested_chunk) { + // if already in queue + if (i_g == group_number && i_p == peer_number && i_h == hash) { + // update timer + i_t = 0.f; + return; + } + } + + // not in queue yet + _queue_requested_chunk.push_back(std::make_tuple(group_number, peer_number, content, hash, 0.f)); +} + +uint64_t SHA1_NGCFT1::combineIds(const uint32_t group_number, const uint32_t peer_number) { + return (uint64_t(group_number) << 32) | peer_number; +} + +void SHA1_NGCFT1::updateMessages(ContentHandle ce) { + assert(ce.all_of()); + + for (auto msg : ce.get().messages) { + if (ce.all_of() && !msg.all_of()) { + msg.emplace(ce.get()); + } + if (ce.all_of()) { + msg.emplace_or_replace(ce.get()); + } + if (ce.all_of()) { + msg.emplace_or_replace(ce.get()); + } + if (ce.all_of()) { + msg.emplace_or_replace(ce.get()); + } + if (ce.all_of()) { + msg.emplace_or_replace(); + } else { + msg.remove(); + } + if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { + msg.emplace_or_replace(); + } + + _rmm.throwEventUpdate(msg); + } +} + +std::optional> SHA1_NGCFT1::selectPeerForRequest(ContentHandle ce) { + // get a list of peers we can request this file from + // TODO: randomly request from non SuspectedParticipants + std::vector> tox_peers; + for (const auto c : ce.get().participants) { + // TODO: sort by con state? + // prio to direct? + if (const auto* cs = _cr.try_get(c); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { + continue; + } + + if (_cr.all_of(c)) { + const auto& tgpe = _cr.get(c); + tox_peers.push_back({tgpe.group_number, tgpe.peer_number}); + } + } + + // 1 in 20 chance to ask random peer instead + // TODO: config + tweak + // TODO: save group in content to avoid the tox_peers list build + if (tox_peers.empty() || (_rng()%20) == 0) { + // meh + // HACK: determain group based on last tox_peers + if (!tox_peers.empty()) { + const uint32_t group_number = tox_peers.back().first; + auto gch = _tcm.getContactGroup(group_number); + assert(static_cast(gch)); + + std::vector un_tox_peers; + for (const auto child : gch.get().subs) { + if (const auto* cs = _cr.try_get(child); cs == nullptr || cs->state == Contact::Components::ConnectionState::State::disconnected) { + continue; + } + + if (_cr.all_of(child)) { + const auto& tgpe = _cr.get(child); + un_tox_peers.push_back(tgpe.peer_number); + } + } + if (un_tox_peers.empty()) { + // no one online, we are out of luck + } else { + const size_t sample_i = _rng()%un_tox_peers.size(); + const auto peer_number = un_tox_peers.at(sample_i); + + return std::make_pair(group_number, peer_number); + } + } + } else { + const size_t sample_i = _rng()%tox_peers.size(); + const auto [group_number, peer_number] = tox_peers.at(sample_i); + + return std::make_pair(group_number, peer_number); + } + + return std::nullopt; +} + +SHA1_NGCFT1::SHA1_NGCFT1( + Contact3Registry& cr, + RegistryMessageModel& rmm, + NGCFT1& nft, + ToxContactModel2& tcm +) : + _cr(cr), + _rmm(rmm), + _nft(nft), + _tcm(tcm) +{ + // TODO: also create and destroy + _rmm.subscribe(this, RegistryMessageModel_Event::message_updated); + + _nft.subscribe(this, NGCFT1_Event::recv_request); + _nft.subscribe(this, NGCFT1_Event::recv_init); + _nft.subscribe(this, NGCFT1_Event::recv_data); + _nft.subscribe(this, NGCFT1_Event::send_data); + _nft.subscribe(this, NGCFT1_Event::recv_done); + _nft.subscribe(this, NGCFT1_Event::send_done); + _nft.subscribe(this, NGCFT1_Event::recv_message); + + //_rmm.subscribe(this, RegistryMessageModel_Event::message_construct); + //_rmm.subscribe(this, RegistryMessageModel_Event::message_updated); + //_rmm.subscribe(this, RegistryMessageModel_Event::message_destroy); + + _rmm.subscribe(this, RegistryMessageModel_Event::send_file_path); +} + +void SHA1_NGCFT1::iterate(float delta) { + { // timers + // sending transfers + for (auto peer_it = _sending_transfers.begin(); peer_it != _sending_transfers.end();) { + for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { + it->second.time_since_activity += delta; + + // if we have not heard for 10sec, timeout + if (it->second.time_since_activity >= 10.f) { + std::cerr << "SHA1_NGCFT1 warning: sending tansfer timed out " << "." << int(it->first) << "\n"; + it = peer_it->second.erase(it); + } else { + it++; + } + } + + if (peer_it->second.empty()) { + // cleanup unused peers too agressive? + peer_it = _sending_transfers.erase(peer_it); + } else { + peer_it++; + } + } + + // receiving transfers + for (auto peer_it = _receiving_transfers.begin(); peer_it != _receiving_transfers.end();) { + for (auto it = peer_it->second.begin(); it != peer_it->second.end();) { + it->second.time_since_activity += delta; + + // if we have not heard for 10sec, timeout + if (it->second.time_since_activity >= 10.f) { + std::cerr << "SHA1_NGCFT1 warning: receiving tansfer timed out " << "." << int(it->first) << "\n"; + // TODO: if info, requeue? or just keep the timer comp? - no, timer comp will continue ticking, even if loading + //it->second.v + it = peer_it->second.erase(it); + } else { + it++; + } + } + + if (peer_it->second.empty()) { + // cleanup unused peers too agressive? + peer_it = _receiving_transfers.erase(peer_it); + } else { + peer_it++; + } + } + + // queued requests + for (auto it = _queue_requested_chunk.begin(); it != _queue_requested_chunk.end();) { + float& timer = std::get(*it); + timer += delta; + + if (timer >= 10.f) { + it = _queue_requested_chunk.erase(it); + } else { + it++; + } + } + + { // requested info timers + std::vector timed_out; + _contentr.view().each([delta, &timed_out](Content e, Components::ReRequestInfoTimer& rrit) { + rrit.timer += delta; + + // 15sec, TODO: config + if (rrit.timer >= 15.f) { + timed_out.push_back(e); + } + }); + for (const auto e : timed_out) { + // TODO: avoid dups + _queue_content_want_info.push_back({_contentr, e}); + _contentr.remove(e); + } + } + { // requested chunk timers + _contentr.view().each([delta](Components::FT1ChunkSHA1Requested& ftchunk_requested) { + for (auto it = ftchunk_requested.chunks.begin(); it != ftchunk_requested.chunks.end();) { + it->second += delta; + + // 20sec, TODO: config + if (it->second >= 20.f) { + it = ftchunk_requested.chunks.erase(it); + } else { + it++; + } + } + }); + } + } + + // if we have not reached the total cap for transfers + // count running transfers + size_t running_sending_transfer_count {0}; + for (const auto& [_, transfers] : _sending_transfers) { + running_sending_transfer_count += transfers.size(); + } + size_t running_receiving_transfer_count {0}; + for (const auto& [_, transfers] : _receiving_transfers) { + running_receiving_transfer_count += transfers.size(); + } + + if (running_sending_transfer_count < _max_concurrent_out) { + // TODO: for each peer? transfer cap per peer? + // TODO: info queue + if (!_queue_requested_chunk.empty()) { // then check for chunk requests + const auto [group_number, peer_number, ce, chunk_hash, _] = _queue_requested_chunk.front(); + + auto chunk_idx_vec = ce.get().chunkIndices(chunk_hash); + if (!chunk_idx_vec.empty()) { + + // check if already sending + bool already_sending_to_this_peer = false; + if (_sending_transfers.count(combineIds(group_number, peer_number))) { + for (const auto& [_2, t] : _sending_transfers.at(combineIds(group_number, peer_number))) { + if (std::holds_alternative(t.v)) { + const auto& v = std::get(t.v); + if (v.content == ce && v.chunk_index == chunk_idx_vec.front()) { + // already sending + already_sending_to_this_peer = true; + break; + } + } + } + } + + if (!already_sending_to_this_peer) { + const auto& info = ce.get(); + + uint8_t transfer_id {0}; + if (_nft.NGC_FT1_send_init_private( + group_number, peer_number, + static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), + chunk_hash.data.data(), chunk_hash.size(), + chunkSize(info, chunk_idx_vec.front()), + &transfer_id + )) { + _sending_transfers + [combineIds(group_number, peer_number)] + [transfer_id] // TODO: also save index? + .v = SendingTransfer::Chunk{ce, chunk_idx_vec.front()}; + } + } // else just remove from queue + } + // remove from queue regardless + _queue_requested_chunk.pop_front(); + } + } + + if (running_receiving_transfer_count < _max_concurrent_in) { + // strictly priorize info + if (!_queue_content_want_info.empty()) { + const auto ce = _queue_content_want_info.front(); + + // make sure we are missing the info + assert(!ce.all_of()); + assert(!ce.all_of()); + assert(!ce.all_of()); + assert(!ce.all_of()); + assert(ce.all_of()); + + auto selected_peer_opt = selectPeerForRequest(ce); + if (selected_peer_opt.has_value()) { + const auto [group_number, peer_number] = selected_peer_opt.value(); + + //const auto& info = msg.get(); + const auto& info_hash = ce.get().hash; + + _nft.NGC_FT1_send_request_private( + group_number, peer_number, + static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), + info_hash.data(), info_hash.size() + ); + ce.emplace(0.f); + + _queue_content_want_info.pop_front(); + + std::cout << "SHA1_NGCFT1: sent info request for [" << SHA1Digest{info_hash} << "] to " << group_number << ":" << peer_number << "\n"; + } + } else if (!_queue_content_want_chunk.empty()) { + const auto ce = _queue_content_want_chunk.front(); + + auto& requested_chunks = ce.get_or_emplace().chunks; + if (requested_chunks.size() < _max_pending_requests) { + + // select chunk/make sure we still need one + auto selected_peer_opt = selectPeerForRequest(ce); + if (selected_peer_opt.has_value()) { + const auto [group_number, peer_number] = selected_peer_opt.value(); + //std::cout << "SHA1_NGCFT1: should ask " << group_number << ":" << peer_number << " for content here\n"; + auto& cc = ce.get(); + const auto& info = ce.get(); + + // naive, choose first chunk we dont have (double requests!!) + for (size_t chunk_idx = 0; chunk_idx < cc.have_chunk.size(); chunk_idx++) { + if (cc.have_chunk[chunk_idx]) { + continue; + } + + // check by hash + if (cc.haveChunk(info.chunks.at(chunk_idx))) { + // TODO: fix this, a completed chunk should fill all the indecies it occupies + cc.have_chunk[chunk_idx] = true; + cc.have_count += 1; + if (cc.have_count == info.chunks.size()) { + cc.have_all = true; + cc.have_chunk.clear(); + break; + } + continue; + } + + if (requested_chunks.count(chunk_idx)) { + // already requested + continue; + } + + // request chunk_idx + _nft.NGC_FT1_send_request_private( + group_number, peer_number, + static_cast(NGCFT1_file_kind::HASH_SHA1_CHUNK), + info.chunks.at(chunk_idx).data.data(), info.chunks.at(chunk_idx).size() + ); + requested_chunks[chunk_idx] = 0.f; + std::cout << "SHA1_NGCFT1: requesting chunk [" << info.chunks.at(chunk_idx) << "] from " << group_number << ":" << peer_number << "\n"; + + break; + } + + // ... + + // TODO: properly determine + if (!cc.have_all) { + _queue_content_want_chunk.push_back(ce); + } + _queue_content_want_chunk.pop_front(); + } + } + } + } +} + +bool SHA1_NGCFT1::onEvent(const Message::Events::MessageUpdated& e) { + // see tox_transfer_manager.cpp for reference + if (!e.e.all_of()) { + return false; + } + + //accept(e.e, e.e.get().save_to_path); + auto ce = e.e.get(); + + //if (!ce.all_of()) { + if (!ce.all_of()) { + // not ready to load yet, skip + return false; + } + assert(!ce.all_of()); + assert(!ce.all_of()); + + // first, open file for write(+readback) + std::string full_file_path{e.e.get().save_to_path}; + // TODO: replace with filesystem or something + // TODO: ensure dir exists + if (full_file_path.back() != '/') { + full_file_path += "/"; + } + + std::filesystem::create_directories(full_file_path); + + const auto& info = ce.get(); + full_file_path += info.file_name; + + ce.emplace(std::vector{full_file_path}); + + std::unique_ptr file_impl; + const bool file_exists = std::filesystem::exists(full_file_path); + + { + const bool truncate = !file_exists; + file_impl = std::make_unique(full_file_path, info.file_size, truncate); + } + + if (!file_impl->isGood()) { + std::cerr << "SHA1_NGCFT1 error: failed opening file '" << full_file_path << "'!\n"; + //e.e.remove(); // stop + return false; + } + + { // next, create chuck cache and check for existing data + auto& cc = ce.emplace(); + cc.have_all = false; + cc.have_count = 0; + + cc.chunk_hash_to_index.clear(); // if copy pasta + + if (file_exists) { + // iterate existing file + for (size_t i = 0; i < info.chunks.size(); i++) { + auto existing_data = file_impl->read(i*info.chunk_size, info.chunkSize(i)); + // TODO: avoid copy + cc.have_chunk.push_back( + SHA1Digest{hash_sha1(existing_data.data(), existing_data.size())} == info.chunks.at(i) + ); + if (cc.have_chunk.back()) { + cc.have_count += 1; + } + + _chunks[info.chunks[i]] = ce; + cc.chunk_hash_to_index[info.chunks[i]].push_back(i); + } + + if (cc.have_count == info.chunks.size()) { + cc.have_all = true; + } + } else { + for (size_t i = 0; i < info.chunks.size(); i++) { + cc.have_chunk.push_back(false); + _chunks[info.chunks[i]] = ce; + cc.chunk_hash_to_index[info.chunks[i]].push_back(i); + } + } + + if (!cc.have_all) { + // now, enque + _queue_content_want_chunk.push_back(ce); + } + } + + ce.emplace(std::move(file_impl)); + + ce.remove(); + + // should? + e.e.remove(); + + updateMessages(ce); + + return false; +} + +bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_request& e) { + // only interested in sha1 + if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) { + return false; + } + + //std::cout << "SHA1_NGCFT1: FT1_REQUEST fk:" << int(e.file_kind) << " [" << bin2hex({e.file_id, e.file_id+e.file_id_size}) << "]\n"; + + if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) { + if (e.file_id_size != 20) { + // error + return false; + } + + SHA1Digest info_hash{e.file_id, e.file_id_size}; + if (!_info_to_content.count(info_hash)) { + // we dont know about this + return false; + } + + auto content = _info_to_content.at(info_hash); + + if (!content.all_of()) { + // we dont have the info for that infohash (yet?) + return false; + } + + // TODO: queue instead + //queueUpRequestInfo(e.group_number, e.peer_number, info_hash); + uint8_t transfer_id {0}; + _nft.NGC_FT1_send_init_private( + e.group_number, e.peer_number, + static_cast(e.file_kind), + e.file_id, e.file_id_size, + content.get().data.size(), + &transfer_id + ); + + _sending_transfers + [combineIds(e.group_number, e.peer_number)] + [transfer_id] + .v = SendingTransfer::Info{content.get().data}; + } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { + if (e.file_id_size != 20) { + // error + return false; + } + + SHA1Digest chunk_hash{e.file_id, e.file_id_size}; + + if (!_chunks.count(chunk_hash)) { + // we dont know about this + return false; + } + + auto ce = _chunks.at(chunk_hash); + + { // they advertise interest in the content + const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + ce.get_or_emplace().participants.emplace(c); + } + + assert(ce.all_of()); + + if (!ce.get().haveChunk(chunk_hash)) { + // we dont have the chunk + return false; + } + + // queue good request + queueUpRequestChunk(e.group_number, e.peer_number, ce, chunk_hash); + } else { + assert(false && "unhandled case"); + } + + return true; +} + +bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_init& e) { + // only interested in sha1 + if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO && e.file_kind != NGCFT1_file_kind::HASH_SHA1_CHUNK) { + return false; + } + + // TODO: make sure we requested this? + + if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_INFO) { + SHA1Digest sha1_info_hash {e.file_id, e.file_id_size}; + if (!_info_to_content.count(sha1_info_hash)) { + // no idea about this content + return false; + } + + auto ce = _info_to_content.at(sha1_info_hash); + + if (ce.any_of()) { + // we already have the info (should) + return false; + } + + // TODO: check if e.file_size too large / ask for permission + if (e.file_size > 100*1024*1024) { + // a info size of 100MiB is ~640GiB for a 128KiB chunk size (default) + return false; + } + + _receiving_transfers + [combineIds(e.group_number, e.peer_number)] + [e.transfer_id] + .v = ReceivingTransfer::Info{ce, std::vector(e.file_size)}; + + e.accept = true; + } else if (e.file_kind == NGCFT1_file_kind::HASH_SHA1_CHUNK) { + SHA1Digest sha1_chunk_hash {e.file_id, e.file_id_size}; + + if (!_chunks.count(sha1_chunk_hash)) { + // no idea about this content + return false; + } + + auto ce = _chunks.at(sha1_chunk_hash); + + // CHECK IF TRANSFER IN PROGESS!! + + { // they have the content (probably, might be fake, should move this to done) + const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + ce.get_or_emplace().participants.emplace(c); + } + + assert(ce.all_of()); + assert(ce.all_of()); + + const auto& cc = ce.get(); + if (cc.haveChunk(sha1_chunk_hash)) { + std::cout << "SHA1_NGCFT1: chunk rejected, already have [" << SHA1Digest{sha1_chunk_hash} << "]\n"; + // we have the chunk + return false; + } + // TODO: cache position + + // calc offset_into_file + auto idx_vec = cc.chunkIndices(sha1_chunk_hash); + assert(!idx_vec.empty()); + + const auto& info = ce.get(); + + // TODO: check e.file_size + assert(e.file_size == info.chunkSize(idx_vec.front())); + + _receiving_transfers + [combineIds(e.group_number, e.peer_number)] + [e.transfer_id] + .v = ReceivingTransfer::Chunk{ce, idx_vec}; + + e.accept = true; + + std::cout << "SHA1_NGCFT1: accepted chunk [" << SHA1Digest{sha1_chunk_hash} << "]\n"; + } else { + assert(false && "unhandled case"); + } + + return true; +} + +bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_data& e) { + if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) { + return false; + } + + auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number)); + if (!peer_transfers.count(e.transfer_id)) { + return false; + } + + auto& tv = peer_transfers[e.transfer_id].v; + if (std::holds_alternative(tv)) { + auto& info_data = std::get(tv).info_data; + for (size_t i = 0; i < e.data_size && i + e.data_offset < info_data.size(); i++) { + info_data[i+e.data_offset] = e.data[i]; + } + } else if (std::holds_alternative(tv)) { + auto ce = std::get(tv).content; + + assert(ce.all_of()); + auto* file = ce.get().get(); + assert(file != nullptr); + + for (const auto chunk_index : std::get(tv).chunk_indices) { + const auto offset_into_file = chunk_index* ce.get().chunk_size; + + // TODO: avoid temporary copy + // TODO: check return + file->write(offset_into_file + e.data_offset, {e.data, e.data + e.data_size}); + } + } else { + assert(false && "unhandled case"); + } + + return true; +} + +bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_data& e) { + if (!_sending_transfers.count(combineIds(e.group_number, e.peer_number))) { + return false; + } + + auto& peer = _sending_transfers.at(combineIds(e.group_number, e.peer_number)); + + if (!peer.count(e.transfer_id)) { + return false; + } + + auto& transfer = peer.at(e.transfer_id); + if (std::holds_alternative(transfer.v)) { + auto& info_transfer = std::get(transfer.v); + for (size_t i = 0; i < e.data_size && (i + e.data_offset) < info_transfer.info_data.size(); i++) { + e.data[i] = info_transfer.info_data[i + e.data_offset]; + } + + if (e.data_offset + e.data_size >= info_transfer.info_data.size()) { + // was last read (probably TODO: add transfer destruction event) + peer.erase(e.transfer_id); + } + } else if (std::holds_alternative(transfer.v)) { + auto& chunk_transfer = std::get(transfer.v); + const auto& info = chunk_transfer.content.get(); + // TODO: should we really use file? + const auto data = chunk_transfer.content.get()->read((chunk_transfer.chunk_index * info.chunk_size) + e.data_offset, e.data_size); + + // TODO: optimize + for (size_t i = 0; i < e.data_size && i < data.size(); i++) { + e.data[i] = data[i]; + } + + chunk_transfer.content.get_or_emplace().total += data.size(); + // TODO: add event to propergate to messages + //_rmm.throwEventUpdate(transfer); // should we? + + //if (e.data_offset + e.data_size >= *insert chunk size here*) { + //// was last read (probably TODO: add transfer destruction event) + //peer.erase(e.transfer_id); + //} + } else { + assert(false && "not implemented?"); + } + + transfer.time_since_activity = 0.f; + + return true; +} + +bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_done& e) { + if (!_receiving_transfers.count(combineIds(e.group_number, e.peer_number))) { + return false; + } + + auto& peer_transfers = _receiving_transfers.at(combineIds(e.group_number, e.peer_number)); + if (!peer_transfers.count(e.transfer_id)) { + return false; + } + + const auto& tv = peer_transfers[e.transfer_id].v; + if (std::holds_alternative(tv)) { + auto& info = std::get(tv); + auto ce = info.content; + + if (ce.any_of()) { + // we already have the info, discard + peer_transfers.erase(e.transfer_id); + return true; + } + + // check if data matches hash + auto hash = hash_sha1(info.info_data.data(), info.info_data.size()); + + assert(ce.all_of()); + if (ce.get().hash != hash) { + std::cerr << "SHA1_NGCFT1 error: got info data mismatching its hash\n"; + // requeue info request + peer_transfers.erase(e.transfer_id); + return true; + } + + const auto& info_data = ce.emplace_or_replace(std::move(info.info_data)).data; + auto& ft_info = ce.emplace_or_replace(); + ft_info.fromBuffer(info_data); + + { // file info + // TODO: not overwrite fi? since same? + auto& file_info = ce.emplace_or_replace(); + file_info.file_list.emplace_back() = {ft_info.file_name, ft_info.file_size}; + file_info.total_size = ft_info.file_size; + } + + std::cout << "SHA1_NGCFT1: got info for [" << SHA1Digest{hash} << "]\n" << ft_info << "\n"; + + ce.remove(); + if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) { + _queue_content_want_info.erase(it); + } + + ce.emplace_or_replace(); + + updateMessages(ce); + } else if (std::holds_alternative(tv)) { + auto ce = std::get(tv).content; + const auto& info = ce.get(); + auto& cc = ce.get(); + + // HACK: only check first chunk (they *should* all be the same) + const auto chunk_index = std::get(tv).chunk_indices.front(); + const auto offset_into_file = chunk_index * info.chunk_size; + + assert(chunk_index < info.chunks.size()); + const auto chunk_size = info.chunkSize(chunk_index); + assert(offset_into_file+chunk_size <= info.file_size); + + const auto chunk_data = ce.get()->read(offset_into_file, chunk_size); + + // check hash of chunk + auto got_hash = hash_sha1(chunk_data.data(), chunk_data.size()); + if (info.chunks.at(chunk_index) == got_hash) { + std::cout << "SHA1_NGCFT1: got chunk [" << SHA1Digest{got_hash} << "]\n"; + + // remove from requested + // TODO: remove at init and track running transfers differently + for (const auto it : std::get(tv).chunk_indices) { + ce.get_or_emplace().chunks.erase(it); + } + + if (!cc.have_all) { + for (const auto inner_chunk_index : std::get(tv).chunk_indices) { + if (!cc.have_all && !cc.have_chunk.at(inner_chunk_index)) { + cc.have_chunk.at(inner_chunk_index) = true; + cc.have_count += 1; + if (cc.have_count == info.chunks.size()) { + // debug check + for ([[maybe_unused]] const bool it : cc.have_chunk) { + assert(it); + } + + cc.have_all = true; + cc.have_chunk.clear(); // not wasting memory + std::cout << "SHA1_NGCFT1: got all chunks for \n" << info << "\n"; + } + + // good chunk + // TODO: have wasted + metadata + ce.get_or_emplace().total += chunk_data.size(); + } + } + } else { + std::cout << "SHA1_NGCFT1 warning: got chunk duplicate\n"; + } + } else { + // bad chunk + // TODO: requeue? + } + + updateMessages(ce); // mostly for received bytes + } + + peer_transfers.erase(e.transfer_id); + + return true; +} + +bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_send_done& e) { + if (!_sending_transfers.count(combineIds(e.group_number, e.peer_number))) { + return false; + } + + auto& peer_transfers = _sending_transfers.at(combineIds(e.group_number, e.peer_number)); + if (!peer_transfers.count(e.transfer_id)) { + return false; + } + + const auto& tv = peer_transfers[e.transfer_id].v; + if (std::holds_alternative(tv)) { + updateMessages(std::get(tv).content); // mostly for sent bytes + } + peer_transfers.erase(e.transfer_id); + + return true; +} + +bool SHA1_NGCFT1::onEvent(const Events::NGCFT1_recv_message& e) { + if (e.file_kind != NGCFT1_file_kind::HASH_SHA1_INFO) { + return false; + } + + uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + const auto c = _tcm.getContactGroupPeer(e.group_number, e.peer_number); + const auto self_c = c.get().self; + + auto* reg_ptr = _rmm.get(c); + if (reg_ptr == nullptr) { + std::cerr << "SHA1_NGCFT1 error: cant find reg\n"; + return false; + } + + Message3Registry& reg = *reg_ptr; + // TODO: check for existence, hs or other syncing mechanics might have sent it already (or like, it arrived 2x or whatever) + auto new_msg_e = reg.create(); + + { // contact + // from + reg.emplace(new_msg_e, c); + + // to + reg.emplace(new_msg_e, c.get().parent); + } + + reg.emplace(new_msg_e, e.message_id); + + reg.emplace(new_msg_e); // add sending? + + reg.emplace(new_msg_e, ts); + //reg.emplace(new_msg_e, 0); + reg.emplace(new_msg_e, ts); // reactive? + + { // by whom + auto& synced_by = reg.get_or_emplace(new_msg_e).list; + synced_by.emplace(self_c); + } + + // check if content exists + const auto sha1_info_hash = std::vector{e.file_id, e.file_id+e.file_id_size}; + ContentHandle ce; + if (_info_to_content.count(sha1_info_hash)) { + ce = _info_to_content.at(sha1_info_hash); + std::cout << "SHA1_NGCFT1: new message has existing content\n"; + } else { + ce = {_contentr, _contentr.create()}; + _info_to_content[sha1_info_hash] = ce; + std::cout << "SHA1_NGCFT1: new message has new content\n"; + + //ce.emplace(sha1_info); + //ce.emplace(sha1_info_data); // keep around? or file? + ce.emplace(sha1_info_hash); + //{ // lookup tables and have + //auto& cc = ce.emplace(); + //cc.have_all = true; + //// skip have vec, since all + ////cc.have_chunk + //cc.have_count = sha1_info.chunks.size(); // need? + + //_info_to_content[sha1_info_hash] = ce; + //for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + //_chunks[sha1_info.chunks[i]] = ce; + //cc.chunk_hash_to_index[sha1_info.chunks[i]] = i; + //} + //} + + // TODO: ft1 specific comp + //ce.emplace(std::move(file_impl)); + } + ce.get_or_emplace().messages.push_back({reg, new_msg_e}); + reg_ptr->emplace(new_msg_e, ce); + + ce.get_or_emplace().participants.emplace(c); + + if (!ce.all_of() && !ce.all_of()) { + // TODO: check if already receiving + _queue_content_want_info.push_back(ce); + } + + // TODO: queue info dl + + //reg_ptr->emplace(e, sha1_info); + //reg_ptr->emplace(e, sha1_info_data); // keep around? or file? + //reg.emplace(new_msg_e, std::vector{e.file_id, e.file_id+e.file_id_size}); + + if (auto* cc = ce.try_get(); cc != nullptr && cc->have_all) { + reg_ptr->emplace(new_msg_e); + } + + if (ce.all_of()) { + reg_ptr->emplace(new_msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(new_msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(new_msg_e, ce.get()); + } + + // TODO: queue info/check if we already have info + + _rmm.throwEventConstruct(reg, new_msg_e); + + return true; // false? +} + +bool SHA1_NGCFT1::sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) { + if ( + // TODO: add support of offline queuing + !_cr.all_of(c) + ) { + return false; + } + + std::cout << "SHA1_NGCFT1: got sendFilePath()\n"; + + auto* reg_ptr = _rmm.get(c); + if (reg_ptr == nullptr) { + return false; + } + + // TODO: rw? + // TODO: memory mapped would be king + auto file_impl = std::make_unique(file_path); + if (!file_impl->isGood()) { + std::cerr << "SHA1_NGCFT1 error: failed opening file '" << file_path << "'!\n"; + return true; + } + + // get current time unix epoch utc + uint64_t ts = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + + // 1. build info by hashing all chunks + + FT1InfoSHA1 sha1_info; + // build info + sha1_info.file_name = file_name; + sha1_info.file_size = file_impl->_file_size; + + { // build chunks + // HACK: load file fully + // TODO: the speed is truly horrid + const auto file_data = file_impl->read(0, file_impl->_file_size); + size_t i = 0; + for (; i + sha1_info.chunk_size < file_data.size(); i += sha1_info.chunk_size) { + sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, sha1_info.chunk_size)); + } + + if (i < file_data.size()) { + sha1_info.chunks.push_back(hash_sha1(file_data.data()+i, file_data.size()-i)); + } + } + + // 2. hash info + std::vector sha1_info_data; + std::vector sha1_info_hash; + + std::cout << "SHA1_NGCFT1 info is: \n" << sha1_info; + sha1_info_data = sha1_info.toBuffer(); + std::cout << "SHA1_NGCFT1 sha1_info size: " << sha1_info_data.size() << "\n"; + sha1_info_hash = hash_sha1(sha1_info_data.data(), sha1_info_data.size()); + std::cout << "SHA1_NGCFT1 sha1_info_hash: " << bin2hex(sha1_info_hash) << "\n"; + + // check if content exists + ContentHandle ce; + if (_info_to_content.count(sha1_info_hash)) { + ce = _info_to_content.at(sha1_info_hash); + + // TODO: check if content is incomplete and use file instead + if (!ce.all_of()) { + ce.emplace(sha1_info); + } + if (!ce.all_of()) { + ce.emplace(sha1_info_data); + } + + // hash has to be set already + // Components::FT1InfoSHA1Hash + + { // lookup tables and have + auto& cc = ce.get_or_emplace(); + cc.have_all = true; + // skip have vec, since all + //cc.have_chunk + cc.have_count = sha1_info.chunks.size(); // need? + + _info_to_content[sha1_info_hash] = ce; + cc.chunk_hash_to_index.clear(); // for cpy pst + for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + _chunks[sha1_info.chunks[i]] = ce; + cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); + } + } + + { // file info + // TODO: not overwrite fi? since same? + auto& file_info = ce.emplace_or_replace(); + file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size}; + file_info.total_size = file_impl->_file_size; + + ce.emplace_or_replace(std::vector{std::string{file_path}}); + } + + // cleanup file + if (ce.all_of()) { + // replace + ce.remove(); + } + ce.emplace(std::move(file_impl)); + + if (!ce.all_of()) { + ce.emplace(0u); + } + + ce.remove(); + + // we dont want the info anymore + ce.remove(); + if (auto it = std::find(_queue_content_want_info.begin(), _queue_content_want_info.end(), ce); it != _queue_content_want_info.end()) { + _queue_content_want_info.erase(it); + } + + // TODO: we dont want chunks anymore + + // TODO: make sure to abort every receiving transfer (sending info and chunk should be fine, info uses copy and chunk handle) + auto it = _queue_content_want_chunk.begin(); + while ( + it != _queue_content_want_chunk.end() && + (it = std::find(it, _queue_content_want_chunk.end(), ce)) != _queue_content_want_chunk.end() + ) { + it = _queue_content_want_chunk.erase(it); + } + } else { + ce = {_contentr, _contentr.create()}; + _info_to_content[sha1_info_hash] = ce; + + ce.emplace(sha1_info); + ce.emplace(sha1_info_data); // keep around? or file? + ce.emplace(sha1_info_hash); + { // lookup tables and have + auto& cc = ce.emplace(); + cc.have_all = true; + // skip have vec, since all + //cc.have_chunk + cc.have_count = sha1_info.chunks.size(); // need? + + _info_to_content[sha1_info_hash] = ce; + cc.chunk_hash_to_index.clear(); // for cpy pst + for (size_t i = 0; i < sha1_info.chunks.size(); i++) { + _chunks[sha1_info.chunks[i]] = ce; + cc.chunk_hash_to_index[sha1_info.chunks[i]].push_back(i); + } + } + + { // file info + auto& file_info = ce.emplace(); + //const auto& file = ce.get(); + file_info.file_list.emplace_back() = {std::string{file_name}, file_impl->_file_size}; + file_info.total_size = file_impl->_file_size; + + ce.emplace(std::vector{std::string{file_path}}); + } + + ce.emplace(std::move(file_impl)); + + ce.emplace(0u); + } + + const auto c_self = _cr.get(c).self; + if (!_cr.valid(c_self)) { + std::cerr << "SHA1_NGCFT1 error: failed to get self!\n"; + return true; + } + + const auto msg_e = reg_ptr->create(); + reg_ptr->emplace(msg_e, c); + reg_ptr->emplace(msg_e, c_self); + reg_ptr->emplace(msg_e, ts); // reactive? + + reg_ptr->emplace(msg_e); + reg_ptr->emplace(msg_e); + + ce.get_or_emplace().messages.push_back({*reg_ptr, msg_e}); + + + //reg_ptr->emplace(e, file_kind); + // file id would be sha1_info hash or something + //reg_ptr->emplace(e, file_id); + + // remove? done in updateMessages() anyway + if (ce.all_of()) { + reg_ptr->emplace(msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(msg_e, ce.get()); + } + if (ce.all_of()) { + reg_ptr->emplace(msg_e, ce.get()); + } + + // TODO: determine if this is true + //reg_ptr->emplace(e); + + if (_cr.any_of(c)) { + const uint32_t group_number = _cr.get(c).group_number; + uint32_t message_id = 0; + + // TODO: check return + _nft.NGC_FT1_send_message_public(group_number, message_id, static_cast(NGCFT1_file_kind::HASH_SHA1_INFO), sha1_info_hash.data(), sha1_info_hash.size()); + reg_ptr->emplace(msg_e, message_id); + + // TODO: generalize? + auto& synced_by = reg_ptr->emplace(msg_e).list; + synced_by.emplace(c_self); + } else if ( + // non online group + _cr.any_of(c) + ) { + // create msg_id + const uint32_t message_id = randombytes_random(); + reg_ptr->emplace(msg_e, message_id); + + // TODO: generalize? + auto& synced_by = reg_ptr->emplace(msg_e).list; + synced_by.emplace(c_self); + } + + _rmm.throwEventConstruct(*reg_ptr, msg_e); + + // TODO: place in iterate? + updateMessages(ce); + + return true; +} + diff --git a/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp new file mode 100644 index 0000000..9565008 --- /dev/null +++ b/solanaceae/ngc_ft1_sha1/sha1_ngcft1.hpp @@ -0,0 +1,134 @@ +#pragma once + +// solanaceae port of sha1 fts for NGCFT1 + +#include +#include +#include + +#include "./ngcft1.hpp" + +#include "./ft1_sha1_info.hpp" + +#include +#include +#include + +#include +#include + +enum class Content : uint32_t {}; +using ContentRegistry = entt::basic_registry; +using ContentHandle = entt::basic_handle; + +class SHA1_NGCFT1 : public RegistryMessageModelEventI, public NGCFT1EventI { + Contact3Registry& _cr; + RegistryMessageModel& _rmm; + NGCFT1& _nft; + ToxContactModel2& _tcm; + + std::minstd_rand _rng {1337*11}; + + // registry per group? + ContentRegistry _contentr; + + // limit this to each group? + entt::dense_map _info_to_content; + + // sha1 chunk index + // TODO: optimize lookup + // TODO: multiple contents. hashes might be unique, but data is not + entt::dense_map _chunks; + + // group_number, peer_number, content, chunk_hash, timer + std::deque> _queue_requested_chunk; + //void queueUpRequestInfo(uint32_t group_number, uint32_t peer_number, const SHA1Digest& hash); + void queueUpRequestChunk(uint32_t group_number, uint32_t peer_number, ContentHandle content, const SHA1Digest& hash); + + struct SendingTransfer { + struct Info { + // copy of info data + // too large? + std::vector info_data; + }; + + struct Chunk { + ContentHandle content; + size_t chunk_index; // <.< remove offset_into_file + //uint64_t offset_into_file; + // or data? + // if memmapped, this would be just a pointer + }; + + std::variant v; + + float time_since_activity {0.f}; + }; + // key is groupid + peerid + entt::dense_map> _sending_transfers; + + struct ReceivingTransfer { + struct Info { + ContentHandle content; + // copy of info data + // too large? + std::vector info_data; + }; + + struct Chunk { + ContentHandle content; + std::vector chunk_indices; + // or data? + // if memmapped, this would be just a pointer + }; + + std::variant v; + + float time_since_activity {0.f}; + }; + // key is groupid + peerid + entt::dense_map> _receiving_transfers; + + // makes request rotate around open content + std::deque _queue_content_want_info; + std::deque _queue_content_want_chunk; + + static uint64_t combineIds(const uint32_t group_number, const uint32_t peer_number); + + void updateMessages(ContentHandle ce); + + std::optional> selectPeerForRequest(ContentHandle ce); + + public: // TODO: config + bool _udp_only {false}; + + size_t _max_concurrent_in {4}; + size_t _max_concurrent_out {6}; + // TODO: probably also includes running transfers rn (meh) + size_t _max_pending_requests {16}; // per content + + public: + SHA1_NGCFT1( + Contact3Registry& cr, + RegistryMessageModel& rmm, + NGCFT1& nft, + ToxContactModel2& tcm + ); + + void iterate(float delta); + + protected: // rmm events (actions) + bool onEvent(const Message::Events::MessageUpdated&) override; + + protected: // events + bool onEvent(const Events::NGCFT1_recv_request&) override; + bool onEvent(const Events::NGCFT1_recv_init&) override; + bool onEvent(const Events::NGCFT1_recv_data&) override; + bool onEvent(const Events::NGCFT1_send_data&) override; // const? + bool onEvent(const Events::NGCFT1_recv_done&) override; + bool onEvent(const Events::NGCFT1_send_done&) override; + bool onEvent(const Events::NGCFT1_recv_message&) override; + + bool sendFilePath(const Contact3 c, std::string_view file_name, std::string_view file_path) override; +}; +