Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/GTID_Server_Data.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@ class GTID_Server_Data {
void dump();
};

bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end);
bool addGtidInterval(const std::string& uuid, const gtid_interval_t &iv, gtid_set_t& gtid_executed);

#endif // CLASS_GTID_Server_Data_H
31 changes: 28 additions & 3 deletions include/proxysql_gtid.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,38 @@
#define PROXYSQL_GTID
// highly inspired by libslave
// https://github.com/vozbu/libslave/
#include <list>
#include <string>
#include <unordered_map>
#include <list>
#include <utility>

typedef std::pair<std::string, int64_t> gtid_t;
typedef std::pair<int64_t, int64_t> gtid_interval_t;

class Gtid_Interval {
public:
int64_t start;
int64_t end;

public:
explicit Gtid_Interval(const int64_t gtid);
explicit Gtid_Interval(const int64_t _start, const int64_t _end);
explicit Gtid_Interval(const char* s);
explicit Gtid_Interval(const std::string& s);

const std::string to_string(void);
const bool contains(const Gtid_Interval& other);
const bool contains(int64_t gtid);
const bool append(const Gtid_Interval& other);
const bool merge(const Gtid_Interval& other);

const int cmp(const Gtid_Interval& other);
const bool operator<(const Gtid_Interval& other);
const bool operator==(const Gtid_Interval& other);
const bool operator!=(const Gtid_Interval& other);
};
typedef Gtid_Interval gtid_interval_t;

// TODO: make me a proper class.
typedef std::unordered_map<std::string, std::list<gtid_interval_t>> gtid_set_t;

/*
Expand All @@ -31,4 +56,4 @@ class Gtid_Server_Info {
};
*/

#endif /* PROXYSQL_GTID */
#endif /* PROXYSQL_GTID */
146 changes: 40 additions & 106 deletions lib/GTID_Server_Data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ bool GTID_Server_Data::gtid_exists(char *gtid_uuid, uint64_t gtid_trxid) {
return false;
}
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
if ((int64_t)gtid_trxid >= itr->first && (int64_t)gtid_trxid <= itr->second) {
if (itr->contains((int64_t)gtid_trxid)) {
// fprintf(stderr,"YES\n");
return true;
}
Expand Down Expand Up @@ -358,10 +358,9 @@ bool GTID_Server_Data::read_next_gtid() {
}
}
} else { // we are reading the trxids
uint64_t trx_from;
uint64_t trx_to;
sscanf(subtoken,"%lu-%lu",&trx_from,&trx_to);
updated = addGtidInterval(gtid_executed, uuid_server, trx_from, trx_to) || updated;
std::string s = uuid_server;
gtid_interval_t iv = Gtid_Interval(subtoken);
updated = addGtidInterval(s, iv, gtid_executed) || updated;
}
}
}
Expand Down Expand Up @@ -394,8 +393,8 @@ bool GTID_Server_Data::read_next_gtid() {
break;
}
std::string s = uuid_server;
gtid_t new_gtid = std::make_pair(s,rec_trxid);
addGtid(new_gtid,gtid_executed);
gtid_interval_t iv = Gtid_Interval(rec_trxid);
addGtidInterval(s, iv, gtid_executed);
events_read++;
}
}
Expand All @@ -412,12 +411,7 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
s.insert(23,"-");
s = s + ":";
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr) {
std::string s2 = s;
s2 = s2 + std::to_string(itr->first);
s2 = s2 + "-";
s2 = s2 + std::to_string(itr->second);
s2 = s2 + ",";
gtid_set = gtid_set + s2;
gtid_set += s + itr->to_string() + ",";
}
}
// Extract latest comma only in case 'gtid_executed' isn't empty
Expand All @@ -428,112 +422,52 @@ std::string gtid_executed_to_string(gtid_set_t& gtid_executed) {
}



