diff --git a/docs/advancedQueries.md b/docs/advancedQueries.md index ed6d4dd1..9b8f236c 100644 --- a/docs/advancedQueries.md +++ b/docs/advancedQueries.md @@ -124,6 +124,16 @@ int32_t selVal = 200; embedDBOperator* selectOp2 = createSelectionOperator(scanOp, 3, SELECT_GTE, &selVal); ``` +### Sorting + +Performs an `ORDER BY col comparator` on the output of an operator. Can support any comparison function that the user creates and wants to use, and will use an adaptive sort that will utilize one of flash minsort, flash minsort sublist, or no output buffer heap sort. + +The following will sort on column 1 (zero-indexed), and the limit for debugging is set to off (-1). + +```c +embedDBOperator* orderByOp1 = createOrderByOperator(state, prevOp, 1, -1, int32comparator); +``` + ### Aggregate Functions This operator allows you to run a `GROUP BY` and perform an aggregate function on each group. In order to use this operator, you will need another type of object: `embedDBAggregateFunc`. The output of an aggregate operator is dictated by the list of `embedDBAggregateFunc` provided to `createAggregateOperator()`. diff --git a/src/query-interface/advancedQueries.c b/src/query-interface/advancedQueries.c index 0b085f05..db544dcd 100644 --- a/src/query-interface/advancedQueries.c +++ b/src/query-interface/advancedQueries.c @@ -530,15 +530,6 @@ void closeOrderBy(embedDBOperator* op) { op->recordBuffer = NULL; } -/** - * @brief Create an operator that will reorder records based on a given direction - * - * @param dbState The database state - * @param input The operator that this operator can pull records from - * @param colNum The column that is being sorted on - * @param limit The first values to be read and sorted - not like a true limit at the moment - * @param compareFn The function being used to make comparisons between row data - */ embedDBOperator* createOrderByOperator(embedDBState* dbState, embedDBOperator* input, int8_t colNum, int32_t limit, int8_t (*compareFn)(void* a, void* b)) { if (input == NULL || dbState == NULL || compareFn == NULL || colNum < 0) { #ifdef PRINT_ERRORS diff --git a/src/query-interface/advancedQueries.h b/src/query-interface/advancedQueries.h index c1d864cb..50f0785f 100644 --- a/src/query-interface/advancedQueries.h +++ b/src/query-interface/advancedQueries.h @@ -185,6 +185,7 @@ embedDBOperator* createKeyJoinOperator(embedDBOperator* input1, embedDBOperator* * @param dbState The database state * @param input The operator that this operator can pull records from * @param colNum The column that is being sorted on + * @param limit The first values to be read and sorted - not like a true limit at the moment * @param compareFn The function being used to make comparisons between row data */ embedDBOperator* createOrderByOperator(embedDBState* dbState, embedDBOperator* input, int8_t colNum, int32_t limit, int8_t (*compareFn)(void* a, void* b)); diff --git a/test/test_group_by/test_group_by.cpp b/test/test_group_by/test_group_by.cpp new file mode 100644 index 00000000..4bf235a4 --- /dev/null +++ b/test/test_group_by/test_group_by.cpp @@ -0,0 +1,326 @@ +/*****************************************************************************/ +/** + * @file Test_group_by.cpp + * @author EmbedDB Team (See Authors.md) + * @brief Test for EmbedDB advanced queries + * @copyright Copyright 2024 + * EmbedDB Team + * @par Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * @par 1.Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * @par 2.Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * @par 3.Neither the name of the copyright holder nor the names of its contributors + * may be used to endorse or promote products derived from this software without + * specific prior written permission. + * + * @par THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +/******************************************************************************/ + +#include + +#ifdef ARDUINO +#include +#endif + +#ifdef DIST +#include "embedDB.h" +#else +#include "embedDB/embedDB.h" +#include "embedDBUtility.h" +#include "query-interface/advancedQueries.h" +#endif + +#if defined(MEMBOARD) +#include "memboardTestSetup.h" +#endif + +#if defined(MEGA) +#include "megaTestSetup.h" +#endif + +#if defined(DUE) +#include "dueTestSetup.h" +#endif + +#include "unity.h" + +#ifdef ARDUINO +#include "SDFileInterface.h" +#define FILE_TYPE SD_FILE +#define getFileInterface getSDInterface +#define setupFile setupSDFile +#define tearDownFile tearDownSDFile +#define JOIN_FILE "expected_join_output.bin" +#define DATA_PATH_UWA "dataFileUWA.bin" +#define INDEX_PATH_UWA "indexFileUWA.bin" +#define DATA_PATH_SEA "dataFileSEA.bin" +#define INDEX_PATH_SEA "indexFileSEA.bin" +#else +#include + +#include "desktopFileInterface.h" +#define FILE_TYPE FILE +#define JOIN_FILE "data/expected_join_output.bin" +#define DATA_PATH_UWA "build/artifacts/dataFileUWA.bin" +#define INDEX_PATH_UWA "build/artifacts/indexFileUWA.bin" +#define DATA_PATH_SEA "build/artifacts/dataFileSEA.bin" +#define INDEX_PATH_SEA "build/artifacts/indexFileSEA.bin" +#endif + +typedef struct DataSource { + FILE_TYPE* fp; + int8_t* pageBuffer; + int32_t pageRecord; +} DataSource; + +void insertData(embedDBState* state, const char* filename); +void* nextRecord(DataSource* source); +int8_t sameTempGroup(const void* lastRecord, const void* record); +void writeTempGroup(embedDBAggregateFunc* aggFunc, embedDBSchema* schema, void* recordBuffer, const void* lastRecord); + +embedDBState* stateSEA; +embedDBSchema* baseSchema; +DataSource* seaData; + +void setUpEmbedDB() { + // Init state for SEA dataset + stateSEA = (embedDBState*)calloc(1, sizeof(embedDBState)); + stateSEA->keySize = 4; + stateSEA->dataSize = 12; + stateSEA->compareKey = int32Comparator; + stateSEA->compareData = int32Comparator; + stateSEA->pageSize = 512; + stateSEA->eraseSizeInPages = 4; + stateSEA->numDataPages = 20000; + stateSEA->numIndexPages = 1000; + stateSEA->numSplinePoints = 120; + + /* Setup file IO for SEA data*/ + char dataPath2[] = DATA_PATH_SEA, indexPath2[] = INDEX_PATH_SEA; + stateSEA->fileInterface = getFileInterface(); + stateSEA->dataFile = setupFile(dataPath2); + stateSEA->indexFile = setupFile(indexPath2); + + stateSEA->bufferSizeInBlocks = 4; + stateSEA->buffer = malloc(stateSEA->bufferSizeInBlocks * stateSEA->pageSize); + stateSEA->parameters = EMBEDDB_USE_BMAP | EMBEDDB_USE_INDEX | EMBEDDB_RESET_DATA; + stateSEA->bitmapSize = 2; + stateSEA->inBitmap = inBitmapInt16; + stateSEA->updateBitmap = updateBitmapInt16; + stateSEA->buildBitmapFromRange = buildBitmapInt16FromRange; + embedDBInit(stateSEA, 1); + + /* insert SEA data */ + const char seaDatafileName[] = "data/sea100K.bin"; + insertData(stateSEA, seaDatafileName); + + // Init base schema + int8_t colSizes[] = {4, 4, 4, 4}; + int8_t colSignedness[] = {embedDB_COLUMN_UNSIGNED, embedDB_COLUMN_SIGNED, embedDB_COLUMN_SIGNED, embedDB_COLUMN_SIGNED}; + ColumnType colTypes[] = {embedDB_COLUMN_UINT32, embedDB_COLUMN_INT32, embedDB_COLUMN_INT32, embedDB_COLUMN_INT32}; + baseSchema = embedDBCreateSchema(4, colSizes, colSignedness, colTypes); + + seaData = (DataSource*)malloc(sizeof(DataSource)); + seaData->fp = fopen("data/sea100K.bin", "rb"); + seaData->pageBuffer = (int8_t*)calloc(1, 512); + seaData->pageRecord = 0; +} + +void test_aggregate() { + embedDBIterator it; + it.minKey = NULL; + it.maxKey = NULL; + it.minData = NULL; + it.maxData = NULL; + embedDBInitIterator(stateSEA, &it); + + embedDBOperator* scanOp = createTableScanOperator(stateSEA, &it, baseSchema); + int32_t selVal = 100; + embedDBOperator* selectOp = createSelectionOperator(scanOp, 3, SELECT_GTE, &selVal); + + embedDBOperator* sortOp = createOrderByOperator(stateSEA, selectOp, 1, -1, int32Comparator); + + embedDBAggregateFunc groupName = {NULL, NULL, writeTempGroup, NULL, 4}; + embedDBAggregateFunc* counter = createCountAggregate(); + embedDBAggregateFunc* maxWind = createMaxAggregate(3, -4); + embedDBAggregateFunc* avgWind = createAvgAggregate(3, 4); + embedDBAggregateFunc* sum = createSumAggregate(2); + embedDBAggregateFunc aggFunctions[] = {groupName, *counter, *maxWind, *avgWind, *sum}; + uint32_t functionsLength = 5; + + typedef struct { + uint32_t count; + int32_t maxWind; + int32_t sum; + int32_t minTemp; + int32_t avgSum; + } Truth; + + Truth* answers = (Truth*)calloc(1000, sizeof(Truth)); + uint32_t expectedGroups = 0; + + fseek(seaData->fp, 0, SEEK_SET); + seaData->pageRecord = 0; + + void* rawRec; + while ((rawRec = nextRecord(seaData)) != NULL) { + int32_t* r = (int32_t*)rawRec; + if (r[3] < selVal) continue; + + int32_t temp = r[1]; + if (answers[temp].count == 0) expectedGroups++; + answers[temp].count++; + if (r[3] > answers[temp].maxWind || answers[temp].count == 1) answers[temp].maxWind = r[3]; + if (r[1] < answers[temp].minTemp || answers[temp].count == 1) answers[temp].minTemp = r[1]; + answers[temp].sum += r[2]; + answers[temp].avgSum += r[3]; + } + + embedDBOperator* aggOp = createAggregateOperator(sortOp, sameTempGroup, aggFunctions, functionsLength); + aggOp->init(aggOp); + + int32_t groupsFound = 0; + while (exec(aggOp)) { + groupsFound++; + int32_t* res = (int32_t*)aggOp->recordBuffer; +#ifdef DEBUG + printf("RAW RECORD: "); + for (int i = 0; i < 8; i++) { + printf("[%d]: %ld ", i, (long)res[i]); + } + printf("\n"); + fflush(stdout); +#endif + int32_t label = res[0]; + + TEST_ASSERT_TRUE_MESSAGE(label >= 0 && label < 1200, "DB returned a label out of range"); + TEST_ASSERT_TRUE_MESSAGE(answers[label].count > 0, "DB returned a group that shouldn't exist"); + + TEST_ASSERT_EQUAL_UINT32_MESSAGE(answers[label].count, res[1], "Count is wrong"); + + TEST_ASSERT_EQUAL_INT32_MESSAGE(answers[label].maxWind, res[2], "Max Wind is wrong"); + + float expectedAvg = (float)answers[label].avgSum / answers[label].count; + + float actualAvg = ((float*)res)[3]; + + TEST_ASSERT_EQUAL_FLOAT_MESSAGE(expectedAvg, actualAvg, "Average mismatch"); + + TEST_ASSERT_EQUAL_INT32_MESSAGE((int32_t)answers[label].sum, res[4], "Sum is wrong"); + } + + TEST_ASSERT_EQUAL_INT32_MESSAGE(expectedGroups, groupsFound, "Number of unique groups mismatch"); + + free(answers); + + // Free states + for (uint32_t i = 0; i < functionsLength; i++) { + if (aggFunctions[i].state != NULL) { + free(aggFunctions[i].state); + } + } + + aggOp->close(aggOp); + embedDBFreeOperatorRecursive(&aggOp); +} + +void tearDown(void) { + embedDBClose(stateSEA); + tearDownFile(stateSEA->dataFile); + tearDownFile(stateSEA->indexFile); + free(stateSEA->fileInterface); + free(stateSEA->buffer); + free(stateSEA); + + embedDBFreeSchema(&baseSchema); + + fclose(seaData->fp); + free(seaData->pageBuffer); + free(seaData); +} + +int runUnityTests(void) { + UNITY_BEGIN(); + + RUN_TEST(test_aggregate); + + return UNITY_END(); +} + +void insertData(embedDBState* state, const char* filename) { + FILE_TYPE* fp = fopen(filename, "rb"); + char fileBuffer[512]; + int numRecords = 0; + while (fread(fileBuffer, state->pageSize, 1, fp)) { + uint16_t count = EMBEDDB_GET_COUNT(fileBuffer); + for (int i = 1; i <= count; i++) { + embedDBPut(state, fileBuffer + i * state->recordSize, fileBuffer + i * state->recordSize + state->keySize); + numRecords++; + } + } + fclose(fp); + embedDBFlush(state); +} + +void* nextRecord(DataSource* source) { + uint16_t count = EMBEDDB_GET_COUNT(source->pageBuffer); + if (count <= source->pageRecord) { + // Read next page + if (!fread(source->pageBuffer, 512, 1, source->fp)) { + return NULL; + } + source->pageRecord = 0; + } + return source->pageBuffer + ++source->pageRecord * 16; +} + +int8_t sameTempGroup(const void* lastRecord, const void* record) { + int32_t t1 = *(int32_t*)((char*)lastRecord + 4); + int32_t t2 = *(int32_t*)((char*)record + 4); + return t1 == t2; +} + +void writeTempGroup(embedDBAggregateFunc* aggFunc, embedDBSchema* schema, void* recordBuffer, const void* lastRecord) { + int32_t temp = *(int32_t*)((char*)lastRecord + 4); + uint16_t offset = getColOffsetFromSchema(schema, aggFunc->colNum); + memcpy((int8_t*)recordBuffer + offset, &temp, sizeof(int32_t)); +} + +void setUp() { + setUpEmbedDB(); +} + +#ifdef ARDUINO + +void setup() { + delay(2000); + setupBoard(); + runUnityTests(); +} + +void loop() {} + +#else + +int main() { + return runUnityTests(); +} + +#endif diff --git a/test/test_sort/test_sort_query_interface.cpp b/test/test_sort/test_sort_query_interface.cpp index 3f43f220..6636b553 100644 --- a/test/test_sort/test_sort_query_interface.cpp +++ b/test/test_sort/test_sort_query_interface.cpp @@ -115,27 +115,29 @@ void insertData(embedDBState* state, const char* filename) { printf("\nInserted %d Records\n", numRecords); } -void insertNValues(embedDBState* state, int n, int mode) { - int key, value; +void insertNValues(embedDBState* state, int32_t n, int8_t mode) { + int32_t key, value; switch (mode) { case 0: - for (int i = 0; i <= n; i++) { + for (int32_t i = 0; i <= n; i++) { key = i; value = i; embedDBPut(state, &key, &value); } break; case 1: - key = 1; - for (int i = n; i >= 0; i--) { + key = 0; + for (int32_t i = n; i >= 0; i--) { value = i; embedDBPut(state, &key, &value); key++; } - for (int i = 0, data = n; i <= n; i++) { - key = i + 1; - embedDBGet(state, (void*)&key, (void*)&value); + int8_t valueBuffer[12]; + for (int32_t i = 0, data = n; i <= n; i++) { + key = i; + embedDBGet(state, (void*)&key, (void*)&valueBuffer); + value = *(int32_t*)valueBuffer; TEST_ASSERT_MESSAGE(value == data, "value isn't equal to extracted data"); data--; } @@ -206,9 +208,11 @@ void runTestUsingSEA100k() { // debugBinData(scanOpOrderBy, 200, 0); uint8_t projColsOB[] = {0, 1}; embedDBOperator* projColsOrderBy = createProjectionOperator(scanOpOrderBy, 2, projColsOB); + // int32_t selVal = 100; + // embedDBOperator* selectOp = createSelectionOperator(scanOpOrderBy, 3, SELECT_GTE, &selVal); // debugBinData(projColsOrderBy, 300, 1); embedDBOperator* orderByOp = createOrderByOperator(state, projColsOrderBy, 1, -1, int32Comparator); - // debugBinData(orderByOp, 100000, 1); + // debugBinData(orderByOp, 100, 1); orderByOp->init(orderByOp); @@ -248,4 +252,4 @@ int main() { return runUnityTests(); } -#endif \ No newline at end of file +#endif