This commit is contained in:
2024-10-10 19:05:48 +00:00
commit cffdcba6af
1880 changed files with 813614 additions and 0 deletions

61
td/tddb/CMakeLists.txt Normal file
View File

@@ -0,0 +1,61 @@
if ((CMAKE_MAJOR_VERSION LESS 3) OR (CMAKE_VERSION VERSION_LESS "3.0.2"))
message(FATAL_ERROR "CMake >= 3.0.2 is required")
endif()
if (NOT DEFINED CMAKE_INSTALL_LIBDIR)
set(CMAKE_INSTALL_LIBDIR "lib")
endif()
set(TDDB_SOURCE
td/db/binlog/Binlog.cpp
td/db/binlog/BinlogEvent.cpp
td/db/binlog/ConcurrentBinlog.cpp
td/db/binlog/detail/BinlogEventsBuffer.cpp
td/db/binlog/detail/BinlogEventsProcessor.cpp
td/db/detail/RawSqliteDb.cpp
td/db/SqliteConnectionSafe.cpp
td/db/SqliteDb.cpp
td/db/SqliteKeyValue.cpp
td/db/SqliteKeyValueAsync.cpp
td/db/SqliteStatement.cpp
td/db/TQueue.cpp
td/db/binlog/Binlog.h
td/db/binlog/BinlogEvent.h
td/db/binlog/BinlogHelper.h
td/db/binlog/BinlogInterface.h
td/db/binlog/ConcurrentBinlog.h
td/db/binlog/detail/BinlogEventsBuffer.h
td/db/binlog/detail/BinlogEventsProcessor.h
td/db/BinlogKeyValue.h
td/db/DbKey.h
td/db/KeyValueSyncInterface.h
td/db/SeqKeyValue.h
td/db/SqliteConnectionSafe.h
td/db/SqliteDb.h
td/db/SqliteKeyValue.h
td/db/SqliteKeyValueAsync.h
td/db/SqliteKeyValueSafe.h
td/db/SqliteStatement.h
td/db/TQueue.h
td/db/TsSeqKeyValue.h
td/db/detail/RawSqliteDb.h
)
add_library(tddb STATIC ${TDDB_SOURCE})
target_include_directories(tddb PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>)
target_link_libraries(tddb PUBLIC tdactor tdutils PRIVATE tdsqlite)
if (NOT CMAKE_CROSSCOMPILING)
add_executable(binlog_dump td/db/binlog/binlog_dump.cpp)
target_link_libraries(binlog_dump PRIVATE tddb)
endif()
install(TARGETS tddb EXPORT TdStaticTargets
LIBRARY DESTINATION "${CMAKE_INSTALL_LIBDIR}"
ARCHIVE DESTINATION "${CMAKE_INSTALL_LIBDIR}"
)

View File

@@ -0,0 +1,301 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/DbKey.h"
#include "td/db/KeyValueSyncInterface.h"
#include "td/utils/algorithm.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/HashTableUtils.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/RwMutex.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/StorerBase.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
#include <functional>
#include <memory>
#include <unordered_map>
#include <utility>
namespace td {
template <class BinlogT>
class BinlogKeyValue final : public KeyValueSyncInterface {
public:
static constexpr int32 MAGIC = 0x2a280000;
struct Event final : public Storer {
Event() = default;
Event(Slice key, Slice value) : key(key), value(value) {
}
Slice key;
Slice value;
template <class StorerT>
void store(StorerT &&storer) const {
storer.store_string(key);
storer.store_string(value);
}
template <class ParserT>
void parse(ParserT &&parser) {
key = parser.template fetch_string<Slice>();
value = parser.template fetch_string<Slice>();
}
size_t size() const final {
TlStorerCalcLength storer;
store(storer);
return storer.get_length();
}
size_t store(uint8 *ptr) const final {
TlStorerUnsafe storer(ptr);
store(storer);
return static_cast<size_t>(storer.get_buf() - ptr);
}
};
int32 get_magic() const {
return magic_;
}
Status init(string name, DbKey db_key = DbKey::empty(), int scheduler_id = -1,
int32 override_magic = 0) TD_WARN_UNUSED_RESULT {
close();
if (override_magic) {
magic_ = override_magic;
}
binlog_ = std::make_shared<BinlogT>();
TRY_STATUS(binlog_->init(
name,
[&](const BinlogEvent &binlog_event) {
Event event;
event.parse(TlParser(binlog_event.get_data()));
if (event.key.empty()) {
LOG(ERROR) << "Have event with empty key";
return;
}
map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_));
},
std::move(db_key), DbKey::empty(), scheduler_id));
return Status::OK();
}
void external_init_begin(int32 override_magic = 0) {
close();
if (override_magic) {
magic_ = override_magic;
}
}
template <class OtherBinlogT>
void external_init_handle(BinlogKeyValue<OtherBinlogT> &&other) {
map_ = std::move(other.map_);
}
void external_init_handle(const BinlogEvent &binlog_event) {
Event event;
event.parse(TlParser(binlog_event.get_data()));
if (event.key.empty()) {
LOG(ERROR) << "Have external event with empty key";
return;
}
map_.emplace(event.key.str(), std::make_pair(event.value.str(), binlog_event.id_));
}
void external_init_finish(std::shared_ptr<BinlogT> binlog) {
binlog_ = std::move(binlog);
}
void close() {
*this = BinlogKeyValue();
}
void close(Promise<> promise) final {
binlog_->close(std::move(promise));
}
SeqNo set(string key, string value) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
uint64 old_event_id = 0;
CHECK(!key.empty());
auto it_ok = map_.emplace(key, std::make_pair(value, 0));
if (!it_ok.second) {
if (it_ok.first->second.first == value) {
return 0;
}
VLOG(binlog) << "Change value of key " << key << " from " << hex_encode(it_ok.first->second.first) << " to "
<< hex_encode(value);
old_event_id = it_ok.first->second.second;
it_ok.first->second.first = value;
} else {
VLOG(binlog) << "Set value of key " << key << " to " << hex_encode(value);
}
bool rewrite = false;
uint64 event_id;
auto seq_no = binlog_->next_event_id();
if (old_event_id != 0) {
rewrite = true;
event_id = old_event_id;
} else {
event_id = seq_no;
it_ok.first->second.second = event_id;
}
lock.reset();
add_event(seq_no,
BinlogEvent::create_raw(event_id, magic_, rewrite ? BinlogEvent::Flags::Rewrite : 0, Event{key, value}));
return seq_no;
}
SeqNo erase(const string &key) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
auto it = map_.find(key);
if (it == map_.end()) {
return 0;
}
VLOG(binlog) << "Remove value of key " << key << ", which is " << hex_encode(it->second.first);
uint64 event_id = it->second.second;
map_.erase(it);
auto seq_no = binlog_->next_event_id();
lock.reset();
add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
EmptyStorer()));
return seq_no;
}
SeqNo erase_batch(vector<string> keys) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
vector<uint64> log_event_ids;
for (auto &key : keys) {
auto it = map_.find(key);
if (it != map_.end()) {
log_event_ids.push_back(it->second.second);
map_.erase(it);
}
}
if (log_event_ids.empty()) {
return 0;
}
VLOG(binlog) << "Remove value of keys " << keys;
return binlog_->erase_batch(std::move(log_event_ids));
}
void add_event(uint64 seq_no, BufferSlice &&event) {
binlog_->add_raw_event(BinlogDebugInfo{__FILE__, __LINE__}, seq_no, std::move(event));
}
bool isset(const string &key) final {
auto lock = rw_mutex_.lock_read().move_as_ok();
return map_.count(key) > 0;
}
string get(const string &key) final {
auto lock = rw_mutex_.lock_read().move_as_ok();
auto it = map_.find(key);
if (it == map_.end()) {
return string();
}
VLOG(binlog) << "Get value of key " << key << ", which is " << hex_encode(it->second.first);
return it->second.first;
}
void force_sync(Promise<> &&promise, const char *source) final {
binlog_->force_sync(std::move(promise), source);
}
void lazy_sync(Promise<> &&promise) {
binlog_->lazy_sync(std::move(promise));
}
void for_each(std::function<void(Slice, Slice)> func) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
for (const auto &kv : map_) {
func(kv.first, kv.second.first);
}
}
std::unordered_map<string, string, Hash<string>> prefix_get(Slice prefix) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
std::unordered_map<string, string, Hash<string>> res;
for (const auto &kv : map_) {
if (begins_with(kv.first, prefix)) {
res.emplace(kv.first.substr(prefix.size()), kv.second.first);
}
}
return res;
}
FlatHashMap<string, string> get_all() final {
auto lock = rw_mutex_.lock_write().move_as_ok();
FlatHashMap<string, string> res;
res.reserve(map_.size());
for (const auto &kv : map_) {
res.emplace(kv.first, kv.second.first);
}
return res;
}
void erase_by_prefix(Slice prefix) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
vector<uint64> event_ids;
table_remove_if(map_, [&](const auto &it) {
if (begins_with(it.first, prefix)) {
event_ids.push_back(it.second.second);
return true;
}
return false;
});
auto seq_no = binlog_->next_event_id(narrow_cast<int32>(event_ids.size()));
lock.reset();
for (auto event_id : event_ids) {
add_event(seq_no, BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
EmptyStorer()));
seq_no++;
}
}
template <class T>
friend class BinlogKeyValue;
static Status destroy(Slice name) {
return Binlog::destroy(name);
}
private:
FlatHashMap<string, std::pair<string, uint64>> map_;
std::shared_ptr<BinlogT> binlog_;
RwMutex rw_mutex_;
int32 magic_ = MAGIC;
};
template <>
inline void BinlogKeyValue<Binlog>::add_event(uint64 seq_no, BufferSlice &&event) {
binlog_->add_raw_event(std::move(event), BinlogDebugInfo{__FILE__, __LINE__});
}
template <>
inline void BinlogKeyValue<Binlog>::force_sync(Promise<> &&promise, const char *source) {
binlog_->sync(source);
promise.set_value(Unit());
}
template <>
inline void BinlogKeyValue<Binlog>::lazy_sync(Promise<> &&promise) {
force_sync(std::move(promise), "lazy_sync");
}
} // namespace td

61
td/tddb/td/db/DbKey.h Normal file
View File

@@ -0,0 +1,61 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/common.h"
#include "td/utils/Slice.h"
namespace td {
class DbKey {
enum class Type { Empty, RawKey, Password };
Type type() const {
return type_;
}
public:
bool is_empty() const {
return type_ == Type::Empty;
}
bool is_raw_key() const {
return type_ == Type::RawKey;
}
bool is_password() const {
return type_ == Type::Password;
}
CSlice data() const {
return data_;
}
static DbKey raw_key(string raw_key) {
DbKey res;
res.type_ = Type::RawKey;
res.data_ = std::move(raw_key);
return res;
}
static DbKey password(string password) {
DbKey res;
res.type_ = Type::Password;
res.data_ = std::move(password);
return res;
}
static DbKey empty() {
return DbKey();
}
private:
Type type_{Type::Empty};
string data_;
};
} // namespace td

View File

@@ -0,0 +1,56 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/common.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/HashTableUtils.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include <functional>
#include <unordered_map>
namespace td {
class KeyValueSyncInterface {
public:
// SeqNo is used to restore total order on all write queries.
// Some implementations may return 0 as SeqNo.
using SeqNo = uint64;
KeyValueSyncInterface() = default;
KeyValueSyncInterface(const KeyValueSyncInterface &) = delete;
KeyValueSyncInterface &operator=(const KeyValueSyncInterface &) = delete;
KeyValueSyncInterface(KeyValueSyncInterface &&) = default;
KeyValueSyncInterface &operator=(KeyValueSyncInterface &&) = default;
virtual ~KeyValueSyncInterface() = default;
virtual SeqNo set(string key, string value) = 0;
virtual bool isset(const string &key) = 0;
virtual string get(const string &key) = 0;
virtual void for_each(std::function<void(Slice, Slice)> func) = 0;
virtual std::unordered_map<string, string, Hash<string>> prefix_get(Slice prefix) = 0;
virtual FlatHashMap<string, string> get_all() = 0;
virtual SeqNo erase(const string &key) = 0;
virtual SeqNo erase_batch(vector<string> keys) = 0;
virtual void erase_by_prefix(Slice prefix) = 0;
virtual void force_sync(Promise<> &&promise, const char *source) = 0;
virtual void close(Promise<> promise) = 0;
};
} // namespace td

105
td/tddb/td/db/SeqKeyValue.h Normal file
View File

@@ -0,0 +1,105 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/common.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/Slice.h"
namespace td {
class SeqKeyValue {
public:
using SeqNo = uint64;
SeqKeyValue() = default;
SeqKeyValue(SeqKeyValue &&) = default;
SeqKeyValue &operator=(SeqKeyValue &&) = default;
SeqKeyValue(const SeqKeyValue &) = delete;
SeqKeyValue &operator=(const SeqKeyValue &) = delete;
~SeqKeyValue() = default;
SeqNo set(Slice key, Slice value) {
CHECK(!key.empty());
auto it_ok = map_.emplace(key.str(), value.str());
if (!it_ok.second) {
if (it_ok.first->second == value) {
return 0;
}
it_ok.first->second = value.str();
}
return next_seq_no();
}
SeqNo erase(const string &key) {
auto it = map_.find(key);
if (it == map_.end()) {
return 0;
}
map_.erase(it);
return next_seq_no();
}
SeqNo erase_batch(vector<string> keys) {
size_t count = 0;
for (auto &key : keys) {
auto it = map_.find(key);
if (it != map_.end()) {
map_.erase(it);
count++;
}
}
if (count == 0) {
return 0;
}
SeqNo result = current_id_ + 1;
current_id_ += count;
return result;
}
SeqNo seq_no() const {
return current_id_ + 1;
}
string get(const string &key) const {
auto it = map_.find(key);
if (it == map_.end()) {
return string();
}
return it->second;
}
bool isset(const string &key) const {
auto it = map_.find(key);
if (it == map_.end()) {
return false;
}
return true;
}
size_t size() const {
return map_.size();
}
FlatHashMap<string, string> get_all() const {
FlatHashMap<string, string> result;
result.reserve(map_.size());
for (auto &it : map_) {
result.emplace(it.first, it.second);
}
return result;
}
private:
FlatHashMap<string, string> map_;
SeqNo current_id_ = 0;
SeqNo next_seq_no() {
return ++current_id_;
}
};
} // namespace td

View File

@@ -0,0 +1,51 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/SqliteConnectionSafe.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
namespace td {
SqliteConnectionSafe::SqliteConnectionSafe(string path, DbKey key, optional<int32> cipher_version)
: path_(std::move(path))
, lsls_connection_([path = path_, close_state_ptr = &close_state_, key = std::move(key),
cipher_version = std::move(cipher_version)] {
auto r_db = SqliteDb::open_with_key(path, false, key, cipher_version.copy());
if (r_db.is_error()) {
LOG(FATAL) << "Can't open database in state " << close_state_ptr->load() << ": " << r_db.error().message();
}
auto db = r_db.move_as_ok();
db.exec("PRAGMA journal_mode=WAL").ensure();
db.exec("PRAGMA secure_delete=1").ensure();
return db;
}) {
}
void SqliteConnectionSafe::set(SqliteDb &&db) {
lsls_connection_.set(std::move(db));
}
SqliteDb &SqliteConnectionSafe::get() {
return lsls_connection_.get();
}
void SqliteConnectionSafe::close() {
LOG(INFO) << "Close SQLite database " << tag("path", path_);
close_state_++;
lsls_connection_.clear_values();
}
void SqliteConnectionSafe::close_and_destroy() {
close();
LOG(INFO) << "Destroy SQLite database " << tag("path", path_);
close_state_ += 65536;
SqliteDb::destroy(path_).ignore();
}
} // namespace td

View File

@@ -0,0 +1,39 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/DbKey.h"
#include "td/db/SqliteDb.h"
#include "td/actor/SchedulerLocalStorage.h"
#include "td/utils/common.h"
#include "td/utils/optional.h"
#include <atomic>
namespace td {
class SqliteConnectionSafe {
public:
SqliteConnectionSafe() = default;
SqliteConnectionSafe(string path, DbKey key, optional<int32> cipher_version = {});
SqliteDb &get();
void set(SqliteDb &&db);
void close();
void close_and_destroy();
private:
string path_;
std::atomic<uint32> close_state_{0};
LazySchedulerLocalStorage<SqliteDb> lsls_connection_;
};
} // namespace td

323
td/tddb/td/db/SqliteDb.cpp Normal file
View File

