diff --git a/http.cpp b/http.cpp index abd3ca3..c48c01e 100644 --- a/http.cpp +++ b/http.cpp @@ -1,6 +1,7 @@ #include "http.h" #include "curl/curl.h" #include "curl/easy.h" +#include "curl/multi.h" #include #include @@ -45,13 +46,31 @@ HttpClient::~HttpClient() { }); } -bool HttpClient::send_request(std::string method, std::string url, HttpOptions opts, ResponseCallback cb) { +void HttpClient::cancel_request(request_id id) { + CURL *requestHandle = reinterpret_cast(id); + auto request = m_requests.find(requestHandle); + if (request == m_requests.end()) { + m_logger->warn("cancel_request: not found"); + return; + } + curl_multi_remove_handle(m_curlMulti, requestHandle); + if (request->second.socketData->pollHandle) { + m_logger->debug("closing poll handle"); + uv_poll_stop(request->second.socketData->pollHandle); + uv_close((uv_handle_t*)request->second.socketData->pollHandle, [](uv_handle_t *h){ + delete h; + }); + } + m_requests.erase(request); +} + +request_id HttpClient::send_request(std::string method, std::string url, HttpOptions opts, ResponseCallback cb) { m_logger->debug("send request {} {}", method, url); CURL *requestHandle = curl_easy_init(); std::pair insertResult = m_requests.emplace(requestHandle, this); if (!insertResult.second) { curl_easy_cleanup(requestHandle); - return false; + return nullptr; } auto requestData = insertResult.first; requestData->second.callback = cb; @@ -83,7 +102,8 @@ bool HttpClient::send_request(std::string method, std::string url, HttpOptions o if (opts.body) { curl_easy_setopt(requestHandle, CURLOPT_POSTFIELDS, opts.body->c_str()); } - return CURLM_OK == curl_multi_add_handle(m_curlMulti, requestHandle); + curl_multi_add_handle(m_curlMulti, requestHandle); + return reinterpret_cast(requestHandle); } int HttpClient::curl_socket_cb(CURL *curl, curl_socket_t curlSocket, int action, HttpClient *self, void *socketPtr) { diff --git a/http.h b/http.h index a6c04f8..44d66ff 100644 --- a/http.h +++ b/http.h @@ -9,6 +9,8 @@ #include namespace http { + typedef void *request_id; + struct HttpResponse { int status; std::string body; @@ -24,7 +26,8 @@ namespace http { HttpClient(HttpClient&&) = delete; HttpClient(HttpClient&) = delete; ~HttpClient(); - bool send_request(std::string method, std::string url, HttpOptions opts, ResponseCallback cb); + request_id send_request(std::string method, std::string url, HttpOptions opts, ResponseCallback cb); + void cancel_request(request_id id); private: void check_curl_messages(); static int curl_socket_cb(CURL *curl, curl_socket_t curlSocket, int action, HttpClient *self, void *socketPtr); diff --git a/manager.cpp b/manager.cpp index 677ba0b..6ed908a 100644 --- a/manager.cpp +++ b/manager.cpp @@ -17,7 +17,8 @@ const unsigned long REPOST_INTERVAL = 2000; const unsigned long VK_CHECK_INTERVAL = 600000; RepostManager::RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config) - : m_vk(eventLoop), m_tg(eventLoop, config->tgApiId, config->tgApiHash, config->tgPhoneNumber) { + : m_vk(eventLoop), m_tg(eventLoop, config->tgApiId, config->tgApiHash, config->tgPhoneNumber), + m_fetcher(this) { m_appState = appState; m_appConfig = config; m_tg.authCodeProvider = tgCodeProvider; @@ -40,6 +41,7 @@ RepostManager::~RepostManager() { } if (m_checkTimer) { uv_timer_stop(m_checkTimer); + m_checkTimerStarted = false; uv_close((uv_handle_t*)m_checkTimer, [](uv_handle_t *h){ delete h; }); } } @@ -95,24 +97,39 @@ void RepostManager::start() { m_tg.start(); } -NewPostFetcher::NewPostFetcher(RepostManager *m, bool fetchVk, bool fetchTg) : mgr(m) { +void NewPostFetcher::fetch(bool fetchVk, bool fetchTg, decltype(onDone) onDone, decltype(onError) onError) { + if (working) return; + working = true; if (fetchVk) { - for (int i = 0; i < m->m_appConfig->vkSources.size(); ++i) { + for (int i = 0; i < mgr->m_appConfig->vkSources.size(); ++i) { fetcher_state &&state {}; state.sourceIndex = i; vkState.emplace_back(state); } } if (fetchTg) { - for (int i = 0; i < m->m_appConfig->tgSources.size(); ++i) { + for (int i = 0; i < mgr->m_appConfig->tgSources.size(); ++i) { fetcher_state &&state {}; state.sourceIndex = i; tgState.emplace_back(state); } } + this->onDone = onDone; + this->onError = onError; + continue_fetch(); } -void NewPostFetcher::fetch() { +void NewPostFetcher::reset_state() { + if (!working) return; + for (int i = 0; i < vkState.size(); ++i) { + vkState[i] = fetcher_state(); + } + for (int i = 0; i < tgState.size(); ++i) { + tgState[i] = fetcher_state(); + } +} + +void NewPostFetcher::continue_fetch() { bool vkReady = true; spdlog::info("fetch called"); for (int i = 0; i < vkState.size(); ++i) { @@ -147,7 +164,7 @@ void NewPostFetcher::fetch() { for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { state.posts.emplace_back(std::move(*i)); } - fetch(); + continue_fetch(); }); } } @@ -186,7 +203,7 @@ void NewPostFetcher::fetch() { for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { state.posts.emplace_back(std::move(*i)); } - fetch(); + continue_fetch(); }); } } @@ -203,7 +220,7 @@ void NewPostFetcher::fetch() { int total = std::accumulate(vkState.begin(), vkState.end(), 0, addPostCount) + std::accumulate(tgState.begin(), tgState.end(), 0, addPostCount); - merged.reserve(total); + merged.reserve(total + mgr->m_unprocessedTgPosts.size()); std::vector indexes; indexes.reserve(nLists); @@ -241,6 +258,28 @@ void NewPostFetcher::fetch() { --indexes[minPostListIdx]; ++k; } + + while (mgr->m_unprocessedTgPosts.size() > 0) { + AbstractPost tgPost = mgr->m_unprocessedTgPosts.front(); + bool duplicate = false, inserted = false; + for (int i = 0; i < merged.size(); ++i) { + if (merged[i].date == tgPost.date) { + spdlog::debug("not inserting duplicate unproc tg post"); + duplicate = true; + break; + } else if (merged[i].date > tgPost.date) { + spdlog::debug("inserting unproc tg post at pos {}", i); + merged.insert(merged.begin() + i, std::move(tgPost)); + inserted = true; + break; + } + } + if (!duplicate && !inserted) { + spdlog::debug("appending unproc tg post"); + merged.push_back(std::move(tgPost)); + } + mgr->m_unprocessedTgPosts.pop(); + } onDone(std::move(merged)); } } @@ -268,7 +307,7 @@ void NewPostFetcher::check_vk_posts(int index, std::vector posts) { spdlog::debug("[vk:{}] last loaded post date is now {}", index, state.posts[0].date); appState.lastLoadedPostDate = state.posts[0].date; } - fetch(); + continue_fetch(); } void NewPostFetcher::check_tg_posts(int index, std::vector> posts) { @@ -293,7 +332,7 @@ void NewPostFetcher::check_tg_posts(int index, std::vectoronDone = [this, f](auto posts){ - delete f; - on_new_posts(posts); - }; - - f->onError = [f](){ - delete f; - }; - - f->fetch(); + spdlog::info("checking all sources"); + m_fetcher.fetch( + true, true, + [this](auto posts){ + on_new_posts(posts); + }, + [](){ + // TODO error handling + spdlog::error("first post check failed"); + }); } void RepostManager::on_new_posts(std::vector posts) { spdlog::info("collected {} new posts", posts.size()); enqueue_for_repost(posts); - spdlog::info("scheduling next check"); - uv_timer_start(m_checkTimer, &RepostManager::check_timer_callback, VK_CHECK_INTERVAL, 0); + if (!m_checkTimerStarted) { + spdlog::info("scheduling next check"); + uv_timer_start(m_checkTimer, &RepostManager::check_timer_callback, VK_CHECK_INTERVAL, 0); + m_checkTimerStarted = true; + } } void RepostManager::collect_all_vk_posts(const std::variant wall, std::function)> callback) { @@ -421,6 +461,19 @@ bool RepostManager::drop_posts_older_than(std::vector &posts, long } } +std::optional to_abstract_post(const vk::Post &post, int sourceIndex) { + return { AbstractPost(posts::SRC_VK, sourceIndex, post.id, post.date, post.text) }; +} + +std::optional to_abstract_post(const td_api::message &post, int sourceIndex) { + if (post.content_->get_id() == td_api::messageText::ID) { + auto &content = (td_api::messageText&) *post.content_; + return { AbstractPost(posts::SRC_TELEGRAM, sourceIndex, post.id_, post.date_, content.text_->text_) }; + } else { + return {}; + } +} + std::vector RepostManager::to_abstract_posts(std::vector &posts, int sourceIndex) { std::vector result; result.reserve(posts.size()); @@ -474,21 +527,23 @@ void RepostManager::check_timer_callback(uv_timer_t *h) { self->recheck_vk_posts({}); } -void RepostManager::recheck_vk_posts(std::function onDone) { +bool RepostManager::recheck_vk_posts(std::function onDone) { + if (m_fetcher.working) { + spdlog::error("can't recheck VK posts: another check is in progress"); + return false; + } spdlog::info("checking VK posts"); - NewPostFetcher *f = new NewPostFetcher(this, true, false); - f->onDone = [this, f, onDone](std::vector &&posts){ + auto onFetchDone = [this, onDone](std::vector &&posts){ spdlog::info("checked VK posts"); this->on_new_posts(posts); if (onDone) onDone(); - delete f; }; - f->onError = [f](){ - delete f; + auto onFetchError = [](){ spdlog::error("failed to check VK posts"); }; - f->fetch(); + m_fetcher.fetch(true, false, onFetchDone, onFetchError); + return true; } void RepostManager::repost(AbstractPost &post) { @@ -530,11 +585,25 @@ void RepostManager::on_tg_message(td_api::updateNewMessage &update) { if (sourceIndex == m_appConfig->tgSources.size()) { return; } + std::optional post = to_abstract_post(*update.message_, sourceIndex); + if (!post) { + spdlog::debug("tg message is not a post"); + return; + } + + spdlog::debug("adding tg post to the unprocessed tg post queue"); + m_unprocessedTgPosts.push(*post); - uv_timer_stop(m_checkTimer); - std::vector> v; - v.push_back(std::move(update.message_)); - recheck_vk_posts([this, post = to_abstract_posts(v, sourceIndex)](){ - on_new_posts(post); - }); + if (!m_fetcher.working) { + if (m_checkTimerStarted) { + uv_timer_stop(m_checkTimer); + m_checkTimerStarted = false; + } + std::vector posts = { *post }; + spdlog::debug("rechecking vk posts before processing the new tg post"); + recheck_vk_posts([this, posts](){ + spdlog::debug("processing the new tg post"); + on_new_posts(posts); + }); + } } diff --git a/manager.h b/manager.h index 6258207..a3c2f60 100644 --- a/manager.h +++ b/manager.h @@ -3,6 +3,7 @@ #include "config.h" #include "posts.h" #include "state.h" +#include "td/tl/TlObject.h" #include "tg.h" #include "vk.h" #include @@ -25,13 +26,17 @@ namespace manager { std::vector posts; }; + bool working = false; RepostManager *mgr; std::vector vkState, tgState; std::function&&)> onDone; std::function onError; - NewPostFetcher(RepostManager *m, bool fetchVk, bool fetchTg); - void fetch(); + inline NewPostFetcher(RepostManager *m) : mgr(m) {}; + void fetch(bool fetchVk, bool fetchTg, decltype(onDone) onDone, decltype(onError) onError); + private: + void reset_state(); + void continue_fetch(); void check_vk_posts(int index, std::vector posts); void check_tg_posts(int index, std::vector> posts); }; @@ -48,7 +53,7 @@ namespace manager { void load_more_telegram_chats(); void on_new_posts(std::vector posts); void on_tg_message(td_api::updateNewMessage &update); - void recheck_vk_posts(std::function onDone); + bool recheck_vk_posts(std::function onDone); void collect_all_vk_posts(const std::variant wall, std::function)> callback); void collect_all_tg_posts(long channel, std::function>)> callback); @@ -62,6 +67,8 @@ namespace manager { bool drop_posts_older_than(std::vector &posts, long lastPostId); + std::optional to_abstract_post(const vk::Post &post, int sourceIndex); + std::optional to_abstract_post(const td_api::message &post, int sourceIndex); std::vector to_abstract_posts(std::vector &posts, int sourceIndex); std::vector to_abstract_posts(std::vector> &posts, int sourceIndex); @@ -74,8 +81,11 @@ namespace manager { config::AppConfig *m_appConfig; vk::VKClient m_vk; tg::TelegramClient m_tg; + NewPostFetcher m_fetcher; std::queue m_repostQueue; + std::queue m_unprocessedTgPosts; uv_timer_t *m_repostTimer = nullptr; + bool m_checkTimerStarted = false; uv_timer_t *m_checkTimer = nullptr; int m_nRequiredChats; int m_nLoadedRequiredChats = 0;