void addGtid(const gtid_t& gtid, gtid_set_t& gtid_executed) {
auto it = gtid_executed.find(gtid.first);
if (it == gtid_executed.end())
{
gtid_executed[gtid.first].emplace_back(gtid.second, gtid.second);
return;
// Merges a GTID interval into a gitd_executed instance. Returns true if gtid_executed was updated, false otherwise.
bool addGtidInterval(const std::string& uuid, const gtid_interval_t &iv, gtid_set_t& gtid_executed) {
auto it = gtid_executed.find(uuid);
if (it == gtid_executed.end()) {
// new UUID entry
gtid_executed[uuid].emplace_back(iv);
return true;
}

bool flag = true;
for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
{
if (gtid.second >= itr->first && gtid.second <= itr->second)
return;
if (gtid.second + 1 == itr->first)
{
--itr->first;
flag = false;
break;
}
else if (gtid.second == itr->second + 1)
{
++itr->second;
flag = false;
break;
}
else if (gtid.second < itr->first)
{
it->second.emplace(itr, gtid.second, gtid.second);
return;
if (!it->second.empty()) {
if (it->second.back().append(iv)) {
// if appending to the last GTID range succeded, gtid_executed was modified, but remains optimized - nothing else to do
return true;
}
}

if (flag)
it->second.emplace_back(gtid.second, gtid.second);

for (auto itr = it->second.begin(); itr != it->second.end(); ++itr)
{
auto next_itr = std::next(itr);
if (next_itr != it->second.end() && itr->second + 1 == next_itr->first)
{
itr->second = next_itr->second;
it->second.erase(next_itr);
break;
// insert/merge GTID interval...
auto pos = it->second.begin();
for (; pos != it->second.end(); ++pos) {
if (pos->contains(iv)) {
// GTID interval is already present, nothing to do
return false;
}
if (pos->merge(iv))
break;
}
}

/**
* @brief Adds or updates a GTID interval in the executed set
*
* This function intelligently merges GTID intervals to prevent events_count reset
* when a binlog reader reconnects and provides updated GTID sets. It handles
* reconnection scenarios where the server provides updated transaction ID ranges.
*
* For example, during reconnection:
* - Before disconnection: server_UUID:1-10
* - After reconnection: server_UUID:1-19
*
* This function will update the existing interval rather than replacing it,
* preserving the events_count metric accuracy.
*
* @param gtid_executed Reference to the GTID set to update
* @param server_uuid The server UUID string
* @param txid_start Starting transaction ID of the interval
* @param txid_end Ending transaction ID of the interval
* @return bool True if the GTID set was updated, false if interval already existed
*
* @note This function is critical for maintaining accurate GTID metrics across
* binlog reader reconnections and preventing events_count resets.
*/
bool addGtidInterval(gtid_set_t& gtid_executed, std::string server_uuid, int64_t txid_start, int64_t txid_end) {
bool updated = true;

auto it = gtid_executed.find(server_uuid);
if (it == gtid_executed.end()) {
gtid_executed[server_uuid].emplace_back(txid_start, txid_end);
return updated;
if (pos == it->second.end()) {
it->second.emplace_back(iv);
}

bool insert = true;

// When ProxySQL reconnects with binlog reader, it might
// receive updated txid intervals in the bootstrap message.
// For example,
// before disconnection -> server_UUID:1-10
// after reconnection -> server_UUID:1-19
auto &txid_intervals = it->second;
for (auto &interval : txid_intervals) {
if (interval.first == txid_start) {
if(interval.second == txid_end) {
updated = false;
} else {
interval.second = txid_end;
}
insert = false;
// ...and merge overlapping GTID ranges, if any
it->second.sort();
auto a = it->second.begin();
while (a != it->second.end()) {
auto b = std::next(a);
if (b == it->second.end()) {
break;
}
if (a->merge(*b)) {
it->second.erase(b);
continue;
}
a++;
}

if (insert) {
txid_intervals.emplace_back(txid_start, txid_end);

}

return updated;
return true;
}

void * GTID_syncer_run() {
Expand Down
1 change: 1 addition & 0 deletions lib/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ _OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo
ProxySQL_Admin_Tests.oo ProxySQL_Admin_Tests2.oo ProxySQL_Admin_Scheduler.oo ProxySQL_Admin_Disk_Upgrade.oo ProxySQL_Admin_Stats.oo \
Admin_Handler.oo Admin_FlushVariables.oo Admin_Bootstrap.oo \
Base_Session.oo Base_Thread.oo \
proxysql_gtid.oo \
proxy_protocol_info.oo \
proxysql_find_charset.oo ProxySQL_Poll.oo \
PgSQL_Protocol.oo PgSQL_Thread.oo PgSQL_Data_Stream.oo PgSQL_Session.oo PgSQL_Variables.oo PgSQL_HostGroups_Manager.oo PgSQL_Connection.oo PgSQL_Backend.oo PgSQL_Logger.oo PgSQL_Authentication.oo PgSQL_Error_Helper.oo \
Expand Down
127 changes: 127 additions & 0 deletions lib/proxysql_gtid.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <string>

#include "proxysql_gtid.h"


// Initializes a GTID interval from a single GTID.
Gtid_Interval::Gtid_Interval(const int64_t gtid) {
start = gtid;
end = gtid;
}

// Initializes a GTID interval from a range.
Gtid_Interval::Gtid_Interval(const int64_t _start, const int64_t _end) {
start = _start;
end = _end;

if (start > end) {
std::swap(start, end);
}
}

// Initializes a GTID interval from a string buffer, in [gtid]{-[gtid]} format.
Gtid_Interval::Gtid_Interval(const char *s) {
uint64_t _start = 0, _end = 0;

if (sscanf(s, "%lu-%lu", &_start, &_end) == 2) {
start = _start;
end = _end;
} else if (sscanf(s, "%lu", &_start) == 1) {
start = _start;
end = _start;
}

if (start > end) {
std::swap(start, end);
}
}

Gtid_Interval::Gtid_Interval(const std::string& s) : Gtid_Interval(s.c_str()) {
}

// Checks if another GTID interval is contained in this one,
const bool Gtid_Interval::contains(const Gtid_Interval& other) {
return (other.start >= start && other.end <= end);
}

// Checks if a given GTID is contained in this interval.
const bool Gtid_Interval::contains(int64_t gtid) {
return (gtid >= start && gtid <= end);
}

// Yields a string representation for a GTID interval.
const std::string Gtid_Interval::to_string(void) {
if (start == end) {
return std::to_string(start);
}
return std::to_string(start) + "-" + std::to_string(end);
}

// Attempts to append a new interval to this interval's end. Returns true if the append succeded, false otherwise.
const bool Gtid_Interval::append(const Gtid_Interval& other) {
if (other.end >= end && other.start <= (end+1)) {
// other overlaps interval at end
end = other.end;
return true;
}

return false;
}

// Attempts to merge two GTID intervals. Returns true if the intervals were merged (and potentially modified), false otherwise.
const bool Gtid_Interval::merge(const Gtid_Interval& other) {
if (other.start >= start && other.end <= end) {
// other is contained by interval
return true;
}
if (other.start <= start && other.end >= end) {
// other contains whole of existing interval
start = other.start;
end = other.end;
return true;
}
if (other.start <= start && other.end >= (start-1)) {
// other overlaps interval at start
start = other.start;
return true;
}
if (other.end >= end && other.start <= (end+1)) {
// other overlaps interval at end
end = other.end;
return true;
}

return false;
}

// Compares two GTID intervals, by strict weak ordering.
const int Gtid_Interval::cmp(const Gtid_Interval& other) {
if (start < other.start) {
return -1;
}
if (start > other.start) {
return 1;
}
if (end < other.end) {
return -1;
}
if (end > other.end) {
return 1;
}
return 0;
}

const bool Gtid_Interval::operator<(const Gtid_Interval& other) {
return cmp(other) == -1;
}

const bool Gtid_Interval::operator==(const Gtid_Interval& other) {
return cmp(other) == 0;
}

const bool Gtid_Interval::operator!=(const Gtid_Interval& other) {
return cmp(other) != 0;
}
Loading