Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 130 additions & 62 deletions src/cloudsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,14 @@ typedef struct PACKED {
uint8_t unused[6]; // padding to ensure the struct is exactly 32 bytes
} cloudsync_network_header;

typedef struct {
sqlite3_value *table_name;
sqlite3_value **new_values;
sqlite3_value **old_values;
int count;
int capacity;
} cloudsync_update_payload;

#ifdef _MSC_VER
#pragma pack(pop)
#endif
Expand Down Expand Up @@ -2545,16 +2553,10 @@ void cloudsync_insert (sqlite3_context *context, int argc, sqlite3_value **argv)
if (pk != buffer) cloudsync_memory_free(pk);
}

void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv) {
DEBUG_FUNCTION("cloudsync_update %s", sqlite3_value_text(argv[0]));
void cloudsync_delete (sqlite3_context *context, int argc, sqlite3_value **argv) {
DEBUG_FUNCTION("cloudsync_delete %s", sqlite3_value_text(argv[0]));
// debug_values(argc-1, &argv[1]);

// arguments are:
// [0] table name
// [1..table->npks] NEW.prikeys
// [1+table->npks ..] OLD.prikeys
// then NEW.value,OLD.value

// retrieve context
sqlite3 *db = sqlite3_context_db_handle(context);
cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);
Expand All @@ -2563,31 +2565,139 @@ void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv)
const char *table_name = (const char *)sqlite3_value_text(argv[0]);
cloudsync_table_context *table = table_lookup(data, table_name);
if (!table) {
dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_update.", table_name);
dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_delete.", table_name);
return;
}

// compute the next database version for tracking changes
sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
int rc = SQLITE_OK;

// check if the primary key(s) have changed by comparing the NEW and OLD primary key values
// encode the primary key values into a buffer
char buffer[1024];
size_t pklen = sizeof(buffer);
char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
if (!pk) {
sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
return;
}

// mark the row as deleted by inserting a delete sentinel into the metadata
rc = local_mark_delete_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
if (rc != SQLITE_OK) goto cleanup;

// remove any metadata related to the old rows associated with this primary key
rc = local_drop_meta(db, table, pk, pklen);
if (rc != SQLITE_OK) goto cleanup;

cleanup:
if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
// free memory if the primary key was dynamically allocated
if (pk != buffer) cloudsync_memory_free(pk);
}

// MARK: -

void cloudsync_update_payload_free (cloudsync_update_payload *payload) {
for (int i=0; i<payload->count; i++) {
sqlite3_value_free(payload->new_values[i]);
sqlite3_value_free(payload->old_values[i]);
}
cloudsync_memory_free(payload->new_values);
cloudsync_memory_free(payload->old_values);
sqlite3_value_free(payload->table_name);
payload->new_values = NULL;
payload->old_values = NULL;
payload->table_name = NULL;
payload->count = 0;
payload->capacity = 0;
}

int cloudsync_update_payload_append (cloudsync_update_payload *payload, sqlite3_value *v1, sqlite3_value *v2, sqlite3_value *v3) {
if (payload->count >= payload->capacity) {
int newcap = payload->capacity ? payload->capacity * 2 : 128;

sqlite3_value **new_values_2 = (sqlite3_value **)cloudsync_memory_realloc(payload->new_values, newcap * sizeof(*new_values_2));
if (!new_values_2) return SQLITE_NOMEM;
payload->new_values = new_values_2;

sqlite3_value **old_values_2 = (sqlite3_value **)cloudsync_memory_realloc(payload->old_values, newcap * sizeof(*old_values_2));
if (!old_values_2) return SQLITE_NOMEM;
payload->old_values = old_values_2;

payload->capacity = newcap;
}

int index = payload->count;
if (payload->table_name == NULL) payload->table_name = sqlite3_value_dup(v1);
else if (dbutils_value_compare(payload->table_name, v1) != 0) return SQLITE_NOMEM;
payload->new_values[index] = sqlite3_value_dup(v2);
payload->old_values[index] = sqlite3_value_dup(v3);
payload->count++;

// sanity check memory allocations
bool v1_can_be_null = (sqlite3_value_type(v1) == SQLITE_NULL);
bool v2_can_be_null = (sqlite3_value_type(v2) == SQLITE_NULL);
bool v3_can_be_null = (sqlite3_value_type(v3) == SQLITE_NULL);

if ((payload->table_name == NULL) && (!v1_can_be_null)) return SQLITE_NOMEM;
if ((payload->old_values[index] == NULL) && (!v2_can_be_null)) return SQLITE_NOMEM;
if ((payload->new_values[index] == NULL) && (!v3_can_be_null)) return SQLITE_NOMEM;

return SQLITE_OK;
}

void cloudsync_update_step (sqlite3_context *context, int argc, sqlite3_value **argv) {
// argv[0] => table_name
// argv[1] => new_column_value
// argv[2] => old_column_value

// allocate/get the update payload
cloudsync_update_payload *payload = (cloudsync_update_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_update_payload));
if (!payload) {sqlite3_result_error_nomem(context); return;}