@@ -0,0 +1,323 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/SqliteDb.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/port/path.h"
#include "td/utils/port/Stat.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/Timer.h"
#include "sqlite/sqlite3.h"
namespace td {
namespace {
string quote_string(Slice str) {
size_t cnt = 0;
for (auto &c : str) {
if (c == '\'') {
cnt++;
}
}
if (cnt == 0) {
return str.str();
}
string result;
result.reserve(str.size() + cnt);
for (auto &c : str) {
if (c == '\'') {
result += '\'';
}
result += c;
}
return result;
}
string db_key_to_sqlcipher_key(const DbKey &db_key) {
if (db_key.is_empty()) {
return "''";
}
if (db_key.is_password()) {
return PSTRING() << "'" << quote_string(db_key.data()) << "'";
}
CHECK(db_key.is_raw_key());
Slice raw_key = db_key.data();
CHECK(raw_key.size() == 32);
size_t expected_size = 64 + 5;
string res(expected_size + 50, ' ');
StringBuilder sb(res);
sb << '"';
sb << 'x';
sb << '\'';
sb << format::as_hex_dump<0>(raw_key);
sb << '\'';
sb << '"';
CHECK(!sb.is_error());
CHECK(sb.as_cslice().size() == expected_size);
res.resize(expected_size);
return res;
}
} // namespace
SqliteDb::~SqliteDb() = default;
Status SqliteDb::init(CSlice path, bool allow_creation) {
// if database does not exist, delete all other files which could have been left from the old database
auto database_stat = stat(path);
if (database_stat.is_error()) {
if (!allow_creation) {
bool was_destroyed = detail::RawSqliteDb::was_any_database_destroyed();
auto reason = was_destroyed ? Slice("was corrupted and deleted") : Slice("disappeared");
return Status::Error(PSLICE() << "Database " << reason
<< " during execution and can't be recreated: " << database_stat.error());
}
TRY_STATUS(destroy(path));
}
tdsqlite3 *db;
CHECK(tdsqlite3_threadsafe() != 0);
int rc =
tdsqlite3_open_v2(path.c_str(), &db, SQLITE_OPEN_READWRITE | (allow_creation ? SQLITE_OPEN_CREATE : 0), nullptr);
if (rc != SQLITE_OK) {
auto res = detail::RawSqliteDb::last_error(db, path);
tdsqlite3_close(db);
return res;
}
tdsqlite3_busy_timeout(db, 1000 * 5 /* 5 seconds */);
raw_ = std::make_shared<detail::RawSqliteDb>(db, path.str());
return Status::OK();
}
static void trace_callback(void *ptr, const char *query) {
LOG(ERROR) << query;
}
static int trace_v2_callback(unsigned code, void *ctx, void *p_raw, void *x_raw) {
CHECK(code == SQLITE_TRACE_STMT);
auto x = static_cast<const char *>(x_raw);
if (x[0] == '-' && x[1] == '-') {
trace_callback(ctx, x);
} else {
trace_callback(ctx, tdsqlite3_expanded_sql(static_cast<tdsqlite3_stmt *>(p_raw)));
}
return 0;
}
void SqliteDb::trace(bool flag) {
tdsqlite3_trace_v2(raw_->db(), SQLITE_TRACE_STMT, flag ? trace_v2_callback : nullptr, nullptr);
}
Status SqliteDb::exec(CSlice cmd) {
CHECK(!empty());
char *msg;
if (enable_logging_) {
VLOG(sqlite) << "Start exec " << tag("query", cmd) << tag("database", raw_->db());
}
auto rc = tdsqlite3_exec(raw_->db(), cmd.c_str(), nullptr, nullptr, &msg);
if (rc != SQLITE_OK) {
CHECK(msg != nullptr);
if (enable_logging_) {
VLOG(sqlite) << "Finish exec with error " << msg;
}
return Status::Error(PSLICE() << tag("query", cmd) << " to database \"" << raw_->path() << "\" failed: " << msg);
}
CHECK(msg == nullptr);
if (enable_logging_) {
VLOG(sqlite) << "Finish exec";
}
return Status::OK();
}
Result<bool> SqliteDb::has_table(Slice table) {
TRY_RESULT(stmt, get_statement(PSLICE() << "SELECT count(*) FROM sqlite_master WHERE type='table' AND name='" << table
<< "'"));
TRY_STATUS(stmt.step());
CHECK(stmt.has_row());
auto cnt = stmt.view_int32(0);
return cnt == 1;
}
Result<string> SqliteDb::get_pragma(Slice name) {
TRY_RESULT(stmt, get_statement(PSLICE() << "PRAGMA " << name));
TRY_STATUS(stmt.step());
CHECK(stmt.has_row());
auto res = stmt.view_blob(0).str();
TRY_STATUS(stmt.step());
CHECK(!stmt.can_step());
return std::move(res);
}
Result<string> SqliteDb::get_pragma_string(Slice name) {
TRY_RESULT(stmt, get_statement(PSLICE() << "PRAGMA " << name));
TRY_STATUS(stmt.step());
CHECK(stmt.has_row());
auto res = stmt.view_string(0).str();
TRY_STATUS(stmt.step());
CHECK(!stmt.can_step());
return std::move(res);
}
Result<int32> SqliteDb::user_version() {
TRY_RESULT(get_version_stmt, get_statement("PRAGMA user_version"));
TRY_STATUS(get_version_stmt.step());
if (!get_version_stmt.has_row()) {
return Status::Error(PSLICE() << "PRAGMA user_version failed for database \"" << raw_->path() << '"');
}
return get_version_stmt.view_int32(0);
}
Status SqliteDb::set_user_version(int32 version) {
return exec(PSLICE() << "PRAGMA user_version = " << version);
}
Status SqliteDb::begin_read_transaction() {
if (raw_->on_begin()) {
return exec("BEGIN");
}
return Status::OK();
}
Status SqliteDb::begin_write_transaction() {
if (raw_->on_begin()) {
return exec("BEGIN IMMEDIATE");
}
return Status::OK();
}
Status SqliteDb::commit_transaction() {
TRY_RESULT(need_commit, raw_->on_commit());
if (need_commit) {
return exec("COMMIT");
}
return Status::OK();
}
Status SqliteDb::check_encryption() {
auto status = exec("SELECT count(*) FROM sqlite_master");
if (status.is_ok()) {
enable_logging_ = true;
}
return status;
}
Result<SqliteDb> SqliteDb::open_with_key(CSlice path, bool allow_creation, const DbKey &db_key,
optional<int32> cipher_version) {
auto res = do_open_with_key(path, allow_creation, db_key, cipher_version ? cipher_version.value() : 0);
if (res.is_error() && !cipher_version && !db_key.is_empty()) {
return do_open_with_key(path, false, db_key, 3);
}
return res;
}
Result<SqliteDb> SqliteDb::do_open_with_key(CSlice path, bool allow_creation, const DbKey &db_key,
int32 cipher_version) {
SqliteDb db;
TRY_STATUS(db.init(path, allow_creation));
if (!db_key.is_empty()) {
if (db.check_encryption().is_ok()) {
return Status::Error(PSLICE() << "No key is needed for database \"" << path << '"');
}
auto key = db_key_to_sqlcipher_key(db_key);
TRY_STATUS(db.exec(PSLICE() << "PRAGMA key = " << key));
if (cipher_version != 0) {
LOG(INFO) << "Trying SQLCipher compatibility mode with version = " << cipher_version;
TRY_STATUS(db.exec(PSLICE() << "PRAGMA cipher_compatibility = " << cipher_version));
}
db.set_cipher_version(cipher_version);
}
TRY_STATUS_PREFIX(db.check_encryption(), "Can't check database: ");
return std::move(db);
}
void SqliteDb::set_cipher_version(int32 cipher_version) {
raw_->set_cipher_version(cipher_version);
}
optional<int32> SqliteDb::get_cipher_version() const {
return raw_->get_cipher_version();
}
Result<SqliteDb> SqliteDb::change_key(CSlice path, bool allow_creation, const DbKey &new_db_key,
const DbKey &old_db_key) {
// fast path
{
PerfWarningTimer perf("open database", 0.05);
auto r_db = open_with_key(path, allow_creation, new_db_key);
if (r_db.is_ok()) {
return r_db;
}
}
PerfWarningTimer perf("change database key", 0.5);
auto create_database = [](CSlice tmp_path) -> Status {
TRY_STATUS(destroy(tmp_path));
SqliteDb db;
return db.init(tmp_path, true);
};
TRY_RESULT(db, open_with_key(path, false, old_db_key));
TRY_RESULT(user_version, db.user_version());
auto new_key = db_key_to_sqlcipher_key(new_db_key);
if (old_db_key.is_empty() && !new_db_key.is_empty()) {
LOG(DEBUG) << "ENCRYPT";
PerfWarningTimer timer("Encrypt SQLite database", 0.1);
auto tmp_path = path.str() + ".encrypted";
TRY_STATUS(create_database(tmp_path));
// make sure that database is not empty
TRY_STATUS(db.exec("CREATE TABLE IF NOT EXISTS encryption_dummy_table(id INT PRIMARY KEY)"));
TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << quote_string(tmp_path) << "' AS encrypted KEY " << new_key));
TRY_STATUS(db.exec("SELECT sqlcipher_export('encrypted')"));
TRY_STATUS(db.exec(PSLICE() << "PRAGMA encrypted.user_version = " << user_version));
TRY_STATUS(db.exec("DETACH DATABASE encrypted"));
db.close();
TRY_STATUS(rename(tmp_path, path));
} else if (!old_db_key.is_empty() && new_db_key.is_empty()) {
LOG(DEBUG) << "DECRYPT";
PerfWarningTimer timer("Decrypt SQLite database", 0.1);
auto tmp_path = path.str() + ".encrypted";
TRY_STATUS(create_database(tmp_path));
TRY_STATUS(db.exec(PSLICE() << "ATTACH DATABASE '" << quote_string(tmp_path) << "' AS decrypted KEY ''"));
TRY_STATUS(db.exec("SELECT sqlcipher_export('decrypted')"));
TRY_STATUS(db.exec(PSLICE() << "PRAGMA decrypted.user_version = " << user_version));
TRY_STATUS(db.exec("DETACH DATABASE decrypted"));
db.close();
TRY_STATUS(rename(tmp_path, path));
} else {
LOG(DEBUG) << "REKEY";
PerfWarningTimer timer("Rekey SQLite database", 0.1);
TRY_STATUS(db.exec(PSLICE() << "PRAGMA rekey = " << new_key));
}
TRY_RESULT(new_db, open_with_key(path, false, new_db_key));
CHECK(new_db.user_version().ok() == user_version);
return std::move(new_db);
}
Status SqliteDb::destroy(Slice path) {
return detail::RawSqliteDb::destroy(path);
}
Result<SqliteStatement> SqliteDb::get_statement(CSlice statement) {
tdsqlite3_stmt *stmt = nullptr;
auto rc =
tdsqlite3_prepare_v2(get_native(), statement.c_str(), static_cast<int>(statement.size()) + 1, &stmt, nullptr);
if (rc != SQLITE_OK) {
return Status::Error(PSLICE() << "Failed to prepare SQLite " << tag("statement", statement) << raw_->last_error());
}
LOG_CHECK(stmt != nullptr) << statement;
return SqliteStatement(stmt, raw_);
}
} // namespace td

94
td/tddb/td/db/SqliteDb.h Normal file
View File

@@ -0,0 +1,94 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/DbKey.h"
#include "td/db/SqliteStatement.h"
#include "td/db/detail/RawSqliteDb.h"
#include "td/utils/common.h"
#include "td/utils/optional.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include <memory>
struct tdsqlite3;
namespace td {
class SqliteDb {
public:
SqliteDb() = default;
SqliteDb(SqliteDb &&) = default;
SqliteDb &operator=(SqliteDb &&) = default;
SqliteDb(const SqliteDb &) = delete;
SqliteDb &operator=(const SqliteDb &) = delete;
~SqliteDb();
// dangerous
SqliteDb clone() const {
return SqliteDb(raw_, enable_logging_);
}
bool empty() const {
return !raw_;
}
void close() {
*this = SqliteDb();
}
Status exec(CSlice cmd) TD_WARN_UNUSED_RESULT;
Result<bool> has_table(Slice table);
Result<string> get_pragma(Slice name);
Result<string> get_pragma_string(Slice name);
Status begin_read_transaction() TD_WARN_UNUSED_RESULT;
Status begin_write_transaction() TD_WARN_UNUSED_RESULT;
Status commit_transaction() TD_WARN_UNUSED_RESULT;
Result<int32> user_version();
Status set_user_version(int32 version) TD_WARN_UNUSED_RESULT;
void trace(bool flag);
static Status destroy(Slice path) TD_WARN_UNUSED_RESULT;
// we can't change the key on the fly, so static functions are more than enough
static Result<SqliteDb> open_with_key(CSlice path, bool allow_creation, const DbKey &db_key,
optional<int32> cipher_version = {});
static Result<SqliteDb> change_key(CSlice path, bool allow_creation, const DbKey &new_db_key,
const DbKey &old_db_key);
tdsqlite3 *get_native() const {
return raw_->db();
}
Result<SqliteStatement> get_statement(CSlice statement) TD_WARN_UNUSED_RESULT;
template <class F>
static void with_db_path(Slice main_path, F &&f) {
detail::RawSqliteDb::with_db_path(main_path, f);
}
optional<int32> get_cipher_version() const;
private:
SqliteDb(std::shared_ptr<detail::RawSqliteDb> raw, bool enable_logging)
: raw_(std::move(raw)), enable_logging_(enable_logging) {
}
std::shared_ptr<detail::RawSqliteDb> raw_;
bool enable_logging_ = false;
Status init(CSlice path, bool allow_creation) TD_WARN_UNUSED_RESULT;
Status check_encryption();
static Result<SqliteDb> do_open_with_key(CSlice path, bool allow_creation, const DbKey &db_key, int32 cipher_version);
void set_cipher_version(int32 cipher_version);
};
} // namespace td

View File

@@ -0,0 +1,130 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/SqliteKeyValue.h"
#include "td/utils/base64.h"
#include "td/utils/logging.h"
#include "td/utils/ScopeGuard.h"
namespace td {
Status SqliteKeyValue::init_with_connection(SqliteDb connection, string table_name) {
auto init_guard = ScopeExit() + [&] {
close();
};
db_ = std::move(connection);
table_name_ = std::move(table_name);
TRY_STATUS(init(db_, table_name_));
TRY_RESULT_ASSIGN(set_stmt_,
db_.get_statement(PSLICE() << "REPLACE INTO " << table_name_ << " (k, v) VALUES (?1, ?2)"));
TRY_RESULT_ASSIGN(get_stmt_, db_.get_statement(PSLICE() << "SELECT v FROM " << table_name_ << " WHERE k = ?1"));
TRY_RESULT_ASSIGN(erase_stmt_, db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE k = ?1"));
TRY_RESULT_ASSIGN(get_all_stmt_, db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_));
TRY_RESULT_ASSIGN(erase_by_prefix_stmt_,
db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE ?1 <= k AND k < ?2"));
TRY_RESULT_ASSIGN(erase_by_prefix_rare_stmt_,
db_.get_statement(PSLICE() << "DELETE FROM " << table_name_ << " WHERE ?1 <= k"));
TRY_RESULT_ASSIGN(get_by_prefix_stmt_,
db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_ << " WHERE ?1 <= k AND k < ?2"));
TRY_RESULT_ASSIGN(get_by_prefix_rare_stmt_,
db_.get_statement(PSLICE() << "SELECT k, v FROM " << table_name_ << " WHERE ?1 <= k"));
init_guard.dismiss();
return Status::OK();
}
Status SqliteKeyValue::drop() {
if (empty()) {
return Status::OK();
}
auto result = drop(db_, table_name_);
close();
return result;
}
void SqliteKeyValue::set(Slice key, Slice value) {
set_stmt_.bind_blob(1, key).ensure();
set_stmt_.bind_blob(2, value).ensure();
auto status = set_stmt_.step();
if (status.is_error()) {
LOG(FATAL) << "Failed to set \"" << base64_encode(key) << "\": " << status;
}
set_stmt_.reset();
}
void SqliteKeyValue::set_all(const FlatHashMap<string, string> &key_values) {
begin_write_transaction().ensure();
for (auto &key_value : key_values) {
set(key_value.first, key_value.second);
}
commit_transaction().ensure();
}
string SqliteKeyValue::get(Slice key) {
SCOPE_EXIT {
get_stmt_.reset();
};
get_stmt_.bind_blob(1, key).ensure();
get_stmt_.step().ensure();
if (!get_stmt_.has_row()) {
return string();
}
auto data = get_stmt_.view_blob(0).str();
get_stmt_.step().ignore();
return data;
}
void SqliteKeyValue::erase(Slice key) {
erase_stmt_.bind_blob(1, key).ensure();
erase_stmt_.step().ensure();
erase_stmt_.reset();
}
void SqliteKeyValue::erase_batch(vector<string> keys) {
for (auto &key : keys) {
erase(key);
}
}
void SqliteKeyValue::erase_by_prefix(Slice prefix) {
auto next = next_prefix(prefix);
if (next.empty()) {
SCOPE_EXIT {
erase_by_prefix_rare_stmt_.reset();
};
erase_by_prefix_rare_stmt_.bind_blob(1, prefix).ensure();
erase_by_prefix_rare_stmt_.step().ensure();
} else {
SCOPE_EXIT {
erase_by_prefix_stmt_.reset();
};
erase_by_prefix_stmt_.bind_blob(1, prefix).ensure();
erase_by_prefix_stmt_.bind_blob(2, next).ensure();
erase_by_prefix_stmt_.step().ensure();
}
}
string SqliteKeyValue::next_prefix(Slice prefix) {
string next = prefix.str();
size_t pos = next.size();
while (pos) {
pos--;
auto value = static_cast<uint8>(next[pos]);
value++;
next[pos] = static_cast<char>(value);
if (value != 0) {
return next;
}
}
return string{};
}
} // namespace td

