#include "manager.h" #include "posts.h" #include "spdlog/spdlog.h" #include "state.h" #include "td/telegram/td_api.h" #include "uv.h" #include "vk.h" #include #include #include #include using namespace manager; const unsigned long REPOST_INTERVAL = 2000; const unsigned long VK_CHECK_INTERVAL = 600000; RepostManager::RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config) : m_vk(eventLoop), m_tg(eventLoop, config->tgApiId, config->tgApiHash, config->tgPhoneNumber) { m_appState = appState; m_appConfig = config; m_tg.authCodeProvider = tgCodeProvider; m_tg.passwordProvider = tgPasswordProvider; m_vk.set_service_api_key(config->vkServiceKey); m_repostTimer = new uv_timer_t; uv_timer_init(eventLoop, m_repostTimer); m_repostTimer->data = this; m_checkTimer = new uv_timer_t; uv_timer_init(eventLoop, m_checkTimer); m_checkTimer->data = this; } RepostManager::~RepostManager() { if (m_repostTimer) { uv_timer_stop(m_repostTimer); uv_close((uv_handle_t*)m_repostTimer, [](uv_handle_t *h){ delete h; }); } } void RepostManager::load_more_telegram_chats() { m_tg.send_query(td_api::make_object(td_api::make_object(), 1000), [this](auto result){ if (result->get_id() == td_api::ok::ID) { spdlog::info("loading more Telegram chats..."); load_more_telegram_chats(); } else if (result->get_id() == td_api::error::ID) { spdlog::info("loaded all chats"); on_clients_ready(); } }); } void RepostManager::start() { m_tg.add_update_handler([this](void*, td_api::Object &obj){ if (obj.get_id() == td_api::updateAuthorizationState::ID) { auto &authState = (td_api::updateAuthorizationState&)obj; if (authState.authorization_state_->get_id() == td_api::authorizationStateReady::ID) { spdlog::info("loading Telegram chats..."); load_more_telegram_chats(); } } else if (obj.get_id() == td_api::updateNewChat::ID) { auto &update = (td_api::updateNewChat&)obj; if (update.chat_->id_ == m_appConfig->tgDestinationId) spdlog::info("destination chat loaded"); if (update.chat_->id_ == m_appConfig->tgSourceId) spdlog::info("source chat loaded"); } else if (obj.get_id() == td_api::updateNewMessage::ID) { auto &update = (td_api::updateNewMessage&)obj; if (update.message_->chat_id_ == m_appConfig->tgSourceId) { on_tg_message(update); } } }); spdlog::info("starting Telegram authentication"); m_tg.start(); } NewPostFetcher::NewPostFetcher(RepostManager *m, bool fetchVk, bool fetchTg) : mgr(m) { if (!fetchVk) vkState.ready = true; if (!fetchTg) tgState.ready = true; } void NewPostFetcher::fetch() { if (vkState.ready && tgState.ready) { onDone(std::move(vkState.posts), std::move(tgState.posts)); return; } if (!vkState.ready && vkState.needRequest) { if (mgr->m_appState->vkLastLoadedPostId != 0) { spdlog::info("fetching {} VK posts at offset {}", vkState.count, vkState.offset); vkState.needRequest = false; mgr->collect_vk_posts_from(vkState.offset, vkState.count, [this](auto posts){ vkState.offset += posts.size(); vkState.count = vkState.count * 3 / 2; check_vk_posts(posts); }); } else { spdlog::info("fetching all VK posts"); vkState.needRequest = false; mgr->collect_all_vk_posts([this](auto posts){ spdlog::info("fetched all {} VK posts", posts.size()); if (posts.size() > 0) { spdlog::info("last vk post id is now {}", posts[0].id); mgr->m_appState->vkLastLoadedPostId = posts[0].id; } vkState.ready = true; std::vector aposts = mgr->to_abstract_posts(posts); vkState.posts.reserve(vkState.posts.size() + aposts.size()); for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { vkState.posts.emplace_back(std::move(*i)); } fetch(); }); } } if (!tgState.ready && tgState.needRequest) { if (mgr->m_appState->tgLastLoadedPostId != 0) { spdlog::info("fetching {} TG posts starting from #{}", tgState.count, tgState.offset); tgState.needRequest = false; mgr->collect_tg_posts_from(tgState.offset, tgState.count, [this](auto posts){ if (posts.empty()) return; tgState.offset += posts[posts.size() - 1]->id_; check_tg_posts(std::move(posts)); }); tgState.count = tgState.count * 3 / 2; } else { tgState.needRequest = false; mgr->collect_all_tg_posts([this](auto posts){ spdlog::info("fetched all {} TG posts", posts.size()); if (posts.size() > 0) { spdlog::info("last telegram post id is now {}", posts[0]->id_); mgr->m_appState->tgLastLoadedPostId = posts[0]->id_; } tgState.ready = true; std::vector aposts = mgr->to_abstract_posts(posts); tgState.posts.reserve(tgState.posts.size() + aposts.size()); for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { tgState.posts.emplace_back(std::move(*i)); } fetch(); }); } } } void NewPostFetcher::check_vk_posts(std::vector posts) { spdlog::info("fetched {} VK posts", posts.size()); long oldLastPostId = mgr->m_appState->vkLastLoadedPostId; if (posts.size() > 0) { spdlog::info("last vk post id is now {}", posts[0].id); } std::vector aposts = mgr->to_abstract_posts(posts); spdlog::info("looking for id {}, have {} - {}", oldLastPostId, aposts[0].id, aposts[aposts.size() - 1].id); if (mgr->drop_posts_older_than(aposts, oldLastPostId)) { spdlog::info("found last remembered VK post"); vkState.ready = true; } vkState.posts.reserve(vkState.posts.size() + aposts.size()); for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { vkState.posts.emplace_back(std::move(*i)); } vkState.needRequest = true; if (vkState.ready && !vkState.posts.empty()) { spdlog::debug("last loaded vk post id is now {}", vkState.posts[0].id); mgr->m_appState->vkLastLoadedPostId = vkState.posts[0].id; } fetch(); } void NewPostFetcher::check_tg_posts(std::vector> posts) { spdlog::info("fetched {} TG posts", posts.size()); long oldLastPostId = mgr->m_appState->tgLastLoadedPostId; if (posts.size() > 0) { spdlog::info("last telegram post id is now {}", posts[0]->id_); } std::vector aposts = mgr->to_abstract_posts(posts); if (mgr->drop_posts_older_than(aposts, oldLastPostId)) { spdlog::info("found last remembered TG post"); tgState.ready = true; } tgState.posts.reserve(tgState.posts.size() + aposts.size()); for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) { tgState.posts.emplace_back(std::move(*i)); } tgState.needRequest = true; if (tgState.ready && !tgState.posts.empty()) { spdlog::debug("last loaded tg post id is now {}", tgState.posts[0].id); mgr->m_appState->tgLastLoadedPostId = tgState.posts[0].id; } fetch(); } void RepostManager::on_clients_ready() { m_appState->vkLastLoadedPostId = m_appState->vkLastPostId; m_appState->tgLastLoadedPostId = m_appState->tgLastPostId; NewPostFetcher *f = new NewPostFetcher(this, true, true); f->onDone = [this, f](auto vkPosts, auto tgPosts){ delete f; on_new_posts(vkPosts, tgPosts); }; f->onError = [f](){ delete f; }; f->fetch(); } void RepostManager::on_new_posts(std::vector vkPosts, std::vector tgPosts) { spdlog::info("collected {} new vk posts and {} new tg posts", vkPosts.size(), tgPosts.size()); std::vector mergedPosts; int totalSize = vkPosts.size() + tgPosts.size(); mergedPosts.reserve(totalSize); int vkIdx = vkPosts.size() - 1; int tgIdx = tgPosts.size() - 1; for (int i = 0; i < totalSize; ++i) { if (tgIdx < 0 || vkIdx >= 0 && vkPosts[vkIdx].date < tgPosts[tgIdx].date) { mergedPosts.emplace_back(std::move(vkPosts[vkIdx--])); } else { mergedPosts.emplace_back(std::move(tgPosts[tgIdx--])); } } spdlog::info("sorted {} posts", totalSize); enqueue_for_repost(mergedPosts); spdlog::info("scheduling next check"); uv_timer_start(m_checkTimer, &RepostManager::check_timer_callback, VK_CHECK_INTERVAL, 0); } void RepostManager::collect_all_vk_posts(std::function)> callback) { collect_last_vk_posts(std::numeric_limits::max(), callback); } void RepostManager::collect_last_vk_posts(int count, std::function)> callback) { collect_vk_posts_from(0, count, callback); } void RepostManager::collect_all_tg_posts(std::function>)> callback) { collect_last_tg_posts(std::numeric_limits::max(), callback); } void RepostManager::collect_last_tg_posts(int count, std::function>)> callback) { collect_tg_posts_from(0, count, callback); } void RepostManager::collect_vk_posts_from(int offset, int count, std::function)> callback) { spdlog::debug("COLLECT VK POSTS FROM {} {}", offset, count); auto result = std::make_shared>(); collect_vk_posts_from__intermediate(offset, count, result, callback); } void RepostManager::collect_vk_posts_from__intermediate(int offset, int count, std::shared_ptr> intermediateResult, std::function)> callback) { m_vk.get_posts(m_appConfig->vkSource, offset, count, [=, this](std::optional chunk, int err){ if (err == 0) { if (chunk->posts.size() == 0) { spdlog::debug("got all posts"); callback(std::move(*intermediateResult.get())); return; } int chunkSize = chunk->posts.size(); int oldSize = intermediateResult->size(); intermediateResult->reserve(oldSize + chunkSize); for (auto i = chunk->posts.begin(), end = chunk->posts.end(); i != end; ++i) { intermediateResult->emplace_back(std::move(*i)); } if (count > chunkSize) { collect_vk_posts_from__intermediate(offset + chunkSize, count - chunkSize, intermediateResult, callback); } else { callback(std::move(*intermediateResult.get())); } } else { spdlog::error("failed to get {} VK posts at offset {}: error {}", count, offset, err); } }); } void RepostManager::collect_tg_posts_from(long from, int count, std::function>)> callback) { spdlog::debug("collecting {} telegram posts starting from {}", count, from); auto result = std::make_shared>>(); collect_tg_posts_from__intermediate(from, count, result, callback); } void RepostManager::collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr>> intermediateResult, std::function>)> callback) { spdlog::debug("getting some messages ({})", count); m_tg.send_query(td_api::make_object(m_appConfig->tgSourceId, from, 0, count, false), [=, this](auto obj){ if (obj->get_id() == td_api::messages::ID) { td_api::messages &msgs = (td_api::messages&)*obj; if (msgs.messages_.size() == 0) { spdlog::debug("got all posts"); auto resultPtr = intermediateResult.get(); callback(std::move(*resultPtr)); return; } spdlog::debug("got {} posts", msgs.messages_.size()); size_t chunkSize = msgs.messages_.size(); size_t oldSize = intermediateResult->size(); long oldestId; intermediateResult->reserve(oldSize + chunkSize); for (auto i = msgs.messages_.begin(), end = msgs.messages_.end(); i != end; ++i) { oldestId = (*i)->id_; intermediateResult->emplace_back(std::move(*i)); } if (count > chunkSize) { collect_tg_posts_from__intermediate(oldestId, count - chunkSize, intermediateResult, callback); } else { callback(std::move(*intermediateResult.get())); } } else { auto &err = (td_api::error&)*obj; spdlog::error("failed to get posts: {} {}", err.code_, err.message_); } }); } bool RepostManager::drop_posts_older_than(std::vector &posts, long lastPostId) { auto idx = std::find_if(posts.begin(), posts.end(), [lastPostId](auto &post){ return post.id == lastPostId; }); if (idx == posts.end()) { return false; } else { posts.erase(idx, posts.end()); return true; } } std::vector RepostManager::to_abstract_posts(std::vector &posts) { std::vector result; result.reserve(posts.size()); for (auto &post : posts) { result.emplace_back(posts::SRC_VK, post.id, post.date, post.text); } return result; } std::vector RepostManager::to_abstract_posts(std::vector> &posts) { std::vector result; result.reserve(posts.size()); for (auto &post : posts) { // we don't want any posts other than plain text (yet) if (post->content_->get_id() == td_api::messageText::ID) { auto &content = (td_api::messageText&) *post->content_; result.emplace_back(posts::SRC_TELEGRAM, post->id_, post->date_, content.text_->text_); } } return result; } // TODO timer void RepostManager::enqueue_for_repost(std::vector posts) { for (auto &post : posts) { if (m_repostQueue.empty()) { uv_timer_start(m_repostTimer, &RepostManager::repost_timer_callback, REPOST_INTERVAL, REPOST_INTERVAL); } m_repostQueue.push(post); } } void RepostManager::repost_timer_callback(uv_timer_t *h) { spdlog::debug("repost timer"); auto self = reinterpret_cast(h->data); if (!self->m_repostQueue.empty()) { self->repost(self->m_repostQueue.front()); self->m_repostQueue.pop(); } if (self->m_repostQueue.empty()) { spdlog::debug("repost queue empty, stopping timer"); uv_timer_stop(h); } else { spdlog::debug("{} posts left to repost", self->m_repostQueue.size()); } } void RepostManager::check_timer_callback(uv_timer_t *h) { spdlog::debug("vk recheck timer"); auto self = reinterpret_cast(h->data); self->recheck_vk_posts({}); } void RepostManager::recheck_vk_posts(std::function onDone) { spdlog::info("checking VK posts"); NewPostFetcher *f = new NewPostFetcher(this, true, false); f->onDone = [this, f, onDone](std::vector &&vkPosts, std::vector &&tgPosts){ spdlog::info("checked VK posts"); this->on_new_posts(vkPosts, tgPosts); if (onDone) onDone(); delete f; }; f->onError = [f](){ delete f; spdlog::error("failed to check VK posts"); }; f->fetch(); } void RepostManager::repost(AbstractPost &post) { spdlog::debug("reposting (post length {})", post.text.length()); std::string_view signature = posts::add_signature(post, m_appConfig); int signatureStart = post.text.length() - signature.length(); int signatureLength = signature.length(); spdlog::debug("post length {}, signature start {}, signature length {}", post.text.length(), signatureStart, signatureLength); std::vector> entities; //entities.push_back(std::move(td_api::make_object(signatureStart, signatureLength, td_api::make_object()))); auto content = td_api::make_object(td_api::make_object(post.text, std::move(entities)), td_api::make_object(true, std::string(""), false, false, false), false); m_tg.send_query(td_api::make_object(m_appConfig->tgDestinationId, 0, nullptr, nullptr, nullptr, std::move(content)), [this, postId = post.id, src = post.source](auto result){ if (result->get_id() == td_api::error::ID) { auto &err = (td_api::error&)*result; spdlog::error("sendMessage error: {} {}", err.code_, err.message_); uv_timer_stop(m_repostTimer); } else { if (src == posts::SRC_VK) m_appState->vkLastPostId = postId; else m_appState->tgLastPostId = postId; } }); } void RepostManager::on_tg_message(td_api::updateNewMessage &update) { spdlog::info("received message from Telegram"); uv_timer_stop(m_checkTimer); std::vector> v; v.push_back(std::move(update.message_)); recheck_vk_posts([this, post = to_abstract_posts(v)](){ on_new_posts({}, post); }); }