From 81b587cef8a3ceef56a5be0f319eebdcb7ad6fd9 Mon Sep 17 00:00:00 2001 From: Slavasil Date: Thu, 21 Nov 2024 00:05:47 +0300 Subject: [PATCH] complete the initial post fetching functionality --- manager.cpp | 211 +++++++++++++++++++++++++++++++++++++--------------- manager.h | 3 + 2 files changed, 152 insertions(+), 62 deletions(-) diff --git a/manager.cpp b/manager.cpp index b58820b..bfddb41 100644 --- a/manager.cpp +++ b/manager.cpp @@ -41,66 +41,125 @@ void RepostManager::start() { } void RepostManager::on_clients_ready() { - auto posts = std::make_shared< std::optional>[] >(2); - auto doWorkWithPosts = [this, posts]() -> void { - spdlog::info("fetched some posts:"); - - int limit = 3; - for (auto &i : *posts[0]) { - if (limit-- == 0) { - spdlog::info("..."); - break; + struct new_post_fetcher { + struct fetcher_state { + bool ready = false; + bool needRequest = true; + long offset = 0, count = 3; + std::vector posts; + }; + RepostManager *mgr; + fetcher_state vkState, tgState; + new_post_fetcher(RepostManager *m) : mgr(m) {} + std::function &&vkPosts, std::vector &&tgPosts)> onDone; + std::function onError; + void fetch() { + if (vkState.ready && tgState.ready) { + onDone(std::move(vkState.posts), std::move(tgState.posts)); + return; } - spdlog::info("vk[#{}, {}] {}", i.id, i.date, i.text); - } - limit = 3; - for (auto &i : *posts[1]) { - if (limit-- == 0) { - spdlog::info("..."); - break; - } - spdlog::info("tg[#{}, {}] {}", i.id, i.date, i.text); - } - - m_tg.send_query(td_api::make_object(m_appConfig->tgSourceId), [](auto result){ - if (result->get_id() == td_api::chat::ID) { - auto &chat = (td_api::chat&)*result; - spdlog::info("source chat: #{} {}", chat.id_, chat.title_); - } else { - auto &e = (td_api::error&)*result; - spdlog::error("getChat error: {} {}", e.code_, e.message_); - } - }); - }; - auto putVkPosts = [this, posts, doWorkWithPosts](std::vector vkPosts){ - posts[0] = {to_abstract_posts(vkPosts)}; - spdlog::info("fetched {} vk posts", posts[0]->size()); - if (posts[1].has_value()) { - doWorkWithPosts(); - } - }; - auto putTgPosts = [this, posts, doWorkWithPosts](std::vector> tgPosts){ - posts[1] = {to_abstract_posts(tgPosts)}; - spdlog::info("fetched {} telegram posts", posts[1]->size()); - if (posts[0].has_value()) { - doWorkWithPosts(); - } - }; - if (m_appState->vkLastPostId == 0) { - spdlog::info("fetching ALL vk posts..."); - collect_all_vk_posts(putVkPosts); - } else { - spdlog::info("fetching last 3 vk posts..."); - collect_last_vk_posts(3, putVkPosts); - } - if (m_appState->tgLastPostId == 0) { - spdlog::info("fetching ALL telegram posts..."); - collect_all_tg_posts(putTgPosts); - } else { - spdlog::info("fetching last 3 telegram posts..."); - collect_last_tg_posts(3, putTgPosts); - } + if (!vkState.ready && vkState.needRequest) { + if (mgr->m_appState->vkLastPostId != 0) { + spdlog::info("fetching {} VK posts at offset {}", vkState.count, vkState.offset); + vkState.needRequest = false; + mgr->collect_vk_posts_from(vkState.offset, vkState.count, [this](auto posts){ check_vk_posts(posts); }); + vkState.offset += vkState.count; + vkState.count = vkState.count * 3 / 2; + } else { + spdlog::info("fetching all VK posts"); + vkState.needRequest = false; + mgr->collect_all_vk_posts([this](auto posts){ + spdlog::info("fetched all {} VK posts", posts.size()); + vkState.ready = true; + std::vector aposts = mgr->to_abstract_posts(posts); + vkState.posts.reserve(vkState.posts.size() + aposts.size()); + for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { + vkState.posts.emplace_back(std::move(*i)); + } + fetch(); + }); + } + } + + if (!tgState.ready && tgState.needRequest) { + if (mgr->m_appState->tgLastPostId != 0) { + spdlog::info("fetching {} TG posts starting from #{}", tgState.count, tgState.offset); + tgState.needRequest = false; + mgr->collect_tg_posts_from(tgState.offset, tgState.count, [this](auto posts){ + if (posts.empty()) return; + tgState.offset += posts[posts.size() - 1]->id_; + check_tg_posts(std::move(posts)); + }); + tgState.count = tgState.count * 3 / 2; + } else { + tgState.needRequest = false; + mgr->collect_all_tg_posts([this](auto posts){ + tgState.ready = true; + std::vector aposts = mgr->to_abstract_posts(posts); + tgState.posts.reserve(tgState.posts.size() + aposts.size()); + for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { + tgState.posts.emplace_back(std::move(*i)); + } + fetch(); + }); + } + } + } + void check_vk_posts(std::vector posts) { + spdlog::info("fetched {} VK posts", posts.size()); + std::vector aposts = mgr->to_abstract_posts(posts); + if (mgr->drop_posts_older_than(aposts, mgr->m_appState->vkLastPostId)) { + spdlog::info("found last remembered VK post"); + vkState.ready = true; + } + vkState.posts.reserve(vkState.posts.size() + aposts.size()); + for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { + vkState.posts.emplace_back(std::move(*i)); + } + vkState.needRequest = true; + fetch(); + } + void check_tg_posts(std::vector> posts) { + std::vector aposts = mgr->to_abstract_posts(posts); + if (mgr->drop_posts_older_than(aposts, mgr->m_appState->tgLastPostId)) { + spdlog::info("found last remembered TG post"); + tgState.ready = true; + } + tgState.posts.reserve(tgState.posts.size() + aposts.size()); + for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { + tgState.posts.emplace_back(std::move(*i)); + } + tgState.needRequest = true; + fetch(); + } + }; + new_post_fetcher *f = new new_post_fetcher(this); + + f->onDone = [this, f](auto vkPosts, auto tgPosts){ + delete f; + spdlog::info("collected {} new vk posts and {} new tg posts", vkPosts.size(), tgPosts.size()); + std::vector mergedPosts; + int totalSize = vkPosts.size() + tgPosts.size(); + mergedPosts.reserve(totalSize); + int vkIdx = vkPosts.size() - 1; + int tgIdx = tgPosts.size() - 1; + for (int i = 0; i < totalSize; ++i) { + if (tgIdx < 0 || vkPosts[vkIdx].date < tgPosts[tgIdx].date) { + mergedPosts.emplace_back(std::move(vkPosts[vkIdx--])); + } else { + mergedPosts.emplace_back(std::move(tgPosts[tgIdx--])); + } + } + spdlog::info("sorted {} posts", totalSize); + repost_all(mergedPosts); + }; + + f->onError = [f](){ + delete f; + }; + + f->fetch(); } void RepostManager::collect_all_vk_posts(std::function)> callback) { @@ -120,9 +179,27 @@ void RepostManager::collect_last_tg_posts(int count, std::function)> callback) { - m_vk.get_posts(m_appConfig->vkSource, offset, count, [=](std::optional chunk, int err){ + spdlog::debug("COLLECT VK POSTS FROM {} {}", offset, count); + auto result = std::make_shared>(); + collect_vk_posts_from__intermediate(offset, count, result, callback); +} + +void RepostManager::collect_vk_posts_from__intermediate(int offset, int count, std::shared_ptr> intermediateResult, std::function)> callback) { + m_vk.get_posts(m_appConfig->vkSource, offset, count, [=, this](std::optional chunk, int err){ if (err == 0) { - callback(chunk->posts); + if (chunk->posts.size() == 0) { + spdlog::debug("got all posts"); + callback(std::move(*intermediateResult.get())); + return; + } + int chunkSize = chunk->posts.size(); + int oldSize = intermediateResult->size(); + intermediateResult->reserve(oldSize + chunkSize); + for (auto i = chunk->posts.begin(), end = chunk->posts.end(); i != end; ++i) { + intermediateResult->emplace_back(std::move(*i)); + } + if (count > chunkSize) + collect_vk_posts_from__intermediate(offset + chunkSize, count - chunkSize, intermediateResult, callback); } else { spdlog::error("failed to get {} VK posts at offset {}: error {}", count, offset, err); } @@ -152,11 +229,11 @@ void RepostManager::collect_tg_posts_from__intermediate(long from, int count, st long oldestId; intermediateResult->reserve(oldSize + chunkSize); for (auto i = msgs.messages_.begin(), end = msgs.messages_.end(); i != end; ++i) { - //spdlog::debug("moving message {}", (*i)->id_); oldestId = (*i)->id_; intermediateResult->emplace_back(std::move(*i)); } - collect_tg_posts_from__intermediate(oldestId, count - chunkSize, intermediateResult, callback); + if (count > chunkSize) + collect_tg_posts_from__intermediate(oldestId, count - chunkSize, intermediateResult, callback); } else { auto &err = (td_api::error&)*obj; spdlog::error("failed to get posts: {} {}", err.code_, err.message_); @@ -164,6 +241,16 @@ void RepostManager::collect_tg_posts_from__intermediate(long from, int count, st }); } +bool RepostManager::drop_posts_older_than(std::vector &posts, long lastPostId) { + auto idx = std::find_if(posts.begin(), posts.end(), [lastPostId](auto &post){ return post.id == lastPostId; }); + if (idx == posts.end()) { + return false; + } else { + posts.erase(idx, posts.end()); + return true; + } +} + std::vector RepostManager::to_abstract_posts(std::vector &posts) { std::vector result; result.reserve(posts.size()); diff --git a/manager.h b/manager.h index 7d44b93..ab2cc79 100644 --- a/manager.h +++ b/manager.h @@ -33,8 +33,11 @@ namespace manager { void collect_vk_posts_from(int offset, int count, std::function)> callback); void collect_tg_posts_from(long from, int count, std::function>)> callback); + void collect_vk_posts_from__intermediate(int offset, int count, std::shared_ptr> intermediateResult, std::function)> callback); void collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr>> intermediateResult, std::function>)> callback); + bool drop_posts_older_than(std::vector &posts, long lastPostId); + std::vector to_abstract_posts(std::vector &posts); std::vector to_abstract_posts(std::vector> &posts);