implement reposting from multiple sources
This commit is contained in:
parent
86eb568c25
commit
b95b500873
108
config.cpp
108
config.cpp
@ -11,21 +11,24 @@ const char *JSON_KEY_VK_SERVICE_KEY = "vk_service_key";
|
||||
const char *JSON_KEY_TG_API_ID = "tg_api_id";
|
||||
const char *JSON_KEY_TG_API_HASH = "tg_api_hash";
|
||||
const char *JSON_KEY_TG_PHONE_NUMBER = "tg_phone_number";
|
||||
const char *JSON_KEY_VK_SOURCE = "vk_source";
|
||||
const char *JSON_KEY_TG_SOURCE_ID = "tg_source_id";
|
||||
const char *JSON_KEY_TG_DESTINATION_ID = "tg_destination_id";
|
||||
const char *JSON_KEY_VK_SOURCE_LINK = "vk_source_link";
|
||||
const char *JSON_KEY_TG_SOURCE_LINK = "tg_source_link";
|
||||
|
||||
const char *JSON_KEY_VK_SOURCES = "vk_sources";
|
||||
const char *JSON_KEY_TG_SOURCES = "tg_sources";
|
||||
const char *JSON_KEY_TG_DESTINAION = "tg_destination_id";
|
||||
const char *JSON_KEY_ID = "id";
|
||||
const char *JSON_KEY_LINK = "link";
|
||||
|
||||
const char *ERR_INVALID_TYPE_VK_SERVICE_KEY = "vk_service_key must be a string";
|
||||
const char *ERR_INVALID_TYPE_TG_API_ID = "tg_api_id must be an integer or a string";
|
||||
const char *ERR_INVALID_TYPE_TG_API_HASH = "tg_api_hash must be a string";
|
||||
const char *ERR_INVALID_TYPE_TG_PHONE_NUMBER = "tg_phone_number must be a string";
|
||||
const char *ERR_INVALID_TYPE_VK_SOURCE = "vk_source must be an integer or a string";
|
||||
const char *ERR_INVALID_TYPE_TG_SOURCE_ID = "tg_source_id must be an integer";
|
||||
const char *ERR_INVALID_TYPE_TG_DESTINATION_ID = "tg_destination_id must be an integer";
|
||||
const char *ERR_INVALID_TYPE_VK_SOURCE_LINK = "vk_source_link must be a string";
|
||||
const char *ERR_INVALID_TYPE_TG_SOURCE_LINK = "tg_source_link must be a string";
|
||||
const char *ERR_INVALID_TYPE_VK_SOURCES = "vk_sources must be an array of objects";
|
||||
const char *ERR_INVALID_TYPE_VK_SOURCE_ID = "vk_sources[].id must be an integer or a string";
|
||||
const char *ERR_INVALID_TYPE_VK_SOURCE_LINK = "vk_sources[].link must be a string";
|
||||
const char *ERR_INVALID_TYPE_TG_SOURCES = "tg_sources must be an array of objects";
|
||||
const char *ERR_INVALID_TYPE_TG_SOURCE_ID = "tg_sources[].id must be an integer";
|
||||
const char *ERR_INVALID_TYPE_TG_SOURCE_LINK = "tg_sources[].link must be a string";
|
||||
const char *ERR_INVALID_TYPE_TG_DESTINATION = "tg_destination_id must be an integer";
|
||||
|
||||
AppConfig::AppConfig(const std::string &filename) {
|
||||
std::ifstream f(filename);
|
||||
@ -74,51 +77,62 @@ AppConfig::AppConfig(const std::string &filename) {
|
||||
}
|
||||
}
|
||||
|
||||
if (config.contains(JSON_KEY_VK_SOURCE)) {
|
||||
json vkSource_ = config[JSON_KEY_VK_SOURCE];
|
||||
json::value_t t = vkSource_.type();
|
||||
if (t == json::value_t::string) {
|
||||
vkSource = vkSource_.get<std::string>();
|
||||
} else if (t == json::value_t::number_integer || t == json::value_t::number_unsigned) {
|
||||
vkSource = vkSource_.get<long>();
|
||||
} else {
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_VK_SOURCE);
|
||||
}
|
||||
}
|
||||
|
||||
if (config.contains(JSON_KEY_TG_SOURCE_ID)) {
|
||||
json tgSourceId_ = config[JSON_KEY_TG_SOURCE_ID];
|
||||
if (tgSourceId_.type() == json::value_t::number_integer || tgSourceId_.type() == json::value_t::number_unsigned) {
|
||||
tgSourceId = tgSourceId_;
|
||||
} else {
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_TG_SOURCE_ID);
|
||||
}
|
||||
}
|
||||
|
||||
if (config.contains(JSON_KEY_TG_DESTINATION_ID)) {
|
||||
json tgDestinationId_ = config[JSON_KEY_TG_DESTINATION_ID];
|
||||
if (tgDestinationId_.type() == json::value_t::number_integer || tgDestinationId_.type() == json::value_t::number_unsigned) {
|
||||
if (config.contains(JSON_KEY_TG_DESTINAION)) {
|
||||
json tgDestinationId_ = config[JSON_KEY_TG_DESTINAION];
|
||||
if (tgDestinationId_.type() == json::value_t::number_integer ||
|
||||
tgDestinationId_.type() == json::value_t::number_unsigned) {
|
||||
tgDestinationId = tgDestinationId_;
|
||||
} else {
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_TG_DESTINATION_ID);
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_TG_DESTINATION);
|
||||
}
|
||||
}
|
||||
|
||||
if (config.contains(JSON_KEY_VK_SOURCE_LINK)) {
|
||||
json vkSourceLink_ = config[JSON_KEY_VK_SOURCE_LINK];
|
||||
if (vkSourceLink_.type() == json::value_t::string) {
|
||||
vkSourceLink = vkSourceLink_;
|
||||
} else {
|
||||
spdlog::warn("config: {}", ERR_INVALID_TYPE_VK_SOURCE_LINK);
|
||||
json vkSources_ = config[JSON_KEY_VK_SOURCES];
|
||||
if (vkSources_.type() == json::value_t::array) {
|
||||
for (auto &src_ : vkSources_) {
|
||||
if (src_.type() != json::value_t::object) {
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_VK_SOURCES);
|
||||
}
|
||||
VkSourceConfig &&src { };
|
||||
if (src_.contains(JSON_KEY_LINK)) {
|
||||
src.link = src_[JSON_KEY_LINK];
|
||||
}
|
||||
if (!src_.contains(JSON_KEY_ID) ||
|
||||
src_[JSON_KEY_ID].type() != json::value_t::string &&
|
||||
src_[JSON_KEY_ID].type() != json::value_t::number_integer &&
|
||||
src_[JSON_KEY_ID].type() != json::value_t::number_unsigned) {
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_VK_SOURCE_ID);
|
||||
}
|
||||
json id_ = src_[JSON_KEY_ID];
|
||||
if (id_.type() == json::value_t::string) {
|
||||
src.id = id_.get<std::string>();
|
||||
} else {
|
||||
src.id = id_.get<long>();
|
||||
}
|
||||
vkSources.emplace_back(src);
|
||||
}
|
||||
} else {
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_VK_SOURCES);
|
||||
}
|
||||
|
||||
if (config.contains(JSON_KEY_TG_SOURCE_LINK)) {
|
||||
json tgSourceLink_ = config[JSON_KEY_TG_SOURCE_LINK];
|
||||
if (tgSourceLink_.type() == json::value_t::string) {
|
||||
tgSourceLink = tgSourceLink_;
|
||||
} else {
|
||||
spdlog::warn("config: {}", ERR_INVALID_TYPE_TG_SOURCE_LINK);
|
||||
json tgSources_ = config[JSON_KEY_TG_SOURCES];
|
||||
if (tgSources_.type() == json::value_t::array) {
|
||||
for (auto &src_ : tgSources_) {
|
||||
if (src_.type() != json::value_t::object) {
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_VK_SOURCES);
|
||||
}
|
||||
if (!src_.contains(JSON_KEY_ID) ||
|
||||
src_[JSON_KEY_ID].type() != json::value_t::number_integer &&
|
||||
src_[JSON_KEY_ID].type() != json::value_t::number_unsigned) {
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_VK_SOURCE_ID);
|
||||
}
|
||||
TgSourceConfig &&src { .id = src_[JSON_KEY_ID]};
|
||||
if (src_.contains(JSON_KEY_LINK)) {
|
||||
src.link = src_[JSON_KEY_LINK];
|
||||
}
|
||||
tgSources.emplace_back(src);
|
||||
}
|
||||
} else {
|
||||
throw InvalidConfigException(ERR_INVALID_TYPE_TG_SOURCES);
|
||||
}
|
||||
}
|
19
config.h
19
config.h
@ -3,6 +3,7 @@
|
||||
#include <exception>
|
||||
#include <string>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
namespace config {
|
||||
struct InvalidConfigException : std::exception {
|
||||
@ -10,6 +11,16 @@ namespace config {
|
||||
const char *message;
|
||||
};
|
||||
|
||||
struct VkSourceConfig {
|
||||
std::variant<long, std::string> id;
|
||||
std::string link;
|
||||
};
|
||||
|
||||
struct TgSourceConfig {
|
||||
long id;
|
||||
std::string link;
|
||||
};
|
||||
|
||||
struct AppConfig {
|
||||
AppConfig(const std::string &filename);
|
||||
|
||||
@ -17,8 +28,10 @@ namespace config {
|
||||
long tgApiId;
|
||||
std::string tgApiHash;
|
||||
std::string tgPhoneNumber;
|
||||
std::variant<long, std::string> vkSource;
|
||||
std::string vkSourceLink, tgSourceLink;
|
||||
long tgSourceId, tgDestinationId;
|
||||
|
||||
std::vector<VkSourceConfig> vkSources;
|
||||
std::vector<TgSourceConfig> tgSources;
|
||||
|
||||
long tgDestinationId;
|
||||
};
|
||||
}
|
17
main.cpp
17
main.cpp
@ -25,24 +25,23 @@ int main() {
|
||||
|
||||
uv_signal_t signalHandles[2] = {};
|
||||
create_signal_handles(loop, signalHandles);
|
||||
|
||||
config::AppConfig config("bridge_config.json");
|
||||
if (config.tgApiId == 0 || config.tgApiHash.empty() || config.tgPhoneNumber.empty()
|
||||
|| config.vkServiceKey.empty() || config.tgDestinationId == 0) {
|
||||
spdlog::error("incomplete config file");
|
||||
return 2;
|
||||
}
|
||||
|
||||
state::AppState state;
|
||||
try {
|
||||
state = state::AppState("bridge_state.json");
|
||||
state = state::AppState("bridge_state.json", &config);
|
||||
spdlog::info("state: {}", state.to_string());
|
||||
} catch (state::InvalidSavedStateException &e) {
|
||||
spdlog::error("invalid saved state: {}", e.message);
|
||||
return 1;
|
||||
}
|
||||
|
||||
config::AppConfig config("bridge_config.json");
|
||||
if (config.tgApiId == 0 || config.tgApiHash.empty() || config.tgPhoneNumber.empty()
|
||||
|| config.vkServiceKey.empty() || (config.vkSource.index() == 0 && std::get<long>(config.vkSource) == 0)
|
||||
|| config.tgSourceId == 0 || config.tgDestinationId == 0) {
|
||||
spdlog::error("incomplete config file");
|
||||
return 2;
|
||||
}
|
||||
|
||||
std::function<void(std::function<void(std::string)>)> tgAuthCodeProvider = [](auto set_code){
|
||||
spdlog::warn("/!\\ hanging event loop to request code");
|
||||
std::string code;
|
||||
|
387
manager.cpp
387
manager.cpp
@ -8,6 +8,7 @@
|
||||
#include <algorithm>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <numeric>
|
||||
#include <optional>
|
||||
|
||||
using namespace manager;
|
||||
@ -44,7 +45,7 @@ RepostManager::~RepostManager() {
|
||||
}
|
||||
|
||||
void RepostManager::load_more_telegram_chats() {
|
||||
m_tg.send_query(td_api::make_object<td_api::loadChats>(td_api::make_object<td_api::chatListMain>(), 1000), [this](auto result){
|
||||
m_tg.send_query(td_api::make_object<td_api::loadChats>(td_api::make_object<td_api::chatListMain>(), 100000), [this](auto result){
|
||||
if (result->get_id() == td_api::ok::ID) {
|
||||
spdlog::info("loading more Telegram chats...");
|
||||
load_more_telegram_chats();
|
||||
@ -56,6 +57,7 @@ void RepostManager::load_more_telegram_chats() {
|
||||
}
|
||||
|
||||
void RepostManager::start() {
|
||||
m_nRequiredChats = 100500;//m_appConfig->tgSources.size() + 1;
|
||||
m_tg.add_update_handler([this](void*, td_api::Object &obj){
|
||||
if (obj.get_id() == td_api::updateAuthorizationState::ID) {
|
||||
auto &authState = (td_api::updateAuthorizationState&)obj;
|
||||
@ -65,14 +67,26 @@ void RepostManager::start() {
|
||||
}
|
||||
} else if (obj.get_id() == td_api::updateNewChat::ID) {
|
||||
auto &update = (td_api::updateNewChat&)obj;
|
||||
if (update.chat_->id_ == m_appConfig->tgDestinationId)
|
||||
spdlog::info("destination chat loaded");
|
||||
if (update.chat_->id_ == m_appConfig->tgSourceId)
|
||||
spdlog::info("source chat loaded");
|
||||
if (update.chat_->id_ == m_appConfig->tgDestinationId) {
|
||||
++m_nLoadedRequiredChats;
|
||||
spdlog::info("destination chat {} loaded (loaded {}/{} chats)", m_appConfig->tgDestinationId, m_nLoadedRequiredChats, m_nRequiredChats);
|
||||
}
|
||||
for (auto &src : m_appConfig->tgSources) {
|
||||
if (update.chat_->id_ == src.id) {
|
||||
++m_nLoadedRequiredChats;
|
||||
spdlog::info("source chat {} loaded (loaded {}/{} chats)", src.id, m_nLoadedRequiredChats, m_nRequiredChats);
|
||||
}
|
||||
}
|
||||
if (m_nLoadedRequiredChats >= m_nRequiredChats) {
|
||||
spdlog::info("all chats loaded");
|
||||
//on_clients_ready();
|
||||
}
|
||||
} else if (obj.get_id() == td_api::updateNewMessage::ID) {
|
||||
auto &update = (td_api::updateNewMessage&)obj;
|
||||
if (update.message_->chat_id_ == m_appConfig->tgSourceId) {
|
||||
on_tg_message(update);
|
||||
for (auto &src : m_appConfig->tgSources) {
|
||||
if (update.message_->chat_id_ == src.id) {
|
||||
on_tg_message(update);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@ -81,135 +95,212 @@ void RepostManager::start() {
|
||||
}
|
||||
|
||||
NewPostFetcher::NewPostFetcher(RepostManager *m, bool fetchVk, bool fetchTg) : mgr(m) {
|
||||
if (!fetchVk) vkState.ready = true;
|
||||
if (!fetchTg) tgState.ready = true;
|
||||
if (fetchVk) {
|
||||
for (int i = 0; i < m->m_appConfig->vkSources.size(); ++i) {
|
||||
fetcher_state &&state {};
|
||||
state.sourceIndex = i;
|
||||
vkState.emplace_back(state);
|
||||
}
|
||||
}
|
||||
if (fetchTg) {
|
||||
for (int i = 0; i < m->m_appConfig->tgSources.size(); ++i) {
|
||||
fetcher_state &&state {};
|
||||
state.sourceIndex = i;
|
||||
tgState.emplace_back(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void NewPostFetcher::fetch() {
|
||||
if (vkState.ready && tgState.ready) {
|
||||
onDone(std::move(vkState.posts), std::move(tgState.posts));
|
||||
return;
|
||||
}
|
||||
|
||||
if (!vkState.ready && vkState.needRequest) {
|
||||
if (mgr->m_appState->vkLastLoadedPostId != 0) {
|
||||
spdlog::info("fetching {} VK posts at offset {}", vkState.count, vkState.offset);
|
||||
vkState.needRequest = false;
|
||||
mgr->collect_vk_posts_from(vkState.offset, vkState.count, [this](auto posts){
|
||||
vkState.offset += posts.size();
|
||||
vkState.count = vkState.count * 3 / 2;
|
||||
check_vk_posts(posts);
|
||||
});
|
||||
} else {
|
||||
spdlog::info("fetching all VK posts");
|
||||
vkState.needRequest = false;
|
||||
mgr->collect_all_vk_posts([this](auto posts){
|
||||
spdlog::info("fetched all {} VK posts", posts.size());
|
||||
if (posts.size() > 0) {
|
||||
spdlog::info("last vk post id is now {}", posts[0].id);
|
||||
mgr->m_appState->vkLastLoadedPostId = posts[0].id;
|
||||
mgr->m_appState->save();
|
||||
}
|
||||
vkState.ready = true;
|
||||
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
||||
vkState.posts.reserve(vkState.posts.size() + aposts.size());
|
||||
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||
vkState.posts.emplace_back(std::move(*i));
|
||||
}
|
||||
fetch();
|
||||
});
|
||||
bool vkReady = true;
|
||||
spdlog::info("fetch called");
|
||||
for (int i = 0; i < vkState.size(); ++i) {
|
||||
auto &state = vkState[i];
|
||||
if (state.ready) continue;
|
||||
vkReady = false;
|
||||
if (state.needRequest) {
|
||||
auto &wall = mgr->m_appConfig->vkSources[state.sourceIndex].id;
|
||||
if (mgr->m_appState->vkRepostState[state.sourceIndex].lastLoadedPostId != 0) {
|
||||
spdlog::info("[vk:{}] fetching {} VK posts at offset {}", i, state.count, state.offset);
|
||||
state.needRequest = false;
|
||||
mgr->collect_vk_posts_from(wall, state.offset, state.count, [&, i](auto posts) mutable {
|
||||
state.offset += posts.size();
|
||||
state.count = state.count * 3 / 2;
|
||||
check_vk_posts(i, posts);
|
||||
});
|
||||
} else {
|
||||
spdlog::info("fetching all VK posts");
|
||||
state.needRequest = false;
|
||||
mgr->collect_all_vk_posts(wall, [&, i](auto posts) mutable {
|
||||
spdlog::info("[vk:{}] fetched all {} posts", i, posts.size());
|
||||
if (posts.size() > 0) {
|
||||
spdlog::info("[vk:{}] last loaded post id is now {}", i, posts[0].id);
|
||||
mgr->m_appState->vkRepostState[state.sourceIndex].lastLoadedPostId = posts[0].id;
|
||||
}
|
||||
state.ready = true;
|
||||
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts, state.sourceIndex);
|
||||
state.posts.reserve(state.posts.size() + aposts.size());
|
||||
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||
state.posts.emplace_back(std::move(*i));
|
||||
}
|
||||
fetch();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!tgState.ready && tgState.needRequest) {
|
||||
if (mgr->m_appState->tgLastLoadedPostId != 0) {
|
||||
spdlog::info("fetching {} TG posts starting from #{}", tgState.count, tgState.offset);
|
||||
tgState.needRequest = false;
|
||||
mgr->collect_tg_posts_from(tgState.offset, tgState.count, [this](auto posts){
|
||||
if (posts.empty()) return;
|
||||
tgState.offset += posts[posts.size() - 1]->id_;
|
||||
check_tg_posts(std::move(posts));
|
||||
});
|
||||
tgState.count = tgState.count * 3 / 2;
|
||||
} else {
|
||||
tgState.needRequest = false;
|
||||
mgr->collect_all_tg_posts([this](auto posts){
|
||||
spdlog::info("fetched all {} TG posts", posts.size());
|
||||
if (posts.size() > 0) {
|
||||
spdlog::info("last telegram post id is now {}", posts[0]->id_);
|
||||
mgr->m_appState->tgLastLoadedPostId = posts[0]->id_;
|
||||
mgr->m_appState->save();
|
||||
}
|
||||
tgState.ready = true;
|
||||
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
||||
tgState.posts.reserve(tgState.posts.size() + aposts.size());
|
||||
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||
tgState.posts.emplace_back(std::move(*i));
|
||||
}
|
||||
fetch();
|
||||
});
|
||||
bool tgReady = true;
|
||||
for (int i = 0; i < tgState.size(); ++i) {
|
||||
auto &state = tgState[i];
|
||||
if (state.ready) continue;
|
||||
tgReady = false;
|
||||
if (state.needRequest) {
|
||||
long channel = mgr->m_appConfig->tgSources[state.sourceIndex].id;
|
||||
if (mgr->m_appState->tgRepostState[state.sourceIndex].lastLoadedPostId != 0) {
|
||||
spdlog::info("[tg:{}] fetching {} posts starting from #{}", i, state.count, state.offset);
|
||||
state.needRequest = false;
|
||||
mgr->collect_tg_posts_from(channel, state.offset, state.count, [this, i, state](auto posts) mutable {
|
||||
state.offset = posts[posts.size() - 1]->id_;
|
||||
check_tg_posts(i, std::move(posts));
|
||||
});
|
||||
state.count = state.count * 3 / 2;
|
||||
} else {
|
||||
state.needRequest = false;
|
||||
mgr->collect_all_tg_posts(channel, [this, i, &state](auto posts) mutable {
|
||||
spdlog::info("[tg:{}] fetched all {} posts", i, posts.size());
|
||||
if (posts.size() > 0) {
|
||||
spdlog::info("[tg:{}] last loaded post id is now {}", i, posts[0]->id_);
|
||||
mgr->m_appState->tgRepostState[state.sourceIndex].lastLoadedPostId = posts[0]->id_;
|
||||
}
|
||||
state.ready = true;
|
||||
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts, state.sourceIndex);
|
||||
state.posts.reserve(state.posts.size() + aposts.size());
|
||||
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||
state.posts.emplace_back(std::move(*i));
|
||||
}
|
||||
fetch();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (tgReady && vkReady) {
|
||||
std::vector<AbstractPost> merged;
|
||||
int nVkLists = vkState.size();
|
||||
int nTgLists = tgState.size();
|
||||
int nLists = nVkLists + nTgLists;
|
||||
auto addPostCount = [](int n, fetcher_state &p){
|
||||
return n + p.posts.size();
|
||||
};
|
||||
int total =
|
||||
std::accumulate(vkState.begin(), vkState.end(), 0, addPostCount)
|
||||
+ std::accumulate(tgState.begin(), tgState.end(), 0, addPostCount);
|
||||
merged.reserve(total);
|
||||
|
||||
std::vector<int> indexes;
|
||||
indexes.reserve(nLists);
|
||||
for (int i = 0; i < nVkLists; ++i) {
|
||||
indexes.push_back(vkState[i].posts.size() - 1);
|
||||
}
|
||||
for (int i = 0; i < nTgLists; ++i) {
|
||||
indexes.push_back(tgState[i].posts.size() - 1);
|
||||
}
|
||||
int k = 0;
|
||||
while (k < total) {
|
||||
long minPostDate = std::numeric_limits<long>::max();
|
||||
int minPostListIdx;
|
||||
for (int i = 0; i < nVkLists; ++i) {
|
||||
if (indexes[i] < 0) continue;
|
||||
long d = vkState[i].posts[indexes[i]].date;
|
||||
if (d < minPostDate) {
|
||||
minPostDate = d;
|
||||
minPostListIdx = i;
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < nTgLists; ++i) {
|
||||
if (indexes[nVkLists + i] < 0) continue;
|
||||
long d = tgState[i].posts[indexes[nVkLists + i]].date;
|
||||
if (d < minPostDate) {
|
||||
minPostDate = d;
|
||||
minPostListIdx = nVkLists + i;
|
||||
}
|
||||
}
|
||||
if (minPostListIdx < nVkLists) {
|
||||
merged.emplace_back(std::move(vkState[minPostListIdx].posts[indexes[minPostListIdx]]));
|
||||
} else {
|
||||
merged.emplace_back(std::move(tgState[minPostListIdx - nVkLists].posts[indexes[minPostListIdx]]));
|
||||
}
|
||||
--indexes[minPostListIdx];
|
||||
++k;
|
||||
}
|
||||
onDone(std::move(merged));
|
||||
}
|
||||
}
|
||||
|
||||
void NewPostFetcher::check_vk_posts(std::vector<vk::Post> posts) {
|
||||
spdlog::info("fetched {} VK posts", posts.size());
|
||||
long oldLastPostId = mgr->m_appState->vkLastLoadedPostId;
|
||||
void NewPostFetcher::check_vk_posts(int index, std::vector<vk::Post> posts) {
|
||||
spdlog::info("[vk:{}] fetched {} posts", index, posts.size());
|
||||
auto &state = vkState[index];
|
||||
auto &appState = mgr->m_appState->vkRepostState[state.sourceIndex];
|
||||
long oldLastPostId = appState.lastLoadedPostId;
|
||||
if (posts.size() > 0) {
|
||||
spdlog::info("last vk post id is now {}", posts[0].id);
|
||||
spdlog::info("[vk:{}] last post id is now {}", index, posts[0].id);
|
||||
}
|
||||
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
||||
spdlog::info("looking for id {}, have {} - {}", oldLastPostId, aposts[0].id, aposts[aposts.size() - 1].id);
|
||||
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts, state.sourceIndex);
|
||||
spdlog::info("[vk:{}] looking for id {}, have {} - {}", index, oldLastPostId, aposts[0].id, aposts[aposts.size() - 1].id);
|
||||
if (mgr->drop_posts_older_than(aposts, oldLastPostId)) {
|
||||
spdlog::info("found last remembered VK post");
|
||||
vkState.ready = true;
|
||||
spdlog::info("[vk:{}] found last remembered post", index);
|
||||
state.ready = true;
|
||||
}
|
||||
vkState.posts.reserve(vkState.posts.size() + aposts.size());
|
||||
state.posts.reserve(state.posts.size() + aposts.size());
|
||||
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||
vkState.posts.emplace_back(std::move(*i));
|
||||
state.posts.emplace_back(std::move(*i));
|
||||
}
|
||||
vkState.needRequest = true;
|
||||
if (vkState.ready && !vkState.posts.empty()) {
|
||||
spdlog::debug("last loaded vk post id is now {}", vkState.posts[0].id);
|
||||
mgr->m_appState->vkLastLoadedPostId = vkState.posts[0].id;
|
||||
mgr->m_appState->save();
|
||||
state.needRequest = true;
|
||||
if (state.ready && !state.posts.empty()) {
|
||||
spdlog::debug("[vk:{}] last loaded post id is now {}", index, state.posts[0].id);
|
||||
appState.lastLoadedPostId = state.posts[0].id;
|
||||
}
|
||||
fetch();
|
||||
}
|
||||
|
||||
void NewPostFetcher::check_tg_posts(std::vector<td::tl::unique_ptr<td_api::message>> posts) {
|
||||
spdlog::info("fetched {} TG posts", posts.size());
|
||||
long oldLastPostId = mgr->m_appState->tgLastLoadedPostId;
|
||||
void NewPostFetcher::check_tg_posts(int index, std::vector<td::tl::unique_ptr<td_api::message>> posts) {
|
||||
spdlog::info("[tg:{}] fetched {} posts", index, posts.size());
|
||||
auto &state = tgState[index];
|
||||
auto &appState = mgr->m_appState->tgRepostState[state.sourceIndex];
|
||||
long oldLastPostId = appState.lastLoadedPostId;
|
||||
if (posts.size() > 0) {
|
||||
spdlog::info("last telegram post id is now {}", posts[0]->id_);
|
||||
spdlog::info("[tg:{}] last post id is now {}", index, posts[0]->id_);
|
||||
}
|
||||
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts);
|
||||
std::vector<AbstractPost> aposts = mgr->to_abstract_posts(posts, state.sourceIndex);
|
||||
if (mgr->drop_posts_older_than(aposts, oldLastPostId)) {
|
||||
spdlog::info("found last remembered TG post");
|
||||
tgState.ready = true;
|
||||
spdlog::info("[tg:{}] found last remembered post", index);
|
||||
state.ready = true;
|
||||
}
|
||||
tgState.posts.reserve(tgState.posts.size() + aposts.size());
|
||||
state.posts.reserve(state.posts.size() + aposts.size());
|
||||
for (auto i = aposts.begin(), e = aposts.end(); i != e; ++i) {
|
||||
tgState.posts.emplace_back(std::move(*i));
|
||||
state.posts.emplace_back(std::move(*i));
|
||||
}
|
||||
tgState.needRequest = true;
|
||||
if (tgState.ready && !tgState.posts.empty()) {
|
||||
spdlog::debug("last loaded tg post id is now {}", tgState.posts[0].id);
|
||||
mgr->m_appState->tgLastLoadedPostId = tgState.posts[0].id;
|
||||
mgr->m_appState->save();
|
||||
state.needRequest = true;
|
||||
if (state.ready && !state.posts.empty()) {
|
||||
spdlog::debug("[tg:{}] last loaded post id is now {}", index, state.posts[0].id);
|
||||
appState.lastLoadedPostId = state.posts[0].id;
|
||||
}
|
||||
fetch();
|
||||
}
|
||||
|
||||
void RepostManager::on_clients_ready() {
|
||||
m_appState->vkLastLoadedPostId = m_appState->vkLastPostId;
|
||||
m_appState->tgLastLoadedPostId = m_appState->tgLastPostId;
|
||||
for (auto &appState : m_appState->vkRepostState) {
|
||||
appState.lastLoadedPostId = appState.lastForwardedPostId;
|
||||
}
|
||||
for (auto &appState : m_appState->tgRepostState) {
|
||||
appState.lastLoadedPostId = appState.lastForwardedPostId;
|
||||
}
|
||||
|
||||
NewPostFetcher *f = new NewPostFetcher(this, true, true);
|
||||
|
||||
f->onDone = [this, f](auto vkPosts, auto tgPosts){
|
||||
f->onDone = [this, f](auto posts){
|
||||
delete f;
|
||||
on_new_posts(vkPosts, tgPosts);
|
||||
on_new_posts(posts);
|
||||
};
|
||||
|
||||
f->onError = [f](){
|
||||
@ -219,51 +310,38 @@ void RepostManager::on_clients_ready() {
|
||||
f->fetch();
|
||||
}
|
||||
|
||||
void RepostManager::on_new_posts(std::vector<AbstractPost> vkPosts, std::vector<AbstractPost> tgPosts) {
|
||||
spdlog::info("collected {} new vk posts and {} new tg posts", vkPosts.size(), tgPosts.size());
|
||||
std::vector<AbstractPost> mergedPosts;
|
||||
int totalSize = vkPosts.size() + tgPosts.size();
|
||||
mergedPosts.reserve(totalSize);
|
||||
int vkIdx = vkPosts.size() - 1;
|
||||
int tgIdx = tgPosts.size() - 1;
|
||||
for (int i = 0; i < totalSize; ++i) {
|
||||
if (tgIdx < 0 || vkIdx >= 0 && vkPosts[vkIdx].date < tgPosts[tgIdx].date) {
|
||||
mergedPosts.emplace_back(std::move(vkPosts[vkIdx--]));
|
||||
} else {
|
||||
mergedPosts.emplace_back(std::move(tgPosts[tgIdx--]));
|
||||
}
|
||||
}
|
||||
spdlog::info("sorted {} posts", totalSize);
|
||||
enqueue_for_repost(mergedPosts);
|
||||
void RepostManager::on_new_posts(std::vector<AbstractPost> posts) {
|
||||
spdlog::info("collected {} new posts", posts.size());
|
||||
enqueue_for_repost(posts);
|
||||
|
||||
spdlog::info("scheduling next check");
|
||||
uv_timer_start(m_checkTimer, &RepostManager::check_timer_callback, VK_CHECK_INTERVAL, 0);
|
||||
}
|
||||
|
||||
void RepostManager::collect_all_vk_posts(std::function<void(std::vector<vk::Post>)> callback) {
|
||||
collect_last_vk_posts(std::numeric_limits<int>::max(), callback);
|
||||
void RepostManager::collect_all_vk_posts(const std::variant<long, std::string> wall, std::function<void(std::vector<vk::Post>)> callback) {
|
||||
collect_last_vk_posts(wall, std::numeric_limits<int>::max(), callback);
|
||||
}
|
||||
|
||||
void RepostManager::collect_last_vk_posts(int count, std::function<void(std::vector<vk::Post>)> callback) {
|
||||
collect_vk_posts_from(0, count, callback);
|
||||
void RepostManager::collect_last_vk_posts(const std::variant<long, std::string> wall, int count, std::function<void(std::vector<vk::Post>)> callback) {
|
||||
collect_vk_posts_from(wall, 0, count, callback);
|
||||
}
|
||||
|
||||
void RepostManager::collect_all_tg_posts(std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
||||
collect_last_tg_posts(std::numeric_limits<int>::max(), callback);
|
||||
void RepostManager::collect_all_tg_posts(long channel, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
||||
collect_last_tg_posts(channel, std::numeric_limits<int>::max(), callback);
|
||||
}
|
||||
|
||||
void RepostManager::collect_last_tg_posts(int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
||||
collect_tg_posts_from(0, count, callback);
|
||||
void RepostManager::collect_last_tg_posts(long channel, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
||||
collect_tg_posts_from(channel, 0, count, callback);
|
||||
}
|
||||
|
||||
void RepostManager::collect_vk_posts_from(int offset, int count, std::function<void(std::vector<vk::Post>)> callback) {
|
||||
void RepostManager::collect_vk_posts_from(const std::variant<long, std::string> wall, int offset, int count, std::function<void(std::vector<vk::Post>)> callback) {
|
||||
spdlog::debug("COLLECT VK POSTS FROM {} {}", offset, count);
|
||||
auto result = std::make_shared<std::vector<vk::Post>>();
|
||||
collect_vk_posts_from__intermediate(offset, count, result, callback);
|
||||
collect_vk_posts_from__intermediate(wall, offset, count, result, callback);
|
||||
}
|
||||
|
||||
void RepostManager::collect_vk_posts_from__intermediate(int offset, int count, std::shared_ptr<std::vector<vk::Post>> intermediateResult, std::function<void(std::vector<vk::Post>)> callback) {
|
||||
m_vk.get_posts(m_appConfig->vkSource, offset, count, [=, this](std::optional<vk::WallChunk> chunk, int err){
|
||||
void RepostManager::collect_vk_posts_from__intermediate(const std::variant<long, std::string> wall, int offset, int count, std::shared_ptr<std::vector<vk::Post>> intermediateResult, std::function<void(std::vector<vk::Post>)> callback) {
|
||||
m_vk.get_posts(wall, offset, count, [=, this](std::optional<vk::WallChunk> chunk, int err){
|
||||
if (err == 0) {
|
||||
if (chunk->posts.size() == 0) {
|
||||
spdlog::debug("got all posts");
|
||||
@ -277,7 +355,7 @@ void RepostManager::collect_vk_posts_from__intermediate(int offset, int count, s
|
||||
intermediateResult->emplace_back(std::move(*i));
|
||||
}
|
||||
if (count > chunkSize) {
|
||||
collect_vk_posts_from__intermediate(offset + chunkSize, count - chunkSize, intermediateResult, callback);
|
||||
collect_vk_posts_from__intermediate(wall, offset + chunkSize, count - chunkSize, intermediateResult, callback);
|
||||
} else {
|
||||
callback(std::move(*intermediateResult.get()));
|
||||
}
|
||||
@ -287,15 +365,15 @@ void RepostManager::collect_vk_posts_from__intermediate(int offset, int count, s
|
||||
});
|
||||
}
|
||||
|
||||
void RepostManager::collect_tg_posts_from(long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
||||
void RepostManager::collect_tg_posts_from(long channel, long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
||||
spdlog::debug("collecting {} telegram posts starting from {}", count, from);
|
||||
auto result = std::make_shared<std::vector<td::tl::unique_ptr<td_api::message>>>();
|
||||
collect_tg_posts_from__intermediate(from, count, result, callback);
|
||||
collect_tg_posts_from__intermediate(channel, from, count, result, callback);
|
||||
}
|
||||
|
||||
void RepostManager::collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
||||
void RepostManager::collect_tg_posts_from__intermediate(long channel, long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback) {
|
||||
spdlog::debug("getting some messages ({})", count);
|
||||
m_tg.send_query(td_api::make_object<td_api::getChatHistory>(m_appConfig->tgSourceId, from, 0, count, false), [=, this](auto obj){
|
||||
m_tg.send_query(td_api::make_object<td_api::getChatHistory>(channel, from, 0, count, false), [=, this](auto obj){
|
||||
if (obj->get_id() == td_api::messages::ID) {
|
||||
td_api::messages &msgs = (td_api::messages&)*obj;
|
||||
if (msgs.messages_.size() == 0) {
|
||||
@ -314,7 +392,7 @@ void RepostManager::collect_tg_posts_from__intermediate(long from, int count, st
|
||||
intermediateResult->emplace_back(std::move(*i));
|
||||
}
|
||||
if (count > chunkSize) {
|
||||
collect_tg_posts_from__intermediate(oldestId, count - chunkSize, intermediateResult, callback);
|
||||
collect_tg_posts_from__intermediate(channel, oldestId, count - chunkSize, intermediateResult, callback);
|
||||
} else {
|
||||
callback(std::move(*intermediateResult.get()));
|
||||
}
|
||||
@ -335,23 +413,23 @@ bool RepostManager::drop_posts_older_than(std::vector<AbstractPost> &posts, long
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<vk::Post> &posts) {
|
||||
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<vk::Post> &posts, int sourceIndex) {
|
||||
std::vector<AbstractPost> result;
|
||||
result.reserve(posts.size());
|
||||
for (auto &post : posts) {
|
||||
result.emplace_back(posts::SRC_VK, post.id, post.date, post.text);
|
||||
result.emplace_back(posts::SRC_VK, sourceIndex, post.id, post.date, post.text);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts) {
|
||||
std::vector<AbstractPost> RepostManager::to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts, int sourceIndex) {
|
||||
std::vector<AbstractPost> result;
|
||||
result.reserve(posts.size());
|
||||
for (auto &post : posts) {
|
||||
// we don't want any posts other than plain text (yet)
|
||||
if (post->content_->get_id() == td_api::messageText::ID) {
|
||||
auto &content = (td_api::messageText&) *post->content_;
|
||||
result.emplace_back(posts::SRC_TELEGRAM, post->id_, post->date_, content.text_->text_);
|
||||
result.emplace_back(posts::SRC_TELEGRAM, sourceIndex, post->id_, post->date_, content.text_->text_);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
@ -391,9 +469,9 @@ void RepostManager::check_timer_callback(uv_timer_t *h) {
|
||||
void RepostManager::recheck_vk_posts(std::function<void()> onDone) {
|
||||
spdlog::info("checking VK posts");
|
||||
NewPostFetcher *f = new NewPostFetcher(this, true, false);
|
||||
f->onDone = [this, f, onDone](std::vector<AbstractPost> &&vkPosts, std::vector<AbstractPost> &&tgPosts){
|
||||
f->onDone = [this, f, onDone](std::vector<AbstractPost> &&posts){
|
||||
spdlog::info("checked VK posts");
|
||||
this->on_new_posts(vkPosts, tgPosts);
|
||||
this->on_new_posts(posts);
|
||||
if (onDone)
|
||||
onDone();
|
||||
delete f;
|
||||
@ -414,27 +492,40 @@ void RepostManager::repost(AbstractPost &post) {
|
||||
std::vector<td::tl::unique_ptr<td_api::textEntity>> entities;
|
||||
//entities.push_back(std::move(td_api::make_object<td_api::textEntity>(signatureStart, signatureLength, td_api::make_object<td_api::textEntityTypeSpoiler>())));
|
||||
auto content = td_api::make_object<td_api::inputMessageText>(td_api::make_object<td_api::formattedText>(post.text, std::move(entities)), td_api::make_object<td_api::linkPreviewOptions>(true, std::string(""), false, false, false), false);
|
||||
m_tg.send_query(td_api::make_object<td_api::sendMessage>(m_appConfig->tgDestinationId, 0, nullptr, nullptr, nullptr, std::move(content)), [this, postId = post.id, src = post.source](auto result){
|
||||
spdlog::info("reposting to {}", m_appConfig->tgDestinationId);
|
||||
m_tg.send_query(td_api::make_object<td_api::sendMessage>(m_appConfig->tgDestinationId, 0, nullptr, nullptr, nullptr, std::move(content)), [this, postId = post.id, srcType = post.sourceType, src = post.source](auto result){
|
||||
if (result->get_id() == td_api::error::ID) {
|
||||
auto &err = (td_api::error&)*result;
|
||||
spdlog::error("sendMessage error: {} {}", err.code_, err.message_);
|
||||
uv_timer_stop(m_repostTimer);
|
||||
} else {
|
||||
if (src == posts::SRC_VK)
|
||||
m_appState->vkLastPostId = postId;
|
||||
if (srcType == posts::SRC_VK)
|
||||
m_appState->vkRepostState[src].lastForwardedPostId = postId;
|
||||
else
|
||||
m_appState->tgLastPostId = postId;
|
||||
m_appState->save();
|
||||
m_appState->tgRepostState[src].lastForwardedPostId = postId;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void RepostManager::on_tg_message(td_api::updateNewMessage &update) {
|
||||
spdlog::info("received message from Telegram");
|
||||
|
||||
int sourceIndex = std::distance(
|
||||
m_appConfig->tgSources.begin(),
|
||||
std::find_if(m_appConfig->tgSources.begin(), m_appConfig->tgSources.end(),
|
||||
[chatId = update.message_->chat_id_](auto &src) {
|
||||
return src.id == chatId;
|
||||
}
|
||||
)
|
||||
);
|
||||
if (sourceIndex == m_appConfig->tgSources.size()) {
|
||||
return;
|
||||
}
|
||||
|
||||
uv_timer_stop(m_checkTimer);
|
||||
std::vector<td::tl::unique_ptr<td_api::message>> v;
|
||||
v.push_back(std::move(update.message_));
|
||||
recheck_vk_posts([this, post = to_abstract_posts(v)](){
|
||||
on_new_posts({}, post);
|
||||
recheck_vk_posts([this, post = to_abstract_posts(v, sourceIndex)](){
|
||||
on_new_posts(post);
|
||||
});
|
||||
}
|
33
manager.h
33
manager.h
@ -18,6 +18,7 @@ namespace manager {
|
||||
|
||||
struct NewPostFetcher {
|
||||
struct fetcher_state {
|
||||
int sourceIndex;
|
||||
bool ready = false;
|
||||
bool needRequest = true;
|
||||
long offset = 0, count = 3;
|
||||
@ -25,14 +26,14 @@ namespace manager {
|
||||
};
|
||||
|
||||
RepostManager *mgr;
|
||||
fetcher_state vkState, tgState;
|
||||
std::function<void(std::vector<AbstractPost> &&vkPosts, std::vector<AbstractPost> &&tgPosts)> onDone;
|
||||
std::vector<fetcher_state> vkState, tgState;
|
||||
std::function<void(std::vector<AbstractPost>&&)> onDone;
|
||||
std::function<void()> onError;
|
||||
|
||||
NewPostFetcher(RepostManager *m, bool fetchVk, bool fetchTg);
|
||||
void fetch();
|
||||
void check_vk_posts(std::vector<vk::Post> posts);
|
||||
void check_tg_posts(std::vector<td::tl::unique_ptr<td_api::message>> posts);
|
||||
void check_vk_posts(int index, std::vector<vk::Post> posts);
|
||||
void check_tg_posts(int index, std::vector<td::tl::unique_ptr<td_api::message>> posts);
|
||||
};
|
||||
|
||||
class RepostManager {
|
||||
@ -45,24 +46,24 @@ namespace manager {
|
||||
private:
|
||||
void on_clients_ready();
|
||||
void load_more_telegram_chats();
|
||||
void on_new_posts(std::vector<AbstractPost> vkPosts, std::vector<AbstractPost> tgPosts);
|
||||
void on_new_posts(std::vector<AbstractPost> posts);
|
||||
void on_tg_message(td_api::updateNewMessage &update);
|
||||
void recheck_vk_posts(std::function<void()> onDone);
|
||||
|
||||
void collect_all_vk_posts(std::function<void(std::vector<vk::Post>)> callback);
|
||||
void collect_all_tg_posts(std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||
void collect_last_vk_posts(int count, std::function<void(std::vector<vk::Post>)> callback);
|
||||
void collect_last_tg_posts(int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||
void collect_vk_posts_from(int offset, int count, std::function<void(std::vector<vk::Post>)> callback);
|
||||
void collect_tg_posts_from(long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||
void collect_all_vk_posts(const std::variant<long, std::string> wall, std::function<void(std::vector<vk::Post>)> callback);
|
||||
void collect_all_tg_posts(long channel, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||
void collect_last_vk_posts(const std::variant<long, std::string> wall, int count, std::function<void(std::vector<vk::Post>)> callback);
|
||||
void collect_last_tg_posts(long channel, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||
void collect_vk_posts_from(const std::variant<long, std::string> wall, int offset, int count, std::function<void(std::vector<vk::Post>)> callback);
|
||||
void collect_tg_posts_from(long channel, long from, int count, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||
|
||||
void collect_vk_posts_from__intermediate(int offset, int count, std::shared_ptr<std::vector<vk::Post>> intermediateResult, std::function<void(std::vector<vk::Post>)> callback);
|
||||
void collect_tg_posts_from__intermediate(long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||
void collect_vk_posts_from__intermediate(const std::variant<long, std::string> wall, int offset, int count, std::shared_ptr<std::vector<vk::Post>> intermediateResult, std::function<void(std::vector<vk::Post>)> callback);
|
||||
void collect_tg_posts_from__intermediate(long channel, long from, int count, std::shared_ptr<std::vector<td::tl::unique_ptr<td_api::message>>> intermediateResult, std::function<void(std::vector<td::tl::unique_ptr<td_api::message>>)> callback);
|
||||
|
||||
bool drop_posts_older_than(std::vector<AbstractPost> &posts, long lastPostId);
|
||||
|
||||
std::vector<AbstractPost> to_abstract_posts(std::vector<vk::Post> &posts);
|
||||
std::vector<AbstractPost> to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts);
|
||||
std::vector<AbstractPost> to_abstract_posts(std::vector<vk::Post> &posts, int sourceIndex);
|
||||
std::vector<AbstractPost> to_abstract_posts(std::vector<td::tl::unique_ptr<td_api::message>> &posts, int sourceIndex);
|
||||
|
||||
void enqueue_for_repost(std::vector<AbstractPost> posts);
|
||||
static void repost_timer_callback(uv_timer_t *h);
|
||||
@ -76,5 +77,7 @@ namespace manager {
|
||||
std::queue<AbstractPost> m_repostQueue;
|
||||
uv_timer_t *m_repostTimer = nullptr;
|
||||
uv_timer_t *m_checkTimer = nullptr;
|
||||
int m_nRequiredChats;
|
||||
int m_nLoadedRequiredChats = 0;
|
||||
};
|
||||
}
|
||||
|
46
posts.cpp
46
posts.cpp
@ -12,7 +12,7 @@ static std::regex tgRegex1("#цитат(а|ы)");
|
||||
static std::regex tgRegex2("- ");
|
||||
|
||||
bool posts::filter_and_transform(AbstractPost &post) {
|
||||
if (post.source == SRC_VK) {
|
||||
if (post.sourceType == SRC_VK) {
|
||||
bool hasHashtag = std::regex_search(post.text, vkRegex);
|
||||
if (!hasHashtag) return false;
|
||||
return true;
|
||||
@ -30,33 +30,23 @@ bool posts::filter_and_transform(AbstractPost &post) {
|
||||
}
|
||||
|
||||
std::string_view posts::add_signature(AbstractPost &post, config::AppConfig *cfg) {
|
||||
if (post.source == SRC_VK) {
|
||||
return add_vk_signature(post.text, cfg);
|
||||
int oldPostLength = post.text.length() + 1;
|
||||
std::string signature = "\nРепостнуто автоматически из ";
|
||||
if (post.sourceType == SRC_VK) {
|
||||
auto &wallId = cfg->vkSources[post.source].id;
|
||||
std::string shortname = std::holds_alternative<std::string>(wallId) ? std::get<std::string>(wallId) : cfg->vkSources[post.source].link;
|
||||
if (!shortname.empty())
|
||||
signature += "vk.com/" + shortname;
|
||||
else
|
||||
signature += "VK";
|
||||
post.text += signature;
|
||||
} else {
|
||||
return add_tg_signature(post.text, cfg);
|
||||
std::string shortname = cfg->tgSources[post.source].link;
|
||||
if (!shortname.empty())
|
||||
signature += "t.me/" + shortname;
|
||||
else
|
||||
signature += "Telegram";
|
||||
post.text += signature;
|
||||
}
|
||||
}
|
||||
|
||||
std::string_view posts::add_vk_signature(std::string &text, config::AppConfig *cfg) {
|
||||
std::string shortname = std::holds_alternative<std::string>(cfg->vkSource) ? std::get<std::string>(cfg->vkSource) : cfg->vkSourceLink;
|
||||
std::string signature = "\nРепостнуто автоматически из ";
|
||||
if (!shortname.empty())
|
||||
signature += "vk.com/" + shortname;
|
||||
else
|
||||
signature += "VK";
|
||||
int oldPostLength = text.length() + 1;
|
||||
text += signature;
|
||||
return std::string_view(text.c_str() + oldPostLength, signature.length());
|
||||
}
|
||||
|
||||
std::string_view posts::add_tg_signature(std::string &text, config::AppConfig *cfg) {
|
||||
std::string shortname = cfg->tgSourceLink;
|
||||
std::string signature = "\nРепостнуто автоматически из ";
|
||||
if (!shortname.empty())
|
||||
signature += "t.me/" + shortname;
|
||||
else
|
||||
signature += "Telegram";
|
||||
int oldPostLength = text.length() + 1;
|
||||
text += signature;
|
||||
return std::string_view(text.c_str() + oldPostLength, signature.length());
|
||||
return std::string_view(post.text.c_str() + oldPostLength, signature.length());
|
||||
}
|
7
posts.h
7
posts.h
@ -10,16 +10,15 @@ namespace posts {
|
||||
};
|
||||
|
||||
struct AbstractPost {
|
||||
inline AbstractPost(PostSource src, long id, long date, std::string text) : source(src), id(id), date(date), text(text) {}
|
||||
inline AbstractPost(PostSource srcType, int src, long id, long date, std::string text) : sourceType(srcType), source(src), id(id), date(date), text(text) {}
|
||||
long id;
|
||||
long date;
|
||||
std::string text;
|
||||
PostSource source;
|
||||
PostSource sourceType;
|
||||
int source;
|
||||
};
|
||||
|
||||
bool filter_and_transform(AbstractPost &post);
|
||||
|
||||
std::string_view add_signature(AbstractPost &post, config::AppConfig *cfg);
|
||||
std::string_view add_vk_signature(std::string &text, config::AppConfig *cfg);
|
||||
std::string_view add_tg_signature(std::string &text, config::AppConfig *cfg);
|
||||
}
|
115
state.cpp
115
state.cpp
@ -1,32 +1,71 @@
|
||||
#include "state.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include <exception>
|
||||
#include <fstream>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
using namespace state;
|
||||
using namespace nlohmann;
|
||||
|
||||
AppState::AppState(std::string filename) : m_saveFilename(filename) {
|
||||
AppState::AppState(std::string filename, config::AppConfig *cfg) : m_saveFilename(filename), m_config(cfg) {
|
||||
for (int i = 0; i < cfg->vkSources.size(); ++i) {
|
||||
vkRepostState.emplace_back();
|
||||
}
|
||||
for (int i = 0; i < cfg->tgSources.size(); ++i) {
|
||||
tgRepostState.emplace_back();
|
||||
}
|
||||
|
||||
std::ifstream f(filename);
|
||||
if (f.fail()) {
|
||||
spdlog::info("no state file");
|
||||
spdlog::error("no state file");
|
||||
return;
|
||||
}
|
||||
json state = json::parse(f);
|
||||
if (state.type() != json::value_t::object) {
|
||||
throw InvalidSavedStateException("JSON root must be an object");
|
||||
}
|
||||
|
||||
if (state.contains("vk")) {
|
||||
json vkState = state["vk"];
|
||||
if (vkState.type() == json::value_t::object) {
|
||||
if (vkState.contains("last_post_id")) {
|
||||
json lastPostId = vkState["last_post_id"];
|
||||
if (lastPostId.type() == json::value_t::number_integer || lastPostId.type() == json::value_t::number_unsigned) {
|
||||
vkLastPostId = lastPostId;
|
||||
json vkStates_ = state["vk"];
|
||||
if (vkStates_.type() == json::value_t::object) {
|
||||
for (auto& [key, vkState_] : vkStates_.items()) {
|
||||
if (key.empty()) continue;
|
||||
int sourceIndex;
|
||||
RepostState &&s {};
|
||||
if (key[0] >= '0' && key[0] <= '9') {
|
||||
long keyNum;
|
||||
try {
|
||||
keyNum = std::stol(key);
|
||||
} catch (std::exception &e) {
|
||||
throw InvalidSavedStateException("invalid number in vk source id");
|
||||
}
|
||||
sourceIndex = std::distance(
|
||||
m_config->vkSources.begin(),
|
||||
std::find_if(m_config->vkSources.begin(), m_config->vkSources.end(),
|
||||
[=](auto &src){ return src.id.index() == 0 && std::get<long>(src.id) == std::stoi(key); }
|
||||
)
|
||||
);
|
||||
} else {
|
||||
throw InvalidSavedStateException("key vk.last_post_id must be an integer");
|
||||
sourceIndex = std::distance(
|
||||
m_config->vkSources.begin(),
|
||||
std::find_if(m_config->vkSources.begin(), m_config->vkSources.end(),
|
||||
[=](auto &src){ return src.id.index() == 1 && std::get<std::string>(src.id) == key; }
|
||||
)
|
||||
);
|
||||
}
|
||||
if (vkState_.contains("last_post_id")) {
|
||||
json lastPostId_ = vkState_["last_post_id"];
|
||||
if (lastPostId_.type() == json::value_t::number_integer ||
|
||||
lastPostId_.type() == json::value_t::number_unsigned) {
|
||||
s.lastForwardedPostId = vkState_["last_post_id"];
|
||||
} else {
|
||||
throw InvalidSavedStateException("key vk.last_post_id must be an integer");
|
||||
}
|
||||
}
|
||||
if (sourceIndex < m_config->vkSources.size())
|
||||
vkRepostState[sourceIndex] = s;
|
||||
}
|
||||
} else {
|
||||
throw InvalidSavedStateException("key vk must be an object");
|
||||
@ -34,15 +73,35 @@ AppState::AppState(std::string filename) : m_saveFilename(filename) {
|
||||
}
|
||||
|
||||
if (state.contains("tg")) {
|
||||
json tgState = state["tg"];
|
||||
if (tgState.type() == json::value_t::object) {
|
||||
if (tgState.contains("last_post_id")) {
|
||||
json lastPostId = tgState["last_post_id"];
|
||||
if (lastPostId.type() == json::value_t::number_integer || lastPostId.type() == json::value_t::number_unsigned) {
|
||||
tgLastPostId = lastPostId;
|
||||
} else {
|
||||
throw InvalidSavedStateException("key tg.last_post_id must be an integer");
|
||||
json tgStates_ = state["tg"];
|
||||
if (tgStates_.type() == json::value_t::object) {
|
||||
for (auto& [key, tgState_] : tgStates_.items()) {
|
||||
if (key.empty()) continue;
|
||||
int sourceIndex;
|
||||
RepostState &&s {};
|
||||
long keyNum;
|
||||
try {
|
||||
keyNum = std::stol(key);
|
||||
} catch (std::exception &e) {
|
||||
throw InvalidSavedStateException("invalid number in tg source id");
|
||||
}
|
||||
sourceIndex = std::distance(
|
||||
m_config->tgSources.begin(),
|
||||
std::find_if(m_config->tgSources.begin(), m_config->tgSources.end(),
|
||||
[=](auto &src){ return src.id == keyNum; }
|
||||
)
|
||||
);
|
||||
if (tgState_.contains("last_post_id")) {
|
||||
json lastPostId_ = tgState_["last_post_id"];
|
||||
if (lastPostId_.type() == json::value_t::number_integer ||
|
||||
lastPostId_.type() == json::value_t::number_unsigned) {
|
||||
s.lastForwardedPostId = tgState_["last_post_id"];
|
||||
} else {
|
||||
throw InvalidSavedStateException("key tg.last_post_id must be an integer");
|
||||
}
|
||||
}
|
||||
if (sourceIndex < m_config->tgSources.size())
|
||||
tgRepostState[sourceIndex] = s;
|
||||
}
|
||||
} else {
|
||||
throw InvalidSavedStateException("key tg must be an object");
|
||||
@ -53,15 +112,31 @@ AppState::AppState(std::string filename) : m_saveFilename(filename) {
|
||||
void AppState::save() {
|
||||
if (m_saveFilename.empty()) return;
|
||||
spdlog::info("saving state");
|
||||
json state = {{"vk", {{"last_post_id", vkLastPostId}}}, {"tg", {{"last_post_id", tgLastPostId}}}};
|
||||
json state = {{"vk", json::object()}, {"tg", json::object()}};
|
||||
for (int i = 0; i < m_config->vkSources.size(); ++i) {
|
||||
auto &s = vkRepostState[i];
|
||||
json st = {{"last_post_id", s.lastForwardedPostId}};
|
||||
auto &vkId = m_config->vkSources[i].id;
|
||||
std::string vkIdStr;
|
||||
if (vkId.index() == 0) vkIdStr = std::to_string(std::get<long>(vkId));
|
||||
else vkIdStr = std::get<std::string>(vkId);
|
||||
state["vk"].emplace(vkIdStr, st);
|
||||
}
|
||||
for (int i = 0; i < m_config->tgSources.size(); ++i) {
|
||||
auto &s = tgRepostState[i];
|
||||
json st = {{"last_post_id", s.lastForwardedPostId}};
|
||||
state["tg"].emplace(std::to_string(m_config->tgSources[i].id), st);
|
||||
}
|
||||
std::ofstream f(m_saveFilename.c_str());
|
||||
if (f.fail()) {
|
||||
spdlog::error("failed to open state file '{}'", m_saveFilename);
|
||||
return;
|
||||
}
|
||||
f << state << std::endl;
|
||||
f << state;
|
||||
}
|
||||
|
||||
std::string AppState::to_string() {
|
||||
return std::format("AppState {{ tgLastPostId: {}, vkLastPostId: {} }}", tgLastPostId, vkLastPostId);
|
||||
std::stringstream output;
|
||||
output << "<not implemented>";
|
||||
return output.str();
|
||||
}
|
19
state.h
19
state.h
@ -1,8 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <cstdint>
|
||||
#include "config.h"
|
||||
#include <exception>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace state {
|
||||
class InvalidSavedStateException : std::exception {
|
||||
@ -11,19 +12,21 @@ namespace state {
|
||||
const char *message;
|
||||
};
|
||||
|
||||
struct RepostState {
|
||||
long lastLoadedPostId = 0;
|
||||
long lastForwardedPostId = 0;
|
||||
};
|
||||
|
||||
class AppState {
|
||||
public:
|
||||
inline AppState() {};
|
||||
AppState(std::string filename);
|
||||
AppState(std::string filename, config::AppConfig *cfg);
|
||||
std::string to_string();
|
||||
void save();
|
||||
|
||||
int64_t tgLastPostId = 0;
|
||||
int64_t vkLastPostId = 0;
|
||||
|
||||
int64_t tgLastLoadedPostId = 0;
|
||||
int64_t vkLastLoadedPostId = 0;
|
||||
std::vector<RepostState> vkRepostState;
|
||||
std::vector<RepostState> tgRepostState;
|
||||
private:
|
||||
std::string m_saveFilename;
|
||||
config::AppConfig *m_config;
|
||||
};
|
||||
}
|
Loading…
Reference in New Issue
Block a user