View File

@@ -0,0 +1,134 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/SqliteDb.h"
#include "td/db/SqliteStatement.h"
#include "td/utils/common.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
namespace td {
class SqliteKeyValue {
public:
static Status drop(SqliteDb &connection, Slice table_name) TD_WARN_UNUSED_RESULT {
return connection.exec(PSLICE() << "DROP TABLE IF EXISTS " << table_name);
}
static Status init(SqliteDb &connection, Slice table_name) TD_WARN_UNUSED_RESULT {
return connection.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << table_name << " (k BLOB PRIMARY KEY, v BLOB)");
}
bool empty() const {
return db_.empty();
}
Status init_with_connection(SqliteDb connection, string table_name) TD_WARN_UNUSED_RESULT;
void close() {
*this = SqliteKeyValue();
}
Status drop();
void set(Slice key, Slice value);
void set_all(const FlatHashMap<string, string> &key_values);
string get(Slice key);
void erase(Slice key);
void erase_batch(vector<string> keys);
Status begin_read_transaction() TD_WARN_UNUSED_RESULT {
return db_.begin_read_transaction();
}
Status begin_write_transaction() TD_WARN_UNUSED_RESULT {
return db_.begin_write_transaction();
}
Status commit_transaction() TD_WARN_UNUSED_RESULT {
return db_.commit_transaction();
}
void erase_by_prefix(Slice prefix);
FlatHashMap<string, string> get_all() {
FlatHashMap<string, string> res;
get_by_prefix("", [&](Slice key, Slice value) {
CHECK(!key.empty());
res.emplace(key.str(), value.str());
return true;
});
return res;
}
template <class CallbackT>
void get_by_prefix(Slice prefix, CallbackT &&callback) {
string next;
if (!prefix.empty()) {
next = next_prefix(prefix);
}
get_by_range_impl(prefix, next, true, callback);
}
template <class CallbackT>
void get_by_range(Slice from, Slice till, CallbackT &&callback) {
get_by_range_impl(from, till, false, std::move(callback));
}
private:
template <class CallbackT>
void get_by_range_impl(Slice from, Slice till, bool strip_key_prefix, CallbackT &&callback) {
SqliteStatement *stmt = nullptr;
if (from.empty()) {
stmt = &get_all_stmt_;
} else {
if (till.empty()) {
stmt = &get_by_prefix_rare_stmt_;
stmt->bind_blob(1, till).ensure();
} else {
stmt = &get_by_prefix_stmt_;
stmt->bind_blob(1, from).ensure();
stmt->bind_blob(2, till).ensure();
}
}
auto guard = stmt->guard();
stmt->step().ensure();
while (stmt->has_row()) {
auto key = stmt->view_blob(0);
if (strip_key_prefix) {
key.remove_prefix(from.size());
}
if (!callback(key, stmt->view_blob(1))) {
return;
}
stmt->step().ensure();
}
}
string table_name_;
SqliteDb db_;
SqliteStatement get_stmt_;
SqliteStatement set_stmt_;
SqliteStatement erase_stmt_;
SqliteStatement get_all_stmt_;
SqliteStatement erase_by_prefix_stmt_;
SqliteStatement erase_by_prefix_rare_stmt_;
SqliteStatement get_by_prefix_stmt_;
SqliteStatement get_by_prefix_rare_stmt_;
static string next_prefix(Slice prefix);
};
} // namespace td

View File

@@ -0,0 +1,166 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/SqliteKeyValueAsync.h"
#include "td/db/SqliteKeyValue.h"
#include "td/actor/actor.h"
#include "td/utils/common.h"
#include "td/utils/optional.h"
#include "td/utils/Time.h"
namespace td {
class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
public:
explicit SqliteKeyValueAsync(std::shared_ptr<SqliteKeyValueSafe> kv_safe, int32 scheduler_id = -1) {
impl_ = create_actor_on_scheduler<Impl>("KV", scheduler_id, std::move(kv_safe));
}
void set(string key, string value, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::set, std::move(key), std::move(value), std::move(promise));
}
void set_all(FlatHashMap<string, string> key_values, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::set_all, std::move(key_values), std::move(promise));
}
void erase(string key, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::erase, std::move(key), std::move(promise));
}
void erase_by_prefix(string key_prefix, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::erase_by_prefix, std::move(key_prefix), std::move(promise));
}
void get(string key, Promise<string> promise) final {
send_closure_later(impl_, &Impl::get, std::move(key), std::move(promise));
}
void close(Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::close, std::move(promise));
}
private:
class Impl final : public Actor {
public:
explicit Impl(std::shared_ptr<SqliteKeyValueSafe> kv_safe) : kv_safe_(std::move(kv_safe)) {
}
void set(string key, string value, Promise<Unit> promise) {
auto it = buffer_.find(key);
if (it != buffer_.end()) {
it->second = std::move(value);
} else {
CHECK(!key.empty());
buffer_.emplace(std::move(key), std::move(value));
}
if (promise) {
buffer_promises_.push_back(std::move(promise));
}
cnt_++;
do_flush(false /*force*/);
}
void set_all(FlatHashMap<string, string> key_values, Promise<Unit> promise) {
do_flush(true /*force*/);
kv_->set_all(key_values);
promise.set_value(Unit());
}
void erase(string key, Promise<Unit> promise) {
auto it = buffer_.find(key);
if (it != buffer_.end()) {
it->second = optional<string>();
} else {
CHECK(!key.empty());
buffer_.emplace(std::move(key), optional<string>());
}
if (promise) {
buffer_promises_.push_back(std::move(promise));
}
cnt_++;
do_flush(false /*force*/);
}
void erase_by_prefix(string key_prefix, Promise<Unit> promise) {
do_flush(true /*force*/);
kv_->erase_by_prefix(key_prefix);
promise.set_value(Unit());
}
void get(const string &key, Promise<string> promise) {
auto it = buffer_.find(key);
if (it != buffer_.end()) {
return promise.set_value(it->second ? it->second.value() : "");
}
promise.set_value(kv_->get(key));
}
void close(Promise<Unit> promise) {
do_flush(true /*force*/);
kv_safe_.reset();
kv_ = nullptr;
stop();
promise.set_value(Unit());
}
private:
std::shared_ptr<SqliteKeyValueSafe> kv_safe_;
SqliteKeyValue *kv_ = nullptr;
static constexpr double MAX_PENDING_QUERIES_DELAY = 0.01;
static constexpr size_t MAX_PENDING_QUERIES_COUNT = 100;
FlatHashMap<string, optional<string>> buffer_;
vector<Promise<Unit>> buffer_promises_;
size_t cnt_ = 0;
double wakeup_at_ = 0;
void do_flush(bool force) {
if (buffer_.empty()) {
return;
}
if (!force) {
auto now = Time::now_cached();
if (wakeup_at_ == 0) {
wakeup_at_ = now + MAX_PENDING_QUERIES_DELAY;
}
if (now < wakeup_at_ && cnt_ < MAX_PENDING_QUERIES_COUNT) {
set_timeout_at(wakeup_at_);
return;
}
}
wakeup_at_ = 0;
cnt_ = 0;
kv_->begin_write_transaction().ensure();
for (auto &it : buffer_) {
if (it.second) {
kv_->set(it.first, it.second.value());
} else {
kv_->erase(it.first);
}
}
kv_->commit_transaction().ensure();
buffer_.clear();
set_promises(buffer_promises_);
}
void timeout_expired() final {
do_flush(false /*force*/);
}
void start_up() final {
kv_ = &kv_safe_->get();
}
};
ActorOwn<Impl> impl_;
};
unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv,
int32 scheduler_id) {
return td::make_unique<SqliteKeyValueAsync>(std::move(kv), scheduler_id);
}
} // namespace td

View File

@@ -0,0 +1,38 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/SqliteKeyValueSafe.h"
#include "td/utils/common.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/Promise.h"
#include <memory>
namespace td {
class SqliteKeyValueAsyncInterface {
public:
virtual ~SqliteKeyValueAsyncInterface() = default;
virtual void set(string key, string value, Promise<Unit> promise) = 0;
virtual void set_all(FlatHashMap<string, string> key_values, Promise<Unit> promise) = 0;
virtual void erase(string key, Promise<Unit> promise) = 0;
virtual void erase_by_prefix(string key_prefix, Promise<Unit> promise) = 0;
virtual void get(string key, Promise<string> promise) = 0;
virtual void close(Promise<Unit> promise) = 0;
};
unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv,
int32 scheduler_id = 1);
} // namespace td

View File

@@ -0,0 +1,40 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/SqliteConnectionSafe.h"
#include "td/db/SqliteKeyValue.h"
#include "td/actor/SchedulerLocalStorage.h"
#include "td/utils/common.h"
#include <memory>
namespace td {
class SqliteKeyValueSafe {
public:
SqliteKeyValueSafe(string name, std::shared_ptr<SqliteConnectionSafe> safe_connection)
: lsls_kv_([name = std::move(name), safe_connection = std::move(safe_connection)] {
SqliteKeyValue kv;
kv.init_with_connection(safe_connection->get().clone(), name).ensure();
return kv;
}) {
}
SqliteKeyValue &get() {
return lsls_kv_.get();
}
void close() {
lsls_kv_.clear_values();
}
private:
LazySchedulerLocalStorage<SqliteKeyValue> lsls_kv_;
};
} // namespace td

View File

@@ -0,0 +1,205 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/SqliteStatement.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/StackAllocator.h"
#include "td/utils/StringBuilder.h"
#include "sqlite/sqlite3.h"
namespace td {
int VERBOSITY_NAME(sqlite) = VERBOSITY_NAME(DEBUG) + 10;
namespace {
int printExplainQueryPlan(StringBuilder &sb, tdsqlite3_stmt *pStmt) {
const char *zSql = tdsqlite3_sql(pStmt);
if (zSql == nullptr) {
return SQLITE_ERROR;
}
sb << "Explain query " << zSql;
char *zExplain = tdsqlite3_mprintf("EXPLAIN QUERY PLAN %s", zSql);
if (zExplain == nullptr) {
return SQLITE_NOMEM;
}
tdsqlite3_stmt *pExplain; /* Compiled EXPLAIN QUERY PLAN command */
int rc = tdsqlite3_prepare_v2(tdsqlite3_db_handle(pStmt), zExplain, -1, &pExplain, nullptr);
tdsqlite3_free(zExplain);
if (rc != SQLITE_OK) {
return rc;
}
while (SQLITE_ROW == tdsqlite3_step(pExplain)) {
int iSelectid = tdsqlite3_column_int(pExplain, 0);
int iOrder = tdsqlite3_column_int(pExplain, 1);
int iFrom = tdsqlite3_column_int(pExplain, 2);
const char *zDetail = reinterpret_cast<const char *>(tdsqlite3_column_text(pExplain, 3));
sb << '\n' << iSelectid << ' ' << iOrder << ' ' << iFrom << ' ' << zDetail;
}
return tdsqlite3_finalize(pExplain);
}
} // namespace
SqliteStatement::SqliteStatement(tdsqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db)
: stmt_(stmt), db_(std::move(db)) {
CHECK(stmt != nullptr);
}
SqliteStatement::~SqliteStatement() = default;
Result<string> SqliteStatement::explain() {
if (empty()) {
return Status::Error("No statement");
}
auto tmp = StackAllocator::alloc(10000);
StringBuilder sb(tmp.as_slice());
auto code = printExplainQueryPlan(sb, stmt_.get());
if (code != SQLITE_OK) {
return last_error();
}
if (sb.is_error()) {
return Status::Error("StringBuilder buffer overflow");
}
return sb.as_cslice().str();
}
Status SqliteStatement::bind_blob(int id, Slice blob) {
auto rc = tdsqlite3_bind_blob(stmt_.get(), id, blob.data(), static_cast<int>(blob.size()), nullptr);
if (rc != SQLITE_OK) {
return last_error();
}
return Status::OK();
}
Status SqliteStatement::bind_string(int id, Slice str) {
auto rc = tdsqlite3_bind_text(stmt_.get(), id, str.data(), static_cast<int>(str.size()), nullptr);
if (rc != SQLITE_OK) {
return last_error();
}
return Status::OK();
}
Status SqliteStatement::bind_int32(int id, int32 value) {
auto rc = tdsqlite3_bind_int(stmt_.get(), id, value);
if (rc != SQLITE_OK) {
return last_error();
}
return Status::OK();
}
Status SqliteStatement::bind_int64(int id, int64 value) {
auto rc = tdsqlite3_bind_int64(stmt_.get(), id, value);
if (rc != SQLITE_OK) {
return last_error();
}
return Status::OK();
}
Status SqliteStatement::bind_null(int id) {
auto rc = tdsqlite3_bind_null(stmt_.get(), id);
if (rc != SQLITE_OK) {
return last_error();
}
return Status::OK();
}
StringBuilder &operator<<(StringBuilder &sb, SqliteStatement::Datatype type) {
using Datatype = SqliteStatement::Datatype;
switch (type) {
case Datatype::Integer:
return sb << "Integer";
case Datatype::Float:
return sb << "Float";
case Datatype::Blob:
return sb << "Blob";
case Datatype::Null:
return sb << "Null";
case Datatype::Text:
return sb << "Text";
}
UNREACHABLE();
return sb;
}
Slice SqliteStatement::view_blob(int id) {
LOG_IF(ERROR, view_datatype(id) != Datatype::Blob) << view_datatype(id);
auto *data = tdsqlite3_column_blob(stmt_.get(), id);
auto size = tdsqlite3_column_bytes(stmt_.get(), id);
if (data == nullptr) {
return Slice();
}
return Slice(static_cast<const char *>(data), size);
}
Slice SqliteStatement::view_string(int id) {
LOG_IF(ERROR, view_datatype(id) != Datatype::Text) << view_datatype(id);
auto *data = tdsqlite3_column_text(stmt_.get(), id);
auto size = tdsqlite3_column_bytes(stmt_.get(), id);
if (data == nullptr) {
return Slice();
}
return Slice(data, size);
}
int32 SqliteStatement::view_int32(int id) {
LOG_IF(ERROR, view_datatype(id) != Datatype::Integer) << view_datatype(id);
return tdsqlite3_column_int(stmt_.get(), id);
}
int64 SqliteStatement::view_int64(int id) {
LOG_IF(ERROR, view_datatype(id) != Datatype::Integer) << view_datatype(id);
return tdsqlite3_column_int64(stmt_.get(), id);
}
SqliteStatement::Datatype SqliteStatement::view_datatype(int id) {
auto type = tdsqlite3_column_type(stmt_.get(), id);
switch (type) {
case SQLITE_INTEGER:
return Datatype::Integer;
case SQLITE_FLOAT:
return Datatype::Float;
case SQLITE_BLOB:
return Datatype::Blob;
case SQLITE_NULL:
return Datatype::Null;
case SQLITE3_TEXT:
return Datatype::Text;
default:
UNREACHABLE();
}
}
void SqliteStatement::reset() {
tdsqlite3_reset(stmt_.get());
state_ = State::Start;
}
Status SqliteStatement::step() {
if (state_ == State::Finish) {
return Status::Error("One has to reset statement");
}
VLOG(sqlite) << "Start step " << tag("query", tdsqlite3_sql(stmt_.get())) << tag("statement", stmt_.get())
<< tag("database", db_.get());
auto rc = tdsqlite3_step(stmt_.get());
VLOG(sqlite) << "Finish step with response " << (rc == SQLITE_ROW ? "ROW" : (rc == SQLITE_DONE ? "DONE" : "ERROR"));
if (rc == SQLITE_ROW) {
state_ = State::HaveRow;
return Status::OK();
}
state_ = State::Finish;
if (rc == SQLITE_DONE) {
return Status::OK();
}
return last_error();
}
void SqliteStatement::StmtDeleter::operator()(tdsqlite3_stmt *stmt) {
tdsqlite3_finalize(stmt);
}
Status SqliteStatement::last_error() {
return db_->last_error();
}
} // namespace td

View File

@@ -0,0 +1,88 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/detail/RawSqliteDb.h"
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include <memory>
struct tdsqlite3;
struct tdsqlite3_stmt;
namespace td {
extern int VERBOSITY_NAME(sqlite);
class SqliteStatement {
public:
SqliteStatement() = default;
SqliteStatement(const SqliteStatement &) = delete;
SqliteStatement &operator=(const SqliteStatement &) = delete;
SqliteStatement(SqliteStatement &&) = default;
SqliteStatement &operator=(SqliteStatement &&) = default;
~SqliteStatement();
Status bind_blob(int id, Slice blob) TD_WARN_UNUSED_RESULT;
Status bind_string(int id, Slice str) TD_WARN_UNUSED_RESULT;
Status bind_int32(int id, int32 value) TD_WARN_UNUSED_RESULT;
Status bind_int64(int id, int64 value) TD_WARN_UNUSED_RESULT;
Status bind_null(int id) TD_WARN_UNUSED_RESULT;
Status step() TD_WARN_UNUSED_RESULT;
Slice view_string(int id) TD_WARN_UNUSED_RESULT;
Slice view_blob(int id) TD_WARN_UNUSED_RESULT;
int32 view_int32(int id) TD_WARN_UNUSED_RESULT;
int64 view_int64(int id) TD_WARN_UNUSED_RESULT;
enum class Datatype { Integer, Float, Blob, Null, Text };
Datatype view_datatype(int id);
Result<string> explain();
bool can_step() const {
return state_ != State::Finish;
}
bool has_row() const {
return state_ == State::HaveRow;
}
bool empty() const {
return !stmt_;
}
void reset();
auto guard() {
return ScopeExit{} + [this] {
this->reset();
};
}
// TODO get row
private:
friend class SqliteDb;
SqliteStatement(tdsqlite3_stmt *stmt, std::shared_ptr<detail::RawSqliteDb> db);
class StmtDeleter {
public:
void operator()(tdsqlite3_stmt *stmt);
};
enum class State { Start, HaveRow, Finish };
State state_ = State::Start;
std::unique_ptr<tdsqlite3_stmt, StmtDeleter> stmt_;
std::shared_ptr<detail::RawSqliteDb> db_;
Status last_error();
};
} // namespace td