if (cloudsync_update_payload_append(payload, argv[0], argv[1], argv[2]) != SQLITE_OK) {
sqlite3_result_error_nomem(context);
}
}

void cloudsync_update_final (sqlite3_context *context) {
cloudsync_update_payload *payload = (cloudsync_update_payload *)sqlite3_aggregate_context(context, sizeof(cloudsync_update_payload));
if (!payload || payload->count == 0) return;

// retrieve context
sqlite3 *db = sqlite3_context_db_handle(context);
cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);

// lookup table
const char *table_name = (const char *)sqlite3_value_text(payload->table_name);
cloudsync_table_context *table = table_lookup(data, table_name);
if (!table) {
dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_update.", table_name);
return;
}

// compute the next database version for tracking changes
sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
int rc = SQLITE_OK;

// Check if the primary key(s) have changed
bool prikey_changed = false;
for (int i=1; i<=table->npks; ++i) {
if (dbutils_value_compare(argv[i], argv[i+table->npks]) != 0) {
for (int i=0; i<table->npks; ++i) {
if (dbutils_value_compare(payload->old_values[i], payload->new_values[i]) != 0) {
prikey_changed = true;
break;
}
}

// encode the NEW primary key values into a buffer (used later for indexing)
char buffer[1024];
char buffer2[1024];
size_t pklen = sizeof(buffer);
size_t oldpklen = sizeof(buffer2);
char *oldpk = NULL;

char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
char *pk = pk_encode_prikey(payload->new_values, table->npks, buffer, &pklen);
if (!pk) {
sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
return;
Expand All @@ -2599,7 +2709,7 @@ void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv)
// 2. create a new row (NEW primary key)

// encode the OLD primary key into a buffer
oldpk = pk_encode_prikey(&argv[1+table->npks], table->npks, buffer2, &oldpklen);
oldpk = pk_encode_prikey(payload->old_values, table->npks, buffer2, &oldpklen);
if (!oldpk) {
if (pk != buffer) cloudsync_memory_free(pk);
sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
Expand All @@ -2626,65 +2736,23 @@ void cloudsync_update (sqlite3_context *context, int argc, sqlite3_value **argv)
}

// compare NEW and OLD values (excluding primary keys) to handle column updates
// starting index for column values
int index = 1 + (table->npks * 2);
for (int i=0; i<table->ncols; i++) {
if (dbutils_value_compare(argv[i+index], argv[i+index+1]) != 0) {
int col_index = table->npks + i; // Regular columns start after primary keys

if (dbutils_value_compare(payload->old_values[col_index], payload->new_values[col_index]) != 0) {
// if a column value has changed, mark it as updated in the metadata
// columns are in cid order
rc = local_mark_insert_or_update_meta(db, table, pk, pklen, table->col_name[i], db_version, BUMP_SEQ(data));
if (rc != SQLITE_OK) goto cleanup;
}
++index;
}

cleanup:
if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
if (pk != buffer) cloudsync_memory_free(pk);
if (oldpk && (oldpk != buffer2)) cloudsync_memory_free(oldpk);
}

void cloudsync_delete (sqlite3_context *context, int argc, sqlite3_value **argv) {
DEBUG_FUNCTION("cloudsync_delete %s", sqlite3_value_text(argv[0]));
// debug_values(argc-1, &argv[1]);

// retrieve context
sqlite3 *db = sqlite3_context_db_handle(context);
cloudsync_context *data = (cloudsync_context *)sqlite3_user_data(context);

// lookup table
const char *table_name = (const char *)sqlite3_value_text(argv[0]);
cloudsync_table_context *table = table_lookup(data, table_name);
if (!table) {
dbutils_context_result_error(context, "Unable to retrieve table name %s in cloudsync_delete.", table_name);
return;
}

// compute the next database version for tracking changes
sqlite3_int64 db_version = db_version_next(db, data, CLOUDSYNC_VALUE_NOTSET);
int rc = SQLITE_OK;

// encode the primary key values into a buffer
char buffer[1024];
size_t pklen = sizeof(buffer);
char *pk = pk_encode_prikey(&argv[1], table->npks, buffer, &pklen);
if (!pk) {
sqlite3_result_error(context, "Not enough memory to encode the primary key(s).", -1);
return;
}

// mark the row as deleted by inserting a delete sentinel into the metadata
rc = local_mark_delete_meta(db, table, pk, pklen, db_version, BUMP_SEQ(data));
if (rc != SQLITE_OK) goto cleanup;

// remove any metadata related to the old rows associated with this primary key
rc = local_drop_meta(db, table, pk, pklen);
if (rc != SQLITE_OK) goto cleanup;

cleanup:
if (rc != SQLITE_OK) sqlite3_result_error(context, sqlite3_errmsg(db), -1);
// free memory if the primary key was dynamically allocated
if (pk != buffer) cloudsync_memory_free(pk);
cloudsync_update_payload_free(payload);
}

// MARK: -
Expand Down Expand Up @@ -3274,7 +3342,7 @@ int cloudsync_register (sqlite3 *db, char **pzErrMsg) {
rc = dbutils_register_function(db, "cloudsync_insert", cloudsync_insert, -1, pzErrMsg, ctx, NULL);
if (rc != SQLITE_OK) return rc;

rc = dbutils_register_function(db, "cloudsync_update", cloudsync_update, -1, pzErrMsg, ctx, NULL);
rc = dbutils_register_aggregate(db, "cloudsync_update", cloudsync_update_step, cloudsync_update_final, 3, pzErrMsg, ctx, NULL);
if (rc != SQLITE_OK) return rc;

rc = dbutils_register_function(db, "cloudsync_delete", cloudsync_delete, -1, pzErrMsg, ctx, NULL);
Expand Down
2 changes: 1 addition & 1 deletion src/cloudsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
extern "C" {
#endif

#define CLOUDSYNC_VERSION "0.8.25"
#define CLOUDSYNC_VERSION "0.8.26"

int sqlite3_cloudsync_init (sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi);
int cloudsync_autoinit (void);
Expand Down
58 changes: 44 additions & 14 deletions src/dbutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -541,25 +541,55 @@ int dbutils_check_triggers (sqlite3 *db, const char *table, table_algo algo) {
if (!trigger_name) goto finalize;

if (!dbutils_trigger_exists(db, trigger_name)) {
char *sql = cloudsync_memory_mprintf("SELECT group_concat('NEW.\"' || format('%%w', name) || '\"', ',') || ',' || group_concat('OLD.\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;", table);
if (!sql) goto finalize;
// Generate VALUES clause for all columns using a CTE to avoid compound SELECT limits
// First, get all primary key columns in order
char *pk_values_sql = cloudsync_memory_mprintf(
"SELECT group_concat('('||quote('%q')||', NEW.\"' || format('%%w', name) || '\", OLD.\"' || format('%%w', name) || '\")', ', ') "
"FROM pragma_table_info('%q') WHERE pk>0 ORDER BY pk;",
table, table);
if (!pk_values_sql) goto finalize;

char *pkclause = dbutils_text_select(db, sql);
char *pkvalues = (pkclause) ? pkclause : "NEW.rowid,OLD.rowid";
cloudsync_memory_free(sql);
char *pk_values_list = dbutils_text_select(db, pk_values_sql);
cloudsync_memory_free(pk_values_sql);

sql = cloudsync_memory_mprintf("SELECT group_concat('NEW.\"' || format('%%w', name) || '\"' || ', OLD.\"' || format('%%w', name) || '\"', ',') FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid;", table);
if (!sql) goto finalize;
char *colvalues = dbutils_text_select(db, sql);
cloudsync_memory_free(sql);
// Then get all regular columns in order
char *col_values_sql = cloudsync_memory_mprintf(
"SELECT group_concat('('||quote('%q')||', NEW.\"' || format('%%w', name) || '\", OLD.\"' || format('%%w', name) || '\")', ', ') "
"FROM pragma_table_info('%q') WHERE pk=0 ORDER BY cid;",
table, table);
if (!col_values_sql) goto finalize;

if (colvalues == NULL) {
sql = cloudsync_memory_mprintf("CREATE TRIGGER \"%w\" AFTER UPDATE ON \"%w\" %s BEGIN SELECT cloudsync_update('%q',%s); END", trigger_name, table, trigger_when, table, pkvalues);
char *col_values_list = dbutils_text_select(db, col_values_sql);
cloudsync_memory_free(col_values_sql);

// Build the complete VALUES query
char *values_query;
if (col_values_list && strlen(col_values_list) > 0) {
// Table has both primary keys and regular columns
values_query = cloudsync_memory_mprintf(
"WITH column_data(table_name, new_value, old_value) AS (VALUES %s, %s) "
"SELECT table_name, new_value, old_value FROM column_data",
pk_values_list, col_values_list);
cloudsync_memory_free(col_values_list);
} else {
sql = cloudsync_memory_mprintf("CREATE TRIGGER \"%w\" AFTER UPDATE ON \"%w\" %s BEGIN SELECT cloudsync_update('%q',%s,%s); END", trigger_name, table, trigger_when, table, pkvalues, colvalues);
cloudsync_memory_free(colvalues);
// Table has only primary keys
values_query = cloudsync_memory_mprintf(
"WITH column_data(table_name, new_value, old_value) AS (VALUES %s) "
"SELECT table_name, new_value, old_value FROM column_data",
pk_values_list);
}
if (pkclause) cloudsync_memory_free(pkclause);

if (pk_values_list) cloudsync_memory_free(pk_values_list);
if (!values_query) goto finalize;

// Create the trigger with aggregate function
char *sql = cloudsync_memory_mprintf(
"CREATE TRIGGER \"%w\" AFTER UPDATE ON \"%w\" %s BEGIN "
"SELECT cloudsync_update(table_name, new_value, old_value) FROM (%s); "
"END",
trigger_name, table, trigger_when, values_query);

cloudsync_memory_free(values_query);
if (!sql) goto finalize;

rc = sqlite3_exec(db, sql, NULL, NULL, NULL);
Expand Down
Loading