diff --git a/src/cloudsync.c b/src/cloudsync.c index fbf20d7..e577f48 100644 --- a/src/cloudsync.c +++ b/src/cloudsync.c @@ -64,15 +64,14 @@ SQLITE_EXTENSION_INIT1 #define APIEXPORT #endif -#define CLOUDSYNC_DEFAULT_ALGO "cls" -#define CLOUDSYNC_INIT_NTABLES 128 -#define CLOUDSYNC_VALUE_NOTSET -1 -#define CLOUDSYNC_MIN_DB_VERSION 0 -#define CLOUDSYNC_PAYLOAD_MINBUF_SIZE 512*1024 -#define CLOUDSYNC_PAYLOAD_VERSION 1 -#define CLOUDSYNC_PAYLOAD_SIGNATURE 'CLSY' -#define CLOUDSYNC_PK_INDEX_DBVERSION 5 -#define CLOUDSYNC_PK_INDEX_SEQ 8 +#define CLOUDSYNC_DEFAULT_ALGO "cls" +#define CLOUDSYNC_INIT_NTABLES 128 +#define CLOUDSYNC_VALUE_NOTSET -1 +#define CLOUDSYNC_MIN_DB_VERSION 0 +#define CLOUDSYNC_PAYLOAD_MINBUF_SIZE 512*1024 +#define CLOUDSYNC_PAYLOAD_VERSION 1 +#define CLOUDSYNC_PAYLOAD_SIGNATURE 'CLSY' +#define CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY "cloudsync_payload_apply_callback" #ifndef MAX #define MAX(a, b) (((a)>(b))?(a):(b)) @@ -93,12 +92,17 @@ typedef enum { CLOUDSYNC_STMT_VALUE_CHANGED = 1, } CLOUDSYNC_STMT_VALUE; -typedef struct { - sqlite3_stmt *vm; - int64_t dbversion; - int64_t seq; - int64_t tmp_dbversion; -} cloudsync_pk_decode_bind_context; +typedef enum { + CLOUDSYNC_PK_INDEX_TBL = 0, + CLOUDSYNC_PK_INDEX_PK = 1, + CLOUDSYNC_PK_INDEX_COLNAME = 2, + CLOUDSYNC_PK_INDEX_COLVALUE = 3, + CLOUDSYNC_PK_INDEX_COLVERSION = 4, + CLOUDSYNC_PK_INDEX_DBVERSION = 5, + CLOUDSYNC_PK_INDEX_SITEID = 6, + CLOUDSYNC_PK_INDEX_CL = 7, + CLOUDSYNC_PK_INDEX_SEQ = 8 +} CLOUDSYNC_PK_INDEX; typedef struct { sqlite3_context *context; @@ -1923,29 +1927,58 @@ void cloudsync_network_encode_final (sqlite3_context *context) { if (!use_uncompressed_buffer) cloudsync_memory_free(buffer); } +cloudsync_payload_apply_callback_t cloudsync_get_payload_apply_callback(sqlite3 *db) { + return sqlite3_get_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY); +} + +void cloudsync_set_payload_apply_callback(sqlite3 *db, cloudsync_payload_apply_callback_t callback) { + sqlite3_set_clientdata(db, CLOUDSYNC_PAYLOAD_APPLY_CALLBACK_KEY, (void*)callback, NULL); +} + int cloudsync_pk_decode_bind_callback (void *xdata, int index, int type, int64_t ival, double dval, char *pval) { cloudsync_pk_decode_bind_context *decode_context = (cloudsync_pk_decode_bind_context*)xdata; int rc = pk_decode_bind_callback(decode_context->vm, index, type, ival, dval, pval); - if (rc == SQLITE_OK && type == SQLITE_INTEGER) { + if (rc == SQLITE_OK) { // the dbversion index is smaller than seq index, so it is processed first // when processing the dbversion column: save the value to the tmp_dbversion field // when processing the seq column: update the dbversion and seq fields only if the current dbversion is greater than the last max value switch (index) { + case CLOUDSYNC_PK_INDEX_TBL: + if (type == SQLITE_TEXT) { + decode_context->tbl = pval; + decode_context->tbl_len = ival; + } + break; + case CLOUDSYNC_PK_INDEX_PK: + if (type == SQLITE_BLOB) { + decode_context->pk = pval; + decode_context->pk_len = ival; + } + break; + case CLOUDSYNC_PK_INDEX_COLNAME: + if (type == SQLITE_TEXT) { + decode_context->col_name = pval; + decode_context->col_name_len = ival; + } + break; + case CLOUDSYNC_PK_INDEX_COLVERSION: + if (type == SQLITE_INTEGER) decode_context->col_version = ival; + break; case CLOUDSYNC_PK_INDEX_DBVERSION: - decode_context->tmp_dbversion = ival; + if (type == SQLITE_INTEGER) decode_context->db_version = ival; break; - case CLOUDSYNC_PK_INDEX_SEQ: - // when the dbversion field is incremented the seq val must be updated too - // because the current decode_context->seq field refers to the previous dbversion - if (decode_context->tmp_dbversion > decode_context->dbversion) { - decode_context->dbversion = decode_context->tmp_dbversion; - decode_context->seq = ival; - } else if (decode_context->tmp_dbversion == decode_context->dbversion) { - decode_context->seq = MAX(decode_context->seq, ival); + case CLOUDSYNC_PK_INDEX_SITEID: + if (type == SQLITE_BLOB) { + decode_context->site_id = pval; + decode_context->site_id_len = ival; } - // reset the tmp_dbversion value before processing the next row - decode_context->tmp_dbversion = 0; + break; + case CLOUDSYNC_PK_INDEX_CL: + if (type == SQLITE_INTEGER) decode_context->cl = ival; + break; + case CLOUDSYNC_PK_INDEX_SEQ: + if (type == SQLITE_INTEGER) decode_context->seq = ival; break; } } @@ -1953,6 +1986,8 @@ int cloudsync_pk_decode_bind_callback (void *xdata, int index, int type, int64_t return rc; } +// #ifndef CLOUDSYNC_OMIT_RLS_VALIDATION + int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int blen) { // decode header cloudsync_network_header header; @@ -2016,31 +2051,48 @@ int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int uint32_t nrows = header.nrows; int dbversion = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_DBVERSION); int seq = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_SEQ); - cloudsync_pk_decode_bind_context xdata = {.vm = vm, .dbversion = dbversion, .seq = seq, .tmp_dbversion = 0}; + cloudsync_pk_decode_bind_context decoded_context = {.vm = vm}; + void *payload_apply_xdata = NULL; + cloudsync_payload_apply_callback_t payload_apply_callback = cloudsync_get_payload_apply_callback(db); + for (uint32_t i=0; i= dbversion) { + snprintf(buf, sizeof(buf), "%lld", decoded_context.db_version); dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_DBVERSION, buf); - } - if (xdata.seq != seq) { - snprintf(buf, sizeof(buf), "%lld", xdata.seq); - dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_SEQ, buf); + + if (decoded_context.seq != seq) { + snprintf(buf, sizeof(buf), "%lld", decoded_context.seq); + dbutils_settings_set_key_value(db, context, CLOUDSYNC_KEY_CHECK_SEQ, buf); + } } } @@ -2061,6 +2113,26 @@ int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int return nrows; } +sqlite3_stmt *cloudsync_col_value_stmt (sqlite3 *db, cloudsync_context *data, const char *tbl_name, bool *persistent) { + sqlite3_stmt *vm; + + cloudsync_table_context *table = table_lookup(data, tbl_name, false); + char *col_name = NULL; + if (table->ncols > 0) { + col_name = table->col_name[0]; + // retrieve col_value precompiled statement + vm = table_column_lookup(table, col_name, false, NULL); + *persistent = true; + } else { + char *sql = table_build_value_sql(db, table, "*"); + sqlite3_prepare_v2(db, sql, -1, &vm, NULL); + cloudsync_memory_free(sql); + *persistent = false; + } + + return vm; +} + void cloudsync_network_decode (sqlite3_context *context, int argc, sqlite3_value **argv) { DEBUG_FUNCTION("cloudsync_network_decode"); //debug_values(argc, argv); diff --git a/src/cloudsync.h b/src/cloudsync.h index d374dd3..b5a578f 100644 --- a/src/cloudsync.h +++ b/src/cloudsync.h @@ -21,7 +21,30 @@ #define CLOUDSYNC_RLS_RESTRICTED_VALUE "__[RLS]__" #define CLOUDSYNC_DISABLE_ROWIDONLY_TABLES 1 +typedef enum { + CLOUDSYNC_PAYLOAD_APPLY_WILL_APPLY = 1, + CLOUDSYNC_PAYLOAD_APPLY_DID_APPLY = 2, + CLOUDSYNC_PAYLOAD_APPLY_CLEANUP = 3 +} CLOUDSYNC_PAYLOAD_APPLY_STEPS; + +typedef struct { + sqlite3_stmt *vm; + char *tbl; + int64_t tbl_len; + const void *pk; + int64_t pk_len; + char *col_name; + int64_t col_name_len; + int64_t col_version; + int64_t db_version; + const void *site_id; + int64_t site_id_len; + int64_t cl; + int64_t seq; +} cloudsync_pk_decode_bind_context; + typedef struct cloudsync_context cloudsync_context; +typedef bool (*cloudsync_payload_apply_callback_t)(void **xdata, cloudsync_pk_decode_bind_context *decoded_change, sqlite3 *db, cloudsync_context *data, int step, int rc); int sqlite3_cloudsync_init (sqlite3 *db, char **pzErrMsg, const sqlite3_api_routines *pApi); bool cloudsync_config_exists (sqlite3 *db); @@ -32,5 +55,7 @@ void cloudsync_sync_table_key (cloudsync_context *data, const char *table, const void *cloudsync_get_auxdata (sqlite3_context *context); void cloudsync_set_auxdata (sqlite3_context *context, void *xdata); int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int blen); +void cloudsync_set_payload_apply_callback(sqlite3 *db, cloudsync_payload_apply_callback_t callback); +sqlite3_stmt *cloudsync_col_value_stmt (sqlite3 *db, cloudsync_context *data, const char *tbl_name, bool *persistent); #endif diff --git a/src/dbutils.c b/src/dbutils.c index 263d55d..d5c2f00 100644 --- a/src/dbutils.c +++ b/src/dbutils.c @@ -416,6 +416,15 @@ bool dbutils_table_sanity_check (sqlite3 *db, sqlite3_context *context, const ch } } + // check for columns declared as NOT NULL without a DEFAULT value. + // Otherwise, col_merge_stmt would fail if changes to other columns are inserted first. + sql = sqlite3_snprintf((int)blen, buffer, "SELECT count(*) FROM pragma_table_info('%w') WHERE pk=0 AND \"notnull\"=1 AND \"dflt_value\" IS NULL;", name); + sqlite3_int64 count3 = dbutils_int_select(db, sql); + if (count3 > 0) { + dbutils_context_result_error(context, "All non-primary key columns declared as NOT NULL must have a DEFAULT value. (table %s)", name); + return false; + } + return true; } diff --git a/src/utils.c b/src/utils.c index def305c..49bd7a9 100644 --- a/src/utils.c +++ b/src/utils.c @@ -126,10 +126,9 @@ void *cloudsync_memory_zeroalloc (uint64_t size) { return ptr; } -char *cloudsync_string_dup (const char *str, bool lowercase) { +char *cloudsync_string_ndup (const char *str, size_t len, bool lowercase) { if (str == NULL) return NULL; - size_t len = strlen(str); char *s = (char *)cloudsync_memory_alloc((sqlite3_uint64)(len + 1)); if (!s) return NULL; @@ -148,6 +147,20 @@ char *cloudsync_string_dup (const char *str, bool lowercase) { return s; } +char *cloudsync_string_dup (const char *str, bool lowercase) { + if (str == NULL) return NULL; + + size_t len = strlen(str); + return cloudsync_string_ndup(str, len, lowercase); +} + +int cloudsync_blob_compare(const char *blob1, size_t size1, const char *blob2, size_t size2) { + if (size1 != size2) { + return (int)(size1 - size2); // Blobs are different if sizes are different + } + return memcmp(blob1, blob2, size1); // Use memcmp for byte-by-byte comparison +} + void cloudsync_rowid_decode (sqlite3_int64 rowid, sqlite3_int64 *db_version, sqlite3_int64 *seq) { // use unsigned 64-bit integer for intermediate calculations // when db_version is large enough, it can cause overflow, leading to negative values diff --git a/src/utils.h b/src/utils.h index ac0a1ed..41a2ac6 100644 --- a/src/utils.h +++ b/src/utils.h @@ -129,7 +129,10 @@ char *cloudsync_string_replace_prefix(const char *input, char *prefix, char *rep uint64_t fnv1a_hash(const char *data, size_t len); void *cloudsync_memory_zeroalloc (uint64_t size); +char *cloudsync_string_ndup (const char *str, size_t len, bool lowercase); char *cloudsync_string_dup (const char *str, bool lowercase); +int cloudsync_blob_compare(const char *blob1, size_t size1, const char *blob2, size_t size2); + void cloudsync_rowid_decode (sqlite3_int64 rowid, sqlite3_int64 *db_version, sqlite3_int64 *seq); #endif diff --git a/test/unit.c b/test/unit.c index 2d3a691..4512bb2 100644 --- a/test/unit.c +++ b/test/unit.c @@ -252,6 +252,16 @@ sqlite3 *close_db (sqlite3 *db) { return NULL; } +int close_db_v2 (sqlite3 *db) { + int counter = 0; + if (db) { + sqlite3_exec(db, "SELECT cloudsync_terminate();", NULL, NULL, NULL); + counter = dbutils_debug_stmt(db, true); + sqlite3_close(db); + } + return counter; +} + bool file_delete (const char *path) { #ifdef _WIN32 if (DeleteFile(path) == 0) return false; @@ -264,6 +274,142 @@ bool file_delete (const char *path) { // MARK: - +#ifndef UNITTEST_OMIT_RLS_VALIDATION +typedef struct { + bool in_savepoint; + bool is_approved; + bool last_is_delete; + char *last_tbl; + void *last_pk; + int64_t last_pk_len; + int64_t last_db_version; +} unittest_payload_apply_rls_status; + +bool unittest_validate_changed_row(sqlite3 *db, cloudsync_context *data, char *tbl_name, void *pk, int64_t pklen) { + // verify row + bool ret = false; + bool vm_persistent; + sqlite3_stmt *vm = cloudsync_col_value_stmt(db, data, tbl_name, &vm_persistent); + if (!vm) goto cleanup; + + // bind primary key values (the return code is the pk count) + int rc = pk_decode_prikey((char *)pk, (size_t)pklen, pk_decode_bind_callback, (void *)vm); + if (rc < 0) goto cleanup; + + // execute vm + rc = sqlite3_step(vm); + if (rc == SQLITE_DONE) { + rc = SQLITE_OK; + } else if (rc == SQLITE_ROW) { + rc = SQLITE_OK; + ret = true; + } + +cleanup: + if (vm_persistent) sqlite3_reset(vm); + else sqlite3_finalize(vm); + + return ret; +} + +int unittest_payload_apply_reset_transaction(sqlite3 *db, unittest_payload_apply_rls_status *s, bool create_new) { + int rc = SQLITE_OK; + + if (s->in_savepoint == true) { + if (s->is_approved) rc = sqlite3_exec(db, "RELEASE unittest_payload_apply_transaction", NULL, NULL, NULL); + else rc = sqlite3_exec(db, "ROLLBACK TO unittest_payload_apply_transaction; RELEASE unittest_payload_apply_transaction", NULL, NULL, NULL); + if (rc == SQLITE_OK) s->in_savepoint = false; + } + if (create_new) { + rc = sqlite3_exec(db, "SAVEPOINT unittest_payload_apply_transaction", NULL, NULL, NULL); + if (rc == SQLITE_OK) s->in_savepoint = true; + } + return rc; +} + +bool unittest_payload_apply_rls_callback(void **xdata, cloudsync_pk_decode_bind_context *d, sqlite3 *db, cloudsync_context *data, int step, int rc) { + unittest_payload_apply_rls_status *s; + if (*xdata) { + s = (unittest_payload_apply_rls_status *)*xdata; + } else { + s = cloudsync_memory_zeroalloc(sizeof(unittest_payload_apply_rls_status)); + s->is_approved = true; + *xdata = s; + } + + switch (step) { + case CLOUDSYNC_PAYLOAD_APPLY_WILL_APPLY: { + // if the tbl name or the prikey has changed, then verify if the row is valid + // must use strncmp because strings in xdata are not zero-terminated + bool tbl_changed = (s->last_tbl && (strlen(s->last_tbl) != (size_t)d->tbl_len || strncmp(s->last_tbl, d->tbl, (size_t)d->tbl_len) != 0)); + bool pk_changed = (s->last_pk && d->pk && cloudsync_blob_compare(s->last_pk, s->last_pk_len, d->pk, d->pk_len) != 0); + if (s->is_approved + && !s->last_is_delete + && (tbl_changed || pk_changed)) { + s->is_approved = unittest_validate_changed_row(db, data, s->last_tbl, s->last_pk, s->last_pk_len); + } + + s->last_is_delete = ((size_t)d->col_name_len == strlen(CLOUDSYNC_TOMBSTONE_VALUE) && + strncmp(d->col_name, CLOUDSYNC_TOMBSTONE_VALUE, (size_t)d->col_name_len) == 0 + ) && d->cl % 2 == 0; + + // update the last_tbl value, if needed + if (!s->last_tbl || + !d->tbl || + (strlen(s->last_tbl) != (size_t)d->tbl_len) || + strncmp(s->last_tbl, d->tbl, (size_t)d->tbl_len) != 0) { + if (s->last_tbl) cloudsync_memory_free(s->last_tbl); + if (d->tbl && d->tbl_len > 0) s->last_tbl = cloudsync_string_ndup(d->tbl, d->tbl_len, false); + else s->last_tbl = NULL; + } + + // update the last_prikey and len values, if needed + if (!s->last_pk || !d->pk || cloudsync_blob_compare(s->last_pk, s->last_pk_len, d->pk, d->pk_len) != 0) { + if (s->last_pk) cloudsync_memory_free(s->last_pk); + if (d->pk && d->pk_len > 0) { + s->last_pk = cloudsync_memory_alloc(d->pk_len); + memcpy(s->last_pk, d->pk, d->pk_len); + s->last_pk_len = d->pk_len; + } else { + s->last_pk = NULL; + s->last_pk_len = 0; + } + } + + // commit the previous transaction, if any + // begin new transacion, if needed + if (s->last_db_version != d->db_version) { + rc = unittest_payload_apply_reset_transaction(db, s, true); + if (rc != SQLITE_OK) printf("unittest_payload_apply error in reset_transaction: (%d) %s\n", rc, sqlite3_errmsg(db)); + + // reset local variables + s->last_db_version = d->db_version; + s->is_approved = true; + } + break; + } + case CLOUDSYNC_PAYLOAD_APPLY_DID_APPLY: + break; + case CLOUDSYNC_PAYLOAD_APPLY_CLEANUP: + if (s->is_approved && !s->last_is_delete) s->is_approved = unittest_validate_changed_row(db, data, s->last_tbl, s->last_pk, s->last_pk_len); + rc = unittest_payload_apply_reset_transaction(db, s, false); + if (s->last_tbl) cloudsync_memory_free(s->last_tbl); + if (s->last_pk) { + cloudsync_memory_free(s->last_pk); + s->last_pk_len = 0; + } + + cloudsync_memory_free(s); + *xdata = NULL; + break; + } + + return s->is_approved; +} +#endif + +// MARK: - + #ifndef CLOUDSYNC_OMIT_PRINT_RESULT int do_query_cb (void *type, int argc, char **argv, char **azColName) { int query_type = 0; @@ -1523,11 +1669,14 @@ bool do_test_dbutils (void) { // manually load extension sqlite3_cloudsync_init(db, NULL, NULL); + cloudsync_set_payload_apply_callback(db, unittest_payload_apply_rls_callback); const char *sql = "CREATE TABLE IF NOT EXISTS foo (name TEXT PRIMARY KEY NOT NULL, age INTEGER, note TEXT, stamp TEXT DEFAULT CURRENT_TIME);" "CREATE TABLE IF NOT EXISTS bar (name TEXT PRIMARY KEY NOT NULL, age INTEGER, note TEXT, stamp TEXT DEFAULT CURRENT_TIME);" "CREATE TABLE IF NOT EXISTS rowid_table (name TEXT, age INTEGER);" - "CREATE TABLE IF NOT EXISTS nonnull_prikey_table (name TEXT PRIMARY KEY, age INTEGER);"; + "CREATE TABLE IF NOT EXISTS nonnull_prikey_table (name TEXT PRIMARY KEY, age INTEGER);" + "CREATE TABLE IF NOT EXISTS nonnull_nodefault_table (name TEXT PRIMARY KEY NOT NULL, stamp TEXT NOT NULL);" + "CREATE TABLE IF NOT EXISTS nonnull_default_table (name TEXT PRIMARY KEY NOT NULL, stamp TEXT NOT NULL DEFAULT CURRENT_TIME);"; rc = sqlite3_exec(db, sql, NULL, NULL, NULL); if (rc != SQLITE_OK) goto finalize; @@ -1584,6 +1733,10 @@ bool do_test_dbutils (void) { if (b == true) goto finalize; b = dbutils_table_sanity_check(db, NULL, "nonnull_prikey_table"); if (b == true) goto finalize; + b = dbutils_table_sanity_check(db, NULL, "nonnull_nodefault_table"); + if (b == true) goto finalize; + b = dbutils_table_sanity_check(db, NULL, "nonnull_default_table"); + if (b == false) goto finalize; // create huge dummy_table table rc = sqlite3_exec(db, build_huge_table(), NULL, NULL, NULL); @@ -2003,7 +2156,8 @@ sqlite3 *do_create_database (void) { // manually load extension sqlite3_cloudsync_init(db, NULL, NULL); - + cloudsync_set_payload_apply_callback(db, unittest_payload_apply_rls_callback); + return db; } @@ -2028,9 +2182,12 @@ sqlite3 *do_create_database_file (int i, time_t timestamp, int ntest) { return NULL; } + sqlite3_exec(db, "PRAGMA journal_mode=WAL;", NULL, NULL, NULL); + // manually load extension sqlite3_cloudsync_init(db, NULL, NULL); - + cloudsync_set_payload_apply_callback(db, unittest_payload_apply_rls_callback); + return db; } @@ -2093,8 +2250,24 @@ bool do_test_merge (int nclients, bool print_result, bool cleanup_databases) { finalize: for (int i=0; i 0) { + result = false; + printf("do_test_merge error: db %d has %d unterminated statements\n", i, counter); + } + } + if (cleanup_databases) { char buf[256]; do_build_database_path(buf, i, timestamp, saved_counter++); @@ -2171,12 +2344,12 @@ bool do_test_merge_2 (int nclients, int table_mask, bool print_result, bool clea // deleta data in the first customer do_delete(db[0], table_mask, print_result); - + // merge all changes if (do_merge(db, nclients, false) == false) { goto finalize; } - + // compare results for (int i=1; i 0) { + result = false; + printf("do_test_merge error: db %d has %d unterminated statements\n", i, counter); + } + } if (cleanup_databases) { char buf[256]; do_build_database_path(buf, i, timestamp, saved_counter++); @@ -2939,7 +3123,20 @@ bool do_test_fill_initial_data(int nclients, bool print_result, bool cleanup_dat finalize: for (int i=0; i 0) { + result = false; + printf("do_test_merge error: db %d has %d unterminated statements\n", i, counter); + } + } + if (cleanup_databases) { char buf[256]; do_build_database_path(buf, i, timestamp, saved_counter++); @@ -3081,7 +3278,8 @@ int main(int argc, const char * argv[]) { // manually load extension sqlite3_cloudsync_init(db, NULL, NULL); - + cloudsync_set_payload_apply_callback(db, unittest_payload_apply_rls_callback); + printf("Testing CloudSync version %s\n", CLOUDSYNC_VERSION); printf("===============================\n");