613
td/tddb/td/db/TQueue.cpp Normal file
View File

@@ -0,0 +1,613 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/TQueue.h"
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/binlog/BinlogHelper.h"
#include "td/db/binlog/BinlogInterface.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Random.h"
#include "td/utils/StorerBase.h"
#include "td/utils/Time.h"
#include "td/utils/tl_helpers.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
#include <set>
namespace td {
using EventId = TQueue::EventId;
EventId::EventId() {
}
Result<EventId> EventId::from_int32(int32 id) {
if (!is_valid_id(id)) {
return Status::Error("Invalid ID");
}
return EventId(id);
}
bool EventId::is_valid() const {
return !empty() && is_valid_id(id_);
}
int32 EventId::value() const {
return id_;
}
Result<EventId> EventId::next() const {
return from_int32(id_ + 1);
}
Result<EventId> EventId::advance(size_t offset) const {
TRY_RESULT(new_id, narrow_cast_safe<int32>(id_ + offset));
return from_int32(new_id);
}
bool EventId::empty() const {
return id_ == 0;
}
bool EventId::operator==(const EventId &other) const {
return id_ == other.id_;
}
bool EventId::operator!=(const EventId &other) const {
return !(*this == other);
}
bool EventId::operator<(const EventId &other) const {
return id_ < other.id_;
}
StringBuilder &operator<<(StringBuilder &string_builder, EventId id) {
return string_builder << "EventId{" << id.value() << "}";
}
EventId::EventId(int32 id) : id_(id) {
CHECK(is_valid_id(id));
}
bool EventId::is_valid_id(int32 id) {
return 0 <= id && id < MAX_ID;
}
class TQueueImpl final : public TQueue {
static constexpr size_t MAX_EVENT_LENGTH = 65536 * 8;
static constexpr size_t MAX_QUEUE_EVENTS = 100000;
static constexpr size_t MAX_TOTAL_EVENT_LENGTH = 1 << 27;
public:
void set_callback(unique_ptr<StorageCallback> callback) final {
callback_ = std::move(callback);
}
unique_ptr<StorageCallback> extract_callback() final {
return std::move(callback_);
}
bool do_push(QueueId queue_id, RawEvent &&raw_event) final {
CHECK(raw_event.event_id.is_valid());
// raw_event.data can be empty when replaying binlog
if (raw_event.data.size() > MAX_EVENT_LENGTH || queue_id == 0) {
return false;
}
auto &q = queues_[queue_id];
if (q.events.size() >= MAX_QUEUE_EVENTS || q.total_event_length > MAX_TOTAL_EVENT_LENGTH - raw_event.data.size() ||
raw_event.expires_at <= 0) {
return false;
}
auto event_id = raw_event.event_id;
if (event_id < q.tail_id) {
return false;
}
if (!q.events.empty()) {
auto it = q.events.end();
--it;
if (it->second.data.empty()) {
if (callback_ != nullptr && it->second.log_event_id != 0) {
callback_->pop(it->second.log_event_id);
}
q.events.erase(it);
}
}
if (q.events.empty() && !raw_event.data.empty()) {
schedule_queue_gc(queue_id, q, raw_event.expires_at);
}
if (raw_event.log_event_id == 0 && callback_ != nullptr) {
raw_event.log_event_id = callback_->push(queue_id, raw_event);
}
q.tail_id = event_id.next().move_as_ok();
q.total_event_length += raw_event.data.size();
q.events.emplace(event_id, std::move(raw_event));
return true;
}
Result<EventId> push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) final {
if (data.empty()) {
return Status::Error("Data is empty");
}
if (data.size() > MAX_EVENT_LENGTH) {
return Status::Error("Data is too big");
}
if (queue_id == 0) {
return Status::Error("Queue identifier is invalid");
}
auto &q = queues_[queue_id];
if (q.events.size() >= MAX_QUEUE_EVENTS) {
return Status::Error("Queue is full");
}
if (q.total_event_length > MAX_TOTAL_EVENT_LENGTH - data.size()) {
return Status::Error("Queue size is too big");
}
if (expires_at <= 0) {
return Status::Error("Failed to add already expired event");
}
EventId event_id;
while (true) {
if (q.tail_id.empty()) {
if (hint_new_id.empty()) {
q.tail_id = EventId::from_int32(
Random::fast(2 * max(static_cast<int>(MAX_QUEUE_EVENTS), 1000000) + 1, EventId::MAX_ID / 2))
.move_as_ok();
} else {
q.tail_id = hint_new_id;
}
}
event_id = q.tail_id;
CHECK(event_id.is_valid());
if (event_id.next().is_ok()) {
break;
}
for (auto it = q.events.begin(); it != q.events.end();) {
pop(q, queue_id, it, {});
}
q.tail_id = EventId();
CHECK(hint_new_id.next().is_ok());
}
RawEvent raw_event;
raw_event.event_id = event_id;
raw_event.data = std::move(data);
raw_event.expires_at = expires_at;
raw_event.extra = extra;
bool is_added = do_push(queue_id, std::move(raw_event));
CHECK(is_added);
return event_id;
}
EventId get_head(QueueId queue_id) const final {
auto it = queues_.find(queue_id);
if (it == queues_.end()) {
return EventId();
}
return get_queue_head(it->second);
}
EventId get_tail(QueueId queue_id) const final {
auto it = queues_.find(queue_id);
if (it == queues_.end()) {
return EventId();
}
auto &q = it->second;
return q.tail_id;
}
void forget(QueueId queue_id, EventId event_id) final {
auto q_it = queues_.find(queue_id);
if (q_it == queues_.end()) {
return;
}
auto &q = q_it->second;
auto it = q.events.find(event_id);
if (it == q.events.end()) {
return;
}
pop(q, queue_id, it, q.tail_id);
}
std::map<EventId, RawEvent> clear(QueueId queue_id, size_t keep_count) final {
auto queue_it = queues_.find(queue_id);
if (queue_it == queues_.end()) {
return {};
}
auto &q = queue_it->second;
auto size = get_size(q);
if (size <= keep_count) {
return {};
}
auto start_time = Time::now();
auto total_event_length = q.total_event_length;
auto end_it = q.events.end();
for (size_t i = 0; i < keep_count; i++) {
--end_it;
}
if (keep_count == 0) {
--end_it;
auto &event = end_it->second;
if (callback_ == nullptr || event.log_event_id == 0) {
++end_it;
} else if (!event.data.empty()) {
clear_event_data(q, event);
callback_->push(queue_id, event);
}
}
auto collect_deleted_event_ids_time = 0.0;
if (callback_ != nullptr) {
vector<uint64> deleted_log_event_ids;
deleted_log_event_ids.reserve(size - keep_count);
for (auto it = q.events.begin(); it != end_it; ++it) {
auto &event = it->second;
if (event.log_event_id != 0) {
deleted_log_event_ids.push_back(event.log_event_id);
}
}
collect_deleted_event_ids_time = Time::now() - start_time;
callback_->pop_batch(std::move(deleted_log_event_ids));
}
auto callback_clear_time = Time::now() - start_time;
std::map<EventId, RawEvent> deleted_events;
if (keep_count > size / 2) {
for (auto it = q.events.begin(); it != end_it;) {
q.total_event_length -= it->second.data.size();
bool is_inserted = deleted_events.emplace(it->first, std::move(it->second)).second;
CHECK(is_inserted);
it = q.events.erase(it);
}
} else {
q.total_event_length = 0;
for (auto it = end_it; it != q.events.end();) {
q.total_event_length += it->second.data.size();
bool is_inserted = deleted_events.emplace(it->first, std::move(it->second)).second;
CHECK(is_inserted);
it = q.events.erase(it);
}
std::swap(deleted_events, q.events);
}
auto clear_time = Time::now() - start_time;
if (clear_time > 0.02) {
LOG(WARNING) << "Cleared " << (size - keep_count) << " TQueue events with total size "
<< (total_event_length - q.total_event_length) << " in " << clear_time - callback_clear_time
<< " seconds, collected their identifiers in " << collect_deleted_event_ids_time
<< " seconds, and deleted them from callback in "
<< callback_clear_time - collect_deleted_event_ids_time << " seconds";
}
return deleted_events;
}
Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now,
MutableSpan<Event> &result_events) final {
auto it = queues_.find(queue_id);
if (it == queues_.end()) {
result_events.truncate(0);
return 0;
}
auto &q = it->second;
// Some sanity checks
if (from_id.value() > q.tail_id.value() + 10) {
return Status::Error("Specified from_id is in the future");
}
if (from_id.value() < get_queue_head(q).value() - static_cast<int32>(MAX_QUEUE_EVENTS)) {
return Status::Error("Specified from_id is in the past");
}
do_get(queue_id, q, from_id, forget_previous, unix_time_now, result_events);
return get_size(q);
}
std::pair<int64, bool> run_gc(int32 unix_time_now) final {
int64 deleted_events = 0;
auto max_finish_time = Time::now() + 0.05;
int64 counter = 0;
while (!queue_gc_at_.empty()) {
auto it = queue_gc_at_.begin();
if (it->first >= unix_time_now) {
break;
}
auto queue_id = it->second;
auto &q = queues_[queue_id];
CHECK(q.gc_at == it->first);
int32 new_gc_at = 0;
if (!q.events.empty()) {
size_t size_before = get_size(q);
for (auto event_it = q.events.begin(); event_it != q.events.end();) {
auto &event = event_it->second;
if ((++counter & 128) == 0 && Time::now() >= max_finish_time) {
if (new_gc_at == 0) {
new_gc_at = event.expires_at;
}
break;
}
if (event.expires_at < unix_time_now || event.data.empty()) {
pop(q, queue_id, event_it, q.tail_id);
} else {
if (new_gc_at != 0) {
break;
}
new_gc_at = event.expires_at;
++event_it;
}
}
size_t size_after = get_size(q);
CHECK(size_after <= size_before);
deleted_events += size_before - size_after;
}
schedule_queue_gc(queue_id, q, new_gc_at);
if (Time::now() >= max_finish_time) {
return {deleted_events, false};
}
}
return {deleted_events, true};
}
size_t get_size(QueueId queue_id) const final {
auto it = queues_.find(queue_id);
if (it == queues_.end()) {
return 0;
}
return get_size(it->second);
}
void close(Promise<> promise) final {
if (callback_ != nullptr) {
callback_->close(std::move(promise));
callback_ = nullptr;
}
}
private:
struct Queue {
EventId tail_id;
std::map<EventId, RawEvent> events;
size_t total_event_length = 0;
int32 gc_at = 0;
};
FlatHashMap<QueueId, Queue> queues_;
std::set<std::pair<int32, QueueId>> queue_gc_at_;
unique_ptr<StorageCallback> callback_;
static EventId get_queue_head(const Queue &q) {
if (q.events.empty()) {
return q.tail_id;
}
return q.events.begin()->first;
}
static size_t get_size(const Queue &q) {
if (q.events.empty()) {
return 0;
}
return q.events.size() - (q.events.rbegin()->second.data.empty() ? 1 : 0);
}
void pop(Queue &q, QueueId queue_id, std::map<EventId, RawEvent>::iterator &it, EventId tail_id) {
auto &event = it->second;
if (callback_ == nullptr || event.log_event_id == 0) {
remove_event(q, it);
return;
}
if (event.event_id.next().ok() == tail_id) {
if (!event.data.empty()) {
clear_event_data(q, event);
callback_->push(queue_id, event);
}
++it;
} else {
callback_->pop(event.log_event_id);
remove_event(q, it);
}
}
static void remove_event(Queue &q, std::map<EventId, RawEvent>::iterator &it) {
q.total_event_length -= it->second.data.size();
it = q.events.erase(it);
}
static void clear_event_data(Queue &q, RawEvent &event) {
q.total_event_length -= event.data.size();
event.data = {};
}
void do_get(QueueId queue_id, Queue &q, EventId from_id, bool forget_previous, int32 unix_time_now,
MutableSpan<Event> &result_events) {
if (forget_previous) {
for (auto it = q.events.begin(); it != q.events.end() && it->first < from_id;) {
pop(q, queue_id, it, q.tail_id);
}
}
size_t ready_n = 0;
for (auto it = q.events.lower_bound(from_id); it != q.events.end();) {
auto &event = it->second;
if (event.expires_at < unix_time_now || event.data.empty()) {
pop(q, queue_id, it, q.tail_id);
} else {
CHECK(!(event.event_id < from_id));
if (ready_n == result_events.size()) {
break;
}
auto &to = result_events[ready_n];
to.data = event.data;
to.id = event.event_id;
to.expires_at = event.expires_at;
to.extra = event.extra;
ready_n++;
++it;
}
}
result_events.truncate(ready_n);
}
void schedule_queue_gc(QueueId queue_id, Queue &q, int32 gc_at) {
if (q.gc_at != 0) {
bool is_deleted = queue_gc_at_.erase({q.gc_at, queue_id}) > 0;
CHECK(is_deleted);
}
q.gc_at = gc_at;
if (q.gc_at != 0) {
bool is_inserted = queue_gc_at_.emplace(gc_at, queue_id).second;
CHECK(is_inserted);
}
}
};
unique_ptr<TQueue> TQueue::create() {
return make_unique<TQueueImpl>();
}
struct TQueueLogEvent final : public Storer {
int64 queue_id;
int32 event_id;
int32 expires_at;
Slice data;
int64 extra;
template <class StorerT>
void store(StorerT &&storer) const {
using td::store;
store(queue_id, storer);
store(event_id, storer);
store(expires_at, storer);
store(data, storer);
if (extra != 0) {
store(extra, storer);
}
}
template <class ParserT>
void parse(ParserT &&parser, int32 has_extra) {
using td::parse;
parse(queue_id, parser);
parse(event_id, parser);
parse(expires_at, parser);
data = parser.template fetch_string<Slice>();
if (has_extra == 0) {
extra = 0;
} else {
parse(extra, parser);
}
}
size_t size() const final {
TlStorerCalcLength storer;
store(storer);
return storer.get_length();
}
size_t store(uint8 *ptr) const final {
TlStorerUnsafe storer(ptr);
store(storer);
return static_cast<size_t>(storer.get_buf() - ptr);
}
};
template <class BinlogT>
uint64 TQueueBinlog<BinlogT>::push(QueueId queue_id, const RawEvent &event) {
TQueueLogEvent log_event;
log_event.queue_id = queue_id;
log_event.event_id = event.event_id.value();
log_event.expires_at = event.expires_at;
log_event.data = event.data;
log_event.extra = event.extra;
auto magic = BINLOG_EVENT_TYPE + (log_event.extra != 0);
if (event.log_event_id == 0) {
return binlog_->add(magic, log_event);
}
binlog_->rewrite(event.log_event_id, magic, log_event);
return event.log_event_id;
}
template <class BinlogT>
void TQueueBinlog<BinlogT>::pop(uint64 log_event_id) {
binlog_->erase(log_event_id);
}
template <class BinlogT>
void TQueueBinlog<BinlogT>::pop_batch(std::vector<uint64> log_event_ids) {
binlog_->erase_batch(std::move(log_event_ids));
}
template <class BinlogT>
Status TQueueBinlog<BinlogT>::replay(const BinlogEvent &binlog_event, TQueue &q) const {
TQueueLogEvent event;
TlParser parser(binlog_event.get_data());
int32 has_extra = binlog_event.type_ - BINLOG_EVENT_TYPE;
if (has_extra != 0 && has_extra != 1) {
return Status::Error("Wrong magic");
}
event.parse(parser, has_extra);
parser.fetch_end();
TRY_STATUS(parser.get_status());
TRY_RESULT(event_id, EventId::from_int32(event.event_id));
RawEvent raw_event;
raw_event.log_event_id = binlog_event.id_;
raw_event.event_id = event_id;
raw_event.expires_at = event.expires_at;
raw_event.data = event.data.str();
raw_event.extra = event.extra;
if (!q.do_push(event.queue_id, std::move(raw_event))) {
return Status::Error("Failed to add event");
}
return Status::OK();
}
template <class BinlogT>
void TQueueBinlog<BinlogT>::close(Promise<> promise) {
binlog_->close(std::move(promise));
}
template class TQueueBinlog<BinlogInterface>;
template class TQueueBinlog<Binlog>;
uint64 TQueueMemoryStorage::push(QueueId queue_id, const RawEvent &event) {
auto log_event_id = event.log_event_id == 0 ? next_log_event_id_++ : event.log_event_id;
events_[log_event_id] = std::make_pair(queue_id, event);
return log_event_id;
}
void TQueueMemoryStorage::pop(uint64 log_event_id) {
events_.erase(log_event_id);
}
void TQueueMemoryStorage::replay(TQueue &q) const {
for (auto &e : events_) {
auto x = e.second;
x.second.log_event_id = e.first;
bool is_added = q.do_push(x.first, std::move(x.second));
CHECK(is_added);
}
}
void TQueueMemoryStorage::close(Promise<> promise) {
events_.clear();
promise.set_value({});
}
void TQueue::StorageCallback::pop_batch(std::vector<uint64> log_event_ids) {
for (auto id : log_event_ids) {
pop(id);
}
}
} // namespace td

