Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 110 additions & 38 deletions src/cloudsync.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,14 @@ SQLITE_EXTENSION_INIT1
#define APIEXPORT
#endif

#define CLOUDSYNC_DEFAULT_ALGO "cls"
#define CLOUDSYNC_INIT_NTABLES 128
#define CLOUDSYNC_VALUE_NOTSET -1
#define CLOUDSYNC_MIN_DB_VERSION 0
#define CLOUDSYNC_PAYLOAD_MINBUF_SIZE 512*1024
#define CLOUDSYNC_PAYLOAD_VERSION 1
#define CLOUDSYNC_PAYLOAD_SIGNATURE 'CLSY'
#define CLOUDSYNC_PK_INDEX_DBVERSION 5
#define CLOUDSYNC_PK_INDEX_SEQ 8
#define CLOUDSYNC_DEFAULT_ALGO "cls"
#define CLOUDSYNC_INIT_NTABLES 128
#define CLOUDSYNC_VALUE_NOTSET -1
#define CLOUDSYNC_MIN_DB_VERSION 0
#define CLOUDSYNC_PAYLOAD_MINBUF_SIZE 512*1024
#define CLOUDSYNC_PAYLOAD_VERSION 1
#define CLOUDSYNC_PAYLOAD_SIGNATURE 'CLSY'
#define CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY "cloudsync_payload_apply_callback"

#ifndef MAX
#define MAX(a, b) (((a)>(b))?(a):(b))
Expand All @@ -93,12 +92,17 @@ typedef enum {
CLOUDSYNC_STMT_VALUE_CHANGED = 1,
} CLOUDSYNC_STMT_VALUE;

typedef struct {
sqlite3_stmt *vm;
int64_t dbversion;
int64_t seq;
int64_t tmp_dbversion;
} cloudsync_pk_decode_bind_context;
typedef enum {
CLOUDSYNC_PK_INDEX_TBL = 0,
CLOUDSYNC_PK_INDEX_PK = 1,
CLOUDSYNC_PK_INDEX_COLNAME = 2,
CLOUDSYNC_PK_INDEX_COLVALUE = 3,
CLOUDSYNC_PK_INDEX_COLVERSION = 4,
CLOUDSYNC_PK_INDEX_DBVERSION = 5,
CLOUDSYNC_PK_INDEX_SITEID = 6,
CLOUDSYNC_PK_INDEX_CL = 7,
CLOUDSYNC_PK_INDEX_SEQ = 8
} CLOUDSYNC_PK_INDEX;

typedef struct {
sqlite3_context *context;
Expand Down Expand Up @@ -1923,36 +1927,67 @@ void cloudsync_network_encode_final (sqlite3_context *context) {
if (!use_uncompressed_buffer) cloudsync_memory_free(buffer);
}

cloudsync_payload_apply_callback_t cloudsync_get_payload_apply_callback(sqlite3 *db) {
return sqlite3_get_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY);
}

void cloudsync_set_payload_apply_callback(sqlite3 *db, cloudsync_payload_apply_callback_t callback) {
sqlite3_set_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY, (void*)callback, NULL);
}

int cloudsync_pk_decode_bind_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) {
cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata;
int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval);

