mmcs-quotes-bridge/manager.cpp

615 lines
23 KiB
C++

#include "manager.h"
#include "posts.h"
#include "spdlog/spdlog.h"
#include "state.h"
#include "td/telegram/td_api.h"
#include "uv.h"
#include "vk.h"
#include <algorithm>
#include <limits>
#include <memory>
#include <numeric>
#include <optional>
using namespace manager;
const unsigned long REPOST_INTERVAL = 2000;
const unsigned long VK_CHECK_INTERVAL = 600000;
RepostManager::RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config)
: m_vk(eventLoop), m_tg(eventLoop, config->tgApiId, config->tgApiHash, config->tgPhoneNumber),
m_fetcher(this) {
m_appState = appState;
m_appConfig = config;
m_tg.authCodeProvider = tgCodeProvider;
m_tg.passwordProvider = tgPasswordProvider;
m_vk.set_service_api_key(config->vkServiceKey);
m_repostTimer = new uv_timer_t;
uv_timer_init(eventLoop, m_repostTimer);
m_repostTimer->data = this;
m_checkTimer = new uv_timer_t;
uv_timer_init(eventLoop, m_checkTimer);
m_checkTimer->data = this;
}
RepostManager::~RepostManager() {
if (m_repostTimer) {
uv_timer_stop(m_repostTimer);
uv_close((uv_handle_t*)m_repostTimer, [](uv_handle_t *h){ delete h; });
}
if (m_checkTimer) {
uv_timer_stop(m_checkTimer);
m_checkTimerStarted = false;
uv_close((uv_handle_t*)m_checkTimer, [](uv_handle_t *h){ delete h; });
}
}
void RepostManager::load_more_telegram_chats() {
m_tg.send_query(td_api::make_object<td_api::loadChats>(td_api::make_object<td_api::chatListMain>(), 100000), [this](auto result){
if (result->get_id() == td_api::ok::ID) {
spdlog::info("loading more Telegram chats...");
load_more_telegram_chats();
} else if (result->get_id() == td_api::error::ID) {
spdlog::info("loaded all chats");
on_clients_ready();
}
});
}
void RepostManager::start() {
m_nRequiredChats = 100500;//m_appConfig->tgSources.size() + 1;
for (auto &appState : m_appState->vkRepostState) {
appState.lastLoadedPostDate = appState.lastForwardedPostDate;
}
for (auto &appState : m_appState->tgRepostState) {
appState.lastLoadedPostDate = appState.lastForwardedPostDate;
}
m_tg.add_update_handler([this](void*, td_api::Object &obj){
if (obj.get_id() == td_api::updateAuthorizationState::ID) {
auto &authState = (td_api::updateAuthorizationState&)obj;
if (authState.authorization_state_->get_id() == td_api::authorizationStateReady::ID) {
spdlog::info("loading Telegram chats...");
load_more_telegram_chats();
}
} else if (obj.get_id() == td_api::updateNewChat::ID) {
auto &update = (td_api::updateNewChat&)obj;
if (update.chat_->id_ == m_appConfig->tgDestinationId) {
++m_nLoadedRequiredChats;
spdlog::info("destination chat {} loaded (loaded {}/{} chats)", m_appConfig->tgDestinationId, m_nLoadedRequiredChats, m_nRequiredChats);
}
for (auto &src : m_appConfig->tgSources) {
if (update.chat_->id_ == src.id) {
++m_nLoadedRequiredChats;
spdlog::info("source chat {} loaded (loaded {}/{} chats)", src.id, m_nLoadedRequiredChats, m_nRequiredChats);
}
}
if (m_nLoadedRequiredChats >= m_nRequiredChats) {
spdlog::info("all chats loaded");
//on_clients_ready();
}
} else if (obj.get_id() == td_api::updateNewMessage::ID) {
auto &update = (td_api::updateNewMessage&)obj;
for (auto &src : m_appConfig->tgSources) {
if (update.message_->chat_id_ == src.id) {
on_tg_message(update);
}
}
}
});
spdlog::info("starting Telegram authentication");
m_tg.start();
}
void NewPostFetcher::fetch(bool fetchVk, bool fetchTg, decltype(onDone) onDone, decltype(onError) onError) {
if (working) return;
working = true;
if (fetchVk) {
for (int i = 0; i < mgr->m_appConfig->vkSources.size(); ++i) {
fetcher_state &&state {};
state.sourceIndex = i;
vkState.emplace_back(state);
}
}
if (fetchTg) {
for (int i = 0; i < mgr->m_appConfig->tgSources.size(); ++i) {
fetcher_state &&state {};
state.sourceIndex = i;
tgState.emplace_back(state);
}
}
this->onDone = onDone;
this->onError = onError;
continue_fetch();
}
void NewPostFetcher::reset_state() {
if (!working) return;
for (int i = 0; i < vkState.size(); ++i) {
vkState[i] = fetcher_state();
}
for (int i = 0; i < tgState.size(); ++i) {
tgState[i] = fetcher_state();
}
}
void NewPostFetcher::continue_fetch() {
bool vkReady = true;
spdlog::info("fetch called");
for (int i = 0; i < vkState.size(); ++i) {
auto &state = vkState[i];
if (state.ready) continue;
vkReady = false;
if (state.needRequest) {
auto &wall = mgr->m_appConfig->vkSources[state.sourceIndex].id;
if (mgr->m_appState->vkRepostState[state.sourceIndex].lastLoadedPostDate != 0) {
spdlog::info("[vk:{}] fetching {} VK posts at offset {}", i, state.count, state.offset);
state.needRequest = false;
mgr->collect_vk_posts_from(wall, state.offset, state.count, [&, i](auto posts) mutable {
for (auto &p : posts) {
spdlog::debug("[vk:{}] got post dated {}", i, p.date);
}
state.offset += posts.size();
state.count = state.count * 3 / 2;
check_vk_posts(i, posts);
});
} else {
spdlog::info("fetching all VK posts");
state.needRequest = false;
mgr->collect_all_vk_posts(wall, [&, i](auto posts) mutable {
spdlog::info("[vk:{}] fetched all {} posts", i, posts.size());
if (posts.size() > 0) {
spdlog::info("[vk:{}] last loaded post date is now {}", i, posts[0].date);
mgr->m_appState->vkRepostState[state.sourceIndex].lastLoadedPostDate = posts[0].date;
}
state.ready = true;
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts, state.sourceIndex);
state.posts.reserve(state.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
state.posts.emplace_back(std::move(*i));
}
continue_fetch();
});
}
}
}
bool tgReady = true;
for (int i = 0; i < tgState.size(); ++i) {
auto &state = tgState[i];
if (state.ready) continue;
tgReady = false;
if (state.needRequest) {
long channel = mgr->m_appConfig->tgSources[state.sourceIndex].id;
if (mgr->m_appState->tgRepostState[state.sourceIndex].lastLoadedPostDate != 0) {
spdlog::info("[tg:{}] fetching {} posts starting from #{}", i, state.count, state.offset);
state.needRequest = false;
mgr->collect_tg_posts_from(channel, state.offset, state.count, [this, i, &state](auto posts) mutable {
for (auto &p : posts) {
spdlog::debug("[tg:{}] got post dated {}", i, p->date_);
}
if (posts.size() > 0) {
state.offset = posts[posts.size() - 1]->id_;
spdlog::info("[tg:{}] setting from to id {}", i, posts[posts.size() - 1]->id_);
}
check_tg_posts(i, std::move(posts));
});
state.count = state.count * 3 / 2;
} else {
state.needRequest = false;
mgr->collect_all_tg_posts(channel, [this, i, &state](auto posts) mutable {
spdlog::info("[tg:{}] fetched all {} posts", i, posts.size());
if (posts.size() > 0) {
spdlog::info("[tg:{}] last loaded post date is now {}", i, posts[0]->date_);
mgr->m_appState->tgRepostState[state.sourceIndex].lastLoadedPostDate = posts[0]->date_;
}
state.ready = true;
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts, state.sourceIndex);
state.posts.reserve(state.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
state.posts.emplace_back(std::move(*i));
}
continue_fetch();
});
}
}
}
if (tgReady && vkReady) {
std::vector<AbstractPost> merged;
int nVkLists = vkState.size();
int nTgLists = tgState.size();
int nLists = nVkLists + nTgLists;
auto addPostCount = [](int n, fetcher_state &p){
return n + p.posts.size();
};
int total =
std::accumulate(vkState.begin(), vkState.end(), 0, addPostCount)
+ std::accumulate(tgState.begin(), tgState.end(), 0, addPostCount);
merged.reserve(total + mgr->m_unprocessedTgPosts.size());
std::vector<int> indexes;
indexes.reserve(nLists);
for (int i = 0; i < nVkLists; ++i) {
indexes.push_back(vkState[i].posts.size() - 1);
}
for (int i = 0; i < nTgLists; ++i) {
indexes.push_back(tgState[i].posts.size() - 1);
}
int k = 0;
while (k < total) {
long minPostDate = std::numeric_limits<long>::max();
int minPostListIdx;
for (int i = 0; i < nVkLists; ++i) {
if (indexes[i] < 0) continue;
long d = vkState[i].posts[indexes[i]].date;
if (d < minPostDate) {
minPostDate = d;
minPostListIdx = i;
}
}
for (int i = 0; i < nTgLists; ++i) {
if (indexes[nVkLists + i] < 0) continue;
long d = tgState[i].posts[indexes[nVkLists + i]].date;
if (d < minPostDate) {
minPostDate = d;
minPostListIdx = nVkLists + i;
}
}
if (minPostListIdx < nVkLists) {
merged.emplace_back(std::move(vkState[minPostListIdx].posts[indexes[minPostListIdx]]));
} else {
merged.emplace_back(std::move(tgState[minPostListIdx - nVkLists].posts[indexes[minPostListIdx]]));
}
--indexes[minPostListIdx];
++k;
}
while (mgr->m_unprocessedTgPosts.size() > 0) {
AbstractPost tgPost = mgr->m_unprocessedTgPosts.front();
bool duplicate = false, inserted = false;
for (int i = 0; i < merged.size(); ++i) {
if (merged[i].date == tgPost.date) {
spdlog::debug("not inserting duplicate unproc tg post");
duplicate = true;
break;
} else if (merged[i].date > tgPost.date) {
spdlog::debug("inserting unproc tg post at pos {}", i);
merged.insert(merged.begin() + i, std::move(tgPost));
inserted = true;
break;
}
}
if (!duplicate && !inserted) {
spdlog::debug("appending unproc tg post");
merged.push_back(std::move(tgPost));
}
mgr->m_unprocessedTgPosts.pop();
}
working = false;
onDone(std::move(merged));
}
}
void NewPostFetcher::check_vk_posts(int index, std::vector<vk::Post> posts) {
spdlog::info("[vk:{}] fetched {} posts", index, posts.size());
auto &state = vkState[index];
auto &appState = mgr->m_appState->vkRepostState[state.sourceIndex];
long oldLastPostDate = appState.lastLoadedPostDate;
if (posts.size() > 0) {
spdlog::info("[vk:{}] last post date is now {}", index, posts[0].date);
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts, state.sourceIndex);
spdlog::info("[vk:{}] looking for date {}, have {} - {}", index, oldLastPostDate, aposts[0].date, aposts[aposts.size() - 1].date);
if (mgr->drop_posts_older_than(aposts, oldLastPostDate)) {
spdlog::info("[vk:{}] found last remembered post", index);
state.ready = true;
}
state.posts.reserve(state.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
state.posts.emplace_back(std::move(*i));
}
state.needRequest = true;
if (state.ready && !state.posts.empty()) {
spdlog::debug("[vk:{}] last loaded post date is now {}", index, state.posts[0].date);
appState.lastLoadedPostDate = state.posts[0].date;
}
} else {
state.ready = true;
}
continue_fetch();
}
void NewPostFetcher::check_tg_posts(int index, std::vector<td::tl::unique_ptr<td_api::message>> posts) {
spdlog::info("[tg:{}] fetched {} posts", index, posts.size());
auto &state = tgState[index];
auto &appState = mgr->m_appState->tgRepostState[state.sourceIndex];
long oldLastPostDate = appState.lastLoadedPostDate;
if (posts.size() > 0) {
spdlog::info("[tg:{}] last post date is now {}", index, posts[0]->date_);
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts, state.sourceIndex);
if (mgr->drop_posts_older_than(aposts, oldLastPostDate)) {
spdlog::info("[tg:{}] found last remembered post", index);
state.ready = true;
}
state.posts.reserve(state.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
state.posts.emplace_back(std::move(*i));
}
state.needRequest = true;
if (state.ready && !state.posts.empty()) {
spdlog::debug("[tg:{}] last loaded post date is now {}", index, state.posts[0].date);
appState.lastLoadedPostDate = state.posts[0].id;
}
} else {
state.ready = true;
}
continue_fetch();
}
void RepostManager::on_clients_ready() {
spdlog::info("checking all sources");
m_fetcher.fetch(
true, true,
[this](auto posts){
on_new_posts(posts);
},
[](){
// TODO error handling
spdlog::error("first post check failed");
});
}
void RepostManager::on_new_posts(std::vector<AbstractPost> posts) {
spdlog::info("collected {} new posts", posts.size());
enqueue_for_repost(posts);
if (!m_checkTimerStarted) {
spdlog::info("scheduling next check");
uv_timer_start(m_checkTimer, &RepostManager::check_timer_callback, VK_CHECK_INTERVAL, 0);
m_checkTimerStarted = true;
}
}
void RepostManager::collect_all_vk_posts(const std::variant<long, std::string> wall, std::function<void(std::vector<vk::Post>)> callback) {
collect_last_vk_posts(wall, std::numeric_limits<int>::max(), callback);
}
void RepostManager::collect_last_vk_posts(const std::variant<long, std::string> wall, int count, std::function<void(std::vector<vk::Post>)> callback) {
collect_vk_posts_from(wall, 0, count, callback);
}
void RepostManager::collect_all_tg_posts(long channel, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
collect_last_tg_posts(channel, std::numeric_limits<int>::max(), callback);
}
void RepostManager::collect_last_tg_posts(long channel, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
collect_tg_posts_from(channel, 0, count, callback);
}
void RepostManager::collect_vk_posts_from(const std::variant<long, std::string> wall, int offset, int count, std::function<void(std::vector<vk::Post>)> callback) {
spdlog::debug("COLLECT VK POSTS FROM {} {}", offset, count);
auto result = std::make_shared<std::vector<vk::Post>>();
collect_vk_posts_from__intermediate(wall, offset, count, result, callback);
}
void RepostManager::collect_vk_posts_from__intermediate(const std::variant<long, std::string> wall, int offset, int count, std::shared_ptr<std::vector<vk::Post>> intermediateResult, std::function<void(std::vector<vk::Post>)> callback) {
m_vk.get_posts(wall, offset, count, [=, this](std::optional<vk::WallChunk> chunk, int err){
if (err == 0) {
if (chunk->posts.size() == 0) {
spdlog::debug("got all posts");
callback(std::move(*intermediateResult.get()));
return;
}
int chunkSize = chunk->posts.size();
int oldSize = intermediateResult->size();
intermediateResult->reserve(oldSize + chunkSize);
for (auto i = chunk->posts.begin(), end = chunk->posts.end(); i != end; ++i) {
intermediateResult->emplace_back(std::move(*i));
}
if (count > chunkSize) {
collect_vk_posts_from__intermediate(wall, offset + chunkSize, count - chunkSize, intermediateResult, callback);
} else {
callback(std::move(*intermediateResult.get()));
}
} else {
spdlog::error("failed to get {} VK posts at offset {}: error {}", count, offset, err);
}
});
}
void RepostManager::collect_tg_posts_from(long channel, long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
spdlog::debug("collecting {} telegram posts starting from {}", count, from);
auto result = std::make_shared<std::vector<td::tl::unique_ptr<td_api::message>>>();
collect_tg_posts_from__intermediate(channel, from, count, result, callback);
}
void RepostManager::collect_tg_posts_from__intermediate(long channel, long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
spdlog::debug("getting some messages ({})", count);
m_tg.send_query(td_api::make_object<td_api::getChatHistory>(channel, from, 0, count, false), [=, this](auto obj){
if (obj->get_id() == td_api::messages::ID) {
td_api::messages &msgs = (td_api::messages&)*obj;
if (msgs.messages_.size() == 0) {
spdlog::debug("got all posts");
auto resultPtr = intermediateResult.get();
callback(std::move(*resultPtr));
return;
}
spdlog::debug("got {} posts", msgs.messages_.size());
size_t chunkSize = msgs.messages_.size();
size_t oldSize = intermediateResult->size();
long oldestId;
intermediateResult->reserve(oldSize + chunkSize);
for (auto i = msgs.messages_.begin(), end = msgs.messages_.end(); i != end; ++i) {
oldestId = (*i)->id_;
intermediateResult->emplace_back(std::move(*i));
}
if (count > chunkSize) {
collect_tg_posts_from__intermediate(channel, oldestId, count - chunkSize, intermediateResult, callback);
} else {
callback(std::move(*intermediateResult.get()));
}
} else {
auto &err = (td_api::error&)*obj;
spdlog::error("failed to get posts: {} {}", err.code_, err.message_);
}
});
}
bool RepostManager::drop_posts_older_than(std::vector<AbstractPost> &posts, long lastPostDate) {
auto idx = std::find_if(posts.begin(), posts.end(), [lastPostDate](auto &post){ return post.date <= lastPostDate; });
if (idx == posts.end()) {
return false;
} else {
posts.erase(idx, posts.end());
return true;
}
}
std::optional<AbstractPost> RepostManager::to_abstract_post(const vk::Post &post, int sourceIndex) {
return { AbstractPost(posts::SRC_VK, sourceIndex, post.id, post.date, post.text) };
}
std::optional<AbstractPost> RepostManager::to_abstract_post(const td_api::message &post, int sourceIndex) {
if (post.content_->get_id() == td_api::messageText::ID) {
auto &content = (td_api::messageText&) *post.content_;
return { AbstractPost(posts::SRC_TELEGRAM, sourceIndex, post.id_, post.date_, content.text_->text_) };
} else {
return {};
}
}
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<vk::Post> &posts, int sourceIndex) {
std::vector<AbstractPost> result;
result.reserve(posts.size());
for (auto &post : posts) {
result.emplace_back(posts::SRC_VK, sourceIndex, post.id, post.date, post.text);
}
return result;
}
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts, int sourceIndex) {
std::vector<AbstractPost> result;
result.reserve(posts.size());
for (auto &post : posts) {
// we don't want any posts other than plain text (yet)
if (post->content_->get_id() == td_api::messageText::ID) {
auto &content = (td_api::messageText&) *post->content_;
result.emplace_back(posts::SRC_TELEGRAM, sourceIndex, post->id_, post->date_, content.text_->text_);
}
}
return result;
}
// TODO timer
void RepostManager::enqueue_for_repost(std::vector<AbstractPost> posts) {
for (auto &post : posts) {
if (m_repostQueue.empty()) {
uv_timer_start(m_repostTimer, &RepostManager::repost_timer_callback, REPOST_INTERVAL, REPOST_INTERVAL);
}
m_repostQueue.push(post);
}
}
void RepostManager::repost_timer_callback(uv_timer_t *h) {
spdlog::debug("repost timer");
auto self = reinterpret_cast<RepostManager*>(h->data);
if (!self->m_repostQueue.empty()) {
self->repost(self->m_repostQueue.front());
self->m_repostQueue.pop();
}
if (self->m_repostQueue.empty()) {
spdlog::debug("repost queue empty, stopping timer");
uv_timer_stop(h);
} else {
spdlog::debug("{} posts left to repost", self->m_repostQueue.size());
}
}
void RepostManager::check_timer_callback(uv_timer_t *h) {
spdlog::debug("vk recheck timer");
auto self = reinterpret_cast<RepostManager*>(h->data);
self->recheck_vk_posts({});
}
bool RepostManager::recheck_vk_posts(std::function<void()> onDone) {
if (m_fetcher.working) {
spdlog::error("can't recheck VK posts: another check is in progress");
return false;
}
spdlog::info("checking VK posts");
auto onFetchDone = [this, onDone](std::vector<AbstractPost> &&posts){
spdlog::info("checked VK posts");
this->on_new_posts(posts);
if (onDone)
onDone();
};
auto onFetchError = [](){
spdlog::error("failed to check VK posts");
};
m_fetcher.fetch(true, false, onFetchDone, onFetchError);
return true;
}
void RepostManager::repost(AbstractPost &post) {
spdlog::debug("reposting (post length {})", post.text.length());
std::string_view signature = posts::add_signature(post, m_appConfig);
int signatureStart = post.text.length() - signature.length();
int signatureLength = signature.length();
spdlog::debug("post length {}, signature start {}, signature length {}", post.text.length(), signatureStart, signatureLength);
std::vector<td::tl::unique_ptr<td_api::textEntity>> entities;
//entities.push_back(std::move(td_api::make_object<td_api::textEntity>(signatureStart, signatureLength, td_api::make_object<td_api::textEntityTypeSpoiler>())));
auto content = td_api::make_object<td_api::inputMessageText>(td_api::make_object<td_api::formattedText>(post.text, std::move(entities)), td_api::make_object<td_api::linkPreviewOptions>(true, std::string(""), false, false, false), false);
spdlog::info("reposting to {}", m_appConfig->tgDestinationId);
m_tg.send_query(td_api::make_object<td_api::sendMessage>(m_appConfig->tgDestinationId, 0, nullptr, nullptr, nullptr, std::move(content)), [this, postDate = post.date, srcType = post.sourceType, src = post.source](auto result){
if (result->get_id() == td_api::error::ID) {
auto &err = (td_api::error&)*result;
spdlog::error("sendMessage error: {} {}", err.code_, err.message_);
uv_timer_stop(m_repostTimer);
} else {
if (srcType == posts::SRC_VK)
m_appState->vkRepostState[src].lastForwardedPostDate = postDate;
else
m_appState->tgRepostState[src].lastForwardedPostDate = postDate;
m_appState->save();
}
});
}
void RepostManager::on_tg_message(td_api::updateNewMessage &update) {
spdlog::info("received message from Telegram");
int sourceIndex = std::distance(
m_appConfig->tgSources.begin(),
std::find_if(m_appConfig->tgSources.begin(), m_appConfig->tgSources.end(),
[chatId = update.message_->chat_id_](auto &src) {
return src.id == chatId;
}
)
);
if (sourceIndex == m_appConfig->tgSources.size()) {
return;
}
std::optional<AbstractPost> post = to_abstract_post(*update.message_, sourceIndex);
if (!post) {
spdlog::debug("tg message is not a post");
return;
}
spdlog::debug("adding tg post to the unprocessed tg post queue");
m_unprocessedTgPosts.push(*post);
if (!m_fetcher.working) {
if (m_checkTimerStarted) {
uv_timer_stop(m_checkTimer);
m_checkTimerStarted = false;
}
std::vector<AbstractPost> posts = { *post };
spdlog::debug("rechecking vk posts before processing the new tg post");
recheck_vk_posts({});
}
}