389 lines
16 KiB
C++
389 lines
16 KiB
C++
#include "manager.h"
|
|
#include "posts.h"
|
|
#include "spdlog/spdlog.h"
|
|
#include "state.h"
|
|
#include "td/telegram/td_api.h"
|
|
#include "uv.h"
|
|
#include "vk.h"
|
|
#include <algorithm>
|
|
#include <limits>
|
|
#include <memory>
|
|
#include <optional>
|
|
|
|
using namespace manager;
|
|
|
|
const unsigned long REPOST_INTERVAL = 2000;
|
|
|
|
RepostManager::RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config)
|
|
: m_vk(eventLoop), m_tg(eventLoop, config->tgApiId, config->tgApiHash, config->tgPhoneNumber) {
|
|
m_appState = appState;
|
|
m_appConfig = config;
|
|
m_tg.authCodeProvider = tgCodeProvider;
|
|
m_tg.passwordProvider = tgPasswordProvider;
|
|
m_vk.set_service_api_key(config->vkServiceKey);
|
|
|
|
m_repostTimer = new uv_timer_t;
|
|
uv_timer_init(eventLoop, m_repostTimer);
|
|
m_repostTimer->data = this;
|
|
}
|
|
|
|
RepostManager::~RepostManager() {
|
|
if (m_repostTimer) {
|
|
uv_timer_stop(m_repostTimer);
|
|
uv_close((uv_handle_t*)m_repostTimer, [](uv_handle_t *h){ delete h; });
|
|
}
|
|
}
|
|
|
|
void RepostManager::load_more_telegram_chats() {
|
|
m_tg.send_query(td_api::make_object<td_api::loadChats>(td_api::make_object<td_api::chatListMain>(), 1000), [this](auto result){
|
|
if (result->get_id() == td_api::ok::ID) {
|
|
spdlog::info("loading more Telegram chats...");
|
|
load_more_telegram_chats();
|
|
} else if (result->get_id() == td_api::error::ID) {
|
|
spdlog::info("loaded all chats");
|
|
on_clients_ready();
|
|
}
|
|
});
|
|
}
|
|
|
|
void RepostManager::start() {
|
|
m_tg.add_update_handler([this](void*, td_api::Object &obj){
|
|
if (obj.get_id() == td_api::updateAuthorizationState::ID) {
|
|
auto &authState = (td_api::updateAuthorizationState&)obj;
|
|
if (authState.authorization_state_->get_id() == td_api::authorizationStateReady::ID) {
|
|
spdlog::info("loading Telegram chats...");
|
|
load_more_telegram_chats();
|
|
}
|
|
} else if (obj.get_id() == td_api::updateNewChat::ID) {
|
|
auto &update = (td_api::updateNewChat&)obj;
|
|
if (update.chat_->id_ == m_appConfig->tgDestinationId)
|
|
spdlog::info("destination chat loaded");
|
|
if (update.chat_->id_ == m_appConfig->tgSourceId)
|
|
spdlog::info("source chat loaded");
|
|
}
|
|
});
|
|
spdlog::info("starting Telegram authentication");
|
|
m_tg.start();
|
|
}
|
|
|
|
void RepostManager::on_clients_ready() {
|
|
m_appState->vkLastLoadedPostId = m_appState->vkLastPostId;
|
|
m_appState->tgLastLoadedPostId = m_appState->tgLastPostId;
|
|
|
|
struct new_post_fetcher {
|
|
struct fetcher_state {
|
|
bool ready = false;
|
|
bool needRequest = true;
|
|
long offset = 0, count = 3;
|
|
std::vector<AbstractPost> posts;
|
|
};
|
|
RepostManager *mgr;
|
|
fetcher_state vkState, tgState;
|
|
new_post_fetcher(RepostManager *m) : mgr(m) {}
|
|
std::function<void(std::vector<AbstractPost> &&vkPosts, std::vector<AbstractPost> &&tgPosts)> onDone;
|
|
std::function<void()> onError;
|
|
void fetch() {
|
|
if (vkState.ready && tgState.ready) {
|
|
onDone(std::move(vkState.posts), std::move(tgState.posts));
|
|
return;
|
|
}
|
|
|
|
if (!vkState.ready && vkState.needRequest) {
|
|
if (mgr->m_appState->vkLastLoadedPostId != 0) {
|
|
spdlog::info("fetching {} VK posts at offset {}", vkState.count, vkState.offset);
|
|
vkState.needRequest = false;
|
|
mgr->collect_vk_posts_from(vkState.offset, vkState.count, [this](auto posts){
|
|
vkState.offset += posts.size();
|
|
vkState.count = vkState.count * 3 / 2;
|
|
check_vk_posts(posts);
|
|
});
|
|
} else {
|
|
spdlog::info("fetching all VK posts");
|
|
vkState.needRequest = false;
|
|
mgr->collect_all_vk_posts([this](auto posts){
|
|
spdlog::info("fetched all {} VK posts", posts.size());
|
|
if (posts.size() > 0) {
|
|
spdlog::info("last vk post id is now {}", posts[0].id);
|
|
mgr->m_appState->vkLastLoadedPostId = posts[0].id;
|
|
}
|
|
vkState.ready = true;
|
|
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
|
vkState.posts.reserve(vkState.posts.size() + aposts.size());
|
|
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
|
vkState.posts.emplace_back(std::move(*i));
|
|
}
|
|
fetch();
|
|
});
|
|
}
|
|
}
|
|
|
|
if (!tgState.ready && tgState.needRequest) {
|
|
if (mgr->m_appState->tgLastLoadedPostId != 0) {
|
|
spdlog::info("fetching {} TG posts starting from #{}", tgState.count, tgState.offset);
|
|
tgState.needRequest = false;
|
|
mgr->collect_tg_posts_from(tgState.offset, tgState.count, [this](auto posts){
|
|
if (posts.empty()) return;
|
|
tgState.offset += posts[posts.size() - 1]->id_;
|
|
check_tg_posts(std::move(posts));
|
|
});
|
|
tgState.count = tgState.count * 3 / 2;
|
|
} else {
|
|
tgState.needRequest = false;
|
|
mgr->collect_all_tg_posts([this](auto posts){
|
|
spdlog::info("fetched all {} TG posts", posts.size());
|
|
if (posts.size() > 0) {
|
|
spdlog::info("last telegram post id is now {}", posts[0]->id_);
|
|
mgr->m_appState->tgLastLoadedPostId = posts[0]->id_;
|
|
}
|
|
tgState.ready = true;
|
|
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
|
tgState.posts.reserve(tgState.posts.size() + aposts.size());
|
|
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
|
tgState.posts.emplace_back(std::move(*i));
|
|
}
|
|
fetch();
|
|
});
|
|
}
|
|
}
|
|
}
|
|
void check_vk_posts(std::vector<vk::Post> posts) {
|
|
spdlog::info("fetched {} VK posts", posts.size());
|
|
long oldLastPostId = mgr->m_appState->vkLastLoadedPostId;
|
|
if (posts.size() > 0) {
|
|
spdlog::info("last vk post id is now {}", posts[0].id);
|
|
//mgr->m_appState->vkLastLoadedPostId = posts[0].id;
|
|
}
|
|
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
|
spdlog::info("looking for id {}, have {} - {}", oldLastPostId, aposts[0].id, aposts[aposts.size() - 1].id);
|
|
if (mgr->drop_posts_older_than(aposts, oldLastPostId)) {
|
|
spdlog::info("found last remembered VK post");
|
|
vkState.ready = true;
|
|
}
|
|
vkState.posts.reserve(vkState.posts.size() + aposts.size());
|
|
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
|
vkState.posts.emplace_back(std::move(*i));
|
|
}
|
|
vkState.needRequest = true;
|
|
if (vkState.ready) {
|
|
spdlog::debug("last loaded vk post id is now {}", vkState.posts[0].id);
|
|
mgr->m_appState->vkLastLoadedPostId = vkState.posts[0].id;
|
|
}
|
|
fetch();
|
|
}
|
|
void check_tg_posts(std::vector<td::tl::unique_ptr<td_api::message>> posts) {
|
|
spdlog::info("fetched {} TG posts", posts.size());
|
|
long oldLastPostId = mgr->m_appState->tgLastLoadedPostId;
|
|
if (posts.size() > 0) {
|
|
spdlog::info("last telegram post id is now {}", posts[0]->id_);
|
|
//mgr->m_appState->tgLastLoadedPostId = posts[0]->id_;
|
|
}
|
|
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
|
if (mgr->drop_posts_older_than(aposts, oldLastPostId)) {
|
|
spdlog::info("found last remembered TG post");
|
|
tgState.ready = true;
|
|
}
|
|
tgState.posts.reserve(tgState.posts.size() + aposts.size());
|
|
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
|
tgState.posts.emplace_back(std::move(*i));
|
|
}
|
|
tgState.needRequest = true;
|
|
if (tgState.ready) {
|
|
spdlog::debug("last loaded tg post id is now {}", tgState.posts[0].id);
|
|
mgr->m_appState->tgLastLoadedPostId = tgState.posts[0].id;
|
|
}
|
|
fetch();
|
|
}
|
|
};
|
|
new_post_fetcher *f = new new_post_fetcher(this);
|
|
|
|
f->onDone = [this, f](auto vkPosts, auto tgPosts){
|
|
delete f;
|
|
spdlog::info("collected {} new vk posts and {} new tg posts", vkPosts.size(), tgPosts.size());
|
|
std::vector<AbstractPost> mergedPosts;
|
|
int totalSize = vkPosts.size() + tgPosts.size();
|
|
mergedPosts.reserve(totalSize);
|
|
int vkIdx = vkPosts.size() - 1;
|
|
int tgIdx = tgPosts.size() - 1;
|
|
for (int i = 0; i < totalSize; ++i) {
|
|
if (tgIdx < 0 || vkPosts[vkIdx].date < tgPosts[tgIdx].date) {
|
|
mergedPosts.emplace_back(std::move(vkPosts[vkIdx--]));
|
|
} else {
|
|
mergedPosts.emplace_back(std::move(tgPosts[tgIdx--]));
|
|
}
|
|
}
|
|
spdlog::info("sorted {} posts", totalSize);
|
|
enqueue_for_repost(mergedPosts);
|
|
};
|
|
|
|
f->onError = [f](){
|
|
delete f;
|
|
};
|
|
|
|
f->fetch();
|
|
}
|
|
|
|
void RepostManager::collect_all_vk_posts(std::function<void(std::vector<vk::Post>)> callback) {
|
|
collect_last_vk_posts(std::numeric_limits<int>::max(), callback);
|
|
}
|
|
|
|
void RepostManager::collect_last_vk_posts(int count, std::function<void(std::vector<vk::Post>)> callback) {
|
|
collect_vk_posts_from(0, count, callback);
|
|
}
|
|
|
|
void RepostManager::collect_all_tg_posts(std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
|
collect_last_tg_posts(std::numeric_limits<int>::max(), callback);
|
|
}
|
|
|
|
void RepostManager::collect_last_tg_posts(int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
|
collect_tg_posts_from(0, count, callback);
|
|
}
|
|
|
|
void RepostManager::collect_vk_posts_from(int offset, int count, std::function<void(std::vector<vk::Post>)> callback) {
|
|
spdlog::debug("COLLECT VK POSTS FROM {} {}", offset, count);
|
|
auto result = std::make_shared<std::vector<vk::Post>>();
|
|
collect_vk_posts_from__intermediate(offset, count, result, callback);
|
|
}
|
|
|
|
void RepostManager::collect_vk_posts_from__intermediate(int offset, int count, std::shared_ptr<std::vector<vk::Post>> intermediateResult, std::function<void(std::vector<vk::Post>)> callback) {
|
|
m_vk.get_posts(m_appConfig->vkSource, offset, count, [=, this](std::optional<vk::WallChunk> chunk, int err){
|
|
if (err == 0) {
|
|
if (chunk->posts.size() == 0) {
|
|
spdlog::debug("got all posts");
|
|
callback(std::move(*intermediateResult.get()));
|
|
return;
|
|
}
|
|
int chunkSize = chunk->posts.size();
|
|
int oldSize = intermediateResult->size();
|
|
intermediateResult->reserve(oldSize + chunkSize);
|
|
for (auto i = chunk->posts.begin(), end = chunk->posts.end(); i != end; ++i) {
|
|
intermediateResult->emplace_back(std::move(*i));
|
|
}
|
|
if (count > chunkSize) {
|
|
collect_vk_posts_from__intermediate(offset + chunkSize, count - chunkSize, intermediateResult, callback);
|
|
} else {
|
|
callback(std::move(*intermediateResult.get()));
|
|
}
|
|
} else {
|
|
spdlog::error("failed to get {} VK posts at offset {}: error {}", count, offset, err);
|
|
}
|
|
});
|
|
}
|
|
|
|
void RepostManager::collect_tg_posts_from(long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
|
spdlog::debug("collecting {} telegram posts starting from {}", count, from);
|
|
auto result = std::make_shared<std::vector<td::tl::unique_ptr<td_api::message>>>();
|
|
collect_tg_posts_from__intermediate(from, count, result, callback);
|
|
}
|
|
|
|
void RepostManager::collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
|
spdlog::debug("getting some messages ({})", count);
|
|
m_tg.send_query(td_api::make_object<td_api::getChatHistory>(m_appConfig->tgSourceId, from, 0, count, false), [=, this](auto obj){
|
|
if (obj->get_id() == td_api::messages::ID) {
|
|
td_api::messages &msgs = (td_api::messages&)*obj;
|
|
if (msgs.messages_.size() == 0) {
|
|
spdlog::debug("got all posts");
|
|
auto resultPtr = intermediateResult.get();
|
|
callback(std::move(*resultPtr));
|
|
return;
|
|
}
|
|
spdlog::debug("got {} posts", msgs.messages_.size());
|
|
size_t chunkSize = msgs.messages_.size();
|
|
size_t oldSize = intermediateResult->size();
|
|
long oldestId;
|
|
intermediateResult->reserve(oldSize + chunkSize);
|
|
for (auto i = msgs.messages_.begin(), end = msgs.messages_.end(); i != end; ++i) {
|
|
oldestId = (*i)->id_;
|
|
intermediateResult->emplace_back(std::move(*i));
|
|
}
|
|
if (count > chunkSize) {
|
|
collect_tg_posts_from__intermediate(oldestId, count - chunkSize, intermediateResult, callback);
|
|
} else {
|
|
callback(std::move(*intermediateResult.get()));
|
|
}
|
|
} else {
|
|
auto &err = (td_api::error&)*obj;
|
|
spdlog::error("failed to get posts: {} {}", err.code_, err.message_);
|
|
}
|
|
});
|
|
}
|
|
|
|
bool RepostManager::drop_posts_older_than(std::vector<AbstractPost> &posts, long lastPostId) {
|
|
auto idx = std::find_if(posts.begin(), posts.end(), [lastPostId](auto &post){ return post.id == lastPostId; });
|
|
if (idx == posts.end()) {
|
|
return false;
|
|
} else {
|
|
posts.erase(idx, posts.end());
|
|
return true;
|
|
}
|
|
}
|
|
|
|
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<vk::Post> &posts) {
|
|
std::vector<AbstractPost> result;
|
|
result.reserve(posts.size());
|
|
for (auto &post : posts) {
|
|
result.emplace_back(posts::SRC_VK, post.id, post.date, post.text);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts) {
|
|
std::vector<AbstractPost> result;
|
|
result.reserve(posts.size());
|
|
for (auto &post : posts) {
|
|
// we don't want any posts other than plain text (yet)
|
|
if (post->content_->get_id() == td_api::messageText::ID) {
|
|
auto &content = (td_api::messageText&) *post->content_;
|
|
result.emplace_back(posts::SRC_TELEGRAM, post->id_, post->date_, content.text_->text_);
|
|
}
|
|
}
|
|
return result;
|
|
}
|
|
|
|
// TODO timer
|
|
void RepostManager::enqueue_for_repost(std::vector<AbstractPost> posts) {
|
|
for (auto &post : posts) {
|
|
if (m_repostQueue.empty()) {
|
|
uv_timer_start(m_repostTimer, &RepostManager::repost_timer_callback, REPOST_INTERVAL, REPOST_INTERVAL);
|
|
}
|
|
m_repostQueue.push(post);
|
|
}
|
|
}
|
|
|
|
void RepostManager::repost_timer_callback(uv_timer_t *h) {
|
|
spdlog::debug("repost timer");
|
|
auto self = reinterpret_cast<RepostManager*>(h->data);
|
|
if (!self->m_repostQueue.empty()) {
|
|
self->repost(self->m_repostQueue.front());
|
|
self->m_repostQueue.pop();
|
|
}
|
|
if (self->m_repostQueue.empty()) {
|
|
spdlog::debug("repost queue empty, stopping timer");
|
|
uv_timer_stop(h);
|
|
} else {
|
|
spdlog::debug("{} posts left to repost", self->m_repostQueue.size());
|
|
}
|
|
}
|
|
|
|
void RepostManager::repost(AbstractPost &post) {
|
|
spdlog::debug("reposting (post length {})", post.text.length());
|
|
std::string_view signature = posts::add_signature(post, m_appConfig);
|
|
spdlog::debug(post.text);
|
|
int signatureStart = post.text.length() - signature.length();
|
|
int signatureLength = signature.length();
|
|
spdlog::debug("post length {}, signature start {}, signature length {}", post.text.length(), signatureStart, signatureLength);
|
|
std::vector<td::tl::unique_ptr<td_api::textEntity>> entities;
|
|
//entities.push_back(std::move(td_api::make_object<td_api::textEntity>(signatureStart, signatureLength, td_api::make_object<td_api::textEntityTypeSpoiler>())));
|
|
auto content = td_api::make_object<td_api::inputMessageText>(td_api::make_object<td_api::formattedText>(post.text, std::move(entities)), td_api::make_object<td_api::linkPreviewOptions>(true, std::string(""), false, false, false), false);
|
|
m_tg.send_query(td_api::make_object<td_api::sendMessage>(m_appConfig->tgDestinationId, 0, nullptr, nullptr, nullptr, std::move(content)), [this, postId = post.id, src = post.source](auto result){
|
|
if (result->get_id() == td_api::error::ID) {
|
|
auto &err = (td_api::error&)*result;
|
|
spdlog::error("sendMessage error: {} {}", err.code_, err.message_);
|
|
uv_timer_stop(m_repostTimer);
|
|
} else {
|
|
if (src == posts::SRC_VK)
|
|
m_appState->vkLastPostId = postId;
|
|
else
|
|
m_appState->tgLastPostId = postId;
|
|
}
|
|
});
|
|
} |