if (rc == SQLITE_OK && type == SQLITE_INTEGER) {
if (rc == SQLITE_OK) {
// the dbversion index is smaller than seq index, so it is processed first
// when processing the dbversion column: save the value to the tmp_dbversion field
// when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value
switch (index) {
case CLOUDSYNC_PK_INDEX_TBL:
if (type == SQLITE_TEXT) {
decode_context->tbl = pval;
decode_context->tbl_len = ival;
}
break;
case CLOUDSYNC_PK_INDEX_PK:
if (type == SQLITE_BLOB) {
decode_context->pk = pval;
decode_context->pk_len = ival;
}
break;
case CLOUDSYNC_PK_INDEX_COLNAME:
if (type == SQLITE_TEXT) {
decode_context->col_name = pval;
decode_context->col_name_len = ival;
}
break;
case CLOUDSYNC_PK_INDEX_COLVERSION:
if (type == SQLITE_INTEGER) decode_context->col_version = ival;
break;
case CLOUDSYNC_PK_INDEX_DBVERSION:
decode_context->tmp_dbversion = ival;
if (type == SQLITE_INTEGER) decode_context->db_version = ival;
break;
case CLOUDSYNC_PK_INDEX_SEQ:
// when the dbversion field is incremented the seq val must be updated too
// because the current decode_context->seq field refers to the previous dbversion
if (decode_context->tmp_dbversion > decode_context->dbversion) {
decode_context->dbversion = decode_context->tmp_dbversion;
decode_context->seq = ival;
} else if (decode_context->tmp_dbversion == decode_context->dbversion) {
decode_context->seq = MAX(decode_context->seq, ival);
case CLOUDSYNC_PK_INDEX_SITEID:
if (type == SQLITE_BLOB) {
decode_context->site_id = pval;
decode_context->site_id_len = ival;
}
// reset the tmp_dbversion value before processing the next row
decode_context->tmp_dbversion = 0;
break;
case CLOUDSYNC_PK_INDEX_CL:
if (type == SQLITE_INTEGER) decode_context->cl = ival;
break;
case CLOUDSYNC_PK_INDEX_SEQ:
if (type == SQLITE_INTEGER) decode_context->seq = ival;
break;
}
}

return rc;
}

// #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION

int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int blen) {
// decode header
cloudsync_network_header header;
Expand Down Expand Up @@ -2016,31 +2051,48 @@ int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int
uint32_t nrows = header.nrows;
int dbversion = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_DBVERSION);
int seq = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_SEQ);
cloudsync_pk_decode_bind_context xdata = {.vm = vm, .dbversion = dbversion, .seq = seq, .tmp_dbversion = 0};
cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
void *payload_apply_xdata = NULL;
cloudsync_payload_apply_callback_t payload_apply_callback = cloudsync_get_payload_apply_callback(db);

for (uint32_t i=0; i<nrows; ++i) {
size_t seek = 0;
pk_decode((char *)buffer, blen, ncols, &seek, cloudsync_pk_decode_bind_callback, &xdata);
pk_decode((char *)buffer, blen, ncols, &seek, cloudsync_pk_decode_bind_callback, &decoded_context);
// n is the pk_decode return value, I don't think I should assert here because in any case the next sqlite3_step would fail
// assert(n == ncols);

rc = sqlite3_step(vm);
if (rc != SQLITE_DONE) break;
bool approved = true;
if (payload_apply_callback) approved = payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_WILL_APPLY, SQLITE_OK);

if (approved) {
rc = sqlite3_step(vm);
if (rc != SQLITE_DONE) {
// don't "break;", the error can be due to a RLS policy.
// in case of error we try to apply the following changes
printf("cloudsync_payload_apply error in step: (%d) %s\n", rc, sqlite3_errmsg(db));
}
}

if (payload_apply_callback) payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_DID_APPLY, rc);

buffer += seek;
blen -= seek;
stmt_reset(vm);
}


if (payload_apply_callback) payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_CLEANUP, rc);

if (rc == SQLITE_DONE) rc = SQLITE_OK;
if (rc == SQLITE_OK) {
char buf[256];
if (xdata.dbversion != dbversion) {
snprintf(buf, sizeof(buf), "%lld", xdata.dbversion);
if (decoded_context.db_version >= dbversion) {
snprintf(buf, sizeof(buf), "%lld", decoded_context.db_version);
dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_DBVERSION, buf);
}
if (xdata.seq != seq) {
snprintf(buf, sizeof(buf), "%lld", xdata.seq);
dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_SEQ, buf);

if (decoded_context.seq != seq) {
snprintf(buf, sizeof(buf), "%lld", decoded_context.seq);
dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_SEQ, buf);
}
}
}

Expand All @@ -2061,6 +2113,26 @@ int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int
return nrows;
}

sqlite3_stmt *cloudsync_col_value_stmt (sqlite3 *db, cloudsync_context *data, const char *tbl_name, bool *persistent) {
sqlite3_stmt *vm;

cloudsync_table_context *table = table_lookup(data, tbl_name, false);
char *col_name = NULL;
if (table->ncols > 0) {
col_name = table->col_name[0];
// retrieve col_value precompiled statement
vm = table_column_lookup(table, col_name, false, NULL);
*persistent = true;
} else {
char *sql = table_build_value_sql(db, table, "*");
sqlite3_prepare_v2(db, sql, -1, &vm, NULL);
cloudsync_memory_free(sql);
*persistent = false;
}

return vm;
}

