#include "manager.h" #include "posts.h" #include "spdlog/spdlog.h" #include "state.h" #include "td/telegram/td_api.h" #include "uv.h" #include "vk.h" #include #include #include #include #include using namespace manager; const unsigned long REPOST_INTERVAL = 2000; const unsigned long VK_CHECK_INTERVAL = 600000; RepostManager::RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config) : m_vk(eventLoop), m_tg(eventLoop, config->tgApiId, config->tgApiHash, config->tgPhoneNumber), m_fetcher(this) { m_appState = appState; m_appConfig = config; m_tg.authCodeProvider = tgCodeProvider; m_tg.passwordProvider = tgPasswordProvider; m_vk.set_service_api_key(config->vkServiceKey); m_repostTimer = new uv_timer_t; uv_timer_init(eventLoop, m_repostTimer); m_repostTimer->data = this; m_checkTimer = new uv_timer_t; uv_timer_init(eventLoop, m_checkTimer); m_checkTimer->data = this; } RepostManager::~RepostManager() { if (m_repostTimer) { uv_timer_stop(m_repostTimer); uv_close((uv_handle_t*)m_repostTimer, [](uv_handle_t *h){ delete h; }); } if (m_checkTimer) { uv_timer_stop(m_checkTimer); m_checkTimerStarted = false; uv_close((uv_handle_t*)m_checkTimer, [](uv_handle_t *h){ delete h; }); } } void RepostManager::load_more_telegram_chats() { m_tg.send_query(td_api::make_object(td_api::make_object(), 100000), [this](auto result){ if (result->get_id() == td_api::ok::ID) { spdlog::info("loading more Telegram chats..."); load_more_telegram_chats(); } else if (result->get_id() == td_api::error::ID) { spdlog::info("loaded all chats"); on_clients_ready(); } }); } void RepostManager::start() { m_nRequiredChats = 100500;//m_appConfig->tgSources.size() + 1; for (auto &appState : m_appState->vkRepostState) { appState.lastLoadedPostDate = appState.lastForwardedPostDate; } for (auto &appState : m_appState->tgRepostState) { appState.lastLoadedPostDate = appState.lastForwardedPostDate; } m_tg.add_update_handler([this](void*, td_api::Object &obj){ if (obj.get_id() == td_api::updateAuthorizationState::ID) { auto &authState = (td_api::updateAuthorizationState&)obj; if (authState.authorization_state_->get_id() == td_api::authorizationStateReady::ID) { spdlog::info("loading Telegram chats..."); load_more_telegram_chats(); } } else if (obj.get_id() == td_api::updateNewChat::ID) { auto &update = (td_api::updateNewChat&)obj; if (update.chat_->id_ == m_appConfig->tgDestinationId) { ++m_nLoadedRequiredChats; spdlog::info("destination chat {} loaded (loaded {}/{} chats)", m_appConfig->tgDestinationId, m_nLoadedRequiredChats, m_nRequiredChats); } for (auto &src : m_appConfig->tgSources) { if (update.chat_->id_ == src.id) { ++m_nLoadedRequiredChats; spdlog::info("source chat {} loaded (loaded {}/{} chats)", src.id, m_nLoadedRequiredChats, m_nRequiredChats); } } if (m_nLoadedRequiredChats >= m_nRequiredChats) { spdlog::info("all chats loaded"); //on_clients_ready(); } } else if (obj.get_id() == td_api::updateNewMessage::ID) { auto &update = (td_api::updateNewMessage&)obj; for (auto &src : m_appConfig->tgSources) { if (update.message_->chat_id_ == src.id) { on_tg_message(update); } } } }); spdlog::info("starting Telegram authentication"); m_tg.start(); } void NewPostFetcher::fetch(bool fetchVk, bool fetchTg, decltype(onDone) onDone, decltype(onError) onError) { if (working) return; working = true; if (fetchVk) { for (int i = 0; i < mgr->m_appConfig->vkSources.size(); ++i) { fetcher_state &&state {}; state.sourceIndex = i; vkState.emplace_back(state); } } if (fetchTg) { for (int i = 0; i < mgr->m_appConfig->tgSources.size(); ++i) { fetcher_state &&state {}; state.sourceIndex = i; tgState.emplace_back(state); } } this->onDone = onDone; this->onError = onError; continue_fetch(); } void NewPostFetcher::reset_state() { if (!working) return; for (int i = 0; i < vkState.size(); ++i) { vkState[i] = fetcher_state(); } for (int i = 0; i < tgState.size(); ++i) { tgState[i] = fetcher_state(); } } void NewPostFetcher::continue_fetch() { bool vkReady = true; spdlog::info("fetch called"); for (int i = 0; i < vkState.size(); ++i) { auto &state = vkState[i]; if (state.ready) continue; vkReady = false; if (state.needRequest) { auto &wall = mgr->m_appConfig->vkSources[state.sourceIndex].id; if (mgr->m_appState->vkRepostState[state.sourceIndex].lastLoadedPostDate != 0) { spdlog::info("[vk:{}] fetching {} VK posts at offset {}", i, state.count, state.offset); state.needRequest = false; mgr->collect_vk_posts_from(wall, state.offset, state.count, [&, i](auto posts) mutable { for (auto &p : posts) { spdlog::debug("[vk:{}] got post dated {}", i, p.date); } state.offset += posts.size(); state.count = state.count * 3 / 2; check_vk_posts(i, posts); }); } else { spdlog::info("fetching all VK posts"); state.needRequest = false; mgr->collect_all_vk_posts(wall, [&, i](auto posts) mutable { spdlog::info("[vk:{}] fetched all {} posts", i, posts.size()); if (posts.size() > 0) { spdlog::info("[vk:{}] last loaded post date is now {}", i, posts[0].date); mgr->m_appState->vkRepostState[state.sourceIndex].lastLoadedPostDate = posts[0].date; } state.ready = true; std::vector aposts = mgr->to_abstract_posts(posts, state.sourceIndex); state.posts.reserve(state.posts.size() + aposts.size()); for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { state.posts.emplace_back(std::move(*i)); } continue_fetch(); }); } } } bool tgReady = true; for (int i = 0; i < tgState.size(); ++i) { auto &state = tgState[i]; if (state.ready) continue; tgReady = false; if (state.needRequest) { long channel = mgr->m_appConfig->tgSources[state.sourceIndex].id; if (mgr->m_appState->tgRepostState[state.sourceIndex].lastLoadedPostDate != 0) { spdlog::info("[tg:{}] fetching {} posts starting from #{}", i, state.count, state.offset); state.needRequest = false; mgr->collect_tg_posts_from(channel, state.offset, state.count, [this, i, &state](auto posts) mutable { for (auto &p : posts) { spdlog::debug("[tg:{}] got post dated {}", i, p->date_); } if (posts.size() > 0) { state.offset = posts[posts.size() - 1]->id_; spdlog::info("[tg:{}] setting from to id {}", i, posts[posts.size() - 1]->id_); } check_tg_posts(i, std::move(posts)); }); state.count = state.count * 3 / 2; } else { state.needRequest = false; mgr->collect_all_tg_posts(channel, [this, i, &state](auto posts) mutable { spdlog::info("[tg:{}] fetched all {} posts", i, posts.size()); if (posts.size() > 0) { spdlog::info("[tg:{}] last loaded post date is now {}", i, posts[0]->date_); mgr->m_appState->tgRepostState[state.sourceIndex].lastLoadedPostDate = posts[0]->date_; } state.ready = true; std::vector aposts = mgr->to_abstract_posts(posts, state.sourceIndex); state.posts.reserve(state.posts.size() + aposts.size()); for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { state.posts.emplace_back(std::move(*i)); } continue_fetch(); }); } } } if (tgReady && vkReady) { std::vector merged; int nVkLists = vkState.size(); int nTgLists = tgState.size(); int nLists = nVkLists + nTgLists; auto addPostCount = [](int n, fetcher_state &p){ return n + p.posts.size(); }; int total = std::accumulate(vkState.begin(), vkState.end(), 0, addPostCount) + std::accumulate(tgState.begin(), tgState.end(), 0, addPostCount); merged.reserve(total + mgr->m_unprocessedTgPosts.size()); std::vector indexes; indexes.reserve(nLists); for (int i = 0; i < nVkLists; ++i) { indexes.push_back(vkState[i].posts.size() - 1); } for (int i = 0; i < nTgLists; ++i) { indexes.push_back(tgState[i].posts.size() - 1); } int k = 0; while (k < total) { long minPostDate = std::numeric_limits::max(); int minPostListIdx; for (int i = 0; i < nVkLists; ++i) { if (indexes[i] < 0) continue; long d = vkState[i].posts[indexes[i]].date; if (d < minPostDate) { minPostDate = d; minPostListIdx = i; } } for (int i = 0; i < nTgLists; ++i) { if (indexes[nVkLists + i] < 0) continue; long d = tgState[i].posts[indexes[nVkLists + i]].date; if (d < minPostDate) { minPostDate = d; minPostListIdx = nVkLists + i; } } if (minPostListIdx < nVkLists) { merged.emplace_back(std::move(vkState[minPostListIdx].posts[indexes[minPostListIdx]])); } else { merged.emplace_back(std::move(tgState[minPostListIdx - nVkLists].posts[indexes[minPostListIdx]])); } --indexes[minPostListIdx]; ++k; } while (mgr->m_unprocessedTgPosts.size() > 0) { AbstractPost tgPost = mgr->m_unprocessedTgPosts.front(); bool duplicate = false, inserted = false; for (int i = 0; i < merged.size(); ++i) { if (merged[i].date == tgPost.date) { spdlog::debug("not inserting duplicate unproc tg post"); duplicate = true; break; } else if (merged[i].date > tgPost.date) { spdlog::debug("inserting unproc tg post at pos {}", i); merged.insert(merged.begin() + i, std::move(tgPost)); inserted = true; break; } } if (!duplicate && !inserted) { spdlog::debug("appending unproc tg post"); merged.push_back(std::move(tgPost)); } mgr->m_unprocessedTgPosts.pop(); } working = false; onDone(std::move(merged)); } } void NewPostFetcher::check_vk_posts(int index, std::vector posts) { spdlog::info("[vk:{}] fetched {} posts", index, posts.size()); auto &state = vkState[index]; auto &appState = mgr->m_appState->vkRepostState[state.sourceIndex]; long oldLastPostDate = appState.lastLoadedPostDate; if (posts.size() > 0) { spdlog::info("[vk:{}] last post date is now {}", index, posts[0].date); std::vector aposts = mgr->to_abstract_posts(posts, state.sourceIndex); spdlog::info("[vk:{}] looking for date {}, have {} - {}", index, oldLastPostDate, aposts[0].date, aposts[aposts.size() - 1].date); if (mgr->drop_posts_older_than(aposts, oldLastPostDate)) { spdlog::info("[vk:{}] found last remembered post", index); state.ready = true; } state.posts.reserve(state.posts.size() + aposts.size()); for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { state.posts.emplace_back(std::move(*i)); } state.needRequest = true; if (state.ready && !state.posts.empty()) { spdlog::debug("[vk:{}] last loaded post date is now {}", index, state.posts[0].date); appState.lastLoadedPostDate = state.posts[0].date; } } else { state.ready = true; } continue_fetch(); } void NewPostFetcher::check_tg_posts(int index, std::vector> posts) { spdlog::info("[tg:{}] fetched {} posts", index, posts.size()); auto &state = tgState[index]; auto &appState = mgr->m_appState->tgRepostState[state.sourceIndex]; long oldLastPostDate = appState.lastLoadedPostDate; if (posts.size() > 0) { spdlog::info("[tg:{}] last post date is now {}", index, posts[0]->date_); std::vector aposts = mgr->to_abstract_posts(posts, state.sourceIndex); if (mgr->drop_posts_older_than(aposts, oldLastPostDate)) { spdlog::info("[tg:{}] found last remembered post", index); state.ready = true; } state.posts.reserve(state.posts.size() + aposts.size()); for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { state.posts.emplace_back(std::move(*i)); } state.needRequest = true; if (state.ready && !state.posts.empty()) { spdlog::debug("[tg:{}] last loaded post date is now {}", index, state.posts[0].date); appState.lastLoadedPostDate = state.posts[0].id; } } else { state.ready = true; } continue_fetch(); } void RepostManager::on_clients_ready() { spdlog::info("checking all sources"); m_fetcher.fetch( true, true, [this](auto posts){ on_new_posts(posts); }, [](){ // TODO error handling spdlog::error("first post check failed"); }); } void RepostManager::on_new_posts(std::vector posts) { spdlog::info("collected {} new posts", posts.size()); enqueue_for_repost(posts); if (!m_checkTimerStarted) { spdlog::info("scheduling next check"); uv_timer_start(m_checkTimer, &RepostManager::check_timer_callback, VK_CHECK_INTERVAL, 0); m_checkTimerStarted = true; } } void RepostManager::collect_all_vk_posts(const std::variant wall, std::function)> callback) { collect_last_vk_posts(wall, std::numeric_limits::max(), callback); } void RepostManager::collect_last_vk_posts(const std::variant wall, int count, std::function)> callback) { collect_vk_posts_from(wall, 0, count, callback); } void RepostManager::collect_all_tg_posts(long channel, std::function>)> callback) { collect_last_tg_posts(channel, std::numeric_limits::max(), callback); } void RepostManager::collect_last_tg_posts(long channel, int count, std::function>)> callback) { collect_tg_posts_from(channel, 0, count, callback); } void RepostManager::collect_vk_posts_from(const std::variant wall, int offset, int count, std::function)> callback) { spdlog::debug("COLLECT VK POSTS FROM {} {}", offset, count); auto result = std::make_shared>(); collect_vk_posts_from__intermediate(wall, offset, count, result, callback); } void RepostManager::collect_vk_posts_from__intermediate(const std::variant wall, int offset, int count, std::shared_ptr> intermediateResult, std::function)> callback) { m_vk.get_posts(wall, offset, count, [=, this](std::optional chunk, int err){ if (err == 0) { if (chunk->posts.size() == 0) { spdlog::debug("got all posts"); callback(std::move(*intermediateResult.get())); return; } int chunkSize = chunk->posts.size(); int oldSize = intermediateResult->size(); intermediateResult->reserve(oldSize + chunkSize); for (auto i = chunk->posts.begin(), end = chunk->posts.end(); i != end; ++i) { intermediateResult->emplace_back(std::move(*i)); } if (count > chunkSize) { collect_vk_posts_from__intermediate(wall, offset + chunkSize, count - chunkSize, intermediateResult, callback); } else { callback(std::move(*intermediateResult.get())); } } else { spdlog::error("failed to get {} VK posts at offset {}: error {}", count, offset, err); } }); } void RepostManager::collect_tg_posts_from(long channel, long from, int count, std::function>)> callback) { spdlog::debug("collecting {} telegram posts starting from {}", count, from); auto result = std::make_shared>>(); collect_tg_posts_from__intermediate(channel, from, count, result, callback); } void RepostManager::collect_tg_posts_from__intermediate(long channel, long from, int count, std::shared_ptr>> intermediateResult, std::function>)> callback) { spdlog::debug("getting some messages ({})", count); m_tg.send_query(td_api::make_object(channel, from, 0, count, false), [=, this](auto obj){ if (obj->get_id() == td_api::messages::ID) { td_api::messages &msgs = (td_api::messages&)*obj; if (msgs.messages_.size() == 0) { spdlog::debug("got all posts"); auto resultPtr = intermediateResult.get(); callback(std::move(*resultPtr)); return; } spdlog::debug("got {} posts", msgs.messages_.size()); size_t chunkSize = msgs.messages_.size(); size_t oldSize = intermediateResult->size(); long oldestId; intermediateResult->reserve(oldSize + chunkSize); for (auto i = msgs.messages_.begin(), end = msgs.messages_.end(); i != end; ++i) { oldestId = (*i)->id_; intermediateResult->emplace_back(std::move(*i)); } if (count > chunkSize) { collect_tg_posts_from__intermediate(channel, oldestId, count - chunkSize, intermediateResult, callback); } else { callback(std::move(*intermediateResult.get())); } } else { auto &err = (td_api::error&)*obj; spdlog::error("failed to get posts: {} {}", err.code_, err.message_); } }); } bool RepostManager::drop_posts_older_than(std::vector &posts, long lastPostDate) { auto idx = std::find_if(posts.begin(), posts.end(), [lastPostDate](auto &post){ return post.date <= lastPostDate; }); if (idx == posts.end()) { return false; } else { posts.erase(idx, posts.end()); return true; } } std::optional RepostManager::to_abstract_post(const vk::Post &post, int sourceIndex) { return { AbstractPost(posts::SRC_VK, sourceIndex, post.id, post.date, post.text) }; } std::optional RepostManager::to_abstract_post(const td_api::message &post, int sourceIndex) { if (post.content_->get_id() == td_api::messageText::ID) { auto &content = (td_api::messageText&) *post.content_; return { AbstractPost(posts::SRC_TELEGRAM, sourceIndex, post.id_, post.date_, content.text_->text_) }; } else { return {}; } } std::vector RepostManager::to_abstract_posts(std::vector &posts, int sourceIndex) { std::vector result; result.reserve(posts.size()); for (auto &post : posts) { result.emplace_back(posts::SRC_VK, sourceIndex, post.id, post.date, post.text); } return result; } std::vector RepostManager::to_abstract_posts(std::vector> &posts, int sourceIndex) { std::vector result; result.reserve(posts.size()); for (auto &post : posts) { // we don't want any posts other than plain text (yet) if (post->content_->get_id() == td_api::messageText::ID) { auto &content = (td_api::messageText&) *post->content_; result.emplace_back(posts::SRC_TELEGRAM, sourceIndex, post->id_, post->date_, content.text_->text_); } } return result; } // TODO timer void RepostManager::enqueue_for_repost(std::vector posts) { for (auto &post : posts) { if (m_repostQueue.empty()) { uv_timer_start(m_repostTimer, &RepostManager::repost_timer_callback, REPOST_INTERVAL, REPOST_INTERVAL); } m_repostQueue.push(post); } } void RepostManager::repost_timer_callback(uv_timer_t *h) { spdlog::debug("repost timer"); auto self = reinterpret_cast(h->data); if (!self->m_repostQueue.empty()) { self->repost(self->m_repostQueue.front()); self->m_repostQueue.pop(); } if (self->m_repostQueue.empty()) { spdlog::debug("repost queue empty, stopping timer"); uv_timer_stop(h); } else { spdlog::debug("{} posts left to repost", self->m_repostQueue.size()); } } void RepostManager::check_timer_callback(uv_timer_t *h) { spdlog::debug("vk recheck timer"); auto self = reinterpret_cast(h->data); self->recheck_vk_posts({}); } bool RepostManager::recheck_vk_posts(std::function onDone) { if (m_fetcher.working) { spdlog::error("can't recheck VK posts: another check is in progress"); return false; } spdlog::info("checking VK posts"); auto onFetchDone = [this, onDone](std::vector &&posts){ spdlog::info("checked VK posts"); this->on_new_posts(posts); if (onDone) onDone(); }; auto onFetchError = [](){ spdlog::error("failed to check VK posts"); }; m_fetcher.fetch(true, false, onFetchDone, onFetchError); return true; } void RepostManager::repost(AbstractPost &post) { spdlog::debug("reposting (post length {})", post.text.length()); std::string_view signature = posts::add_signature(post, m_appConfig); int signatureStart = post.text.length() - signature.length(); int signatureLength = signature.length(); spdlog::debug("post length {}, signature start {}, signature length {}", post.text.length(), signatureStart, signatureLength); std::vector> entities; //entities.push_back(std::move(td_api::make_object(signatureStart, signatureLength, td_api::make_object()))); auto content = td_api::make_object(td_api::make_object(post.text, std::move(entities)), td_api::make_object(true, std::string(""), false, false, false), false); spdlog::info("reposting to {}", m_appConfig->tgDestinationId); m_tg.send_query(td_api::make_object(m_appConfig->tgDestinationId, 0, nullptr, nullptr, nullptr, std::move(content)), [this, postDate = post.date, srcType = post.sourceType, src = post.source](auto result){ if (result->get_id() == td_api::error::ID) { auto &err = (td_api::error&)*result; spdlog::error("sendMessage error: {} {}", err.code_, err.message_); uv_timer_stop(m_repostTimer); } else { if (srcType == posts::SRC_VK) m_appState->vkRepostState[src].lastForwardedPostDate = postDate; else m_appState->tgRepostState[src].lastForwardedPostDate = postDate; m_appState->save(); } }); } void RepostManager::on_tg_message(td_api::updateNewMessage &update) { spdlog::info("received message from Telegram"); int sourceIndex = std::distance( m_appConfig->tgSources.begin(), std::find_if(m_appConfig->tgSources.begin(), m_appConfig->tgSources.end(), [chatId = update.message_->chat_id_](auto &src) { return src.id == chatId; } ) ); if (sourceIndex == m_appConfig->tgSources.size()) { return; } std::optional post = to_abstract_post(*update.message_, sourceIndex); if (!post) { spdlog::debug("tg message is not a post"); return; } spdlog::debug("adding tg post to the unprocessed tg post queue"); m_unprocessedTgPosts.push(*post); if (!m_fetcher.working) { if (m_checkTimerStarted) { uv_timer_stop(m_checkTimer); m_checkTimerStarted = false; } std::vector posts = { *post }; spdlog::debug("rechecking vk posts before processing the new tg post"); recheck_vk_posts({}); } }