157
td/tddb/td/db/TQueue.h Normal file
View File

@@ -0,0 +1,157 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/common.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Span.h"
#include "td/utils/Status.h"
#include "td/utils/StringBuilder.h"
#include <map>
#include <memory>
#include <utility>
namespace td {
class TQueue {
public:
class EventId {
public:
static constexpr int32 MAX_ID = 2000000000;
EventId();
static Result<EventId> from_int32(int32 id);
bool is_valid() const;
int32 value() const;
Result<EventId> next() const;
Result<EventId> advance(size_t offset) const;
bool empty() const;
bool operator==(const EventId &other) const;
bool operator!=(const EventId &other) const;
bool operator<(const EventId &other) const;
private:
int32 id_{0};
explicit EventId(int32 id);
static bool is_valid_id(int32 id);
};
struct Event {
EventId id;
int32 expires_at{0};
Slice data;
int64 extra{0};
};
struct RawEvent {
uint64 log_event_id{0};
EventId event_id;
int32 expires_at{0};
string data;
int64 extra{0};
};
using QueueId = int64;
class StorageCallback {
public:
using QueueId = TQueue::QueueId;
using RawEvent = TQueue::RawEvent;
StorageCallback() = default;
StorageCallback(const StorageCallback &) = delete;
StorageCallback &operator=(const StorageCallback &) = delete;
StorageCallback(StorageCallback &&) = delete;
StorageCallback &operator=(StorageCallback &&) = delete;
virtual ~StorageCallback() = default;
virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0;
virtual void pop(uint64 log_event_id) = 0;
virtual void close(Promise<> promise) = 0;
virtual void pop_batch(std::vector<uint64> log_event_ids);
};
static unique_ptr<TQueue> create();
TQueue() = default;
TQueue(const TQueue &) = delete;
TQueue &operator=(const TQueue &) = delete;
TQueue(TQueue &&) = delete;
TQueue &operator=(TQueue &&) = delete;
virtual ~TQueue() = default;
virtual void set_callback(unique_ptr<StorageCallback> callback) = 0;
virtual unique_ptr<StorageCallback> extract_callback() = 0;
virtual bool do_push(QueueId queue_id, RawEvent &&raw_event) = 0;
virtual Result<EventId> push(QueueId queue_id, string data, int32 expires_at, int64 extra, EventId hint_new_id) = 0;
virtual void forget(QueueId queue_id, EventId event_id) = 0;
virtual std::map<EventId, RawEvent> clear(QueueId queue_id, size_t keep_count) = 0;
virtual EventId get_head(QueueId queue_id) const = 0;
virtual EventId get_tail(QueueId queue_id) const = 0;
virtual Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now,
MutableSpan<Event> &result_events) = 0;
virtual size_t get_size(QueueId queue_id) const = 0;
// returns number of deleted events and whether garbage collection was completed
virtual std::pair<int64, bool> run_gc(int32 unix_time_now) = 0;
virtual void close(Promise<> promise) = 0;
};
StringBuilder &operator<<(StringBuilder &string_builder, TQueue::EventId id);
struct BinlogEvent;
template <class BinlogT>
class TQueueBinlog final : public TQueue::StorageCallback {
public:
uint64 push(QueueId queue_id, const RawEvent &event) final;
void pop(uint64 log_event_id) final;
void pop_batch(std::vector<uint64> log_event_ids) final;
Status replay(const BinlogEvent &binlog_event, TQueue &q) const TD_WARN_UNUSED_RESULT;
void set_binlog(std::shared_ptr<BinlogT> binlog) {
binlog_ = std::move(binlog);
}
void close(Promise<> promise) final;
private:
std::shared_ptr<BinlogT> binlog_;
static constexpr int32 BINLOG_EVENT_TYPE = 2314;
};
class TQueueMemoryStorage final : public TQueue::StorageCallback {
public:
uint64 push(QueueId queue_id, const RawEvent &event) final;
void pop(uint64 log_event_id) final;
void replay(TQueue &q) const;
void close(Promise<> promise) final;
private:
uint64 next_log_event_id_{1};
std::map<uint64, std::pair<QueueId, RawEvent>> events_;
};
} // namespace td

View File

@@ -0,0 +1,91 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/SeqKeyValue.h"
#include "td/utils/common.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/port/RwMutex.h"
#include "td/utils/Slice.h"
#include <utility>
namespace td {
class TsSeqKeyValue {
public:
using SeqNo = SeqKeyValue::SeqNo;
TsSeqKeyValue() = default;
explicit TsSeqKeyValue(SeqKeyValue kv) : kv_(std::move(kv)) {
}
TsSeqKeyValue(TsSeqKeyValue &&) = default;
TsSeqKeyValue &operator=(TsSeqKeyValue &&) = default;
TsSeqKeyValue(const TsSeqKeyValue &) = delete;
TsSeqKeyValue &operator=(const TsSeqKeyValue &) = delete;
~TsSeqKeyValue() = default;
SeqNo set(Slice key, Slice value) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return kv_.set(key, value);
}
std::pair<SeqNo, RwMutex::WriteLock> set_and_lock(Slice key, Slice value) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return std::make_pair(kv_.set(key, value), std::move(lock));
}
SeqNo erase(const string &key) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return kv_.erase(key);
}
SeqNo erase_batch(vector<string> keys) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return kv_.erase_batch(std::move(keys));
}
std::pair<SeqNo, RwMutex::WriteLock> erase_and_lock(const string &key) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return std::make_pair(kv_.erase(key), std::move(lock));
}
string get(const string &key) const {
auto lock = rw_mutex_.lock_read().move_as_ok();
return kv_.get(key);
}
bool isset(const string &key) const {
auto lock = rw_mutex_.lock_read().move_as_ok();
return kv_.isset(key);
}
size_t size() const {
return kv_.size();
}
FlatHashMap<string, string> get_all() const {
auto lock = rw_mutex_.lock_write().move_as_ok();
return kv_.get_all();
}
// non-thread-safe method
SeqKeyValue &inner() {
return kv_;
}
auto lock() {
return rw_mutex_.lock_write().move_as_ok();
}
private:
mutable RwMutex rw_mutex_;
SeqKeyValue kv_;
};
} // namespace td

View File

