rename new_post_fetcher and make it a standalone struct

This commit is contained in:
Slavasil 2024-11-22 10:20:53 +03:00
parent efd188bc8a
commit 48dca58dfc
2 changed files with 144 additions and 125 deletions

View File

@ -25,6 +25,10 @@ RepostManager::RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodePr
m_repostTimer = new uv_timer_t; m_repostTimer = new uv_timer_t;
uv_timer_init(eventLoop, m_repostTimer); uv_timer_init(eventLoop, m_repostTimer);
m_repostTimer->data = this; m_repostTimer->data = this;
m_checkTimer = new uv_timer_t;
uv_timer_init(eventLoop, m_checkTimer);
m_checkTimer->data = this;
} }
RepostManager::~RepostManager() { RepostManager::~RepostManager() {
@ -66,135 +70,123 @@ void RepostManager::start() {
m_tg.start(); m_tg.start();
} }
void NewPostFetcher::fetch() {
if (vkState.ready && tgState.ready) {
onDone(std::move(vkState.posts), std::move(tgState.posts));
return;
}
if (!vkState.ready && vkState.needRequest) {
if (mgr->m_appState->vkLastLoadedPostId != 0) {
spdlog::info("fetching {} VK posts at offset {}", vkState.count, vkState.offset);
vkState.needRequest = false;
mgr->collect_vk_posts_from(vkState.offset, vkState.count, [this](auto posts){
vkState.offset += posts.size();
vkState.count = vkState.count * 3 / 2;
check_vk_posts(posts);
});
} else {
spdlog::info("fetching all VK posts");
vkState.needRequest = false;
mgr->collect_all_vk_posts([this](auto posts){
spdlog::info("fetched all {} VK posts", posts.size());
if (posts.size() > 0) {
spdlog::info("last vk post id is now {}", posts[0].id);
mgr->m_appState->vkLastLoadedPostId = posts[0].id;
}
vkState.ready = true;
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
vkState.posts.reserve(vkState.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
vkState.posts.emplace_back(std::move(*i));
}
fetch();
});
}
}
if (!tgState.ready && tgState.needRequest) {
if (mgr->m_appState->tgLastLoadedPostId != 0) {
spdlog::info("fetching {} TG posts starting from #{}", tgState.count, tgState.offset);
tgState.needRequest = false;
mgr->collect_tg_posts_from(tgState.offset, tgState.count, [this](auto posts){
if (posts.empty()) return;
tgState.offset += posts[posts.size() - 1]->id_;
check_tg_posts(std::move(posts));
});
tgState.count = tgState.count * 3 / 2;
} else {
tgState.needRequest = false;
mgr->collect_all_tg_posts([this](auto posts){
spdlog::info("fetched all {} TG posts", posts.size());
if (posts.size() > 0) {
spdlog::info("last telegram post id is now {}", posts[0]->id_);
mgr->m_appState->tgLastLoadedPostId = posts[0]->id_;
}
tgState.ready = true;
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
tgState.posts.reserve(tgState.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
tgState.posts.emplace_back(std::move(*i));
}
fetch();
});
}
}
}
void NewPostFetcher::check_vk_posts(std::vector<vk::Post> posts) {
spdlog::info("fetched {} VK posts", posts.size());
long oldLastPostId = mgr->m_appState->vkLastLoadedPostId;
if (posts.size() > 0) {
spdlog::info("last vk post id is now {}", posts[0].id);
}
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
spdlog::info("looking for id {}, have {} - {}", oldLastPostId, aposts[0].id, aposts[aposts.size() - 1].id);
if (mgr->drop_posts_older_than(aposts, oldLastPostId)) {
spdlog::info("found last remembered VK post");
vkState.ready = true;
}
vkState.posts.reserve(vkState.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
vkState.posts.emplace_back(std::move(*i));
}
vkState.needRequest = true;
if (vkState.ready) {
spdlog::debug("last loaded vk post id is now {}", vkState.posts[0].id);
mgr->m_appState->vkLastLoadedPostId = vkState.posts[0].id;
}
fetch();
}
void NewPostFetcher::check_tg_posts(std::vector<td::tl::unique_ptr<td_api::message>> posts) {
spdlog::info("fetched {} TG posts", posts.size());
long oldLastPostId = mgr->m_appState->tgLastLoadedPostId;
if (posts.size() > 0) {
spdlog::info("last telegram post id is now {}", posts[0]->id_);
}
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
if (mgr->drop_posts_older_than(aposts, oldLastPostId)) {
spdlog::info("found last remembered TG post");
tgState.ready = true;
}
tgState.posts.reserve(tgState.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
tgState.posts.emplace_back(std::move(*i));
}
tgState.needRequest = true;
if (tgState.ready) {
spdlog::debug("last loaded tg post id is now {}", tgState.posts[0].id);
mgr->m_appState->tgLastLoadedPostId = tgState.posts[0].id;
}
fetch();
}
void RepostManager::on_clients_ready() { void RepostManager::on_clients_ready() {
m_appState->vkLastLoadedPostId = m_appState->vkLastPostId; m_appState->vkLastLoadedPostId = m_appState->vkLastPostId;
m_appState->tgLastLoadedPostId = m_appState->tgLastPostId; m_appState->tgLastLoadedPostId = m_appState->tgLastPostId;
struct new_post_fetcher { NewPostFetcher *f = new NewPostFetcher(this);
struct fetcher_state {
bool ready = false;
bool needRequest = true;
long offset = 0, count = 3;
std::vector<AbstractPost> posts;
};
RepostManager *mgr;
fetcher_state vkState, tgState;
new_post_fetcher(RepostManager *m) : mgr(m) {}
std::function<void(std::vector<AbstractPost> &&vkPosts, std::vector<AbstractPost> &&tgPosts)> onDone;
std::function<void()> onError;
void fetch() {
if (vkState.ready && tgState.ready) {
onDone(std::move(vkState.posts), std::move(tgState.posts));
return;
}
if (!vkState.ready && vkState.needRequest) {
if (mgr->m_appState->vkLastLoadedPostId != 0) {
spdlog::info("fetching {} VK posts at offset {}", vkState.count, vkState.offset);
vkState.needRequest = false;
mgr->collect_vk_posts_from(vkState.offset, vkState.count, [this](auto posts){
vkState.offset += posts.size();
vkState.count = vkState.count * 3 / 2;
check_vk_posts(posts);
});
} else {
spdlog::info("fetching all VK posts");
vkState.needRequest = false;
mgr->collect_all_vk_posts([this](auto posts){
spdlog::info("fetched all {} VK posts", posts.size());
if (posts.size() > 0) {
spdlog::info("last vk post id is now {}", posts[0].id);
mgr->m_appState->vkLastLoadedPostId = posts[0].id;
}
vkState.ready = true;
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
vkState.posts.reserve(vkState.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
vkState.posts.emplace_back(std::move(*i));
}
fetch();
});
}
}
if (!tgState.ready && tgState.needRequest) {
if (mgr->m_appState->tgLastLoadedPostId != 0) {
spdlog::info("fetching {} TG posts starting from #{}", tgState.count, tgState.offset);
tgState.needRequest = false;
mgr->collect_tg_posts_from(tgState.offset, tgState.count, [this](auto posts){
if (posts.empty()) return;
tgState.offset += posts[posts.size() - 1]->id_;
check_tg_posts(std::move(posts));
});
tgState.count = tgState.count * 3 / 2;
} else {
tgState.needRequest = false;
mgr->collect_all_tg_posts([this](auto posts){
spdlog::info("fetched all {} TG posts", posts.size());
if (posts.size() > 0) {
spdlog::info("last telegram post id is now {}", posts[0]->id_);
mgr->m_appState->tgLastLoadedPostId = posts[0]->id_;
}
tgState.ready = true;
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
tgState.posts.reserve(tgState.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
tgState.posts.emplace_back(std::move(*i));
}
fetch();
});
}
}
}
void check_vk_posts(std::vector<vk::Post> posts) {
spdlog::info("fetched {} VK posts", posts.size());
long oldLastPostId = mgr->m_appState->vkLastLoadedPostId;
if (posts.size() > 0) {
spdlog::info("last vk post id is now {}", posts[0].id);
//mgr->m_appState->vkLastLoadedPostId = posts[0].id;
}
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
spdlog::info("looking for id {}, have {} - {}", oldLastPostId, aposts[0].id, aposts[aposts.size() - 1].id);
if (mgr->drop_posts_older_than(aposts, oldLastPostId)) {
spdlog::info("found last remembered VK post");
vkState.ready = true;
}
vkState.posts.reserve(vkState.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
vkState.posts.emplace_back(std::move(*i));
}
vkState.needRequest = true;
if (vkState.ready) {
spdlog::debug("last loaded vk post id is now {}", vkState.posts[0].id);
mgr->m_appState->vkLastLoadedPostId = vkState.posts[0].id;
}
fetch();
}
void check_tg_posts(std::vector<td::tl::unique_ptr<td_api::message>> posts) {
spdlog::info("fetched {} TG posts", posts.size());
long oldLastPostId = mgr->m_appState->tgLastLoadedPostId;
if (posts.size() > 0) {
spdlog::info("last telegram post id is now {}", posts[0]->id_);
//mgr->m_appState->tgLastLoadedPostId = posts[0]->id_;
}
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
if (mgr->drop_posts_older_than(aposts, oldLastPostId)) {
spdlog::info("found last remembered TG post");
tgState.ready = true;
}
tgState.posts.reserve(tgState.posts.size() + aposts.size());
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
tgState.posts.emplace_back(std::move(*i));
}
tgState.needRequest = true;
if (tgState.ready) {
spdlog::debug("last loaded tg post id is now {}", tgState.posts[0].id);
mgr->m_appState->tgLastLoadedPostId = tgState.posts[0].id;
}
fetch();
}
};
new_post_fetcher *f = new new_post_fetcher(this);
f->onDone = [this, f](auto vkPosts, auto tgPosts){ f->onDone = [this, f](auto vkPosts, auto tgPosts){
delete f; delete f;
@ -364,6 +356,10 @@ void RepostManager::repost_timer_callback(uv_timer_t *h) {
} }
} }
void RepostManager::check_timer_callback(uv_timer_t *h) {
auto self = reinterpret_cast<RepostManager*>(h->data);
}
void RepostManager::repost(AbstractPost &post) { void RepostManager::repost(AbstractPost &post) {
spdlog::debug("reposting (post length {})", post.text.length()); spdlog::debug("reposting (post length {})", post.text.length());
std::string_view signature = posts::add_signature(post, m_appConfig); std::string_view signature = posts::add_signature(post, m_appConfig);

View File

@ -14,7 +14,29 @@ namespace manager {
using posts::AbstractPost; using posts::AbstractPost;
class RepostManager;
struct NewPostFetcher {
struct fetcher_state {
bool ready = false;
bool needRequest = true;
long offset = 0, count = 3;
std::vector<AbstractPost> posts;
};
RepostManager *mgr;
fetcher_state vkState, tgState;
std::function<void(std::vector<AbstractPost> &&vkPosts, std::vector<AbstractPost> &&tgPosts)> onDone;
std::function<void()> onError;
inline NewPostFetcher(RepostManager *m) : mgr(m) {}
void fetch();
void check_vk_posts(std::vector<vk::Post> posts);
void check_tg_posts(std::vector<td::tl::unique_ptr<td_api::message>> posts);
};
class RepostManager { class RepostManager {
friend struct NewPostFetcher;
public: public:
RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config); RepostManager(uv_loop_t *eventLoop, tg::AuthCodeProvider tgCodeProvider, tg::PasswordProvider tgPasswordProvider, state::AppState *appState, config::AppConfig *config);
RepostManager(RepostManager&) = delete; RepostManager(RepostManager&) = delete;
@ -41,6 +63,7 @@ namespace manager {
void enqueue_for_repost(std::vector<AbstractPost> posts); void enqueue_for_repost(std::vector<AbstractPost> posts);
static void repost_timer_callback(uv_timer_t *h); static void repost_timer_callback(uv_timer_t *h);
static void check_timer_callback(uv_timer_t *h);
void repost(AbstractPost &post); void repost(AbstractPost &post);
state::AppState *m_appState; state::AppState *m_appState;