complete the initial post fetching functionality
This commit is contained in:
parent
2e543e6c48
commit
81b587cef8
193
manager.cpp
193
manager.cpp
@ -41,66 +41,125 @@ void RepostManager::start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void RepostManager::on_clients_ready() {
|
void RepostManager::on_clients_ready() {
|
||||||
auto posts = std::make_shared< std::optional<std::vector<AbstractPost>>[] >(2);
|
struct new_post_fetcher {
|
||||||
auto doWorkWithPosts = [this, posts]() -> void {
|
struct fetcher_state {
|
||||||
spdlog::info("fetched some posts:");
|
bool ready = false;
|
||||||
|
bool needRequest = true;
|
||||||
int limit = 3;
|
long offset = 0, count = 3;
|
||||||
for (auto &i : *posts[0]) {
|
std::vector<AbstractPost> posts;
|
||||||
if (limit-- == 0) {
|
};
|
||||||
spdlog::info("...");
|
RepostManager *mgr;
|
||||||
break;
|
fetcher_state vkState, tgState;
|
||||||
}
|
new_post_fetcher(RepostManager *m) : mgr(m) {}
|
||||||
spdlog::info("vk[#{}, {}] {}", i.id, i.date, i.text);
|
std::function<void(std::vector<AbstractPost> &&vkPosts, std::vector<AbstractPost> &&tgPosts)> onDone;
|
||||||
}
|
std::function<void()> onError;
|
||||||
limit = 3;
|
void fetch() {
|
||||||
for (auto &i : *posts[1]) {
|
if (vkState.ready && tgState.ready) {
|
||||||
if (limit-- == 0) {
|
onDone(std::move(vkState.posts), std::move(tgState.posts));
|
||||||
spdlog::info("...");
|
return;
|
||||||
break;
|
|
||||||
}
|
|
||||||
spdlog::info("tg[#{}, {}] {}", i.id, i.date, i.text);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m_tg.send_query(td_api::make_object<td_api::getChat>(m_appConfig->tgSourceId), [](auto result){
|
if (!vkState.ready && vkState.needRequest) {
|
||||||
if (result->get_id() == td_api::chat::ID) {
|
if (mgr->m_appState->vkLastPostId != 0) {
|
||||||
auto &chat = (td_api::chat&)*result;
|
spdlog::info("fetching {} VK posts at offset {}", vkState.count, vkState.offset);
|
||||||
spdlog::info("source chat: #{} {}", chat.id_, chat.title_);
|
vkState.needRequest = false;
|
||||||
|
mgr->collect_vk_posts_from(vkState.offset, vkState.count, [this](auto posts){ check_vk_posts(posts); });
|
||||||
|
vkState.offset += vkState.count;
|
||||||
|
vkState.count = vkState.count * 3 / 2;
|
||||||
} else {
|
} else {
|
||||||
auto &e = (td_api::error&)*result;
|
spdlog::info("fetching all VK posts");
|
||||||
spdlog::error("getChat error: {} {}", e.code_, e.message_);
|
vkState.needRequest = false;
|
||||||
|
mgr->collect_all_vk_posts([this](auto posts){
|
||||||
|
spdlog::info("fetched all {} VK posts", posts.size());
|
||||||
|
vkState.ready = true;
|
||||||
|
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
||||||
|
vkState.posts.reserve(vkState.posts.size() + aposts.size());
|
||||||
|
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||||
|
vkState.posts.emplace_back(std::move(*i));
|
||||||
}
|
}
|
||||||
|
fetch();
|
||||||
});
|
});
|
||||||
};
|
|
||||||
auto putVkPosts = [this, posts, doWorkWithPosts](std::vector<vk::Post> vkPosts){
|
|
||||||
posts[0] = {to_abstract_posts(vkPosts)};
|
|
||||||
spdlog::info("fetched {} vk posts", posts[0]->size());
|
|
||||||
if (posts[1].has_value()) {
|
|
||||||
doWorkWithPosts();
|
|
||||||
}
|
}
|
||||||
};
|
|
||||||
auto putTgPosts = [this, posts, doWorkWithPosts](std::vector<td::tl::unique_ptr<td_api::message>> tgPosts){
|
|
||||||
posts[1] = {to_abstract_posts(tgPosts)};
|
|
||||||
spdlog::info("fetched {} telegram posts", posts[1]->size());
|
|
||||||
if (posts[0].has_value()) {
|
|
||||||
doWorkWithPosts();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if (m_appState->vkLastPostId == 0) {
|
|
||||||
spdlog::info("fetching ALL vk posts...");
|
|
||||||
collect_all_vk_posts(putVkPosts);
|
|
||||||
} else {
|
|
||||||
spdlog::info("fetching last 3 vk posts...");
|
|
||||||
collect_last_vk_posts(3, putVkPosts);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m_appState->tgLastPostId == 0) {
|
if (!tgState.ready && tgState.needRequest) {
|
||||||
spdlog::info("fetching ALL telegram posts...");
|
if (mgr->m_appState->tgLastPostId != 0) {
|
||||||
collect_all_tg_posts(putTgPosts);
|
spdlog::info("fetching {} TG posts starting from #{}", tgState.count, tgState.offset);
|
||||||
|
tgState.needRequest = false;
|
||||||
|
mgr->collect_tg_posts_from(tgState.offset, tgState.count, [this](auto posts){
|
||||||
|
if (posts.empty()) return;
|
||||||
|
tgState.offset += posts[posts.size() - 1]->id_;
|
||||||
|
check_tg_posts(std::move(posts));
|
||||||
|
});
|
||||||
|
tgState.count = tgState.count * 3 / 2;
|
||||||
} else {
|
} else {
|
||||||
spdlog::info("fetching last 3 telegram posts...");
|
tgState.needRequest = false;
|
||||||
collect_last_tg_posts(3, putTgPosts);
|
mgr->collect_all_tg_posts([this](auto posts){
|
||||||
|
tgState.ready = true;
|
||||||
|
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
||||||
|
tgState.posts.reserve(tgState.posts.size() + aposts.size());
|
||||||
|
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||||
|
tgState.posts.emplace_back(std::move(*i));
|
||||||
}
|
}
|
||||||
|
fetch();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void check_vk_posts(std::vector<vk::Post> posts) {
|
||||||
|
spdlog::info("fetched {} VK posts", posts.size());
|
||||||
|
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
||||||
|
if (mgr->drop_posts_older_than(aposts, mgr->m_appState->vkLastPostId)) {
|
||||||
|
spdlog::info("found last remembered VK post");
|
||||||
|
vkState.ready = true;
|
||||||
|
}
|
||||||
|
vkState.posts.reserve(vkState.posts.size() + aposts.size());
|
||||||
|
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||||
|
vkState.posts.emplace_back(std::move(*i));
|
||||||
|
}
|
||||||
|
vkState.needRequest = true;
|
||||||
|
fetch();
|
||||||
|
}
|
||||||
|
void check_tg_posts(std::vector<td::tl::unique_ptr<td_api::message>> posts) {
|
||||||
|
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
||||||
|
if (mgr->drop_posts_older_than(aposts, mgr->m_appState->tgLastPostId)) {
|
||||||
|
spdlog::info("found last remembered TG post");
|
||||||
|
tgState.ready = true;
|
||||||
|
}
|
||||||
|
tgState.posts.reserve(tgState.posts.size() + aposts.size());
|
||||||
|
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||||
|
tgState.posts.emplace_back(std::move(*i));
|
||||||
|
}
|
||||||
|
tgState.needRequest = true;
|
||||||
|
fetch();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
new_post_fetcher *f = new new_post_fetcher(this);
|
||||||
|
|
||||||
|
f->onDone = [this, f](auto vkPosts, auto tgPosts){
|
||||||
|
delete f;
|
||||||
|
spdlog::info("collected {} new vk posts and {} new tg posts", vkPosts.size(), tgPosts.size());
|
||||||
|
std::vector<AbstractPost> mergedPosts;
|
||||||
|
int totalSize = vkPosts.size() + tgPosts.size();
|
||||||
|
mergedPosts.reserve(totalSize);
|
||||||
|
int vkIdx = vkPosts.size() - 1;
|
||||||
|
int tgIdx = tgPosts.size() - 1;
|
||||||
|
for (int i = 0; i < totalSize; ++i) {
|
||||||
|
if (tgIdx < 0 || vkPosts[vkIdx].date < tgPosts[tgIdx].date) {
|
||||||
|
mergedPosts.emplace_back(std::move(vkPosts[vkIdx--]));
|
||||||
|
} else {
|
||||||
|
mergedPosts.emplace_back(std::move(tgPosts[tgIdx--]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
spdlog::info("sorted {} posts", totalSize);
|
||||||
|
repost_all(mergedPosts);
|
||||||
|
};
|
||||||
|
|
||||||
|
f->onError = [f](){
|
||||||
|
delete f;
|
||||||
|
};
|
||||||
|
|
||||||
|
f->fetch();
|
||||||
}
|
}
|
||||||
|
|
||||||
void RepostManager::collect_all_vk_posts(std::function<void(std::vector<vk::Post>)> callback) {
|
void RepostManager::collect_all_vk_posts(std::function<void(std::vector<vk::Post>)> callback) {
|
||||||
@ -120,9 +179,27 @@ void RepostManager::collect_last_tg_posts(int count, std::function<void(std::vec
|
|||||||
}
|
}
|
||||||
|
|
||||||
void RepostManager::collect_vk_posts_from(int offset, int count, std::function<void(std::vector<vk::Post>)> callback) {
|
void RepostManager::collect_vk_posts_from(int offset, int count, std::function<void(std::vector<vk::Post>)> callback) {
|
||||||
m_vk.get_posts(m_appConfig->vkSource, offset, count, [=](std::optional<vk::WallChunk> chunk, int err){
|
spdlog::debug("COLLECT VK POSTS FROM {} {}", offset, count);
|
||||||
|
auto result = std::make_shared<std::vector<vk::Post>>();
|
||||||
|
collect_vk_posts_from__intermediate(offset, count, result, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RepostManager::collect_vk_posts_from__intermediate(int offset, int count, std::shared_ptr<std::vector<vk::Post>> intermediateResult, std::function<void(std::vector<vk::Post>)> callback) {
|
||||||
|
m_vk.get_posts(m_appConfig->vkSource, offset, count, [=, this](std::optional<vk::WallChunk> chunk, int err){
|
||||||
if (err == 0) {
|
if (err == 0) {
|
||||||
callback(chunk->posts);
|
if (chunk->posts.size() == 0) {
|
||||||
|
spdlog::debug("got all posts");
|
||||||
|
callback(std::move(*intermediateResult.get()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int chunkSize = chunk->posts.size();
|
||||||
|
int oldSize = intermediateResult->size();
|
||||||
|
intermediateResult->reserve(oldSize + chunkSize);
|
||||||
|
for (auto i = chunk->posts.begin(), end = chunk->posts.end(); i != end; ++i) {
|
||||||
|
intermediateResult->emplace_back(std::move(*i));
|
||||||
|
}
|
||||||
|
if (count > chunkSize)
|
||||||
|
collect_vk_posts_from__intermediate(offset + chunkSize, count - chunkSize, intermediateResult, callback);
|
||||||
} else {
|
} else {
|
||||||
spdlog::error("failed to get {} VK posts at offset {}: error {}", count, offset, err);
|
spdlog::error("failed to get {} VK posts at offset {}: error {}", count, offset, err);
|
||||||
}
|
}
|
||||||
@ -152,10 +229,10 @@ void RepostManager::collect_tg_posts_from__intermediate(long from, int count, st
|
|||||||
long oldestId;
|
long oldestId;
|
||||||
intermediateResult->reserve(oldSize + chunkSize);
|
intermediateResult->reserve(oldSize + chunkSize);
|
||||||
for (auto i = msgs.messages_.begin(), end = msgs.messages_.end(); i != end; ++i) {
|
for (auto i = msgs.messages_.begin(), end = msgs.messages_.end(); i != end; ++i) {
|
||||||
//spdlog::debug("moving message {}", (*i)->id_);
|
|
||||||
oldestId = (*i)->id_;
|
oldestId = (*i)->id_;
|
||||||
intermediateResult->emplace_back(std::move(*i));
|
intermediateResult->emplace_back(std::move(*i));
|
||||||
}
|
}
|
||||||
|
if (count > chunkSize)
|
||||||
collect_tg_posts_from__intermediate(oldestId, count - chunkSize, intermediateResult, callback);
|
collect_tg_posts_from__intermediate(oldestId, count - chunkSize, intermediateResult, callback);
|
||||||
} else {
|
} else {
|
||||||
auto &err = (td_api::error&)*obj;
|
auto &err = (td_api::error&)*obj;
|
||||||
@ -164,6 +241,16 @@ void RepostManager::collect_tg_posts_from__intermediate(long from, int count, st
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool RepostManager::drop_posts_older_than(std::vector<AbstractPost> &posts, long lastPostId) {
|
||||||
|
auto idx = std::find_if(posts.begin(), posts.end(), [lastPostId](auto &post){ return post.id == lastPostId; });
|
||||||
|
if (idx == posts.end()) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
posts.erase(idx, posts.end());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<vk::Post> &posts) {
|
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<vk::Post> &posts) {
|
||||||
std::vector<AbstractPost> result;
|
std::vector<AbstractPost> result;
|
||||||
result.reserve(posts.size());
|
result.reserve(posts.size());
|
||||||
|
@ -33,8 +33,11 @@ namespace manager {
|
|||||||
void collect_vk_posts_from(int offset, int count, std::function<void(std::vector<vk::Post>)> callback);
|
void collect_vk_posts_from(int offset, int count, std::function<void(std::vector<vk::Post>)> callback);
|
||||||
void collect_tg_posts_from(long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
void collect_tg_posts_from(long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||||
|
|
||||||
|
void collect_vk_posts_from__intermediate(int offset, int count, std::shared_ptr<std::vector<vk::Post>> intermediateResult, std::function<void(std::vector<vk::Post>)> callback);
|
||||||
void collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
void collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||||
|
|
||||||
|
bool drop_posts_older_than(std::vector<AbstractPost> &posts, long lastPostId);
|
||||||
|
|
||||||
std::vector<AbstractPost> to_abstract_posts(std::vector<vk::Post> &posts);
|
std::vector<AbstractPost> to_abstract_posts(std::vector<vk::Post> &posts);
|
||||||
std::vector<AbstractPost> to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts);
|
std::vector<AbstractPost> to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user