@@ -0,0 +1,775 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/detail/BinlogEventsBuffer.h"
#include "td/db/binlog/detail/BinlogEventsProcessor.h"
#include "td/utils/buffer.h"
#include "td/utils/format.h"
#include "td/utils/misc.h"
#include "td/utils/port/Clocks.h"
#include "td/utils/port/FileFd.h"
#include "td/utils/port/path.h"
#include "td/utils/port/PollFlags.h"
#include "td/utils/port/sleep.h"
#include "td/utils/port/Stat.h"
#include "td/utils/Random.h"
#include "td/utils/ScopeGuard.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/Time.h"
#include "td/utils/tl_helpers.h"
#include "td/utils/tl_parsers.h"
namespace td {
namespace detail {
struct AesCtrEncryptionEvent {
static constexpr size_t min_salt_size() {
return 16; // 256 bits
}
static constexpr size_t default_salt_size() {
return 32; // 256 bits
}
static constexpr size_t key_size() {
return 32; // 256 bits
}
static constexpr size_t iv_size() {
return 16; // 128 bits
}
static constexpr size_t hash_size() {
return 32; // 256 bits
}
static constexpr size_t kdf_iteration_count() {
return 60002;
}
static constexpr size_t kdf_fast_iteration_count() {
return 2;
}
string key_salt_;
string iv_;
string key_hash_;
string generate_key(const DbKey &db_key) const {
CHECK(!db_key.is_empty());
string key(key_size(), '\0');
size_t iteration_count = kdf_iteration_count();
if (db_key.is_raw_key()) {
iteration_count = kdf_fast_iteration_count();
}
pbkdf2_sha256(db_key.data(), key_salt_, narrow_cast<int>(iteration_count), key);
return key;
}
static string generate_hash(Slice key) {
string hash(hash_size(), '\0');
hmac_sha256(key, "cucumbers everywhere", hash);
return hash;
}
template <class StorerT>
void store(StorerT &storer) const {
using td::store;
BEGIN_STORE_FLAGS();
END_STORE_FLAGS();
store(key_salt_, storer);
store(iv_, storer);
store(key_hash_, storer);
}
template <class ParserT>
void parse(ParserT &&parser) {
using td::parse;
BEGIN_PARSE_FLAGS();
END_PARSE_FLAGS();
parse(key_salt_, parser);
parse(iv_, parser);
parse(key_hash_, parser);
}
};
class BinlogReader {
public:
explicit BinlogReader(ChainBufferReader *input) : input_(input) {
}
void set_input(ChainBufferReader *input, bool is_encrypted, int64 expected_size) {
input_ = input;
is_encrypted_ = is_encrypted;
expected_size_ = expected_size;
}
ChainBufferReader *input() {
return input_;
}
int64 offset() const {
return offset_;
}
Result<size_t> read_next(BinlogEvent *event) {
if (state_ == State::ReadLength) {
if (input_->size() < 4) {
return 4;
}
auto it = input_->clone();
char buf[4];
it.advance(4, MutableSlice(buf, 4));
size_ = static_cast<size_t>(TlParser(Slice(buf, 4)).fetch_int());
if (size_ > BinlogEvent::MAX_SIZE) {
return Status::Error(PSLICE() << "Too big event " << tag("size", size_));
}
if (size_ < BinlogEvent::MIN_SIZE) {
return Status::Error(PSLICE() << "Too small event " << tag("size", size_));
}
if (size_ % 4 != 0) {
return Status::Error(-2, PSLICE() << "Event of size " << size_ << " at offset " << offset() << " out of "
<< expected_size_ << ' ' << tag("is_encrypted", is_encrypted_)
<< format::as_hex_dump<4>(Slice(input_->prepare_read().truncate(28))));
}
state_ = State::ReadEvent;
}
if (input_->size() < size_) {
return size_;
}
event->debug_info_ = BinlogDebugInfo{__FILE__, __LINE__};
auto buffer_slice = input_->cut_head(size_).move_as_buffer_slice();
event->init(buffer_slice.as_slice().str());
TRY_STATUS(event->validate());
offset_ += size_;
event->offset_ = offset_;
state_ = State::ReadLength;
return 0;
}
private:
ChainBufferReader *input_;
enum class State { ReadLength, ReadEvent };
State state_ = State::ReadLength;
size_t size_{0};
int64 offset_{0};
int64 expected_size_{0};
bool is_encrypted_{false};
};
static int64 file_size(CSlice path) {
auto r_stat = stat(path);
if (r_stat.is_error()) {
return 0;
}
return r_stat.ok().size_;
}
} // namespace detail
int32 VERBOSITY_NAME(binlog) = VERBOSITY_NAME(DEBUG) + 8;
Binlog::Binlog() = default;
Binlog::~Binlog() {
close().ignore();
}
Result<FileFd> Binlog::open_binlog(const string &path, int32 flags) {
TRY_RESULT(fd, FileFd::open(path, flags));
TRY_STATUS(fd.lock(FileFd::LockFlags::Write, path, 100));
return std::move(fd);
}
Status Binlog::init(string path, const Callback &callback, DbKey db_key, DbKey old_db_key, int32 dummy,
const Callback &debug_callback) {
close().ignore();
db_key_ = std::move(db_key);
old_db_key_ = std::move(old_db_key);
processor_ = make_unique<detail::BinlogEventsProcessor>();
// Turn off BinlogEventsBuffer
// events_buffer_ = make_unique<detail::BinlogEventsBuffer>();
// try to restore binlog from regenerated version
if (stat(path).is_error()) {
rename(PSLICE() << path << ".new", path).ignore();
}
info_ = BinlogInfo();
info_.was_created = stat(path).is_error();
TRY_RESULT(fd, open_binlog(path, FileFd::Flags::Read | FileFd::Flags::Write | FileFd::Flags::Create));
fd_ = BufferedFdBase<FileFd>(std::move(fd));
fd_size_ = 0;
path_ = std::move(path);
auto status = load_binlog(callback, debug_callback);
if (status.is_error()) {
close().ignore();
return status;
}
info_.last_event_id = processor_->last_event_id();
last_event_id_ = processor_->last_event_id();
if (info_.wrong_password) {
close().ignore();
return Status::Error(static_cast<int>(Error::WrongPassword), "Wrong password");
}
if ((!db_key_.is_empty() && !db_key_used_) || (db_key_.is_empty() && encryption_type_ != EncryptionType::None)) {
aes_ctr_key_salt_ = string();
do_reindex();
}
info_.is_opened = true;
return Status::OK();
}
void Binlog::add_event(BinlogEvent &&event) {
if (event.size_ % 4 != 0) {
LOG(FATAL) << "Trying to add event with bad size " << event.public_to_string();
}
if (!events_buffer_) {
do_add_event(std::move(event));
} else {
events_buffer_->add_event(std::move(event));
}
lazy_flush();
if (state_ == State::Run) {
auto fd_size = fd_size_;
if (events_buffer_) {
fd_size += events_buffer_->size();
}
auto need_reindex = [&](int64 min_size, int rate) {
return fd_size > min_size && fd_size / rate > processor_->total_raw_events_size();
};
if (need_reindex(50000, 5) || need_reindex(100000, 4) || need_reindex(300000, 3) || need_reindex(500000, 2)) {
LOG(INFO) << tag("fd_size", format::as_size(fd_size))
<< tag("total events size", format::as_size(processor_->total_raw_events_size()));
do_reindex();
}
}
}
size_t Binlog::flush_events_buffer(bool force) {
if (!events_buffer_) {
return 0;
}
if (!force && !events_buffer_->need_flush()) {
return events_buffer_->size();
}
CHECK(!in_flush_events_buffer_);
in_flush_events_buffer_ = true;
events_buffer_->flush([&](BinlogEvent &&event) { this->do_add_event(std::move(event)); });
in_flush_events_buffer_ = false;
return 0;
}
void Binlog::do_add_event(BinlogEvent &&event) {
if (event.flags_ & BinlogEvent::Flags::Partial) {
event.flags_ &= ~BinlogEvent::Flags::Partial;
pending_events_.emplace_back(std::move(event));
} else {
for (auto &pending_event : pending_events_) {
do_event(std::move(pending_event));
}
pending_events_.clear();
do_event(std::move(event));
}
}
Status Binlog::close(bool need_sync) {
if (fd_.empty()) {
return Status::OK();
}
if (need_sync) {
sync("close");
} else {
flush("close");
}
fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ensure();
fd_.close();
path_.clear();
info_.is_opened = false;
need_sync_ = false;
return Status::OK();
}
void Binlog::close(Promise<> promise) {
TRY_STATUS_PROMISE(promise, close());
promise.set_value({});
}
void Binlog::change_key(DbKey new_db_key) {
db_key_ = std::move(new_db_key);
aes_ctr_key_salt_ = string();
do_reindex();
}
Status Binlog::close_and_destroy() {
auto path = path_;
auto close_status = close(false);
destroy(path).ignore();
return close_status;
}
Status Binlog::destroy(Slice path) {
unlink(PSLICE() << path << ".new").ignore(); // delete regenerated version first to avoid it becoming main version
unlink(PSLICE() << path).ignore();
return Status::OK();
}
void Binlog::do_event(BinlogEvent &&event) {
auto event_size = event.raw_event_.size();
if (state_ == State::Run || state_ == State::Reindex) {
auto validate_status = event.validate();
if (validate_status.is_error()) {
LOG(FATAL) << "Failed to validate binlog event " << validate_status << " "
<< format::as_hex_dump<4>(as_slice(event.raw_event_).truncate(28));
}
VLOG(binlog) << "Write binlog event: " << format::cond(state_ == State::Reindex, "[reindex] ")
<< event.public_to_string();
buffer_writer_.append(as_slice(event.raw_event_));
}
if (event.type_ < 0) {
if (event.type_ == BinlogEvent::ServiceTypes::AesCtrEncryption) {
detail::AesCtrEncryptionEvent encryption_event;
encryption_event.parse(TlParser(event.get_data()));
string key;
if (aes_ctr_key_salt_ == encryption_event.key_salt_) {
key = as_slice(aes_ctr_key_).str();
} else if (!db_key_.is_empty()) {
key = encryption_event.generate_key(db_key_);
}
if (detail::AesCtrEncryptionEvent::generate_hash(key) != encryption_event.key_hash_) {
CHECK(state_ == State::Load);
if (!old_db_key_.is_empty()) {
key = encryption_event.generate_key(old_db_key_);
if (detail::AesCtrEncryptionEvent::generate_hash(key) != encryption_event.key_hash_) {
info_.wrong_password = true;
}
} else {
info_.wrong_password = true;
}
} else {
db_key_used_ = true;
}
encryption_type_ = EncryptionType::AesCtr;
aes_ctr_key_salt_ = encryption_event.key_salt_;
update_encryption(key, encryption_event.iv_);
if (state_ == State::Load) {
update_read_encryption();
LOG(INFO) << "Load: init encryption";
} else {
CHECK(state_ == State::Reindex);
flush("do_event");
update_write_encryption();
//LOG(INFO) << format::cond(state_ == State::Run, "Run", "Reindex") << ": init encryption";
}
}
}
if (state_ != State::Reindex) {
auto status = processor_->add_event(std::move(event));
if (status.is_error()) {
auto old_size = detail::file_size(path_);
auto data = debug_get_binlog_data(fd_size_, old_size);
if (state_ == State::Load) {
fd_.seek(fd_size_).ensure();
fd_.truncate_to_current_position(fd_size_).ensure();
if (data.empty()) {
return;
}
}
LOG(FATAL) << "Truncate binlog \"" << path_ << "\" from size " << old_size << " to size " << fd_size_
<< " in state " << static_cast<int32>(state_) << " due to error: " << status << " after reading "
<< data;
}
}
fd_events_++;
fd_size_ += event_size;
}
void Binlog::sync(const char *source) {
flush(source);
if (need_sync_) {
LOG(INFO) << "Sync binlog from " << source;
auto status = fd_.sync();
LOG_IF(FATAL, status.is_error()) << "Failed to sync binlog: " << status;
need_sync_ = false;
}
}
void Binlog::flush(const char *source) {
if (state_ == State::Load) {
return;
}
LOG(DEBUG) << "Flush binlog from " << source;
flush_events_buffer(true);
// NB: encryption happens during flush
if (byte_flow_flag_) {
byte_flow_source_.wakeup();
}
auto r_written = fd_.flush_write();
r_written.ensure();
auto written = r_written.ok();
if (written > 0) {
need_sync_ = true;
}
need_flush_since_ = 0;
LOG_IF(FATAL, fd_.need_flush_write()) << "Failed to flush binlog";
if (state_ == State::Run && Time::now() > next_buffer_flush_time_) {
VLOG(binlog) << "Flush write buffer";
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
if (encryption_type_ == EncryptionType::AesCtr) {
aes_ctr_state_ = aes_xcode_byte_flow_.move_aes_ctr_state();
}
update_write_encryption();
next_buffer_flush_time_ = Time::now() + 1.0;
}
}
void Binlog::lazy_flush() {
size_t events_buffer_size = flush_events_buffer(false /*force*/);
buffer_reader_.sync_with_writer();
auto size = buffer_reader_.size() + events_buffer_size;
if (size > (1 << 14)) {
flush("lazy_flush");
} else if (size > 0 && need_flush_since_ == 0) {
need_flush_since_ = Time::now_cached();
}
}
void Binlog::update_read_encryption() {
CHECK(binlog_reader_ptr_);
switch (encryption_type_) {
case EncryptionType::None: {
auto r_file_size = fd_.get_size();
r_file_size.ensure();
binlog_reader_ptr_->set_input(&buffer_reader_, false, r_file_size.ok());
byte_flow_flag_ = false;
break;
}
case EncryptionType::AesCtr: {
byte_flow_source_ = ByteFlowSource(&buffer_reader_);
aes_xcode_byte_flow_ = AesCtrByteFlow();
aes_xcode_byte_flow_.init(std::move(aes_ctr_state_));
byte_flow_sink_ = ByteFlowSink();
byte_flow_source_ >> aes_xcode_byte_flow_ >> byte_flow_sink_;
byte_flow_flag_ = true;
auto r_file_size = fd_.get_size();
r_file_size.ensure();
binlog_reader_ptr_->set_input(byte_flow_sink_.get_output(), true, r_file_size.ok());
break;
}
}
}
void Binlog::update_write_encryption() {
switch (encryption_type_) {
case EncryptionType::None: {
fd_.set_output_reader(&buffer_reader_);
byte_flow_flag_ = false;
break;
}
case EncryptionType::AesCtr: {
byte_flow_source_ = ByteFlowSource(&buffer_reader_);
aes_xcode_byte_flow_ = AesCtrByteFlow();
aes_xcode_byte_flow_.init(std::move(aes_ctr_state_));
byte_flow_sink_ = ByteFlowSink();
byte_flow_source_ >> aes_xcode_byte_flow_ >> byte_flow_sink_;
byte_flow_flag_ = true;
fd_.set_output_reader(byte_flow_sink_.get_output());
break;
}
}
}
Status Binlog::load_binlog(const Callback &callback, const Callback &debug_callback) {
state_ = State::Load;
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
fd_.set_input_writer(&buffer_writer_);
detail::BinlogReader reader{nullptr};
binlog_reader_ptr_ = &reader;
update_read_encryption();
fd_.get_poll_info().add_flags(PollFlags::Read());
info_.wrong_password = false;
while (true) {
BinlogEvent event;
auto r_need_size = reader.read_next(&event);
if (r_need_size.is_error()) {
if (r_need_size.error().code() == -2) {
auto old_size = detail::file_size(path_);
auto offset = reader.offset();
auto data = debug_get_binlog_data(offset, old_size);
fd_.seek(offset).ensure();
fd_.truncate_to_current_position(offset).ensure();
if (data.empty()) {
break;
}
LOG(FATAL) << "Truncate binlog \"" << path_ << "\" from size " << old_size << " to size " << offset
<< " due to error: " << r_need_size.error() << " after reading " << data;
}
LOG(ERROR) << r_need_size.error();
break;
}
auto need_size = r_need_size.move_as_ok();
// LOG(ERROR) << "Need size = " << need_size;
if (need_size == 0) {
if (debug_callback) {
debug_callback(event);
}
do_add_event(std::move(event));
if (info_.wrong_password) {
return Status::OK();
}
} else {
TRY_STATUS(fd_.flush_read(max(need_size, static_cast<size_t>(4096))));
buffer_reader_.sync_with_writer();
if (byte_flow_flag_) {
byte_flow_source_.wakeup();
}
if (reader.input()->size() < need_size) {
break;
}
}
}
auto offset = processor_->offset();
CHECK(offset >= 0);
processor_->for_each([&](BinlogEvent &event) {
VLOG(binlog) << "Replay binlog event: " << event.public_to_string();
if (callback) {
callback(event);
}
});
TRY_RESULT(fd_size, fd_.get_size());
if (offset != fd_size) {
LOG(ERROR) << "Truncate " << tag("path", path_) << tag("old_size", fd_size) << tag("new_size", offset);
fd_.seek(offset).ensure();
fd_.truncate_to_current_position(offset).ensure();
db_key_used_ = false; // force reindex
}
LOG_CHECK(fd_size_ == offset) << fd_size << " " << fd_size_ << " " << offset;
binlog_reader_ptr_ = nullptr;
state_ = State::Run;
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
// reuse aes_ctr_state_
if (encryption_type_ == EncryptionType::AesCtr) {
aes_ctr_state_ = aes_xcode_byte_flow_.move_aes_ctr_state();
}
update_write_encryption();
return Status::OK();
}
void Binlog::update_encryption(Slice key, Slice iv) {
as_mutable_slice(aes_ctr_key_).copy_from(key);
UInt128 aes_ctr_iv;
as_mutable_slice(aes_ctr_iv).copy_from(iv);
aes_ctr_state_.init(as_slice(aes_ctr_key_), as_slice(aes_ctr_iv));
}
void Binlog::reset_encryption() {
if (db_key_.is_empty()) {
encryption_type_ = EncryptionType::None;
return;
}
using EncryptionEvent = detail::AesCtrEncryptionEvent;
EncryptionEvent event;
if (aes_ctr_key_salt_.empty()) {
event.key_salt_.resize(EncryptionEvent::default_salt_size());
Random::secure_bytes(event.key_salt_);
} else {
event.key_salt_ = aes_ctr_key_salt_;
}
event.iv_.resize(EncryptionEvent::iv_size());
Random::secure_bytes(event.iv_);
string key;
if (aes_ctr_key_salt_ == event.key_salt_) {
key = as_slice(aes_ctr_key_).str();
} else {
key = event.generate_key(db_key_);
}
event.key_hash_ = EncryptionEvent::generate_hash(key);
do_event(BinlogEvent(
BinlogEvent::create_raw(0, BinlogEvent::ServiceTypes::AesCtrEncryption, 0, create_default_storer(event)),
BinlogDebugInfo{__FILE__, __LINE__}));
}
void Binlog::do_reindex() {
flush_events_buffer(true);
// start reindex
CHECK(state_ == State::Run);
state_ = State::Reindex;
SCOPE_EXIT {
state_ = State::Run;
};
auto start_time = Clocks::monotonic();
auto start_size = detail::file_size(path_);
auto start_events = fd_events_;
string new_path = path_ + ".new";
auto r_opened_file = open_binlog(new_path, FileFd::Flags::Write | FileFd::Flags::Create | FileFd::Truncate);
if (r_opened_file.is_error()) {
LOG(ERROR) << "Can't open new binlog for regenerate: " << r_opened_file.error();
return;
}
auto old_fd = std::move(fd_); // can't close fd_ now, because it will release file lock
fd_ = BufferedFdBase<FileFd>(r_opened_file.move_as_ok());
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
encryption_type_ = EncryptionType::None;
update_write_encryption();
// reindex
fd_size_ = 0;
fd_events_ = 0;
reset_encryption();
processor_->for_each([&](BinlogEvent &event) {
do_event(std::move(event)); // NB: no move is actually happens
});
{
flush("do_reindex");
if (start_size != 0) { // must sync creation of the file if it is non-empty
auto status = fd_.sync_barrier();
LOG_IF(FATAL, status.is_error()) << "Failed to sync binlog: " << status;
}
need_sync_ = false;
}
// finish_reindex
auto status = unlink(path_);
LOG_IF(FATAL, status.is_error()) << "Failed to unlink old binlog: " << status;
old_fd.close(); // now we can close old file and release the system lock
status = rename(new_path, path_);
FileFd::remove_local_lock(new_path); // now we can release local lock for temporary file
LOG_IF(FATAL, status.is_error()) << "Failed to rename binlog: " << status;
auto finish_time = Clocks::monotonic();
auto finish_size = fd_size_;
auto finish_events = fd_events_;
for (int left_tries = 10; left_tries > 0; left_tries--) {
auto r_stat = stat(path_);
if (r_stat.is_error()) {
if (left_tries != 1) {
usleep_for(200000 / left_tries);
continue;
}
LOG(FATAL) << "Failed to rename binlog of size " << fd_size_ << " to " << path_ << ": " << r_stat.error()
<< ". Temp file size is " << detail::file_size(new_path) << ", new size " << detail::file_size(path_);
}
LOG_CHECK(fd_size_ == r_stat.ok().size_) << fd_size_ << ' ' << r_stat.ok().size_ << ' '
<< detail::file_size(new_path) << ' ' << fd_events_ << ' ' << path_;
break;
}
auto ratio = static_cast<double>(start_size) / static_cast<double>(finish_size + 1);
[&](Slice msg) {
if (start_size > (10 << 20) || finish_time - start_time > 1) {
LOG(WARNING) << "Slow " << msg;
} else {
LOG(INFO) << msg;
}
}(PSLICE() << "Regenerate index " << tag("name", path_) << tag("time", format::as_time(finish_time - start_time))
<< tag("before_size", format::as_size(start_size)) << tag("after_size", format::as_size(finish_size))
<< tag("ratio", ratio) << tag("before_events", start_events) << tag("after_events", finish_events));
buffer_writer_ = ChainBufferWriter();
buffer_reader_ = buffer_writer_.extract_reader();
// reuse aes_ctr_state_
if (encryption_type_ == EncryptionType::AesCtr) {
aes_ctr_state_ = aes_xcode_byte_flow_.move_aes_ctr_state();
}
update_write_encryption();
}
string Binlog::debug_get_binlog_data(int64 begin_offset, int64 end_offset) {
if (begin_offset > end_offset) {
return "Begin offset is bigger than end_offset";
}
if (begin_offset == end_offset) {
return string();
}
static int64 MAX_DATA_LENGTH = 512;
if (end_offset - begin_offset > MAX_DATA_LENGTH) {
end_offset = begin_offset + MAX_DATA_LENGTH;
}
auto r_fd = FileFd::open(path_, FileFd::Flags::Read);
if (r_fd.is_error()) {
return PSTRING() << "Failed to open binlog: " << r_fd.error();
}
auto fd = r_fd.move_as_ok();
fd_.lock(FileFd::LockFlags::Unlock, path_, 1).ignore();
SCOPE_EXIT {
fd_.lock(FileFd::LockFlags::Write, path_, 1).ensure();
};
auto expected_data_length = narrow_cast<size_t>(end_offset - begin_offset);
string data(expected_data_length, '\0');
auto r_data_size = fd.pread(data, begin_offset);
if (r_data_size.is_error()) {
return PSTRING() << "Failed to read binlog: " << r_data_size.error();
}
if (r_data_size.ok() < expected_data_length) {
data.resize(r_data_size.ok());
data = PSTRING() << format::as_hex_dump<4>(Slice(data)) << " | with " << expected_data_length - r_data_size.ok()
<< " missed bytes";
} else {
if (encryption_type_ == EncryptionType::AesCtr) {
bool is_zero = true;
for (auto &c : data) {
if (c != '\0') {
is_zero = false;
}
}
// very often we have '\0' bytes written to disk instead of a real log event
// this is clearly impossible content for a real encrypted log event, so just ignore it
if (is_zero) {
return string();
}
}
data = PSTRING() << format::as_hex_dump<4>(Slice(data));
}
return data;
}
} // namespace td

View File

@@ -0,0 +1,183 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/DbKey.h"
#include "td/utils/AesCtrByteFlow.h"
#include "td/utils/buffer.h"
#include "td/utils/BufferedFd.h"
#include "td/utils/ByteFlow.h"
#include "td/utils/common.h"
#include "td/utils/crypto.h"
#include "td/utils/logging.h"
#include "td/utils/port/FileFd.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include "td/utils/StorerBase.h"
#include "td/utils/UInt.h"
#include <functional>
namespace td {
extern int32 VERBOSITY_NAME(binlog);
struct BinlogInfo {
bool was_created{false};
uint64 last_event_id{0};
bool is_encrypted{false};
bool wrong_password{false};
bool is_opened{false};
};
namespace detail {
class BinlogReader;
class BinlogEventsProcessor;
class BinlogEventsBuffer;
} // namespace detail
class Binlog {
public:
enum class Error : int { WrongPassword = -1037284 };
Binlog();
Binlog(const Binlog &) = delete;
Binlog &operator=(const Binlog &) = delete;
Binlog(Binlog &&) = delete;
Binlog &operator=(Binlog &&) = delete;
~Binlog();
using Callback = std::function<void(const BinlogEvent &)>;
Status init(string path, const Callback &callback, DbKey db_key = DbKey::empty(), DbKey old_db_key = DbKey::empty(),
int32 dummy = -1, const Callback &debug_callback = Callback()) TD_WARN_UNUSED_RESULT;
uint64 next_event_id() {
return ++last_event_id_;
}
uint64 next_event_id(int32 shift) {
auto res = last_event_id_ + 1;
last_event_id_ += shift;
return res;
}
uint64 peek_next_event_id() const {
return last_event_id_ + 1;
}
bool empty() const {
return fd_.empty();
}
uint64 add(int32 type, const Storer &storer) {
auto event_id = next_event_id();
add_raw_event(BinlogEvent::create_raw(event_id, type, 0, storer), {});
return event_id;
}
uint64 rewrite(uint64 event_id, int32 type, const Storer &storer) {
auto seq_no = next_event_id();
add_raw_event(BinlogEvent::create_raw(event_id, type, BinlogEvent::Flags::Rewrite, storer), {});
return seq_no;
}
uint64 erase(uint64 event_id) {
auto seq_no = next_event_id();
add_raw_event(
BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer()),
{});
return seq_no;
}
uint64 erase_batch(vector<uint64> event_ids) {
if (event_ids.empty()) {
return 0;
}
auto seq_no = next_event_id(0);
for (auto event_id : event_ids) {
erase(event_id);
}
return seq_no;
}
void add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) {
add_event(BinlogEvent(std::move(raw_event), info));
}
void add_event(BinlogEvent &&event);
void sync(const char *source);
void flush(const char *source);
void lazy_flush();
double need_flush_since() const {
return need_flush_since_;
}
void change_key(DbKey new_db_key);
Status close(bool need_sync = true) TD_WARN_UNUSED_RESULT;
void close(Promise<> promise);
Status close_and_destroy() TD_WARN_UNUSED_RESULT;
static Status destroy(Slice path) TD_WARN_UNUSED_RESULT;
CSlice get_path() const {
return path_;
}
BinlogInfo get_info() const { // works even after binlog was closed
return info_;
}
private:
BufferedFdBase<FileFd> fd_;
ChainBufferWriter buffer_writer_;
ChainBufferReader buffer_reader_;
detail::BinlogReader *binlog_reader_ptr_ = nullptr;
BinlogInfo info_;
DbKey db_key_;
bool db_key_used_ = false;
DbKey old_db_key_;
enum class EncryptionType { None, AesCtr } encryption_type_ = EncryptionType::None;
// AesCtrEncryption
string aes_ctr_key_salt_;
UInt256 aes_ctr_key_;
AesCtrState aes_ctr_state_;
bool byte_flow_flag_ = false;
ByteFlowSource byte_flow_source_;
ByteFlowSink byte_flow_sink_;
AesCtrByteFlow aes_xcode_byte_flow_;
int64 fd_size_{0};
uint64 fd_events_{0};
string path_;
vector<BinlogEvent> pending_events_;
unique_ptr<detail::BinlogEventsProcessor> processor_;
unique_ptr<detail::BinlogEventsBuffer> events_buffer_;
bool in_flush_events_buffer_{false};
uint64 last_event_id_{0};
double need_flush_since_ = 0;
double next_buffer_flush_time_ = 0;
bool need_sync_{false};
enum class State { Empty, Load, Reindex, Run } state_{State::Empty};
static Result<FileFd> open_binlog(const string &path, int32 flags);
size_t flush_events_buffer(bool force);
void do_add_event(BinlogEvent &&event);
void do_event(BinlogEvent &&event);
Status load_binlog(const Callback &callback, const Callback &debug_callback = Callback()) TD_WARN_UNUSED_RESULT;
void do_reindex();
void update_encryption(Slice key, Slice iv);
void reset_encryption();
void update_read_encryption();
void update_write_encryption();
string debug_get_binlog_data(int64 begin_offset, int64 end_offset);
};
} // namespace td

