From e221d3624d2b29ee77122b66423531e6180cf9a9 Mon Sep 17 00:00:00 2001 From: Slavasil Date: Wed, 20 Nov 2024 14:12:01 +0300 Subject: [PATCH] add TelegramClient and RepostManager with functionality to fetch posts --- CMakeLists.txt | 2 +- main.cpp | 29 ++++++++ manager.cpp | 193 +++++++++++++++++++++++++++++++++++++++++++++++++ manager.h | 48 ++++++++++++ tg.cpp | 174 ++++++++++++++++++++++++++++++++++++++++++++ tg.h | 84 +++++++++++++++++++++ vk.cpp | 17 ++++- 7 files changed, 543 insertions(+), 4 deletions(-) create mode 100644 manager.cpp create mode 100644 manager.h create mode 100644 tg.cpp create mode 100644 tg.h diff --git a/CMakeLists.txt b/CMakeLists.txt index f62ebd5..c860817 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,7 +7,7 @@ add_subdirectory(libuv) add_subdirectory(spdlog) add_subdirectory(td) -add_executable(${PROJECT_NAME} main.cpp config.cpp http.cpp state.cpp vk.cpp) +add_executable(${PROJECT_NAME} main.cpp config.cpp http.cpp manager.cpp state.cpp tg.cpp vk.cpp) target_compile_options(${PROJECT_NAME} PRIVATE -std=c++2b) diff --git a/main.cpp b/main.cpp index a05653e..993dc84 100644 --- a/main.cpp +++ b/main.cpp @@ -1,6 +1,9 @@ +#include "config.h" +#include "manager.h" #include "spdlog/spdlog.h" #include "state.h" #include +#include #include #include #include @@ -18,6 +21,7 @@ void create_signal_handles(uv_loop_t *loop, uv_signal_t handles[2]) { int main() { uv_loop_t *loop = uv_default_loop(); + spdlog::set_level(spdlog::level::trace); uv_signal_t signalHandles[2] = {}; create_signal_handles(loop, signalHandles); @@ -31,6 +35,31 @@ int main() { return 1; } + config::AppConfig config("bridge_config.json"); + if (config.tgApiId == 0 || config.tgApiHash.empty() || config.tgPhoneNumber.empty() + || config.vkServiceKey.empty() || (config.vkSource.index() == 0 && std::get(config.vkSource) == 0) + || config.tgSourceId == 0 || config.tgDestinationId == 0) { + spdlog::error("incomplete config file"); + return 2; + } + + std::function)> tgAuthCodeProvider = [](auto set_code){ + spdlog::warn("/!\\ hanging event loop to request code"); + std::string code; + std::cin >> code; + set_code(code); + }; + + std::function)> tgPasswordProvider = [](auto set_password){ + spdlog::warn("/!\\ hanging event loop to request password"); + std::string password; + std::cin >> password; + set_password(password); + }; + + manager::RepostManager manager(loop, tgAuthCodeProvider, tgPasswordProvider, &state, &config); + manager.start(); + uv_run(loop, UV_RUN_DEFAULT); spdlog::info("event loop ended"); state.save(); diff --git a/manager.cpp b/manager.cpp new file mode 100644 index 0000000..b58820b --- /dev/null +++ b/manager.cpp @@ -0,0 +1,193 @@ +#include "manager.h" +#include "spdlog/spdlog.h" +#include "state.h" +#include "td/telegram/td_api.h" +#include "vk.h" +#include +#include +#include +#include + +using namespace manager; + +RepostManager::RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config) + : m_vk(eventLoop), m_tg(eventLoop, config->tgApiId, config->tgApiHash, config->tgPhoneNumber) { + m_appState = appState; + m_appConfig = config; + m_tg.authCodeProvider = tgCodeProvider; + m_tg.passwordProvider = tgPasswordProvider; + m_vk.set_service_api_key(config->vkServiceKey); +} + +void RepostManager::start() { + m_tg.add_update_handler([this](void*, td_api::Object &obj){ + if (obj.get_id() == td_api::updateAuthorizationState::ID) { + auto &authState = (td_api::updateAuthorizationState&)obj; + if (authState.authorization_state_->get_id() == td_api::authorizationStateReady::ID) { + //on_clients_ready(); + spdlog::info("loading Telegram chats..."); + m_tg.send_query(td_api::make_object(td_api::make_object(), 1000), [this](auto result){ + spdlog::debug("loadChats done"); + on_clients_ready(); + }); + } + } else if (obj.get_id() == td_api::updateNewChat::ID) { + auto &update = (td_api::updateNewChat&)obj; + //spdlog::debug("chat loaded: {}", update.chat_->title_); + } + }); + spdlog::info("starting Telegram authentication"); + m_tg.start(); +} + +void RepostManager::on_clients_ready() { + auto posts = std::make_shared< std::optional>[] >(2); + auto doWorkWithPosts = [this, posts]() -> void { + spdlog::info("fetched some posts:"); + + int limit = 3; + for (auto &i : *posts[0]) { + if (limit-- == 0) { + spdlog::info("..."); + break; + } + spdlog::info("vk[#{}, {}] {}", i.id, i.date, i.text); + } + limit = 3; + for (auto &i : *posts[1]) { + if (limit-- == 0) { + spdlog::info("..."); + break; + } + spdlog::info("tg[#{}, {}] {}", i.id, i.date, i.text); + } + + m_tg.send_query(td_api::make_object(m_appConfig->tgSourceId), [](auto result){ + if (result->get_id() == td_api::chat::ID) { + auto &chat = (td_api::chat&)*result; + spdlog::info("source chat: #{} {}", chat.id_, chat.title_); + } else { + auto &e = (td_api::error&)*result; + spdlog::error("getChat error: {} {}", e.code_, e.message_); + } + }); + }; + auto putVkPosts = [this, posts, doWorkWithPosts](std::vector vkPosts){ + posts[0] = {to_abstract_posts(vkPosts)}; + spdlog::info("fetched {} vk posts", posts[0]->size()); + if (posts[1].has_value()) { + doWorkWithPosts(); + } + }; + auto putTgPosts = [this, posts, doWorkWithPosts](std::vector> tgPosts){ + posts[1] = {to_abstract_posts(tgPosts)}; + spdlog::info("fetched {} telegram posts", posts[1]->size()); + if (posts[0].has_value()) { + doWorkWithPosts(); + } + }; + if (m_appState->vkLastPostId == 0) { + spdlog::info("fetching ALL vk posts..."); + collect_all_vk_posts(putVkPosts); + } else { + spdlog::info("fetching last 3 vk posts..."); + collect_last_vk_posts(3, putVkPosts); + } + + if (m_appState->tgLastPostId == 0) { + spdlog::info("fetching ALL telegram posts..."); + collect_all_tg_posts(putTgPosts); + } else { + spdlog::info("fetching last 3 telegram posts..."); + collect_last_tg_posts(3, putTgPosts); + } +} + +void RepostManager::collect_all_vk_posts(std::function)> callback) { + collect_last_vk_posts(std::numeric_limits::max(), callback); +} + +void RepostManager::collect_last_vk_posts(int count, std::function)> callback) { + collect_vk_posts_from(0, count, callback); +} + +void RepostManager::collect_all_tg_posts(std::function>)> callback) { + collect_last_tg_posts(std::numeric_limits::max(), callback); +} + +void RepostManager::collect_last_tg_posts(int count, std::function>)> callback) { + collect_tg_posts_from(0, count, callback); +} + +void RepostManager::collect_vk_posts_from(int offset, int count, std::function)> callback) { + m_vk.get_posts(m_appConfig->vkSource, offset, count, [=](std::optional chunk, int err){ + if (err == 0) { + callback(chunk->posts); + } else { + spdlog::error("failed to get {} VK posts at offset {}: error {}", count, offset, err); + } + }); +} + +void RepostManager::collect_tg_posts_from(long from, int count, std::function>)> callback) { + spdlog::debug("collecting {} telegram posts starting from {}", count, from); + auto result = std::make_shared>>(); + collect_tg_posts_from__intermediate(from, count, result, callback); +} + +void RepostManager::collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr>> intermediateResult, std::function>)> callback) { + spdlog::debug("getting some messages ({})", count); + m_tg.send_query(td_api::make_object(m_appConfig->tgSourceId, from, 0, count, false), [=, this](auto obj){ + if (obj->get_id() == td_api::messages::ID) { + td_api::messages &msgs = (td_api::messages&)*obj; + if (msgs.messages_.size() == 0) { + spdlog::debug("got all posts"); + auto resultPtr = intermediateResult.get(); + callback(std::move(*resultPtr)); + return; + } + spdlog::debug("got {} posts", msgs.messages_.size()); + size_t chunkSize = msgs.messages_.size(); + size_t oldSize = intermediateResult->size(); + long oldestId; + intermediateResult->reserve(oldSize + chunkSize); + for (auto i = msgs.messages_.begin(), end = msgs.messages_.end(); i != end; ++i) { + //spdlog::debug("moving message {}", (*i)->id_); + oldestId = (*i)->id_; + intermediateResult->emplace_back(std::move(*i)); + } + collect_tg_posts_from__intermediate(oldestId, count - chunkSize, intermediateResult, callback); + } else { + auto &err = (td_api::error&)*obj; + spdlog::error("failed to get posts: {} {}", err.code_, err.message_); + } + }); +} + +std::vector RepostManager::to_abstract_posts(std::vector &posts) { + std::vector result; + result.reserve(posts.size()); + for (auto &post : posts) { + result.emplace_back(post.id, post.date, post.text); + } + return result; +} + +std::vector RepostManager::to_abstract_posts(std::vector> &posts) { + std::vector result; + result.reserve(posts.size()); + for (auto &post : posts) { + // we don't want any posts other than plain text (yet) + if (post->content_->get_id() == td_api::messageText::ID) { + auto &content = (td_api::messageText&) *post->content_; + result.emplace_back(post->id_, post->date_, content.text_->text_); + } + } + return result; +} + +void RepostManager::repost_all(std::vector posts) { + for (auto &post : posts) { + // TODO + } +} \ No newline at end of file diff --git a/manager.h b/manager.h new file mode 100644 index 0000000..7d44b93 --- /dev/null +++ b/manager.h @@ -0,0 +1,48 @@ +#pragma once + +#include "config.h" +#include "state.h" +#include "tg.h" +#include "vk.h" +#include +#include +#include + +namespace manager { + namespace td_api = td::td_api; + + struct AbstractPost { + inline AbstractPost(long id, long date, std::string text) : id(id), date(date), text(text) {} + long id; + long date; + std::string text; + }; + + class RepostManager { + public: + RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config); + RepostManager(RepostManager&) = delete; + void start(); + private: + void on_clients_ready(); + + void collect_all_vk_posts(std::function)> callback); + void collect_all_tg_posts(std::function>)> callback); + void collect_last_vk_posts(int count, std::function)> callback); + void collect_last_tg_posts(int count, std::function>)> callback); + void collect_vk_posts_from(int offset, int count, std::function)> callback); + void collect_tg_posts_from(long from, int count, std::function>)> callback); + + void collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr>> intermediateResult, std::function>)> callback); + + std::vector to_abstract_posts(std::vector &posts); + std::vector to_abstract_posts(std::vector> &posts); + + void repost_all(std::vector posts); + + state::AppState *m_appState; + config::AppConfig *m_appConfig; + vk::VKClient m_vk; + tg::TelegramClient m_tg; + }; +} diff --git a/tg.cpp b/tg.cpp new file mode 100644 index 0000000..2baf625 --- /dev/null +++ b/tg.cpp @@ -0,0 +1,174 @@ +#include +#include + +#include + +#include "tg.h" +#include "td/telegram/td_api.h" + +using namespace tg; + +TelegramClient::TelegramClient(uv_loop_t *eventLoop, long apiId, std::string apiHash, std::string phoneNumber) + : m_eventLoop(eventLoop), m_apiId(apiId), m_apiHash(apiHash), m_phoneNumber(phoneNumber) {} + +bool TelegramClient::start() { + spdlog::debug("tg: start"); + if (m_running) return false; + m_running = true; + + spdlog::debug("tg: creating uv handle"); + m_uvHandle = new uv_async_t; + m_uvHandle->data = (void*)this; + uv_async_init(m_eventLoop, m_uvHandle, TelegramClient::uv_callback); + + spdlog::debug("tg: starting telegram thread"); + m_thread = std::thread([this](){ + run_thread(); + }); + return true; +} + +TelegramClient::~TelegramClient() { + if (m_thread.joinable()) { + send_query(td_api::make_object(), {}); + m_thread.join(); + + if (m_uvHandle) + uv_close((uv_handle_t*)m_uvHandle, TelegramClient::uv_close_handle_callback); + } +} + +void TelegramClient::run_thread() { + spdlog::debug("tg: i am telegram thread"); + td::ClientManager::execute(td_api::make_object(0)); + m_clientManager = std::make_unique(); + m_clientId = m_clientManager->create_client_id(); + send_query(td_api::make_object("version"), {}); + run_telegram_main_loop(); +} + +void TelegramClient::run_telegram_main_loop() { + spdlog::info("tg: begin telegram main loop"); + while (m_running) { + if (!process_response(m_clientManager->receive(500))) { + spdlog::info("tg: closed"); + return; + } + } +} + +void TelegramClient::send_query(td_api::object_ptr f, std::function callback) { + std::uint64_t query_id = m_nextQueryId++; + if (callback) { + m_handlers.emplace(query_id, std::move(callback)); + } + m_clientManager->send(m_clientId, query_id, std::move(f)); +} + +bool TelegramClient::process_response(td::ClientManager::Response response) { + if (!response.object) { + return true; + } + if (response.object->get_id() == td_api::updateAuthorizationState::ID) { + auto authUpdate = (td_api::updateAuthorizationState*) response.object.get(); + spdlog::debug("updateAuthorizationState: {}", authUpdate->authorization_state_->get_id()); + switch (authUpdate->authorization_state_->get_id()) { + case td_api::authorizationStateClosed::ID: + spdlog::debug("closed"); + return false; + case td_api::authorizationStateWaitTdlibParameters::ID: + spdlog::debug("setting tdlib parameters"); + set_tdlib_parameters(); + return true; + case td_api::authorizationStateWaitPhoneNumber::ID: + spdlog::debug("setting phone number"); + set_phone_number(); + return true; + case td_api::authorizationStateWaitPassword::ID: + spdlog::debug("setting password"); + if (passwordProvider) + passwordProvider(std::bind(&TelegramClient::set_password, this, std::placeholders::_1)); + case td_api::authorizationStateWaitCode::ID: + spdlog::debug("setting 2FA code"); + if (authCodeProvider) + authCodeProvider(std::bind(&TelegramClient::set_code, this, std::placeholders::_1)); + return true; + } + } + m_responseQueueMutex.lock(); + m_responseQueue.push_back(std::move(response)); + m_responseQueueMutex.unlock(); + + uv_async_send(m_uvHandle); + return true; +} + +void TelegramClient::set_tdlib_parameters() { + auto query = td_api::make_object(); + query->database_directory_ = "tdata"; + query->use_message_database_ = true; + query->use_secret_chats_ = false; + query->api_id_ = m_apiId; + query->api_hash_ = m_apiHash; + query->system_language_code_ = "en"; + query->device_model_ = "server"; + query->application_version_ = "0.1.0"; + send_query(std::move(query), {}); +} + +void TelegramClient::set_phone_number() { + auto query = td_api::make_object(); + query->phone_number_ = m_phoneNumber; + send_query(std::move(query), {}); +} + +void TelegramClient::set_code(std::string code) { + spdlog::debug("setting auth code {}", code); + send_query(td_api::make_object(code), {}); +} + +void TelegramClient::set_password(std::string password) { + spdlog::debug("setting password {}", password); + send_query(td_api::make_object(password), {}); +} + +void TelegramClient::uv_callback(uv_async_t *h) { + //spdlog::debug("tg: libuv callback"); + TelegramClient *self = (TelegramClient*)h->data; + + self->m_responseQueueMutex.lock(); + while (self->m_responseQueue.size() > 0) { + auto response = std::move(self->m_responseQueue.front()); + self->m_responseQueue.pop_front(); + if (response.request_id == 0) { + self->dispatch_update(response.object); + } else { + self->dispatch_response(response.request_id, std::move(response.object)); + } + } + self->m_responseQueueMutex.unlock(); +} + +void TelegramClient::dispatch_update(TgObject &update) { + //spdlog::debug("tg: update"); + for (auto i = m_updateHandlers.begin(); i != m_updateHandlers.end(); i++) { + i->first(i->second, *update.get()); + } +} + +void TelegramClient::dispatch_response(std::uint64_t queryId, TgObject response) { + spdlog::debug("tg: response to {}", queryId); + auto queryHandler = m_handlers.find(queryId); + if (queryHandler != m_handlers.end()) { + queryHandler->second(std::move(response)); + } +} + +void TelegramClient::uv_close_handle_callback(uv_handle_t *h) { + spdlog::debug("tg: closing uv handle"); + delete h; +} + +void TelegramClient::add_update_handler(std::function handler, void *context) { + m_updateHandlers.push_back({std::move(handler), context}); +} diff --git a/tg.h b/tg.h new file mode 100644 index 0000000..065f670 --- /dev/null +++ b/tg.h @@ -0,0 +1,84 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace tg { + namespace td_api = td::td_api; + using TgObject = td_api::object_ptr; + + namespace detail { + template + struct overload; + + template + struct overload : public F { + explicit overload(F f) : F(f) { + } + }; + template + struct overload + : public overload + , public overload { + overload(F f, Fs... fs) : overload(f), overload(fs...) { + } + using overload::operator(); + using overload::operator(); + }; + } // namespace detail + + typedef std::function)> AuthCodeProvider; + typedef std::function)> PasswordProvider; + + class TelegramClient { + public: + TelegramClient(uv_loop_t *eventLoop, long apiId, std::string apiHash, std::string phoneNumber); + ~TelegramClient(); + void add_update_handler(std::function handler, void *context = nullptr); + void send_query(td_api::object_ptr f, std::function callback); + bool start(); + + AuthCodeProvider authCodeProvider; + PasswordProvider passwordProvider; + private: + static void uv_callback(uv_async_t *h); + static void uv_close_handle_callback(uv_handle_t *h); + void run_thread(); + void run_telegram_main_loop(); + bool process_response(td::ClientManager::Response response); + void dispatch_response(std::uint64_t queryId, TgObject response); + void dispatch_update(TgObject &update); + void set_tdlib_parameters(); + void set_phone_number(); + void set_code(std::string code); + void set_password(std::string password); + + bool m_running = false; + bool m_authorized = false; + std::uint64_t m_nextQueryId = 0; + std::map> m_handlers; + std::vector, void*>> m_updateHandlers; + std::thread m_thread; + std::unique_ptr m_clientManager; + std::int32_t m_clientId; + uv_loop_t *m_eventLoop; + uv_async_t *m_uvHandle = nullptr; + long m_apiId; + std::string m_apiHash; + std::string m_phoneNumber; + std::string m_password; + std::deque m_responseQueue; + std::mutex m_responseQueueMutex; + }; + } \ No newline at end of file diff --git a/vk.cpp b/vk.cpp index 5a8a776..6523e2a 100644 --- a/vk.cpp +++ b/vk.cpp @@ -2,6 +2,7 @@ #include "curl/curl.h" #include "http.h" #include "spdlog/sinks/stdout_color_sinks.h" +#include #include #include @@ -41,7 +42,9 @@ void VKClient::get_posts(std::variant wall, int offset, int c url += "&offset="; url += std::to_string(offset); url += "&count="; - url += std::to_string(count); + // + 1 here to handle the pinned post + int increasedCount = count < std::numeric_limits::max() ? count + 1 : count; + url += std::to_string(increasedCount); if (wall.index() == 0) { url += "&owner_id="; url += std::to_string(std::get(wall)); @@ -51,7 +54,7 @@ void VKClient::get_posts(std::variant wall, int offset, int c } m_logger->debug("using URL: {}", url); - m_httpClient.send_request("GET", url, {}, [this, callback](std::unique_ptr resp, CURLcode r){ + m_httpClient.send_request("GET", url, {}, [this, callback, requestedCount = count](std::unique_ptr resp, CURLcode r){ if (r == 0) { auto parsedResponse = json::parse(resp->body); @@ -65,8 +68,16 @@ void VKClient::get_posts(std::variant wall, int offset, int c auto responsePayload = parsedResponse["response"]; int count = responsePayload["count"]; std::vector posts; + int checkedPosts = 0; for (auto post : responsePayload["items"]) { - posts.emplace_back(post["id"], post["date"], post["edited"], post["from_id"], post["type"], post["text"]); + if (checkedPosts == requestedCount) continue; + ++checkedPosts; + if (post.contains("is_pinned") && post["is_pinned"] == true) continue; + if (!post.contains("id") || !post.contains("date") || !post.contains("from_id") || !post.contains("type") || !post.contains("text")) { + m_logger->warn("strange post: {}", post.dump()); + continue; + } + posts.emplace_back(post["id"], post["date"], post.contains("edited") ? post["edited"].get() : 0, post["from_id"], post["type"], post["text"]); } callback({{count, std::move(posts)}}, 0);