add TelegramClient and RepostManager with functionality to fetch posts

This commit is contained in:
Slavasil 2024-11-20 14:12:01 +03:00
parent 5c93d48bb8
commit e221d3624d
7 changed files with 543 additions and 4 deletions

View File

@ -7,7 +7,7 @@ add_subdirectory(libuv)
add_subdirectory(spdlog)
add_subdirectory(td)
add_executable(${PROJECT_NAME} main.cpp config.cpp http.cpp state.cpp vk.cpp)
add_executable(${PROJECT_NAME} main.cpp config.cpp http.cpp manager.cpp state.cpp tg.cpp vk.cpp)
target_compile_options(${PROJECT_NAME} PRIVATE -std=c++2b)

View File

@ -1,6 +1,9 @@
#include "config.h"
#include "manager.h"
#include "spdlog/spdlog.h"
#include "state.h"
#include <filesystem>
#include <iostream>
#include <uv.h>
#include <td/telegram/td_api.h>
#include <td/telegram/td_api.hpp>
@ -18,6 +21,7 @@ void create_signal_handles(uv_loop_t *loop, uv_signal_t handles[2]) {
int main() {
uv_loop_t *loop = uv_default_loop();
spdlog::set_level(spdlog::level::trace);
uv_signal_t signalHandles[2] = {};
create_signal_handles(loop, signalHandles);
@ -31,6 +35,31 @@ int main() {
return 1;
}
config::AppConfig config("bridge_config.json");
if (config.tgApiId == 0 || config.tgApiHash.empty() || config.tgPhoneNumber.empty()
|| config.vkServiceKey.empty() || (config.vkSource.index() == 0 && std::get<long>(config.vkSource) == 0)
|| config.tgSourceId == 0 || config.tgDestinationId == 0) {
spdlog::error("incomplete config file");
return 2;
}
std::function<void(std::function<void(std::string)>)> tgAuthCodeProvider = [](auto set_code){
spdlog::warn("/!\\ hanging event loop to request code");
std::string code;
std::cin >> code;
set_code(code);
};
std::function<void(std::function<void(std::string)>)> tgPasswordProvider = [](auto set_password){
spdlog::warn("/!\\ hanging event loop to request password");
std::string password;
std::cin >> password;
set_password(password);
};
manager::RepostManager manager(loop, tgAuthCodeProvider, tgPasswordProvider, &state, &config);
manager.start();
uv_run(loop, UV_RUN_DEFAULT);
spdlog::info("event loop ended");
state.save();

193
manager.cpp Normal file
View File

@ -0,0 +1,193 @@
#include "manager.h"
#include "spdlog/spdlog.h"
#include "state.h"
#include "td/telegram/td_api.h"
#include "vk.h"
#include <algorithm>
#include <limits>
#include <memory>
#include <optional>
using namespace manager;
RepostManager::RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config)
: m_vk(eventLoop), m_tg(eventLoop, config->tgApiId, config->tgApiHash, config->tgPhoneNumber) {
m_appState = appState;
m_appConfig = config;
m_tg.authCodeProvider = tgCodeProvider;
m_tg.passwordProvider = tgPasswordProvider;
m_vk.set_service_api_key(config->vkServiceKey);
}
void RepostManager::start() {
m_tg.add_update_handler([this](void*, td_api::Object &obj){
if (obj.get_id() == td_api::updateAuthorizationState::ID) {
auto &authState = (td_api::updateAuthorizationState&)obj;
if (authState.authorization_state_->get_id() == td_api::authorizationStateReady::ID) {
//on_clients_ready();
spdlog::info("loading Telegram chats...");
m_tg.send_query(td_api::make_object<td_api::loadChats>(td_api::make_object<td_api::chatListMain>(), 1000), [this](auto result){
spdlog::debug("loadChats done");
on_clients_ready();
});
}
} else if (obj.get_id() == td_api::updateNewChat::ID) {
auto &update = (td_api::updateNewChat&)obj;
//spdlog::debug("chat loaded: {}", update.chat_->title_);
}
});
spdlog::info("starting Telegram authentication");
m_tg.start();
}
void RepostManager::on_clients_ready() {
auto posts = std::make_shared< std::optional<std::vector<AbstractPost>>[] >(2);
auto doWorkWithPosts = [this, posts]() -> void {
spdlog::info("fetched some posts:");
int limit = 3;
for (auto &i : *posts[0]) {
if (limit-- == 0) {
spdlog::info("...");
break;
}
spdlog::info("vk[#{}, {}] {}", i.id, i.date, i.text);
}
limit = 3;
for (auto &i : *posts[1]) {
if (limit-- == 0) {
spdlog::info("...");
break;
}
spdlog::info("tg[#{}, {}] {}", i.id, i.date, i.text);
}
m_tg.send_query(td_api::make_object<td_api::getChat>(m_appConfig->tgSourceId), [](auto result){
if (result->get_id() == td_api::chat::ID) {
auto &chat = (td_api::chat&)*result;
spdlog::info("source chat: #{} {}", chat.id_, chat.title_);
} else {
auto &e = (td_api::error&)*result;
spdlog::error("getChat error: {} {}", e.code_, e.message_);
}
});
};
auto putVkPosts = [this, posts, doWorkWithPosts](std::vector<vk::Post> vkPosts){
posts[0] = {to_abstract_posts(vkPosts)};
spdlog::info("fetched {} vk posts", posts[0]->size());
if (posts[1].has_value()) {
doWorkWithPosts();
}
};
auto putTgPosts = [this, posts, doWorkWithPosts](std::vector<td::tl::unique_ptr<td_api::message>> tgPosts){
posts[1] = {to_abstract_posts(tgPosts)};
spdlog::info("fetched {} telegram posts", posts[1]->size());
if (posts[0].has_value()) {
doWorkWithPosts();
}
};
if (m_appState->vkLastPostId == 0) {
spdlog::info("fetching ALL vk posts...");
collect_all_vk_posts(putVkPosts);
} else {
spdlog::info("fetching last 3 vk posts...");
collect_last_vk_posts(3, putVkPosts);
}
if (m_appState->tgLastPostId == 0) {
spdlog::info("fetching ALL telegram posts...");
collect_all_tg_posts(putTgPosts);
} else {
spdlog::info("fetching last 3 telegram posts...");
collect_last_tg_posts(3, putTgPosts);
}
}
void RepostManager::collect_all_vk_posts(std::function<void(std::vector<vk::Post>)> callback) {
collect_last_vk_posts(std::numeric_limits<int>::max(), callback);
}
void RepostManager::collect_last_vk_posts(int count, std::function<void(std::vector<vk::Post>)> callback) {
collect_vk_posts_from(0, count, callback);
}
void RepostManager::collect_all_tg_posts(std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
collect_last_tg_posts(std::numeric_limits<int>::max(), callback);
}
void RepostManager::collect_last_tg_posts(int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
collect_tg_posts_from(0, count, callback);
}
void RepostManager::collect_vk_posts_from(int offset, int count, std::function<void(std::vector<vk::Post>)> callback) {
m_vk.get_posts(m_appConfig->vkSource, offset, count, [=](std::optional<vk::WallChunk> chunk, int err){
if (err == 0) {
callback(chunk->posts);
} else {
spdlog::error("failed to get {} VK posts at offset {}: error {}", count, offset, err);
}
});
}
void RepostManager::collect_tg_posts_from(long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
spdlog::debug("collecting {} telegram posts starting from {}", count, from);
auto result = std::make_shared<std::vector<td::tl::unique_ptr<td_api::message>>>();
collect_tg_posts_from__intermediate(from, count, result, callback);
}
void RepostManager::collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
spdlog::debug("getting some messages ({})", count);
m_tg.send_query(td_api::make_object<td_api::getChatHistory>(m_appConfig->tgSourceId, from, 0, count, false), [=, this](auto obj){
if (obj->get_id() == td_api::messages::ID) {
td_api::messages &msgs = (td_api::messages&)*obj;
if (msgs.messages_.size() == 0) {
spdlog::debug("got all posts");
auto resultPtr = intermediateResult.get();
callback(std::move(*resultPtr));
return;
}
spdlog::debug("got {} posts", msgs.messages_.size());
size_t chunkSize = msgs.messages_.size();
size_t oldSize = intermediateResult->size();
long oldestId;
intermediateResult->reserve(oldSize + chunkSize);
for (auto i = msgs.messages_.begin(), end = msgs.messages_.end(); i != end; ++i) {
//spdlog::debug("moving message {}", (*i)->id_);
oldestId = (*i)->id_;
intermediateResult->emplace_back(std::move(*i));
}
collect_tg_posts_from__intermediate(oldestId, count - chunkSize, intermediateResult, callback);
} else {
auto &err = (td_api::error&)*obj;
spdlog::error("failed to get posts: {} {}", err.code_, err.message_);
}
});
}
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<vk::Post> &posts) {
std::vector<AbstractPost> result;
result.reserve(posts.size());
for (auto &post : posts) {
result.emplace_back(post.id, post.date, post.text);
}
return result;
}
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts) {
std::vector<AbstractPost> result;
result.reserve(posts.size());
for (auto &post : posts) {
// we don't want any posts other than plain text (yet)
if (post->content_->get_id() == td_api::messageText::ID) {
auto &content = (td_api::messageText&) *post->content_;
result.emplace_back(post->id_, post->date_, content.text_->text_);
}
}
return result;
}
void RepostManager::repost_all(std::vector<AbstractPost> posts) {
for (auto &post : posts) {
// TODO
}
}

