Skip to content

Commit 24c8cec

Browse files
committed
wip(postgres): add support for PK type conversion to pg-specific types like uuid
Introduce database-specific PK decode callbacks to support UUID and other non-TEXT primary key types in PostgreSQL. This allows proper type conversion during sync payload application. Work in progress - core implementation complete but needs debugging.
1 parent 58aeafa commit 24c8cec

File tree

7 files changed

+394
-6
lines changed

7 files changed

+394
-6
lines changed

src/cloudsync.c

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,10 @@ bool table_algo_isgos (cloudsync_table_context *table) {
11021102
return (table->algo == table_algo_crdt_gos);
11031103
}
11041104

1105+
const char *table_name (cloudsync_table_context *table) {
1106+
return table->name;
1107+
}
1108+
11051109
const char *table_schema (cloudsync_table_context *table) {
11061110
return table->schema;
11071111
}
@@ -1199,9 +1203,12 @@ int merge_insert_col (cloudsync_context *data, cloudsync_table_context *table, c
11991203
if (vm == NULL) return cloudsync_set_error(data, "Unable to retrieve column merge precompiled statement in merge_insert_col", DBRES_MISUSE);
12001204

12011205
// INSERT INTO table (pk1, pk2, col_name) VALUES (?, ?, ?) ON CONFLICT DO UPDATE SET col_name=?;"
1202-
1206+
12031207
// bind primary key(s)
1204-
int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1208+
pk_decode_callback callback = database_get_pk_decode_callback();
1209+
void *callback_context = database_get_pk_decode_context(table, vm);
1210+
int rc = pk_decode_prikey((char *)pk, (size_t)pklen, callback, callback_context);
1211+
database_free_pk_decode_context(callback_context);
12051212
if (rc < 0) {
12061213
cloudsync_set_dberror(data);
12071214
dbvm_reset(vm);
@@ -1250,7 +1257,10 @@ int merge_delete (cloudsync_context *data, cloudsync_table_context *table, const
12501257

12511258
// bind pk
12521259
dbvm_t *vm = table->real_merge_delete_stmt;
1253-
rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1260+
pk_decode_callback callback = database_get_pk_decode_callback();
1261+
void *callback_context = database_get_pk_decode_context(table, vm);
1262+
rc = pk_decode_prikey((char *)pk, (size_t)pklen, callback, callback_context);
1263+
database_free_pk_decode_context(callback_context);
12541264
if (rc < 0) {
12551265
rc = cloudsync_set_dberror(data);
12561266
dbvm_reset(vm);
@@ -1329,7 +1339,10 @@ int merge_did_cid_win (cloudsync_context *data, cloudsync_table_context *table,
13291339
if (!vm) return cloudsync_set_error(data, "Unable to retrieve column value precompiled statement in merge_did_cid_win", DBRES_ERROR);
13301340

13311341
// bind primary key values
1332-
rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm);
1342+
pk_decode_callback callback = database_get_pk_decode_callback();
1343+
void *callback_context = database_get_pk_decode_context(table, vm);
1344+
rc = pk_decode_prikey((char *)pk, (size_t)pklen, callback, callback_context);
1345+
database_free_pk_decode_context(callback_context);
13331346
if (rc < 0) {
13341347
rc = cloudsync_set_dberror(data);
13351348
dbvm_reset(vm);
@@ -1398,7 +1411,10 @@ int merge_sentinel_only_insert (cloudsync_context *data, cloudsync_table_context
13981411

13991412
// bind pk
14001413
dbvm_t *vm = table->real_merge_sentinel_stmt;
1401-
int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, vm);
1414+
pk_decode_callback callback = database_get_pk_decode_callback();
1415+
void *callback_context = database_get_pk_decode_context(table, vm);
1416+
int rc = pk_decode_prikey((char *)pk, (size_t)pklen, callback, callback_context);
1417+
database_free_pk_decode_context(callback_context);
14021418
if (rc < 0) {
14031419
rc = cloudsync_set_dberror(data);
14041420
dbvm_reset(vm);

src/cloudsync.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ const char *table_colname (cloudsync_table_context *table, int index);
105105
char **table_pknames (cloudsync_table_context *table);
106106
void table_set_pknames (cloudsync_table_context *table, char **pknames);
107107
bool table_algo_isgos (cloudsync_table_context *table);
108+
const char *table_name (cloudsync_table_context *table);
108109
const char *table_schema (cloudsync_table_context *table);
109110
int table_remove (cloudsync_context *data, cloudsync_table_context *table);
110111
void table_free (cloudsync_table_context *table);

src/database.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,4 +160,12 @@ typedef bool (*cloudsync_payload_apply_callback_t)(void **xdata, cloudsync_pk_de
160160
void cloudsync_set_payload_apply_callback(void *db, cloudsync_payload_apply_callback_t callback);
161161
cloudsync_payload_apply_callback_t cloudsync_get_payload_apply_callback(void *db);
162162

163+
// PK Decode Callback Abstraction
164+
typedef struct cloudsync_table_context cloudsync_table_context;
165+
typedef int (*pk_decode_callback) (void *xdata, int index, int type, int64_t ival, double dval, char *pval);
166+
167+
pk_decode_callback database_get_pk_decode_callback(void);
168+
void* database_get_pk_decode_context(cloudsync_table_context *table, dbvm_t *vm);
169+
void database_free_pk_decode_context(void *context);
170+
163171
#endif

src/postgresql/cloudsync_postgresql.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2092,7 +2092,7 @@ static char * build_union_sql (void) {
20922092
return result;
20932093
}
20942094

2095-
static Oid lookup_column_type_oid (const char *tbl, const char *col_name, const char *schema) {
2095+
Oid lookup_column_type_oid (const char *tbl, const char *col_name, const char *schema) {
20962096
// SPI_connect not needed here
20972097
if (strcmp(col_name, CLOUDSYNC_TOMBSTONE_VALUE) == 0) return BYTEAOID;
20982098

src/postgresql/database_postgresql.c

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
#include "../dbutils.h"
1919
#include "../sql.h"
2020
#include "../utils.h"
21+
#include "../pk.h"
2122

2223
// PostgreSQL SPI and other headers
2324
#include "access/xact.h"
25+
#include "catalog/namespace.h"
2426
#include "catalog/pg_type.h"
2527
#include "executor/spi.h"
2628
#include "funcapi.h"
@@ -33,6 +35,9 @@
3335

3436
#include "pgvalue.h"
3537

38+
// Shared PostgreSQL utilities from cloudsync_postgresql.c
39+
extern Oid lookup_column_type_oid(const char *tbl, const char *col_name, const char *schema);
40+
3641
// ============================================================================
3742
// SPI CONNECTION REQUIREMENTS
3843
// ============================================================================
@@ -82,6 +87,11 @@ typedef struct {
8287
cloudsync_context *data;
8388
} pg_stmt_t;
8489

90+
typedef struct {
91+
dbvm_t *vm;
92+
cloudsync_table_context *table;
93+
} postgresql_pk_bind_context;
94+
8595
static int database_refresh_snapshot (void);
8696

8797
// MARK: - SQL -
@@ -2769,3 +2779,109 @@ void cloudsync_set_payload_apply_callback(void *db, cloudsync_payload_apply_call
27692779
cloudsync_payload_apply_callback_t cloudsync_get_payload_apply_callback(void *db) {
27702780
return payload_apply_callback;
27712781
}
2782+
2783+
// MARK: - PK Decode Callback Abstraction -
2784+
2785+
static int postgresql_pk_decode_bind_callback(void *xdata, int index, int type,
2786+
int64_t ival, double dval, char *pval) {
2787+
postgresql_pk_bind_context *ctx = (postgresql_pk_bind_context *)xdata;
2788+
pg_stmt_t *vm = ctx->vm;
2789+
int bind_index = index + 1;
2790+
2791+
// Handle non-text types normally
2792+
if (type != DBTYPE_TEXT) {
2793+
switch (type) {
2794+
case DBTYPE_INTEGER:
2795+
return databasevm_bind_int(vm, bind_index, ival);
2796+
case DBTYPE_FLOAT:
2797+
return databasevm_bind_double(vm, bind_index, dval);
2798+
case DBTYPE_BLOB:
2799+
return databasevm_bind_blob(vm, bind_index, pval, (uint64_t)ival);
2800+
case DBTYPE_NULL:
2801+
return databasevm_bind_null(vm, bind_index);
2802+
default:
2803+
return DBRES_ERROR;
2804+
}
2805+
}
2806+
2807+
// For TEXT types, look up target column type and convert
2808+
cloudsync_table_context *table = ctx->table;
2809+
const char *tbl_name = table_name(table);
2810+
2811+
// Get PK column names from table context
2812+
char **pk_names = table_pknames(table);
2813+
if (!pk_names) {
2814+
// retrieve primary key(s)
2815+
int nrows = 0;
2816+
database_pk_names(vm->data, tbl_name, &pk_names, &nrows);
2817+
table_set_pknames(table, pk_names);
2818+
}
2819+
int pk_count = table_count_pks(table);
2820+
const char *schema = table_schema(table);
2821+
2822+
if (!pk_names || index >= pk_count) {
2823+
return databasevm_bind_text(vm, bind_index, pval, (int)ival);
2824+
}
2825+
2826+
const char *col_name = pk_names[index];
2827+
2828+
// Look up column type OID
2829+
Oid target_typeoid = lookup_column_type_oid(tbl_name, col_name, schema);
2830+
2831+
if (!OidIsValid(target_typeoid)) {
2832+
return databasevm_bind_text(vm, bind_index, pval, (int)ival);
2833+
}
2834+
2835+
// Get type name for error messages
2836+
char *target_typename = format_type_be(target_typeoid);
2837+
2838+
// Convert using PostgreSQL type system
2839+
Datum converted_datum;
2840+
PG_TRY();
2841+
{
2842+
converted_datum = OidInputFunctionCall(
2843+
target_typeoid,
2844+
pval,
2845+
InvalidOid,
2846+
-1
2847+
);
2848+
}
2849+
PG_CATCH();
2850+
{
2851+
pfree(target_typename);
2852+
PG_RE_THROW();
2853+
}
2854+
PG_END_TRY();
2855+
2856+
// Bind with correct type
2857+
pg_stmt_t *stmt = (pg_stmt_t *)vm;
2858+
int idx = bind_index - 1;
2859+
2860+
MemoryContext old = MemoryContextSwitchTo(stmt->bind_mcxt);
2861+
stmt->values[idx] = converted_datum;
2862+
stmt->types[idx] = target_typeoid;
2863+
stmt->nulls[idx] = ' ';
2864+
MemoryContextSwitchTo(old);
2865+
2866+
if (stmt->nparams < bind_index) stmt->nparams = bind_index;
2867+
2868+
pfree(target_typename);
2869+
return DBRES_OK;
2870+
}
2871+
2872+
pk_decode_callback database_get_pk_decode_callback(void) {
2873+
return postgresql_pk_decode_bind_callback;
2874+
}
2875+
2876+
void* database_get_pk_decode_context(cloudsync_table_context *table, dbvm_t *vm) {
2877+
postgresql_pk_bind_context *ctx = dbmem_alloc(sizeof(postgresql_pk_bind_context));
2878+
ctx->vm = vm;
2879+
ctx->table = table;
2880+
return ctx;
2881+
}
2882+
2883+
void database_free_pk_decode_context(void *context) {
2884+
if (context) {
2885+
dbmem_free(context);
2886+
}
2887+
}

src/sqlite/database_sqlite.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "../dbutils.h"
1111
#include "../utils.h"
1212
#include "../sql.h"
13+
#include "../pk.h"
1314

1415
#include <inttypes.h>
1516
#include <string.h>
@@ -1100,3 +1101,19 @@ void cloudsync_set_payload_apply_callback(void *db, cloudsync_payload_apply_call
11001101
sqlite3_set_clientdata((sqlite3 *)db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY, (void*)callback, NULL);
11011102
}
11021103
}
1104+
1105+
// MARK: - PK Decode Callback Abstraction -
1106+
1107+
pk_decode_callback database_get_pk_decode_callback(void) {
1108+
return pk_decode_bind_callback;
1109+
}
1110+
1111+
void* database_get_pk_decode_context(cloudsync_table_context *table, dbvm_t *vm) {
1112+
UNUSED_PARAMETER(table);
1113+
return vm;
1114+
}
1115+
1116+
void database_free_pk_decode_context(void *context) {
1117+
UNUSED_PARAMETER(context);
1118+
// Nothing to free for SQLite
1119+
}

0 commit comments

Comments
 (0)