View File

@@ -0,0 +1,75 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/BinlogEvent.h"
#include "td/utils/crypto.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/tl_storers.h"
namespace td {
void BinlogEvent::init(string raw_event) {
TlParser parser(as_slice(raw_event));
size_ = static_cast<uint32>(parser.fetch_int());
LOG_CHECK(size_ == raw_event.size()) << size_ << ' ' << raw_event.size() << debug_info_;
id_ = static_cast<uint64>(parser.fetch_long());
type_ = parser.fetch_int();
flags_ = parser.fetch_int();
extra_ = static_cast<uint64>(parser.fetch_long());
CHECK(size_ >= MIN_SIZE);
parser.template fetch_string_raw<Slice>(size_ - MIN_SIZE); // skip data
crc32_ = static_cast<uint32>(parser.fetch_int());
raw_event_ = std::move(raw_event);
}
Slice BinlogEvent::get_data() const {
CHECK(raw_event_.size() >= MIN_SIZE);
return Slice(as_slice(raw_event_).data() + HEADER_SIZE, raw_event_.size() - MIN_SIZE);
}
Status BinlogEvent::validate() const {
if (raw_event_.size() < MIN_SIZE) {
return Status::Error("Too small event");
}
TlParser parser(as_slice(raw_event_));
auto size = static_cast<uint32>(parser.fetch_int());
if (size_ != size || size_ != raw_event_.size()) {
return Status::Error(PSLICE() << "Size of event changed: " << tag("was", size_) << tag("now", size)
<< tag("real size", raw_event_.size()));
}
parser.template fetch_string_raw<Slice>(size_ - TAIL_SIZE - sizeof(int)); // skip
auto stored_crc32 = static_cast<uint32>(parser.fetch_int());
auto calculated_crc = crc32(Slice(as_slice(raw_event_).data(), size_ - TAIL_SIZE));
if (calculated_crc != crc32_ || calculated_crc != stored_crc32) {
return Status::Error(PSLICE() << "CRC mismatch " << tag("actual", format::as_hex(calculated_crc))
<< tag("expected", format::as_hex(crc32_)) << public_to_string());
}
return Status::OK();
}
BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const Storer &storer) {
auto raw_event = BufferSlice{storer.size() + MIN_SIZE};
TlStorerUnsafe tl_storer(raw_event.as_mutable_slice().ubegin());
tl_storer.store_int(narrow_cast<int32>(raw_event.size()));
tl_storer.store_long(id);
tl_storer.store_int(type);
tl_storer.store_int(flags);
tl_storer.store_long(0);
CHECK(tl_storer.get_buf() == raw_event.as_slice().ubegin() + HEADER_SIZE);
tl_storer.store_storer(storer);
CHECK(tl_storer.get_buf() == raw_event.as_slice().uend() - TAIL_SIZE);
tl_storer.store_int(crc32(raw_event.as_slice().truncate(raw_event.size() - TAIL_SIZE)));
return raw_event;
}
} // namespace td

View File

@@ -0,0 +1,115 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include "td/utils/Storer.h"
#include "td/utils/StorerBase.h"
#include "td/utils/StringBuilder.h"
namespace td {
struct EmptyStorerImpl {
EmptyStorerImpl() {
}
template <class StorerT>
void store(StorerT &storer) const {
}
};
inline auto EmptyStorer() {
static const EmptyStorerImpl impl;
return create_default_storer(impl);
}
struct BinlogDebugInfo {
BinlogDebugInfo() = default;
BinlogDebugInfo(const char *file, int line) : file(file), line(line) {
}
const char *file{""};
int line{0};
};
inline StringBuilder &operator<<(StringBuilder &sb, const BinlogDebugInfo &info) {
if (info.line == 0) {
return sb;
}
return sb << "[" << info.file << ":" << info.line << "]";
}
struct BinlogEvent {
static constexpr size_t MAX_SIZE = 1 << 24;
static constexpr size_t HEADER_SIZE = 4 + 8 + 4 + 4 + 8;
static constexpr size_t TAIL_SIZE = 4;
static constexpr size_t MIN_SIZE = HEADER_SIZE + TAIL_SIZE;
int64 offset_ = -1;
uint32 size_ = 0;
uint64 id_ = 0;
int32 type_ = 0; // type can be merged with flags
int32 flags_ = 0;
uint64 extra_ = 0;
uint32 crc32_ = 0;
string raw_event_;
BinlogDebugInfo debug_info_;
enum ServiceTypes { Header = -1, Empty = -2, AesCtrEncryption = -3, NoEncryption = -4 };
enum Flags { Rewrite = 1, Partial = 2 };
Slice get_data() const;
bool is_empty() const {
return raw_event_.empty();
}
BinlogEvent clone() const {
BinlogEvent result;
result.debug_info_ = BinlogDebugInfo{__FILE__, __LINE__};
result.init(raw_event_);
result.validate().ensure();
return result;
}
BufferSlice data_as_buffer_slice() const {
return BufferSlice(get_data());
}
BinlogEvent() = default;
BinlogEvent(BufferSlice &&raw_event, BinlogDebugInfo info) {
debug_info_ = info;
init(raw_event.as_slice().str());
}
static BufferSlice create_raw(uint64 id, int32 type, int32 flags, const Storer &storer);
string public_to_string() const {
return PSTRING() << "LogEvent[" << tag("id", format::as_hex(id_)) << tag("type", type_) << tag("flags", flags_)
<< tag("data", get_data().size()) << "]" << debug_info_;
}
void init(string raw_event);
Status validate() const TD_WARN_UNUSED_RESULT;
};
inline StringBuilder &operator<<(StringBuilder &sb, const BinlogEvent &event) {
return sb << "LogEvent[" << tag("id", format::as_hex(event.id_)) << tag("type", event.type_)
<< tag("flags", event.flags_) << tag("data", format::as_hex_dump<4>(event.get_data())) << "]"
<< event.debug_info_;
}
} // namespace td

View File

@@ -0,0 +1,32 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/binlog/BinlogInterface.h"
#include "td/utils/common.h"
#include "td/utils/Promise.h"
#include "td/utils/StorerBase.h"
namespace td {
inline uint64 binlog_add(BinlogInterface *binlog_ptr, int32 type, const Storer &storer,
Promise<> promise = Promise<>()) {
return binlog_ptr->add(type, storer, std::move(promise));
}
inline uint64 binlog_rewrite(BinlogInterface *binlog_ptr, uint64 log_event_id, int32 type, const Storer &storer,
Promise<> promise = Promise<>()) {
return binlog_ptr->rewrite(log_event_id, type, storer, std::move(promise));
}
inline uint64 binlog_erase(BinlogInterface *binlog_ptr, uint64 log_event_id, Promise<> promise = Promise<>()) {
return binlog_ptr->erase(log_event_id, std::move(promise));
}
} // namespace td

View File

@@ -0,0 +1,90 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/DbKey.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/Promise.h"
#include "td/utils/StorerBase.h"
namespace td {
class BinlogInterface {
public:
BinlogInterface() = default;
BinlogInterface(const BinlogInterface &) = delete;
BinlogInterface &operator=(const BinlogInterface &) = delete;
BinlogInterface(BinlogInterface &&) = delete;
BinlogInterface &operator=(BinlogInterface &&) = delete;
virtual ~BinlogInterface() = default;
void close(Promise<> promise = {}) {
close_impl(std::move(promise));
}
void close_and_destroy(Promise<> promise = {}) {
close_and_destroy_impl(std::move(promise));
}
void add_raw_event(BinlogDebugInfo info, uint64 event_id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
add_raw_event_impl(event_id, std::move(raw_event), std::move(promise), info);
}
void add_raw_event(uint64 event_id, BufferSlice &&raw_event, Promise<> promise = Promise<>()) {
add_raw_event_impl(event_id, std::move(raw_event), std::move(promise), {});
}
void lazy_sync(Promise<> promise = Promise<>()) {
add_raw_event_impl(next_event_id(), BufferSlice(), std::move(promise), {});
}
uint64 add(int32 type, const Storer &storer, Promise<> promise = Promise<>()) {
auto event_id = next_event_id();
add_raw_event_impl(event_id, BinlogEvent::create_raw(event_id, type, 0, storer), std::move(promise), {});
return event_id;
}
uint64 rewrite(uint64 event_id, int32 type, const Storer &storer, Promise<> promise = Promise<>()) {
auto seq_no = next_event_id();
add_raw_event_impl(seq_no, BinlogEvent::create_raw(event_id, type, BinlogEvent::Flags::Rewrite, storer),
std::move(promise), {});
return seq_no;
}
uint64 erase(uint64 event_id, Promise<> promise = Promise<>()) {
auto seq_no = next_event_id();
add_raw_event_impl(
seq_no,
BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, EmptyStorer()),
std::move(promise), {});
return seq_no;
}
virtual uint64 erase_batch(vector<uint64> event_ids) {
if (event_ids.empty()) {
return 0;
}
uint64 seq_no = next_event_id(0);
for (auto event_id : event_ids) {
erase(event_id);
}
return seq_no;
}
virtual void force_sync(Promise<> promise, const char *source) = 0;
virtual void force_flush() = 0;
virtual void change_key(DbKey db_key, Promise<> promise) = 0;
virtual uint64 next_event_id() = 0;
virtual uint64 next_event_id(int32 shift) = 0;
protected:
virtual void close_impl(Promise<> promise) = 0;
virtual void close_and_destroy_impl(Promise<> promise) = 0;
virtual void add_raw_event_impl(uint64 seq_no, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) = 0;
};
} // namespace td

View File

@@ -0,0 +1,231 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/ConcurrentBinlog.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/OrderedEventsProcessor.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Time.h"
#include <map>
namespace td {
namespace detail {
class BinlogActor final : public Actor {
public:
BinlogActor(unique_ptr<Binlog> binlog, uint64 seq_no) : binlog_(std::move(binlog)), processor_(seq_no) {
}
void close(Promise<> promise) {
binlog_->close().ensure();
LOG(INFO) << "Finished to close binlog";
stop();
promise.set_value(Unit()); // setting promise can complete closing and destroy the current actor context
}
void close_and_destroy(Promise<> promise) {
binlog_->close_and_destroy().ensure();
LOG(INFO) << "Finished to destroy binlog";
stop();
promise.set_value(Unit()); // setting promise can complete closing and destroy the current actor context
}
struct Event {
BufferSlice raw_event;
Promise<> sync_promise;
BinlogDebugInfo debug_info;
};
void erase_batch(uint64 seq_no, std::vector<uint64> event_ids) {
for (auto event_id : event_ids) {
auto event = BinlogEvent::create_raw(event_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite,
EmptyStorer());
add_raw_event(seq_no, std::move(event), Promise<Unit>(), BinlogDebugInfo{__FILE__, __LINE__});
seq_no++;
}
}
void add_raw_event(uint64 seq_no, BufferSlice &&raw_event, Promise<> &&promise, BinlogDebugInfo info) {
processor_.add(seq_no, Event{std::move(raw_event), std::move(promise), info}, [&](uint64 event_id, Event &&event) {
if (!event.raw_event.empty()) {
do_add_raw_event(std::move(event.raw_event), event.debug_info);
}
do_lazy_sync(std::move(event.sync_promise));
});
flush_immediate_sync();
try_flush();
}
void force_sync(Promise<> &&promise, const char *source) {
LOG(INFO) << "Force binlog sync from " << source;
auto seq_no = processor_.max_unfinished_seq_no();
if (processor_.max_finished_seq_no() == seq_no) {
do_immediate_sync(std::move(promise));
} else {
immediate_sync_promises_.emplace(seq_no, std::move(promise));
}
}
void force_flush() {
// TODO: use same logic as in force_sync
binlog_->flush("force_flush");
flush_flag_ = false;
}
void change_key(DbKey db_key, Promise<> promise) {
binlog_->change_key(std::move(db_key));
promise.set_value(Unit());
}
private:
unique_ptr<Binlog> binlog_;
OrderedEventsProcessor<Event> processor_;
std::multimap<uint64, Promise<>> immediate_sync_promises_;
std::vector<Promise<>> sync_promises_;
bool force_sync_flag_ = false;
bool lazy_sync_flag_ = false;
bool flush_flag_ = false;
double wakeup_at_ = 0;
static constexpr double FLUSH_TIMEOUT = 0.001; // 1ms
void wakeup_after(double after) {
auto now = Time::now_cached();
wakeup_at(now + after);
}
void wakeup_at(double at) {
if (wakeup_at_ == 0 || wakeup_at_ > at) {
wakeup_at_ = at;
set_timeout_at(wakeup_at_);
}
}
void do_add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) {
binlog_->add_raw_event(std::move(raw_event), info);
}
void try_flush() {
auto need_flush_since = binlog_->need_flush_since();
auto now = Time::now_cached();
if (now > need_flush_since + FLUSH_TIMEOUT - 1e-9) {
binlog_->flush("try_flush");
} else {
if (!force_sync_flag_) {
flush_flag_ = true;
wakeup_at(need_flush_since + FLUSH_TIMEOUT);
}
}
}
void flush_immediate_sync() {
auto seq_no = processor_.max_finished_seq_no();
for (auto it = immediate_sync_promises_.begin(), end = immediate_sync_promises_.end();
it != end && it->first <= seq_no; it = immediate_sync_promises_.erase(it)) {
do_immediate_sync(std::move(it->second));
}
}
void do_immediate_sync(Promise<> &&promise) {
if (promise) {
sync_promises_.emplace_back(std::move(promise));
}
if (!force_sync_flag_) {
force_sync_flag_ = true;
wakeup_after(0.003);
}
}
void do_lazy_sync(Promise<> &&promise) {
if (!promise) {
return;
}
sync_promises_.emplace_back(std::move(promise));
if (!lazy_sync_flag_ && !force_sync_flag_) {
wakeup_after(30);
lazy_sync_flag_ = true;
}
}
void timeout_expired() final {
bool need_sync = lazy_sync_flag_ || force_sync_flag_;
lazy_sync_flag_ = false;
force_sync_flag_ = false;
bool need_flush = flush_flag_;
flush_flag_ = false;
wakeup_at_ = 0;
if (need_sync) {
binlog_->sync("timeout_expired");
// LOG(ERROR) << "BINLOG SYNC";
set_promises(sync_promises_);
} else if (need_flush) {
try_flush();
// LOG(ERROR) << "BINLOG FLUSH";
}
}
};
} // namespace detail
ConcurrentBinlog::ConcurrentBinlog() = default;
ConcurrentBinlog::~ConcurrentBinlog() = default;
ConcurrentBinlog::ConcurrentBinlog(unique_ptr<Binlog> binlog, int scheduler_id) {
init_impl(std::move(binlog), scheduler_id);
}
Result<BinlogInfo> ConcurrentBinlog::init(string path, const Callback &callback, DbKey db_key, DbKey old_db_key,
int scheduler_id) {
auto binlog = make_unique<Binlog>();
TRY_STATUS(binlog->init(std::move(path), callback, std::move(db_key), std::move(old_db_key)));
auto info = binlog->get_info();
init_impl(std::move(binlog), scheduler_id);
return info;
}
void ConcurrentBinlog::init_impl(unique_ptr<Binlog> binlog, int32 scheduler_id) {
path_ = binlog->get_path().str();
last_event_id_ = binlog->peek_next_event_id();
binlog_actor_ = create_actor_on_scheduler<detail::BinlogActor>(PSLICE() << "Binlog " << path_, scheduler_id,
std::move(binlog), last_event_id_);
}
void ConcurrentBinlog::close_impl(Promise<> promise) {
send_closure(std::move(binlog_actor_), &detail::BinlogActor::close, std::move(promise));
}
void ConcurrentBinlog::close_and_destroy_impl(Promise<> promise) {
send_closure(std::move(binlog_actor_), &detail::BinlogActor::close_and_destroy, std::move(promise));
}
void ConcurrentBinlog::add_raw_event_impl(uint64 event_id, BufferSlice &&raw_event, Promise<> promise,
BinlogDebugInfo info) {
send_closure(binlog_actor_, &detail::BinlogActor::add_raw_event, event_id, std::move(raw_event), std::move(promise),
info);
}
void ConcurrentBinlog::force_sync(Promise<> promise, const char *source) {
send_closure(binlog_actor_, &detail::BinlogActor::force_sync, std::move(promise), source);
}
void ConcurrentBinlog::force_flush() {
send_closure(binlog_actor_, &detail::BinlogActor::force_flush);
}
void ConcurrentBinlog::change_key(DbKey db_key, Promise<> promise) {
send_closure(binlog_actor_, &detail::BinlogActor::change_key, std::move(db_key), std::move(promise));
}
uint64 ConcurrentBinlog::erase_batch(vector<uint64> event_ids) {
auto shift = narrow_cast<int32>(event_ids.size());
if (shift == 0) {
return 0;
}
auto seq_no = next_event_id(shift);
send_closure(binlog_actor_, &detail::BinlogActor::erase_batch, seq_no, std::move(event_ids));
return seq_no;
}
} // namespace td

