From 3a4ba5ba681680edbcee285f09630f7c28bd699e Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Thu, 21 Aug 2025 15:33:07 +0200 Subject: [PATCH 1/3] Added support for tables with more than 64 columns (previously limited to 64). --- src/cloudsync.c | 192 ++++++++++++++++++++++++++++++++---------------- src/dbutils.c | 58 +++++++++++---- 2 files changed, 174 insertions(+), 76 deletions(-) diff --git a/src/cloudsync.c b/src/cloudsync.c index 44bd2d2..aa63675 100644 --- a/src/cloudsync.c +++ b/src/cloudsync.c @@ -226,6 +226,14 @@ typedef struct PACKED { uint8_t unused[6]; // padding to ensure the struct is exactly 32 bytes } cloudsync_network_header; +typedef struct { + sqlite3_value *table_name; + sqlite3_value **new_values; + sqlite3_value **old_values; + int count; + int capacity; +} cloudsync_update_payload; + #ifdef _MSC_VER #pragma pack(pop) #endif @@ -2545,16 +2553,10 @@ void cloudsync_insert (sqlite3_context *context, int argc, sqlite3_value **argv) if (pk != buffer) cloudsync_memory_free(pk); } -void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_update %s", sqlite3_value_text(argv[0])); +void cloudsync_delete (sqlite3_context *context, int argc, sqlite3_value **argv) { + DEBUG_FUNCTION("cloudsync_delete %s", sqlite3_value_text(argv[0])); // debug_values(argc-1, &argv[1]); - // arguments are: - // [0] table name - // [1..table->npks] NEW.prikeys - // [1+table->npks ..] OLD.prikeys - // then NEW.value,OLD.value - // retrieve context sqlite3 *db = sqlite3_context_db_handle(context); cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context); @@ -2563,7 +2565,7 @@ void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv) const char *table_name = (const char *)sqlite3_value_text(argv[0]); cloudsync_table_context *table = table_lookup(data, table_name); if (!table) { - dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_update.", table_name); + dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_delete.", table_name); return; } @@ -2571,15 +2573,123 @@ void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv) sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET); int rc = SQLITE_OK; - // check if the primary key(s) have changed by comparing the NEW and OLD primary key values + // encode the primary key values into a buffer + char buffer[1024]; + size_t pklen = sizeof(buffer); + char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen); + if (!pk) { + sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1); + return; + } + + // mark the row as deleted by inserting a delete sentinel into the metadata + rc = local_mark_delete_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data)); + if (rc != SQLITE_OK) goto cleanup; + + // remove any metadata related to the old rows associated with this primary key + rc = local_drop_meta(db, table, pk, pklen); + if (rc != SQLITE_OK) goto cleanup; + +cleanup: + if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1); + // free memory if the primary key was dynamically allocated + if (pk != buffer) cloudsync_memory_free(pk); +} + +// MARK: - + +void cloudsync_update_payload_free (cloudsync_update_payload *payload) { + for (int i=0; icount; i++) { + sqlite3_value_free(payload->new_values[i]); + sqlite3_value_free(payload->old_values[i]); + } + cloudsync_memory_free(payload->new_values); + cloudsync_memory_free(payload->old_values); + sqlite3_value_free(payload->table_name); + payload->new_values = NULL; + payload->old_values = NULL; + payload->table_name = NULL; + payload->count = 0; + payload->capacity = 0; +} + +int cloudsync_update_payload_append (cloudsync_update_payload *payload, sqlite3_value *v1, sqlite3_value *v2, sqlite3_value *v3) { + if (payload->count >= payload->capacity) { + int newcap = payload->capacity ? payload->capacity * 2 : 128; + + sqlite3_value **new_values_2 = (sqlite3_value **)cloudsync_memory_realloc(payload->new_values, newcap * sizeof(*new_values_2)); + if (!new_values_2) return SQLITE_NOMEM; + payload->new_values = new_values_2; + + sqlite3_value **old_values_2 = (sqlite3_value **)cloudsync_memory_realloc(payload->old_values, newcap * sizeof(*old_values_2)); + if (!old_values_2) return SQLITE_NOMEM; + payload->old_values = old_values_2; + + payload->capacity = newcap; + } + + int index = payload->count; + if (payload->table_name == NULL) payload->table_name = sqlite3_value_dup(v1); + else if (dbutils_value_compare(payload->table_name, v1) != 0) return SQLITE_NOMEM; + payload->new_values[index] = sqlite3_value_dup(v2); + payload->old_values[index] = sqlite3_value_dup(v3); + payload->count++; + + // sanity check memory allocations + bool v1_can_be_null = (sqlite3_value_type(v1) == SQLITE_NULL); + bool v2_can_be_null = (sqlite3_value_type(v2) == SQLITE_NULL); + bool v3_can_be_null = (sqlite3_value_type(v3) == SQLITE_NULL); + + if ((payload->table_name == NULL) && (!v1_can_be_null)) return SQLITE_NOMEM; + if ((payload->old_values[index] == NULL) && (!v2_can_be_null)) return SQLITE_NOMEM; + if ((payload->new_values[index] == NULL) && (!v3_can_be_null)) return SQLITE_NOMEM; + + return SQLITE_OK; +} + +void cloudsync_update_step (sqlite3_context *context, int argc, sqlite3_value **argv) { + // argv[0] => table_name + // argv[1] => new_column_value + // argv[2] => old_column_value + + // allocate/get the update payload + cloudsync_update_payload *payload = (cloudsync_update_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_update_payload)); + if (!payload) {sqlite3_result_error_nomem(context); return;} + + if (cloudsync_update_payload_append(payload, argv[0], argv[1], argv[2]) != SQLITE_OK) { + sqlite3_result_error_nomem(context); + } +} + +void cloudsync_update_final (sqlite3_context *context) { + cloudsync_update_payload *payload = (cloudsync_update_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_update_payload)); + if (!payload || payload->count == 0) return; + + // retrieve context + sqlite3 *db = sqlite3_context_db_handle(context); + cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context); + + // lookup table + const char *table_name = (const char *)sqlite3_value_text(payload->table_name); + cloudsync_table_context *table = table_lookup(data, table_name); + if (!table) { + dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_update.", table_name); + return; + } + + // compute the next database version for tracking changes + sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET); + int rc = SQLITE_OK; + + // Check if the primary key(s) have changed bool prikey_changed = false; - for (int i=1; i<=table->npks; ++i) { - if (dbutils_value_compare(argv[i], argv[i+table->npks]) != 0) { + for (int i=0; inpks; ++i) { + if (dbutils_value_compare(payload->old_values[i], payload->new_values[i]) != 0) { prikey_changed = true; break; } } - + // encode the NEW primary key values into a buffer (used later for indexing) char buffer[1024]; char buffer2[1024]; @@ -2587,7 +2697,7 @@ void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv) size_t oldpklen = sizeof(buffer2); char *oldpk = NULL; - char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen); + char *pk = pk_encode_prikey(payload->new_values, table->npks, buffer, &pklen); if (!pk) { sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1); return; @@ -2599,7 +2709,7 @@ void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv) // 2. create a new row (NEW primary key) // encode the OLD primary key into a buffer - oldpk = pk_encode_prikey(&argv[1+table->npks], table->npks, buffer2, &oldpklen); + oldpk = pk_encode_prikey(payload->old_values, table->npks, buffer2, &oldpklen); if (!oldpk) { if (pk != buffer) cloudsync_memory_free(pk); sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1); @@ -2626,65 +2736,23 @@ void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv) } // compare NEW and OLD values (excluding primary keys) to handle column updates - // starting index for column values - int index = 1 + (table->npks * 2); for (int i=0; incols; i++) { - if (dbutils_value_compare(argv[i+index], argv[i+index+1]) != 0) { + int col_index = table->npks + i; // Regular columns start after primary keys + + if (dbutils_value_compare(payload->old_values[col_index], payload->new_values[col_index]) != 0) { // if a column value has changed, mark it as updated in the metadata // columns are in cid order rc = local_mark_insert_or_update_meta(db, table, pk, pklen, table->col_name[i], db_version, BUMP_SEQ(data)); if (rc != SQLITE_OK) goto cleanup; } - ++index; } cleanup: if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1); if (pk != buffer) cloudsync_memory_free(pk); if (oldpk && (oldpk != buffer2)) cloudsync_memory_free(oldpk); -} - -void cloudsync_delete (sqlite3_context *context, int argc, sqlite3_value **argv) { - DEBUG_FUNCTION("cloudsync_delete %s", sqlite3_value_text(argv[0])); - // debug_values(argc-1, &argv[1]); - - // retrieve context - sqlite3 *db = sqlite3_context_db_handle(context); - cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context); - - // lookup table - const char *table_name = (const char *)sqlite3_value_text(argv[0]); - cloudsync_table_context *table = table_lookup(data, table_name); - if (!table) { - dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_delete.", table_name); - return; - } - - // compute the next database version for tracking changes - sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET); - int rc = SQLITE_OK; - - // encode the primary key values into a buffer - char buffer[1024]; - size_t pklen = sizeof(buffer); - char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen); - if (!pk) { - sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1); - return; - } - - // mark the row as deleted by inserting a delete sentinel into the metadata - rc = local_mark_delete_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data)); - if (rc != SQLITE_OK) goto cleanup; - - // remove any metadata related to the old rows associated with this primary key - rc = local_drop_meta(db, table, pk, pklen); - if (rc != SQLITE_OK) goto cleanup; -cleanup: - if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1); - // free memory if the primary key was dynamically allocated - if (pk != buffer) cloudsync_memory_free(pk); + cloudsync_update_payload_free(payload); } // MARK: - @@ -3274,7 +3342,7 @@ int cloudsync_register (sqlite3 *db, char **pzErrMsg) { rc = dbutils_register_function(db, "cloudsync_insert", cloudsync_insert, -1, pzErrMsg, ctx, NULL); if (rc != SQLITE_OK) return rc; - rc = dbutils_register_function(db, "cloudsync_update", cloudsync_update, -1, pzErrMsg, ctx, NULL); + rc = dbutils_register_aggregate(db, "cloudsync_update", cloudsync_update_step, cloudsync_update_final, 3, pzErrMsg, ctx, NULL); if (rc != SQLITE_OK) return rc; rc = dbutils_register_function(db, "cloudsync_delete", cloudsync_delete, -1, pzErrMsg, ctx, NULL); diff --git a/src/dbutils.c b/src/dbutils.c index 9edd10f..eaa8bfb 100644 --- a/src/dbutils.c +++ b/src/dbutils.c @@ -541,25 +541,55 @@ int dbutils_check_triggers (sqlite3 *db, const char *table, table_algo algo) { if (!trigger_name) goto finalize; if (!dbutils_trigger_exists(db, trigger_name)) { - char *sql = cloudsync_memory_mprintf("SELECT group_concat('NEW.\"' || format('%%w', name) || '\"', ',') || ',' || group_concat('OLD.\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table); - if (!sql) goto finalize; + // Generate VALUES clause for all columns using a CTE to avoid compound SELECT limits + // First, get all primary key columns in order + char *pk_values_sql = cloudsync_memory_mprintf( + "SELECT group_concat('('||quote('%q')||', NEW.\"' || format('%%w', name) || '\", OLD.\"' || format('%%w', name) || '\")', ', ') " + "FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", + table, table); + if (!pk_values_sql) goto finalize; - char *pkclause = dbutils_text_select(db, sql); - char *pkvalues = (pkclause) ? pkclause : "NEW.rowid,OLD.rowid"; - cloudsync_memory_free(sql); + char *pk_values_list = dbutils_text_select(db, pk_values_sql); + cloudsync_memory_free(pk_values_sql); - sql = cloudsync_memory_mprintf("SELECT group_concat('NEW.\"' || format('%%w', name) || '\"' || ', OLD.\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid;", table); - if (!sql) goto finalize; - char *colvalues = dbutils_text_select(db, sql); - cloudsync_memory_free(sql); + // Then get all regular columns in order + char *col_values_sql = cloudsync_memory_mprintf( + "SELECT group_concat('('||quote('%q')||', NEW.\"' || format('%%w', name) || '\", OLD.\"' || format('%%w', name) || '\")', ', ') " + "FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid;", + table, table); + if (!col_values_sql) goto finalize; - if (colvalues == NULL) { - sql = cloudsync_memory_mprintf("CREATE TRIGGER \"%w\" AFTER UPDATE ON \"%w\" %s BEGIN SELECT cloudsync_update('%q',%s); END", trigger_name, table, trigger_when, table, pkvalues); + char *col_values_list = dbutils_text_select(db, col_values_sql); + cloudsync_memory_free(col_values_sql); + + // Build the complete VALUES query + char *values_query; + if (col_values_list && strlen(col_values_list) > 0) { + // Table has both primary keys and regular columns + values_query = cloudsync_memory_mprintf( + "WITH column_data(table_name, new_value, old_value) AS (VALUES %s, %s) " + "SELECT table_name, new_value, old_value FROM column_data", + pk_values_list, col_values_list); + cloudsync_memory_free(col_values_list); } else { - sql = cloudsync_memory_mprintf("CREATE TRIGGER \"%w\" AFTER UPDATE ON \"%w\" %s BEGIN SELECT cloudsync_update('%q',%s,%s); END", trigger_name, table, trigger_when, table, pkvalues, colvalues); - cloudsync_memory_free(colvalues); + // Table has only primary keys + values_query = cloudsync_memory_mprintf( + "WITH column_data(table_name, new_value, old_value) AS (VALUES %s) " + "SELECT table_name, new_value, old_value FROM column_data", + pk_values_list); } - if (pkclause) cloudsync_memory_free(pkclause); + + if (pk_values_list) cloudsync_memory_free(pk_values_list); + if (!values_query) goto finalize; + + // Create the trigger with aggregate function + char *sql = cloudsync_memory_mprintf( + "CREATE TRIGGER \"%w\" AFTER UPDATE ON \"%w\" %s BEGIN " + "SELECT cloudsync_update(table_name, new_value, old_value) FROM (%s); " + "END", + trigger_name, table, trigger_when, values_query); + + cloudsync_memory_free(values_query); if (!sql) goto finalize; rc = sqlite3_exec(db, sql, NULL, NULL, NULL); From 09a2c67cf4099f2c275d10b19f1dcc6572343452 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Thu, 21 Aug 2025 15:33:48 +0200 Subject: [PATCH 2/3] test: add test for tables with hundreads of columns --- test/unit.c | 155 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 153 insertions(+), 2 deletions(-) diff --git a/test/unit.c b/test/unit.c index d0b80c0..63c4080 100644 --- a/test/unit.c +++ b/test/unit.c @@ -1740,7 +1740,6 @@ bool do_test_dbutils (void) { "CREATE TABLE IF NOT EXISTS integer_pk (id INTEGER PRIMARY KEY NOT NULL, value);" "CREATE TABLE IF NOT EXISTS int_pk (id INT PRIMARY KEY NOT NULL, value);" "CREATE TABLE IF NOT EXISTS \"quoted table name 🚀\" (\"pk quoted col 1\" TEXT NOT NULL, \"pk quoted col 2\" TEXT NOT NULL, \"non pk quoted col 1\", \"non pk quoted col 2\", PRIMARY KEY (\"pk quoted col 1\", \"pk quoted col 2\"));"; -; rc = sqlite3_exec(db, sql, NULL, NULL, NULL); if (rc != SQLITE_OK) goto finalize; @@ -2031,6 +2030,35 @@ bool do_test_string_replace_prefix(void) { return true; } +bool do_test_many_columns (int ncols, sqlite3 *db) { + char sql_create[10000]; + int pos = 0; + pos += snprintf(sql_create+pos, sizeof(sql_create)-pos, "CREATE TABLE IF NOT EXISTS test_many_columns (id TEXT PRIMARY KEY NOT NULL"); + for (int i=1; i= MAX_SIMULATED_CLIENTS) { + nclients = MAX_SIMULATED_CLIENTS; + printf("Number of test merge reduced to %d clients\n", MAX_SIMULATED_CLIENTS); + } else if (nclients < 2) { + nclients = 2; + printf("Number of test merge increased to %d clients\n", 2); + } + + // create databases and tables + time_t timestamp = time(NULL); + int saved_counter = test_counter; + for (int i=0; i " CUSTOMERS_TABLE "\n"); + sql = sqlite3_mprintf("SELECT * FROM \"%w\" ORDER BY first_name, \"" CUSTOMERS_TABLE_COLUMN_LASTNAME "\";", CUSTOMERS_TABLE); + do_query(db[0], sql, query_table); + sqlite3_free(sql); + + printf("\n-> \"" CUSTOMERS_NOCOLS_TABLE "\"\n"); + do_query(db[0], "SELECT * FROM \"" CUSTOMERS_NOCOLS_TABLE "\" ORDER BY first_name, \"" CUSTOMERS_TABLE_COLUMN_LASTNAME "\";", query_table); + } + + result = true; + rc = SQLITE_OK; + +finalize: + for (int i=0; i 0) { + result = false; + printf("do_test_merge_two_tables error: db %d has %d unterminated statements\n", i, counter); + } + } + if (cleanup_databases) { + char buf[256]; + do_build_database_path(buf, i, timestamp, saved_counter++); + file_delete(buf); + } + } + return result; +} + bool do_test_prikey (int nclients, bool print_result, bool cleanup_databases) { sqlite3 *db[MAX_SIMULATED_CLIENTS] = {NULL}; bool result = false; @@ -3455,7 +3604,8 @@ int main(int argc, const char * argv[]) { result += test_report("Functions Test:", do_test_functions(db, print_result)); result += test_report("Functions Test (Int):", do_test_internal_functions()); result += test_report("String Func Test:", do_test_string_replace_prefix()); - + result += test_report("Test Many Columns:", do_test_many_columns(600, db)); + // close local database db = close_db(db); @@ -3467,6 +3617,7 @@ int main(int argc, const char * argv[]) { result += test_report("Merge Test 5:", do_test_merge_5(2, print_result, cleanup_databases, false)); result += test_report("Merge Alter Schema 1:", do_test_merge_alter_schema_1(2, print_result, cleanup_databases, false)); result += test_report("Merge Alter Schema 2:", do_test_merge_alter_schema_2(2, print_result, cleanup_databases, false)); + result += test_report("Merge Two Tables Test:", do_test_merge_two_tables(2, print_result, cleanup_databases)); result += test_report("PriKey NULL Test:", do_test_prikey(2, print_result, cleanup_databases)); result += test_report("Test Double Init:", do_test_double_init(2, cleanup_databases)); From 1db3ee24262fd8131928048f308cd2fd9c9681b4 Mon Sep 17 00:00:00 2001 From: Andrea Donetti Date: Thu, 21 Aug 2025 15:41:00 +0200 Subject: [PATCH 3/3] Raise version --- src/cloudsync.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cloudsync.h b/src/cloudsync.h index 1029d4b..a9aab5f 100644 --- a/src/cloudsync.h +++ b/src/cloudsync.h @@ -20,7 +20,7 @@ extern "C" { #endif -#define CLOUDSYNC_VERSION "0.8.25" +#define CLOUDSYNC_VERSION "0.8.26" int sqlite3_cloudsync_init (sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi); int cloudsync_autoinit (void);