48
manager.h Normal file
View File

@ -0,0 +1,48 @@
#pragma once
#include "config.h"
#include "state.h"
#include "tg.h"
#include "vk.h"
#include <functional>
#include <string>
#include <vector>
namespace manager {
namespace td_api = td::td_api;
struct AbstractPost {
inline AbstractPost(long id, long date, std::string text) : id(id), date(date), text(text) {}
long id;
long date;
std::string text;
};
class RepostManager {
public:
RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config);
RepostManager(RepostManager&) = delete;
void start();
private:
void on_clients_ready();
void collect_all_vk_posts(std::function<void(std::vector<vk::Post>)> callback);
void collect_all_tg_posts(std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
void collect_last_vk_posts(int count, std::function<void(std::vector<vk::Post>)> callback);
void collect_last_tg_posts(int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
void collect_vk_posts_from(int offset, int count, std::function<void(std::vector<vk::Post>)> callback);
void collect_tg_posts_from(long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
void collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
std::vector<AbstractPost> to_abstract_posts(std::vector<vk::Post> &posts);
std::vector<AbstractPost> to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts);
void repost_all(std::vector<AbstractPost> posts);
state::AppState *m_appState;
config::AppConfig *m_appConfig;
vk::VKClient m_vk;
tg::TelegramClient m_tg;
};
}

174
tg.cpp Normal file
View File

@ -0,0 +1,174 @@
#include <functional>
#include <unistd.h>
#include <spdlog/spdlog.h>
#include "tg.h"
#include "td/telegram/td_api.h"
using namespace tg;
TelegramClient::TelegramClient(uv_loop_t *eventLoop, long apiId, std::string apiHash, std::string phoneNumber)
: m_eventLoop(eventLoop), m_apiId(apiId), m_apiHash(apiHash), m_phoneNumber(phoneNumber) {}
bool TelegramClient::start() {
spdlog::debug("tg: start");
if (m_running) return false;
m_running = true;
spdlog::debug("tg: creating uv handle");
m_uvHandle = new uv_async_t;
m_uvHandle->data = (void*)this;
uv_async_init(m_eventLoop, m_uvHandle, TelegramClient::uv_callback);
spdlog::debug("tg: starting telegram thread");
m_thread = std::thread([this](){
run_thread();
});
return true;
}
TelegramClient::~TelegramClient() {
if (m_thread.joinable()) {
send_query(td_api::make_object<td_api::close>(), {});
m_thread.join();
if (m_uvHandle)
uv_close((uv_handle_t*)m_uvHandle, TelegramClient::uv_close_handle_callback);
}
}
void TelegramClient::run_thread() {
spdlog::debug("tg: i am telegram thread");
td::ClientManager::execute(td_api::make_object<td_api::setLogVerbosityLevel>(0));
m_clientManager = std::make_unique<td::ClientManager>();
m_clientId = m_clientManager->create_client_id();
send_query(td_api::make_object<td_api::getOption>("version"), {});
run_telegram_main_loop();
}
void TelegramClient::run_telegram_main_loop() {
spdlog::info("tg: begin telegram main loop");
while (m_running) {
if (!process_response(m_clientManager->receive(500))) {
spdlog::info("tg: closed");
return;
}
}
}
void TelegramClient::send_query(td_api::object_ptr<td_api::Function> f, std::function<void(TgObject)> callback) {
std::uint64_t query_id = m_nextQueryId++;
if (callback) {
m_handlers.emplace(query_id, std::move(callback));
}
m_clientManager->send(m_clientId, query_id, std::move(f));
}
bool TelegramClient::process_response(td::ClientManager::Response response) {
if (!response.object) {
return true;
}
if (response.object->get_id() == td_api::updateAuthorizationState::ID) {
auto authUpdate = (td_api::updateAuthorizationState*) response.object.get();
spdlog::debug("updateAuthorizationState: {}", authUpdate->authorization_state_->get_id());
switch (authUpdate->authorization_state_->get_id()) {
case td_api::authorizationStateClosed::ID:
spdlog::debug("closed");
return false;
case td_api::authorizationStateWaitTdlibParameters::ID:
spdlog::debug("setting tdlib parameters");
set_tdlib_parameters();
return true;
case td_api::authorizationStateWaitPhoneNumber::ID:
spdlog::debug("setting phone number");
set_phone_number();
return true;
case td_api::authorizationStateWaitPassword::ID:
spdlog::debug("setting password");
if (passwordProvider)
passwordProvider(std::bind(&TelegramClient::set_password, this, std::placeholders::_1));
case td_api::authorizationStateWaitCode::ID:
spdlog::debug("setting 2FA code");
if (authCodeProvider)
authCodeProvider(std::bind(&TelegramClient::set_code, this, std::placeholders::_1));
return true;
}
}
m_responseQueueMutex.lock();
m_responseQueue.push_back(std::move(response));
m_responseQueueMutex.unlock();
uv_async_send(m_uvHandle);
return true;
}
void TelegramClient::set_tdlib_parameters() {
auto query = td_api::make_object<td_api::setTdlibParameters>();
query->database_directory_ = "tdata";
query->use_message_database_ = true;
query->use_secret_chats_ = false;
query->api_id_ = m_apiId;
query->api_hash_ = m_apiHash;
query->system_language_code_ = "en";
query->device_model_ = "server";
query->application_version_ = "0.1.0";
send_query(std::move(query), {});
}
void TelegramClient::set_phone_number() {
auto query = td_api::make_object<td_api::setAuthenticationPhoneNumber>();
query->phone_number_ = m_phoneNumber;
send_query(std::move(query), {});
}
void TelegramClient::set_code(std::string code) {
spdlog::debug("setting auth code {}", code);
send_query(td_api::make_object<td_api::checkAuthenticationCode>(code), {});
}
void TelegramClient::set_password(std::string password) {
spdlog::debug("setting password {}", password);
send_query(td_api::make_object<td_api::checkAuthenticationPassword>(password), {});
}
void TelegramClient::uv_callback(uv_async_t *h) {
//spdlog::debug("tg: libuv callback");
TelegramClient *self = (TelegramClient*)h->data;
self->m_responseQueueMutex.lock();
while (self->m_responseQueue.size() > 0) {
auto response = std::move(self->m_responseQueue.front());
self->m_responseQueue.pop_front();
if (response.request_id == 0) {
self->dispatch_update(response.object);
} else {
self->dispatch_response(response.request_id, std::move(response.object));
}
}
self->m_responseQueueMutex.unlock();
}
void TelegramClient::dispatch_update(TgObject &update) {
//spdlog::debug("tg: update");
for (auto i = m_updateHandlers.begin(); i != m_updateHandlers.end(); i++) {
i->first(i->second, *update.get());
}
}
void TelegramClient::dispatch_response(std::uint64_t queryId, TgObject response) {
spdlog::debug("tg: response to {}", queryId);
auto queryHandler = m_handlers.find(queryId);
if (queryHandler != m_handlers.end()) {
queryHandler->second(std::move(response));
}
}
void TelegramClient::uv_close_handle_callback(uv_handle_t *h) {
spdlog::debug("tg: closing uv handle");
delete h;
}
void TelegramClient::add_update_handler(std::function<void(void*, td_api::Object&)> handler, void *context) {
m_updateHandlers.push_back({std::move(handler), context});
}

84
tg.h Normal file
View File

@ -0,0 +1,84 @@
#include <cstdint>
#include <deque>
#include <functional>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <thread>
#include <uv.h>
#include <td/telegram/Client.h>
#include <td/telegram/td_api.h>
#include <td/telegram/td_api.hpp>
namespace tg {
namespace td_api = td::td_api;
using TgObject = td_api::object_ptr<td_api::Object>;
namespace detail {
template <class... Fs>
struct overload;
template <class F>
struct overload<F> : public F {
explicit overload(F f) : F(f) {
}
};
template <class F, class... Fs>
struct overload<F, Fs...>
: public overload<F>
, public overload<Fs...> {
overload(F f, Fs... fs) : overload<F>(f), overload<Fs...>(fs...) {
}
using overload<F>::operator();
using overload<Fs...>::operator();
};
} // namespace detail
typedef std::function<void(std::function<void(std::string)>)> AuthCodeProvider;
typedef std::function<void(std::function<void(std::string)>)> PasswordProvider;
class TelegramClient {
public:
TelegramClient(uv_loop_t *eventLoop, long apiId, std::string apiHash, std::string phoneNumber);
~TelegramClient();
void add_update_handler(std::function<void(void*, td_api::Object&)> handler, void *context = nullptr);
void send_query(td_api::object_ptr<td_api::Function> f, std::function<void(TgObject)> callback);
bool start();
AuthCodeProvider authCodeProvider;
PasswordProvider passwordProvider;
private:
static void uv_callback(uv_async_t *h);
static void uv_close_handle_callback(uv_handle_t *h);
void run_thread();
void run_telegram_main_loop();
bool process_response(td::ClientManager::Response response);
void dispatch_response(std::uint64_t queryId, TgObject response);
void dispatch_update(TgObject &update);
void set_tdlib_parameters();
void set_phone_number();
void set_code(std::string code);
void set_password(std::string password);
bool m_running = false;
bool m_authorized = false;
std::uint64_t m_nextQueryId = 0;
std::map<std::uint64_t, std::function<void(TgObject)>> m_handlers;
std::vector<std::pair<std::function<void(void*, td_api::Object&)>, void*>> m_updateHandlers;
std::thread m_thread;
std::unique_ptr<td::ClientManager> m_clientManager;
std::int32_t m_clientId;
uv_loop_t *m_eventLoop;
uv_async_t *m_uvHandle = nullptr;
long m_apiId;
std::string m_apiHash;
std::string m_phoneNumber;
std::string m_password;
std::deque<td::ClientManager::Response> m_responseQueue;
std::mutex m_responseQueueMutex;
};
}

17
vk.cpp
View File

@ -2,6 +2,7 @@
#include "curl/curl.h"
#include "http.h"
#include "spdlog/sinks/stdout_color_sinks.h"
#include <limits>
#include <memory>
#include <nlohmann/json.hpp>
@ -41,7 +42,9 @@ void VKClient::get_posts(std::variant<long, std::string> wall, int offset, int c
url += "&offset=";
url += std::to_string(offset);
url += "&count=";
url += std::to_string(count);
// + 1 here to handle the pinned post
int increasedCount = count < std::numeric_limits<int>::max() ? count + 1 : count;
url += std::to_string(increasedCount);
if (wall.index() == 0) {
url += "&owner_id=";
url += std::to_string(std::get<long>(wall));
@ -51,7 +54,7 @@ void VKClient::get_posts(std::variant<long, std::string> wall, int offset, int c
}
m_logger->debug("using URL: {}", url);
m_httpClient.send_request("GET", url, {}, [this, callback](std::unique_ptr<http::HttpResponse> resp, CURLcode r){
m_httpClient.send_request("GET", url, {}, [this, callback, requestedCount = count](std::unique_ptr<http::HttpResponse> resp, CURLcode r){
if (r == 0) {
auto parsedResponse = json::parse(resp->body);
@ -65,8 +68,16 @@ void VKClient::get_posts(std::variant<long, std::string> wall, int offset, int c
auto responsePayload = parsedResponse["response"];
int count = responsePayload["count"];
std::vector<Post> posts;
int checkedPosts = 0;
for (auto post : responsePayload["items"]) {
posts.emplace_back(post["id"], post["date"], post["edited"], post["from_id"], post["type"], post["text"]);
if (checkedPosts == requestedCount) continue;
++checkedPosts;
if (post.contains("is_pinned") && post["is_pinned"] == true) continue;
if (!post.contains("id") || !post.contains("date") || !post.contains("from_id") || !post.contains("type") || !post.contains("text")) {
m_logger->warn("strange post: {}", post.dump());
continue;
}
posts.emplace_back(post["id"], post["date"], post.contains("edited") ? post["edited"].get<long>() : 0, post["from_id"], post["type"], post["text"]);
}
callback({{count, std::move(posts)}}, 0);