View File

@@ -0,0 +1,72 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/BinlogInterface.h"
#include "td/db/DbKey.h"
#include "td/actor/actor.h"
#include "td/utils/buffer.h"
#include "td/utils/common.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include <atomic>
#include <functional>
namespace td {
namespace detail {
class BinlogActor;
} // namespace detail
class ConcurrentBinlog final : public BinlogInterface {
public:
using Callback = std::function<void(const BinlogEvent &)>;
Result<BinlogInfo> init(string path, const Callback &callback, DbKey db_key = DbKey::empty(),
DbKey old_db_key = DbKey::empty(), int scheduler_id = -1) TD_WARN_UNUSED_RESULT;
ConcurrentBinlog();
explicit ConcurrentBinlog(unique_ptr<Binlog> binlog, int scheduler_id = -1);
ConcurrentBinlog(const ConcurrentBinlog &) = delete;
ConcurrentBinlog &operator=(const ConcurrentBinlog &) = delete;
ConcurrentBinlog(ConcurrentBinlog &&) = delete;
ConcurrentBinlog &operator=(ConcurrentBinlog &&) = delete;
~ConcurrentBinlog() final;
void force_sync(Promise<> promise, const char *source) final;
void force_flush() final;
void change_key(DbKey db_key, Promise<> promise) final;
uint64 next_event_id() final {
return last_event_id_.fetch_add(1, std::memory_order_relaxed);
}
uint64 next_event_id(int32 shift) final {
return last_event_id_.fetch_add(shift, std::memory_order_relaxed);
}
CSlice get_path() const {
return path_;
}
uint64 erase_batch(vector<uint64> event_ids) final;
private:
void init_impl(unique_ptr<Binlog> binlog, int scheduler_id);
void close_impl(Promise<> promise) final;
void close_and_destroy_impl(Promise<> promise) final;
void add_raw_event_impl(uint64 event_id, BufferSlice &&raw_event, Promise<> promise, BinlogDebugInfo info) final;
ActorOwn<detail::BinlogActor> binlog_actor_;
string path_;
std::atomic<uint64> last_event_id_{0};
};
} // namespace td

View File

@@ -0,0 +1,158 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/Binlog.h"
#include "td/db/DbKey.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/Stat.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/StringBuilder.h"
#include "td/utils/tl_parsers.h"
#include <map>
struct Trie {
Trie() {
nodes_.resize(1);
}
void add(td::Slice value) {
do_add(0, PSLICE() << value << '\0');
}
void dump() {
if (nodes_[0].sum == 0) { // division by zero
return;
}
LOG(PLAIN) << "TOTAL: " << nodes_[0].sum;
do_dump("", 0);
}
private:
struct FullNode {
int next[256] = {};
int sum = 0;
};
td::vector<FullNode> nodes_;
void do_add(int event_id, td::Slice value) {
nodes_[event_id].sum++;
if (value.empty()) {
return;
}
auto c = static_cast<td::uint8>(value[0]);
auto next_event_id = nodes_[event_id].next[c];
if (next_event_id == 0) {
next_event_id = static_cast<int>(nodes_.size());
nodes_.emplace_back();
nodes_[event_id].next[c] = next_event_id;
}
do_add(next_event_id, value.substr(1));
}
void do_dump(td::string path, int v) {
bool is_word_end = !path.empty() && path.back() == '\0';
bool need_stop = false;
int next_count = 0;
for (int c = 0; c < 256; c++) {
if (nodes_[v].next[c] != 0) {
need_stop |= c >= 128 || !(td::is_alpha(static_cast<char>(c)) || c == '.' || c == '_');
next_count++;
}
}
need_stop |= next_count == 0 || (next_count >= 2 && nodes_[v].sum <= nodes_[0].sum / 100);
if (is_word_end || need_stop) {
if (is_word_end) {
path.pop_back();
} else if (next_count != 1 || nodes_[v].next[0] == 0) {
path.push_back('*');
}
LOG(PLAIN) << nodes_[v].sum << " " << td::StringBuilder::FixedDouble(nodes_[v].sum * 100.0 / nodes_[0].sum, 2)
<< "% [" << td::format::escaped(path) << "]";
return;
}
for (int c = 0; c < 256; c++) {
auto next_event_id = nodes_[v].next[c];
if (next_event_id == 0) {
continue;
}
do_dump(path + static_cast<char>(c), next_event_id);
}
}
};
enum Magic { ConfigPmcMagic = 0x1f18, BinlogPmcMagic = 0x4327 };
int main(int argc, char *argv[]) {
if (argc < 2) {
LOG(PLAIN) << "Usage: binlog_dump <binlog_file_name>";
return 1;
}
td::string binlog_file_name = argv[1];
auto r_stat = td::stat(binlog_file_name);
if (r_stat.is_error() || r_stat.ok().size_ == 0 || !r_stat.ok().is_reg_) {
LOG(PLAIN) << "Wrong binlog file name specified";
LOG(PLAIN) << "Usage: binlog_dump <binlog_file_name>";
return 1;
}
struct Info {
std::size_t full_size = 0;
std::size_t compressed_size = 0;
Trie trie;
Trie compressed_trie;
};
std::map<td::uint64, Info> info;
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
td::Binlog binlog;
binlog
.init(
binlog_file_name,
[&](auto &event) {
info[0].compressed_size += event.raw_event_.size();
info[event.type_].compressed_size += event.raw_event_.size();
if (event.type_ == ConfigPmcMagic || event.type_ == BinlogPmcMagic) {
auto key = td::TlParser(event.get_data()).template fetch_string<td::Slice>();
info[event.type_].compressed_trie.add(key);
}
},
td::DbKey::raw_key("cucumber"), td::DbKey::empty(), -1,
[&](auto &event) mutable {
info[0].full_size += event.raw_event_.size();
info[event.type_].full_size += event.raw_event_.size();
if (event.type_ == ConfigPmcMagic || event.type_ == BinlogPmcMagic) {
auto key = td::TlParser(event.get_data()).template fetch_string<td::Slice>();
info[event.type_].trie.add(key);
}
LOG(PLAIN) << "LogEvent[" << td::tag("event_id", td::format::as_hex(event.id_))
<< td::tag("type", event.type_) << td::tag("flags", event.flags_)
<< td::tag("size", event.get_data().size())
<< td::tag("data", td::format::escaped(event.get_data())) << "]\n";
})
.ensure();
for (auto &it : info) {
LOG(PLAIN) << td::tag("handler", td::format::as_hex(it.first))
<< td::tag("full_size", td::format::as_size(it.second.full_size))
<< td::tag("compressed_size", td::format::as_size(it.second.compressed_size));
it.second.trie.dump();
if (it.second.full_size != it.second.compressed_size) {
it.second.compressed_trie.dump();
}
}
return 0;
}

View File

@@ -0,0 +1,43 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/detail/BinlogEventsBuffer.h"
#include <algorithm>
namespace td {
namespace detail {
void BinlogEventsBuffer::add_event(BinlogEvent &&event) {
total_events_++;
if ((event.flags_ & BinlogEvent::Flags::Partial) == 0) {
auto it = std::find(ids_.begin(), ids_.end(), event.id_);
if (it != ids_.end()) {
auto &to_event = events_[it - ids_.begin()];
size_ -= to_event.size_;
to_event = std::move(event);
size_ += to_event.size_;
return;
}
}
ids_.push_back(event.id_);
size_ += event.size_;
events_.push_back(std::move(event));
}
bool BinlogEventsBuffer::need_flush() const {
return total_events_ > 5000 || ids_.size() > 100;
}
void BinlogEventsBuffer::clear() {
ids_.clear();
events_.clear();
total_events_ = 0;
size_ = 0;
}
} // namespace detail
} // namespace td

View File

@@ -0,0 +1,51 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/binlog/BinlogEvent.h"
#include "td/utils/common.h"
namespace td {
namespace detail {
class BinlogEventsBuffer {
public:
void add_event(BinlogEvent &&event);
bool need_flush() const;
template <class CallbackT>
void flush(CallbackT &&callback) {
for (size_t i = 0; i < ids_.size(); i++) {
auto &event = events_[i];
if (i + 1 != ids_.size() && (event.flags_ & BinlogEvent::Flags::Partial) == 0) {
callback(BinlogEvent(BinlogEvent::create_raw(event.id_, event.type_, event.flags_ | BinlogEvent::Flags::Partial,
create_storer(event.get_data())),
BinlogDebugInfo{__FILE__, __LINE__}));
} else {
callback(std::move(event));
}
}
clear();
}
size_t size() const {
return size_;
}
private:
vector<uint64> ids_;
vector<BinlogEvent> events_;
size_t total_events_{0};
size_t size_{0};
void do_event(BinlogEvent &&event);
void clear();
};
} // namespace detail
} // namespace td

View File

@@ -0,0 +1,80 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/binlog/detail/BinlogEventsProcessor.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
#include <algorithm>
namespace td {
namespace detail {
Status BinlogEventsProcessor::do_event(BinlogEvent &&event) {
offset_ = event.offset_;
auto fixed_event_id = event.id_ * 2;
if ((event.flags_ & BinlogEvent::Flags::Rewrite) && !event_ids_.empty() && event_ids_.back() >= fixed_event_id) {
auto it = std::lower_bound(event_ids_.begin(), event_ids_.end(), fixed_event_id);
if (it == event_ids_.end() || *it != fixed_event_id) {
return Status::Error(PSLICE() << "Ignore rewrite log event " << event.public_to_string());
}
auto pos = it - event_ids_.begin();
total_raw_events_size_ -= static_cast<int64>(events_[pos].raw_event_.size());
if (event.type_ == BinlogEvent::ServiceTypes::Empty) {
*it += 1;
empty_events_++;
events_[pos] = {};
} else {
event.flags_ &= ~BinlogEvent::Flags::Rewrite;
total_raw_events_size_ += static_cast<int64>(event.raw_event_.size());
events_[pos] = std::move(event);
}
} else if (event.type_ < 0) {
// just skip service events
} else {
if (!(event_ids_.empty() || event_ids_.back() < fixed_event_id)) {
return Status::Error(PSLICE() << offset_ << ' ' << event_ids_.size() << ' ' << event_ids_.back() << ' '
<< fixed_event_id << ' ' << event.public_to_string() << ' ' << total_events_ << ' '
<< total_raw_events_size_);
}
last_event_id_ = event.id_;
total_raw_events_size_ += static_cast<int64>(event.raw_event_.size());
total_events_++;
event_ids_.push_back(fixed_event_id);
events_.emplace_back(std::move(event));
}
if (total_events_ > 10 && empty_events_ * 4 > total_events_ * 3) {
compactify();
}
return Status::OK();
}
void BinlogEventsProcessor::compactify() {
CHECK(event_ids_.size() == events_.size());
auto event_ids_from = event_ids_.begin();
auto event_ids_to = event_ids_from;
auto events_from = events_.begin();
auto events_to = events_from;
for (; event_ids_from != event_ids_.end(); event_ids_from++, events_from++) {
if ((*event_ids_from & 1) == 0) {
*event_ids_to++ = *event_ids_from;
if (events_to != events_from) {
*events_to = std::move(*events_from);
}
events_to++;
}
}
event_ids_.erase(event_ids_to, event_ids_.end());
events_.erase(events_to, events_.end());
total_events_ = event_ids_.size();
empty_events_ = 0;
CHECK(event_ids_.size() == events_.size());
}
} // namespace detail
} // namespace td

View File

@@ -0,0 +1,61 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/db/binlog/BinlogEvent.h"
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/Status.h"
namespace td {
namespace detail {
class BinlogEventsProcessor {
public:
Status add_event(BinlogEvent &&event) TD_WARN_UNUSED_RESULT {
return do_event(std::move(event));
}
template <class CallbackT>
void for_each(CallbackT &&callback) {
for (size_t i = 0; i < event_ids_.size(); i++) {
LOG_CHECK(i == 0 || event_ids_[i - 1] < event_ids_[i])
<< event_ids_[i - 1] << " " << events_[i - 1].public_to_string() << " " << event_ids_[i] << " "
<< events_[i].public_to_string();
if ((event_ids_[i] & 1) == 0) {
callback(events_[i]);
}
}
}
uint64 last_event_id() const {
return last_event_id_;
}
int64 offset() const {
return offset_;
}
int64 total_raw_events_size() const {
return total_raw_events_size_;
}
private:
// holds (event_id * 2 + was_deleted)
std::vector<uint64> event_ids_;
std::vector<BinlogEvent> events_;
size_t total_events_{0};
size_t empty_events_{0};
uint64 last_event_id_{0};
int64 offset_{0};
int64 total_raw_events_size_{0};
Status do_event(BinlogEvent &&event);
void compactify();
};
} // namespace detail
} // namespace td

View File

@@ -0,0 +1,60 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/db/detail/RawSqliteDb.h"
#include "sqlite/sqlite3.h"
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/path.h"
#include "td/utils/port/Stat.h"
#include <atomic>
namespace td {
namespace detail {
static std::atomic<bool> was_database_destroyed{false};
Status RawSqliteDb::last_error(tdsqlite3 *db, CSlice path) {
return Status::Error(PSLICE() << Slice(tdsqlite3_errmsg(db)) << " for database \"" << path << '"');
}
Status RawSqliteDb::destroy(Slice path) {
Status error;
with_db_path(path, [&](auto path) {
unlink(path).ignore();
if (!ends_with(path, "-shm") && !stat(path).is_error()) {
error = Status::Error(PSLICE() << "Failed to delete file \"" << path << '"');
}
});
return error;
}
Status RawSqliteDb::last_error() {
//If database was corrupted, try to delete it.
auto code = tdsqlite3_errcode(db_);
if (code == SQLITE_CORRUPT) {
was_database_destroyed.store(true, std::memory_order_relaxed);
destroy(path_).ignore();
}
return last_error(db_, path());
}
bool RawSqliteDb::was_any_database_destroyed() {
return was_database_destroyed.load(std::memory_order_relaxed);
}
RawSqliteDb::~RawSqliteDb() {
auto rc = tdsqlite3_close(db_);
LOG_IF(FATAL, rc != SQLITE_OK) << last_error(db_, path());
}
} // namespace detail
} // namespace td

View File

@@ -0,0 +1,78 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/utils/optional.h"
#include "td/utils/Slice.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Status.h"
struct tdsqlite3;
namespace td {
namespace detail {
class RawSqliteDb {
public:
RawSqliteDb(tdsqlite3 *db, std::string path) : db_(db), path_(std::move(path)) {
}
RawSqliteDb(const RawSqliteDb &) = delete;
RawSqliteDb(RawSqliteDb &&) = delete;
RawSqliteDb &operator=(const RawSqliteDb &) = delete;
RawSqliteDb &operator=(RawSqliteDb &&) = delete;
~RawSqliteDb();
template <class F>
static void with_db_path(Slice main_path, F &&f) {
f(PSLICE() << main_path);
f(PSLICE() << main_path << "-journal");
f(PSLICE() << main_path << "-wal");
f(PSLICE() << main_path << "-shm");
}
static Status destroy(Slice path) TD_WARN_UNUSED_RESULT;
tdsqlite3 *db() {
return db_;
}
CSlice path() const {
return path_;
}
Status last_error();
static Status last_error(tdsqlite3 *db, CSlice path);
static bool was_any_database_destroyed();
bool on_begin() {
begin_cnt_++;
return begin_cnt_ == 1;
}
Result<bool> on_commit() {
if (begin_cnt_ == 0) {
return Status::Error("No matching begin for commit");
}
begin_cnt_--;
return begin_cnt_ == 0;
}
void set_cipher_version(int32 cipher_version) {
cipher_version_ = cipher_version;
}
optional<int32> get_cipher_version() const {
return cipher_version_.copy();
}
private:
tdsqlite3 *db_;
std::string path_;
size_t begin_cnt_{0};
optional<int32> cipher_version_;
};
} // namespace detail
} // namespace td