void cloudsync_network_decode (sqlite3_context *context, int argc, sqlite3_value **argv) {
DEBUG_FUNCTION("cloudsync_network_decode");
//debug_values(argc, argv);
Expand Down
25 changes: 25 additions & 0 deletions src/cloudsync.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,30 @@
#define CLOUDSYNC_RLS_RESTRICTED_VALUE "__[RLS]__"
#define CLOUDSYNC_DISABLE_ROWIDONLY_TABLES 1

typedef enum {
CLOUDSYNC_PAYLOAD_APPLY_WILL_APPLY = 1,
CLOUDSYNC_PAYLOAD_APPLY_DID_APPLY = 2,
CLOUDSYNC_PAYLOAD_APPLY_CLEANUP = 3
} CLOUDSYNC_PAYLOAD_APPLY_STEPS;

typedef struct {
sqlite3_stmt *vm;
char *tbl;
int64_t tbl_len;
const void *pk;
int64_t pk_len;
char *col_name;
int64_t col_name_len;
int64_t col_version;
int64_t db_version;
const void *site_id;
int64_t site_id_len;
int64_t cl;
int64_t seq;
} cloudsync_pk_decode_bind_context;

typedef struct cloudsync_context cloudsync_context;
typedef bool (*cloudsync_payload_apply_callback_t)(void **xdata, cloudsync_pk_decode_bind_context *decoded_change, sqlite3 *db, cloudsync_context *data, int step, int rc);

int sqlite3_cloudsync_init (sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi);
bool cloudsync_config_exists (sqlite3 *db);
Expand All @@ -32,5 +55,7 @@ void cloudsync_sync_table_key (cloudsync_context *data, const char *table, const
void *cloudsync_get_auxdata (sqlite3_context *context);
void cloudsync_set_auxdata (sqlite3_context *context, void *xdata);
int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int blen);
void cloudsync_set_payload_apply_callback(sqlite3 *db, cloudsync_payload_apply_callback_t callback);
sqlite3_stmt *cloudsync_col_value_stmt (sqlite3 *db, cloudsync_context *data, const char *tbl_name, bool *persistent);

#endif
9 changes: 9 additions & 0 deletions src/dbutils.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,15 @@ bool dbutils_table_sanity_check (sqlite3 *db, sqlite3_context *context, const ch
}
}

// check for columns declared as NOT NULL without a DEFAULT value.
// Otherwise, col_merge_stmt would fail if changes to other columns are inserted first.
sql = sqlite3_snprintf((int)blen, buffer, "SELECT count(*) FROM pragma_table_info('%w') WHERE pk=0 AND \"notnull\"=1 AND \"dflt_value\" IS NULL;", name);
sqlite3_int64 count3 = dbutils_int_select(db, sql);
if (count3 > 0) {
dbutils_context_result_error(context, "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name);
return false;
}

return true;
}

Expand Down
17 changes: 15 additions & 2 deletions src/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ void *cloudsync_memory_zeroalloc (uint64_t size) {
return ptr;
}

char *cloudsync_string_dup (const char *str, bool lowercase) {
char *cloudsync_string_ndup (const char *str, size_t len, bool lowercase) {
if (str == NULL) return NULL;

size_t len = strlen(str);
char *s = (char *)cloudsync_memory_alloc((sqlite3_uint64)(len + 1));
if (!s) return NULL;

Expand All @@ -148,6 +147,20 @@ char *cloudsync_string_dup (const char *str, bool lowercase) {
return s;
}

char *cloudsync_string_dup (const char *str, bool lowercase) {
if (str == NULL) return NULL;

size_t len = strlen(str);
return cloudsync_string_ndup(str, len, lowercase);
}

int cloudsync_blob_compare(const char *blob1, size_t size1, const char *blob2, size_t size2) {
if (size1 != size2) {
return (int)(size1 - size2); // Blobs are different if sizes are different
}
return memcmp(blob1, blob2, size1); // Use memcmp for byte-by-byte comparison
}

void cloudsync_rowid_decode (sqlite3_int64 rowid, sqlite3_int64 *db_version, sqlite3_int64 *seq) {
// use unsigned 64-bit integer for intermediate calculations
// when db_version is large enough, it can cause overflow, leading to negative values
Expand Down
3 changes: 3 additions & 0 deletions src/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ char *cloudsync_string_replace_prefix(const char *input, char *prefix, char *rep
uint64_t fnv1a_hash(const char *data, size_t len);

void *cloudsync_memory_zeroalloc (uint64_t size);
char *cloudsync_string_ndup (const char *str, size_t len, bool lowercase);
char *cloudsync_string_dup (const char *str, bool lowercase);
int cloudsync_blob_compare(const char *blob1, size_t size1, const char *blob2, size_t size2);

void cloudsync_rowid_decode (sqlite3_int64 rowid, sqlite3_int64 *db_version, sqlite3_int64 *seq);

#endif
Loading