diff --git a/2.0/src/query/inc/qExecutor.h b/2.0/src/query/inc/qExecutor.h index 0c0e3363c8..13512d2e10 100644 --- a/2.0/src/query/inc/qExecutor.h +++ b/2.0/src/query/inc/qExecutor.h @@ -622,7 +622,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); -void* destroyOutputBuf(SSDataBlock* pBlock); +void* destroySDataBlock(SSDataBlock* pBlock); void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); diff --git a/2.0/src/query/src/qExecutor.c b/2.0/src/query/src/qExecutor.c index fdadf39d4d..6a066338d6 100644 --- a/2.0/src/query/src/qExecutor.c +++ b/2.0/src/query/src/qExecutor.c @@ -336,7 +336,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO return res; } -void* destroyOutputBuf(SSDataBlock* pBlock) { +void* destroySDataBlock(SSDataBlock* pBlock) { if (pBlock == NULL) { return NULL; } @@ -5373,7 +5373,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; taosArrayDestroy(pInfo->orderColumnList); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); tfree(pInfo->prevRow); } @@ -6566,7 +6566,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { tfree(pInfo->rowCellInfoOffset); cleanupResultRowInfo(&pInfo->resultRowInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); } static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { @@ -6590,7 +6590,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); tfree(pInfo->p); } @@ -6607,12 +6607,12 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { STagScanInfo* pInfo = (STagScanInfo*) param; - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); } static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; - pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); + pInfo->pDataBlock = destroySDataBlock(pInfo->pDataBlock); } static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { @@ -6625,7 +6625,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { taosHashCleanup(pInfo->pSet); tfree(pInfo->buf); taosArrayDestroy(pInfo->pDistinctDataInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); } SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { diff --git a/include/common/common.h b/include/common/common.h index 132d5cc92b..c33689d66f 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -16,31 +16,19 @@ #ifndef TDENGINE_COMMON_H #define TDENGINE_COMMON_H + +#ifdef __cplusplus +extern "C" { +#endif + #include "taosdef.h" #include "tmsg.h" #include "tarray.h" #include "tvariant.h" -//typedef struct STimeWindow { -// TSKEY skey; -// TSKEY ekey; -//} STimeWindow; - -//typedef struct { -// int32_t dataLen; -// char name[TSDB_TABLE_FNAME_LEN]; -// char *data; -//} STagData; - -//typedef struct SSchema { -// uint8_t type; -// char name[TSDB_COL_NAME_LEN]; -// int16_t colId; -// int16_t bytes; -//} SSchema; typedef struct { uint32_t numOfTables; - SArray *pGroupList; + SArray * pGroupList; SHashObj *map; // speedup acquire the tableQueryInfo by table uid } STableGroupInfo; @@ -63,7 +51,7 @@ typedef struct SDataBlockInfo { typedef struct SConstantItem { SColumnInfo info; - int32_t startRow; // run-length-encoding to save the space for multiple rows + int32_t startRow; // run-length-encoding to save the space for multiple rows int32_t endRow; SVariant value; } SConstantItem; @@ -71,58 +59,67 @@ typedef struct SConstantItem { // info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList); typedef struct SSDataBlock { SColumnDataAgg *pBlockAgg; - SArray *pDataBlock; // SArray - SArray *pConstantList; // SArray, it is a constant/tags value of the corresponding result value. - SDataBlockInfo info; + SArray * pDataBlock; // SArray + SArray *pConstantList; // SArray, it is a constant/tags value of the corresponding result value. + SDataBlockInfo info; } SSDataBlock; +typedef struct SVarColAttr { + int32_t *offset; // start position for each entry in the list + uint32_t length; // used buffer size that contain the valid data + uint32_t allocLen; // allocated buffer size +} SVarColAttr; + // pBlockAgg->numOfNull == info.rows, all data are null // pBlockAgg->numOfNull == 0, no data are null. typedef struct SColumnInfoData { - SColumnInfo info; // TODO filter info needs to be removed - char *nullbitmap;// - char *pData; // the corresponding block data in memory + SColumnInfo info; // TODO filter info needs to be removed + char *pData; // the corresponding block data in memory + union { + char *nullbitmap; // bitmap, one bit for each item in the list + SVarColAttr varmeta; + }; } SColumnInfoData; //====================================================================================================================== // the following structure shared by parser and executor typedef struct SColumn { - uint64_t uid; - char name[TSDB_COL_NAME_LEN]; - int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string) - SColumnInfo info; + uint64_t uid; + char name[TSDB_COL_NAME_LEN]; + int8_t flag; // column type: normal column, tag, or user-input column (integer/float/string) + SColumnInfo info; } SColumn; typedef struct SLimit { - int64_t limit; - int64_t offset; + int64_t limit; + int64_t offset; } SLimit; typedef struct SOrder { - uint32_t order; - SColumn col; + uint32_t order; + SColumn col; } SOrder; typedef struct SGroupbyExpr { - SArray* columnInfo; // SArray, group by columns information - bool groupbyTag; // group by tag or column + SArray *columnInfo; // SArray, group by columns information + bool groupbyTag; // group by tag or column } SGroupbyExpr; // the structure for sql function in select clause typedef struct SSqlExpr { - char token[TSDB_COL_NAME_LEN]; // original token - SSchema resSchema; + char token[TSDB_COL_NAME_LEN]; // original token + SSchema resSchema; - int32_t numOfCols; - SColumn* pColumns; // data columns that are required by query - int32_t interBytes; // inter result buffer size - int16_t numOfParams; // argument value of each function - SVariant param[3]; // parameters are not more than 3 + int32_t numOfCols; + SColumn *pColumns; // data columns that are required by query + int32_t interBytes; // inter result buffer size + int16_t numOfParams; // argument value of each function + SVariant param[3]; // parameters are not more than 3 } SSqlExpr; typedef struct SExprInfo { - struct SSqlExpr base; - struct tExprNode *pExpr; + struct SSqlExpr base; + struct tExprNode *pExpr; } SExprInfo; typedef struct SStateWindow { @@ -130,13 +127,19 @@ typedef struct SStateWindow { } SStateWindow; typedef struct SSessionWindow { - int64_t gap; // gap between two session window(in microseconds) + int64_t gap; // gap between two session window(in microseconds) SColumn col; } SSessionWindow; -#define QUERY_ASC_FORWARD_STEP 1 +#define QUERY_ASC_FORWARD_STEP 1 #define QUERY_DESC_FORWARD_STEP -1 #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) +void *destroySDataBlock(SSDataBlock *pBlock); + +#ifdef __cplusplus +} +#endif + #endif // TDENGINE_COMMON_H diff --git a/include/common/tep.h b/include/common/tep.h index 69dd385a37..7e90b46c3c 100644 --- a/include/common/tep.h +++ b/include/common/tep.h @@ -7,12 +7,18 @@ extern "C" { #include "os.h" #include "tmsg.h" +#include "common.h" typedef struct SCorEpSet { int32_t version; SEpSet epSet; } SCorEpSet; +typedef struct SBlockOrderInfo { + int32_t order; + int32_t colIndex; +} SBlockOrderInfo; + int taosGetFqdnPortFromEp(const char *ep, SEp *pEp); void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port); @@ -21,6 +27,30 @@ bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2); void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet); SEpSet getEpSet_s(SCorEpSet *pEpSet); +bool colDataIsNull_f(const char* bitmap, uint32_t row); +void colDataSetNull_f(char* bitmap, uint32_t row); + +bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg); + +char* colDataGet(SColumnInfoData* pColumnInfoData, uint32_t row); +int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2); +int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock); + +int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); +void colDataTrim(SColumnInfoData* pColumnInfoData); + +size_t colDataGetNumOfCols(const SSDataBlock* pBlock); +size_t colDataGetNumOfRows(const SSDataBlock* pBlock); + +int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); +int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize); +int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); + +size_t blockDataGetSize(const SSDataBlock* pBlock); +size_t blockDataGetRowSize(const SSDataBlock* pBlock); +int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); + #ifdef __cplusplus } #endif diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 3d57a5de1d..63067b9bd9 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -53,516 +53,516 @@ TEST(testCase, driverInit_Test) { // taos_init(); } -TEST(testCase, connect_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - if (pConn == NULL) { - printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); - } - taos_close(pConn); -} - -TEST(testCase, create_user_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, create_account_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, drop_account_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, show_user_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "show users"); - TAOS_ROW pRow = NULL; - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, drop_user_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "drop user abc"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, show_db_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "show databases"); - TAOS_ROW pRow = NULL; - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_close(pConn); -} - -TEST(testCase, create_db_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - } - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == NULL); - - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create database abc1 vgroups 4"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - } - taos_close(pConn); -} - -TEST(testCase, create_dnode_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000"); - if (taos_errno(pRes) != 0) { - printf("error in create dnode, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000"); - if (taos_errno(pRes) != 0) { - printf("failed to create dnode, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - taos_close(pConn); -} - -TEST(testCase, drop_dnode_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "drop dnode 3"); - if (taos_errno(pRes) != 0) { - printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); - } - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == NULL); - - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); - - pRes = taos_query(pConn, "drop dnode 4"); - if (taos_errno(pRes) != 0) { - printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, use_db_test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == NULL); - - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); - - taos_close(pConn); -} - -// TEST(testCase, drop_db_test) { +//TEST(testCase, connect_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// if (pConn == NULL) { +// printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); +// } +// taos_close(pConn); +//} +// +//TEST(testCase, create_user_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // -// showDB(pConn); -// -// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); -// if (taos_errno(pRes) != 0) { -// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); +// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); // } -// taos_free_result(pRes); // -// showDB(pConn); -// -// pRes = taos_query(pConn, "create database abc1"); -// if (taos_errno(pRes) != 0) { -// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); -// } // taos_free_result(pRes); // taos_close(pConn); //} - -TEST(testCase, create_stable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); - if (taos_errno(pRes) != 0) { - printf("error in create db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists abc1.st1(ts timestamp, k int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("error in create stable, reason:%s\n", taos_errstr(pRes)); - } - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == NULL); - - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); - taos_free_result(pRes); - - pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); - } - - pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - pRes = taos_query(pConn, "drop stable `123_$^)`"); - if (taos_errno(pRes) != 0) { - printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes)); - } - - taos_close(pConn); -} - -TEST(testCase, create_table_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); - ASSERT_EQ(taos_errno(pRes), 0); - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); - ASSERT_NE(taos_errno(pRes), 0); - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, create_ctable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int ) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create stable, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, show_stable_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != nullptr); - - TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); - if (taos_errno(pRes) != 0) { - printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, show_vgroup_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "show vgroups"); - if (taos_errno(pRes) != 0) { - printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, create_multiple_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to create db, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - taos_close(pConn); - return; - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("failed to use db, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - taos_close(pConn); - return; - } - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create stable tables, reason:%s\n", taos_errstr(pRes)); - } - - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - taos_free_result(pRes); - pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); - if (taos_errno(pRes) != 0) { - printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - - for (int32_t i = 0; i < 20; ++i) { - char sql[512] = {0}; - snprintf(sql, tListLen(sql), - "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, - (i + 1) * 30, (i + 2) * 40); - TAOS_RES* pres = taos_query(pConn, sql); - if (taos_errno(pres) != 0) { - printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); - } - taos_free_result(pres); - } - - taos_close(pConn); -} - -TEST(testCase, show_table_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "show tables"); - if (taos_errno(pRes) != 0) { - printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - } - - taos_free_result(pRes); - - pRes = taos_query(pConn, "show abc1.tables"); - if (taos_errno(pRes) != 0) { - printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - int32_t count = 0; - char str[512] = {0}; - - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%d: %s\n", ++count, str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -//TEST(testCase, drop_stable_Test) { +// +//TEST(testCase, create_account_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, drop_account_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, show_user_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "show users"); +// TAOS_ROW pRow = NULL; +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, drop_user_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "drop user abc"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, show_db_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "show databases"); +// TAOS_ROW pRow = NULL; +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_close(pConn); +//} +// +//TEST(testCase, create_db_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); +// if (taos_errno(pRes) != 0) { +// printf("error in create db, reason:%s\n", taos_errstr(pRes)); +// } +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// ASSERT_TRUE(pFields == NULL); +// +// int32_t numOfFields = taos_num_fields(pRes); +// ASSERT_EQ(numOfFields, 0); +// +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create database abc1 vgroups 4"); +// if (taos_errno(pRes) != 0) { +// printf("error in create db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_close(pConn); +//} +// +//TEST(testCase, create_dnode_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "create dnode abc1 port 7000"); +// if (taos_errno(pRes) != 0) { +// printf("error in create dnode, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create dnode 1.1.1.1 port 9000"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create dnode, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// taos_close(pConn); +//} +// +//TEST(testCase, drop_dnode_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "drop dnode 3"); +// if (taos_errno(pRes) != 0) { +// printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); +// } +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// ASSERT_TRUE(pFields == NULL); +// +// int32_t numOfFields = taos_num_fields(pRes); +// ASSERT_EQ(numOfFields, 0); +// +// pRes = taos_query(pConn, "drop dnode 4"); +// if (taos_errno(pRes) != 0) { +// printf("error in drop dnode, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, use_db_test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// if (taos_errno(pRes) != 0) { +// printf("error in use db, reason:%s\n", taos_errstr(pRes)); +// } +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// ASSERT_TRUE(pFields == NULL); +// +// int32_t numOfFields = taos_num_fields(pRes); +// ASSERT_EQ(numOfFields, 0); +// +// taos_close(pConn); +//} +// +//// TEST(testCase, drop_db_test) { +//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +//// assert(pConn != NULL); +//// +//// showDB(pConn); +//// +//// TAOS_RES* pRes = taos_query(pConn, "drop database abc1"); +//// if (taos_errno(pRes) != 0) { +//// printf("failed to drop db, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +//// +//// showDB(pConn); +//// +//// pRes = taos_query(pConn, "create database abc1"); +//// if (taos_errno(pRes) != 0) { +//// printf("create to drop db, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +//// taos_close(pConn); +////} +// +//TEST(testCase, create_stable_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); +// if (taos_errno(pRes) != 0) { +// printf("error in create db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists abc1.st1(ts timestamp, k int) tags(a int)"); +// if (taos_errno(pRes) != 0) { +// printf("error in create stable, reason:%s\n", taos_errstr(pRes)); +// } +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// ASSERT_TRUE(pFields == NULL); +// +// int32_t numOfFields = taos_num_fields(pRes); +// ASSERT_EQ(numOfFields, 0); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); +// } +// +// pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// pRes = taos_query(pConn, "drop stable `123_$^)`"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop super table 123_$^), reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_close(pConn); +//} +// +//TEST(testCase, create_table_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); +// ASSERT_EQ(taos_errno(pRes), 0); +// +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); +// ASSERT_NE(taos_errno(pRes), 0); +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, create_ctable_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int ) tags(a int)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create stable, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, show_stable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != nullptr); // +// TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); +// if (taos_errno(pRes) != 0) { +// printf("failed to show stables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, show_vgroup_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "show vgroups"); +// if (taos_errno(pRes) != 0) { +// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, create_multiple_tables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// // TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); // if (taos_errno(pRes) != 0) { -// printf("error in creating db, reason:%s\n", taos_errstr(pRes)); +// printf("failed to create db, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// taos_close(pConn); +// return; // } // taos_free_result(pRes); // // pRes = taos_query(pConn, "use abc1"); // if (taos_errno(pRes) != 0) { -// printf("error in using db, reason:%s\n", taos_errstr(pRes)); +// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// taos_close(pConn); +// return; // } +// // taos_free_result(pRes); // -// pRes = taos_query(pConn, "drop stable st1"); +// pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)"); // if (taos_errno(pRes) != 0) { -// printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); +// printf("failed to create stable tables, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// taos_free_result(pRes); +// pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// +// for (int32_t i = 0; i < 20; ++i) { +// char sql[512] = {0}; +// snprintf(sql, tListLen(sql), +// "create table t_x_%d using st1 tags(2) t_x_%d using st1 tags(5) t_x_%d using st1 tags(911)", i, +// (i + 1) * 30, (i + 2) * 40); +// TAOS_RES* pres = taos_query(pConn, sql); +// if (taos_errno(pres) != 0) { +// printf("failed to create table %d\n, reason:%s", i, taos_errstr(pres)); +// } +// taos_free_result(pres); +// } +// +// taos_close(pConn); +//} +// +//TEST(testCase, show_table_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "show tables"); +// if (taos_errno(pRes) != 0) { +// printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// } +// +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "show abc1.tables"); +// if (taos_errno(pRes) != 0) { +// printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// int32_t count = 0; +// char str[512] = {0}; +// +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%d: %s\n", ++count, str); // } // // taos_free_result(pRes); // taos_close(pConn); //} - -TEST(testCase, generated_request_id_test) { - SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - - for (int32_t i = 0; i < 50000; ++i) { - uint64_t v = generateRequestId(); - void* result = taosHashGet(phash, &v, sizeof(v)); - if (result != nullptr) { - printf("0x%lx, index:%d\n", v, i); - } - assert(result == nullptr); - taosHashPut(phash, &v, sizeof(v), NULL, 0); - } - - taosHashCleanup(phash); -} - -TEST(testCase, insert_test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create into table t_2, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -#if 0 +// +////TEST(testCase, drop_stable_Test) { +//// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +//// assert(pConn != nullptr); +//// +//// TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1"); +//// if (taos_errno(pRes) != 0) { +//// printf("error in creating db, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +//// +//// pRes = taos_query(pConn, "use abc1"); +//// if (taos_errno(pRes) != 0) { +//// printf("error in using db, reason:%s\n", taos_errstr(pRes)); +//// } +//// taos_free_result(pRes); +//// +//// pRes = taos_query(pConn, "drop stable st1"); +//// if (taos_errno(pRes) != 0) { +//// printf("failed to drop stable, reason:%s\n", taos_errstr(pRes)); +//// } +//// +//// taos_free_result(pRes); +//// taos_close(pConn); +////} +// +//TEST(testCase, generated_request_id_test) { +// SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); +// +// for (int32_t i = 0; i < 50000; ++i) { +// uint64_t v = generateRequestId(); +// void* result = taosHashGet(phash, &v, sizeof(v)); +// if (result != nullptr) { +// printf("0x%lx, index:%d\n", v, i); +// } +// assert(result == nullptr); +// taosHashPut(phash, &v, sizeof(v), NULL, 0); +// } +// +// taosHashCleanup(phash); +//} +// +//TEST(testCase, insert_test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create into table t_2, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//#if 0 TEST(testCase, create_topic_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -581,156 +581,156 @@ TEST(testCase, create_topic_Test) { taos_free_result(pRes); - char* sql = "select * from tu"; + char* sql = "select * from st1"; pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); taos_free_result(pRes); taos_close(pConn); } -TEST(testCase, tmq_subscribe_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "group.id", "tg1"); - tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); - - tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "test_topic_1"); - tmq_subscribe(tmq, topic_list); - - while (1) { - tmq_message_t* msg = tmq_consume_poll(tmq, 0); - printf("get msg\n"); - //if (msg == NULL) break; - } -} -#endif - -TEST(testCase, tmq_consume_Test) { -} - -TEST(testCase, tmq_commit_TEST) { -} - -TEST(testCase, projection_query_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tu using st1 tags(1)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - for(int32_t i = 0; i < 100000; ++i) { - char sql[512] = {0}; - sprintf(sql, "insert into tu values(now+%da, %d)", i, i); - TAOS_RES* p = taos_query(pConn, sql); - if (taos_errno(p) != 0) { - printf("failed to insert data, reason:%s\n", taos_errstr(p)); - } - - taos_free_result(p); - } - - pRes = taos_query(pConn, "select * from tu"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, projection_query_stables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "select ts from st1"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, agg_query_tables) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - ASSERT_NE(pConn, nullptr); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - taos_free_result(pRes); - - pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)"); - if (taos_errno(pRes) != 0) { - printf("failed to create table, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - pRes = taos_query(pConn, "select count(*) from t_x_19"); - if (taos_errno(pRes) != 0) { - printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - ASSERT_TRUE(false); - } - - TAOS_ROW pRow = NULL; - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - int32_t numOfFields = taos_num_fields(pRes); - - char str[512] = {0}; - while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - } - - taos_free_result(pRes); - taos_close(pConn); -} +//TEST(testCase, tmq_subscribe_Test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// assert(pConn != NULL); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// if (taos_errno(pRes) != 0) { +// printf("error in use db, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// tmq_conf_t* conf = tmq_conf_new(); +// tmq_conf_set(conf, "group.id", "tg1"); +// tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); +// +// tmq_list_t* topic_list = tmq_list_new(); +// tmq_list_append(topic_list, "test_topic_1"); +// tmq_subscribe(tmq, topic_list); +// +// while (1) { +// tmq_message_t* msg = tmq_consume_poll(tmq, 0); +// printf("get msg\n"); +// //if (msg == NULL) break; +// } +//} +//#endif +// +////TEST(testCase, tmq_consume_Test) { +////} +// +////TEST(testCase, tmq_commit_TEST) { +////} +// +//TEST(testCase, projection_query_tables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table tu using st1 tags(1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// for(int32_t i = 0; i < 100000; ++i) { +// char sql[512] = {0}; +// sprintf(sql, "insert into tu values(now+%da, %d)", i, i); +// TAOS_RES* p = taos_query(pConn, sql); +// if (taos_errno(p) != 0) { +// printf("failed to insert data, reason:%s\n", taos_errstr(p)); +// } +// +// taos_free_result(p); +// } +// +// pRes = taos_query(pConn, "select * from tu"); +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, projection_query_stables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "select ts from st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} +// +//TEST(testCase, agg_query_tables) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "create table tx using st1 tags(111111111111111)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create table, reason:%s\n", taos_errstr(pRes)); +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "select count(*) from t_x_19"); +// if (taos_errno(pRes) != 0) { +// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// TAOS_ROW pRow = NULL; +// TAOS_FIELD* pFields = taos_fetch_fields(pRes); +// int32_t numOfFields = taos_num_fields(pRes); +// +// char str[512] = {0}; +// while ((pRow = taos_fetch_row(pRes)) != NULL) { +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} #pragma GCC diagnostic pop diff --git a/source/common/src/tep.c b/source/common/src/tep.c index 45587a8856..ce088bd173 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -71,57 +71,215 @@ bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, u } } - if (pColumnInfoData->nullbitmap == NULL) { - return false; - } + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + return pColumnInfoData->varmeta.offset[row] == -1; + } else { + if (pColumnInfoData->nullbitmap == NULL) { + return false; + } - uint8_t v = (pColumnInfoData->nullbitmap[row>>3] & (1<<(8 - (row&0x07)))); - return (v == 1); + return colDataIsNull_f(pColumnInfoData->nullbitmap, row); + } } +#define NBIT (3u) +#define BitmapLen(_n) (((_n) + ((1<> NBIT) +#define BitPos(_n) ((_n) & ((1<>3] & (1<<(8 - (row&0x07)))); + return (bitmap[row>>3u] & (1u<<(7u - BitPos(row)))) == (1u<<(7u - BitPos(row))); } -void colDataSetNull_f(char* bitmap, uint32_t row) { // TODO - return; +void colDataSetNull_f(char* bitmap, uint32_t row) { + bitmap[row>>3u] |= (1u << (7u - BitPos(row))); } -void* colDataGet(const SColumnInfoData* pColumnInfoData, uint32_t row) { +char* colDataGet(SColumnInfoData* pColumnInfoData, uint32_t row) { + char* p = pColumnInfoData->pData; if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { - uint32_t offset = ((uint32_t*)pColumnInfoData->pData)[row]; - return (char*)(pColumnInfoData->pData) + offset; // the first part is the pointer to the true binary data + return p + pColumnInfoData->varmeta.offset[row]; } else { - return (char*)(pColumnInfoData->pData) + (row * pColumnInfoData->info.bytes); + return p + (row * pColumnInfoData->info.bytes); } } +static int32_t ensureBitmapSize(SColumnInfoData* pColumnInfoData, uint32_t size) { +#if 0 + ASSERT(pColumnInfoData != NULL); + if (pColumnInfoData->bitmapLen * 8 < size) { + int32_t inc = pColumnInfoData->bitmapLen * 1.25; + if (inc < 8) { + inc = 8; + } + + char* tmp = realloc(pColumnInfoData->nullbitmap, inc + pColumnInfoData->bitmapLen); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pColumnInfoData->nullbitmap = tmp; + memset(pColumnInfoData->nullbitmap + pColumnInfoData->bitmapLen, 0, inc); + } +#endif + return TSDB_CODE_SUCCESS; +} + +int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { + ASSERT(pColumnInfoData != NULL); + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + return pColumnInfoData->varmeta.length; + } else { + return pColumnInfoData->info.bytes * numOfRows; + } +} + +void colDataTrim(SColumnInfoData* pColumnInfoData) { + // TODO +} + int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull) { ASSERT(pColumnInfoData != NULL); if (isNull) { - // TODO set null value in the nullbitmap + // There is a placehold for each NULL value of binary or nchar type. + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + pColumnInfoData->varmeta.offset[currentRow] = -1; // it is a null value of VAR type. + } else { + colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow); + } + return 0; } int32_t type = pColumnInfoData->info.type; if (IS_VAR_DATA_TYPE(type)) { - // TODO continue append var_type + SVarColAttr* pAttr = &pColumnInfoData->varmeta; + if (pAttr->allocLen < pAttr->length + varDataTLen(pData)) { + uint32_t newSize = pAttr->allocLen; + if (newSize == 0) { + newSize = 8; + } + + while(newSize < pAttr->length + varDataTLen(pData)) { + newSize = newSize * 1.5; + } + + char* buf = realloc(pColumnInfoData->pData, newSize); + if (buf == NULL) { + // TODO handle the malloc failure. + } + + pColumnInfoData->pData = buf; + pAttr->allocLen = newSize; + } + + uint32_t len = pColumnInfoData->varmeta.length; + pColumnInfoData->varmeta.offset[currentRow] = len; + + memcpy(pColumnInfoData->pData + len, pData, varDataTLen(pData)); + pColumnInfoData->varmeta.length += varDataTLen(pData); } else { char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; switch(type) { case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_UTINYINT: {*(int8_t*) p = *(int8_t*) pData;break;} + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;} + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;} + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;} default: assert(0); } - } return 0; } -size_t colDataGetCols(const SSDataBlock* pBlock) { +static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, int32_t numOfRow2) { + uint32_t total = numOfRow1 + numOfRow2; + + if (BitmapLen(numOfRow1) < BitmapLen(total)) { + char* tmp = realloc(pColumnInfoData->nullbitmap, BitmapLen(total)); + uint32_t extend = BitmapLen(total) - BitmapLen(numOfRow1); + memset(tmp + BitmapLen(numOfRow1), 0, extend); + pColumnInfoData->nullbitmap = tmp; + } + + uint32_t remindBits = BitPos(numOfRow1); + uint32_t shiftBits = 8 - remindBits; + + if (remindBits == 0) { // no need to shift bits of bitmap + memcpy(pColumnInfoData->nullbitmap + BitmapLen(numOfRow1), pSource->nullbitmap, BitmapLen(numOfRow2)); + } else { + int32_t len = BitmapLen(numOfRow2); + int32_t i = 0; + + uint8_t* p = (uint8_t*)pSource->nullbitmap; + pColumnInfoData->nullbitmap[BitmapLen(numOfRow1) - 1] |= (p[0] >> remindBits); + + uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)]; + while (i < len) { + start[i] |= (p[i] << shiftBits); + i += 1; + + if (i > 1) { + start[i - 1] |= (p[i] >> remindBits); + } + } + } +} + +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2) { + ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type); + + if (numOfRow2 == 0) { + return numOfRow1; + } + + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { + // Handle the bitmap + char* p = realloc(pColumnInfoData->varmeta.offset, sizeof(int32_t) * (numOfRow1 + numOfRow2)); + if (p == NULL) { + // TODO + } + + pColumnInfoData->varmeta.offset = (int32_t*) p; + memcpy(pColumnInfoData->varmeta.offset + sizeof(int32_t) * numOfRow1, pSource->varmeta.offset, sizeof(int32_t) * numOfRow2); + + // copy the + uint32_t len = pSource->varmeta.length; + uint32_t oldLen = pColumnInfoData->varmeta.length; + if (pColumnInfoData->varmeta.allocLen < len + oldLen) { + char* tmp = realloc(pColumnInfoData->pData, len + oldLen); + if (tmp == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + pColumnInfoData->pData = tmp; + pColumnInfoData->varmeta.allocLen = len + oldLen; + } + + memcpy(pColumnInfoData->pData + oldLen, pSource->pData + sizeof(int32_t), len); + } else { + doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2); + + int32_t newSize = (numOfRow1 + numOfRow2) * pColumnInfoData->info.bytes; + char* tmp = realloc(pColumnInfoData->pData, newSize); + if (tmp == NULL) { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + + pColumnInfoData->pData = tmp; + int32_t offset = pColumnInfoData->info.bytes * numOfRow1; + memcpy(pColumnInfoData->pData + offset, pSource->pData, pSource->info.bytes * numOfRow2); + } + + return numOfRow1 + numOfRow2; +} + +size_t colDataGetNumOfCols(const SSDataBlock* pBlock) { ASSERT(pBlock); size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0; @@ -129,7 +287,7 @@ size_t colDataGetCols(const SSDataBlock* pBlock) { return pBlock->info.numOfCols; } -size_t colDataGetRows(const SSDataBlock* pBlock) { +size_t colDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } @@ -153,6 +311,349 @@ int32_t colDataUpdateTsWindow(SSDataBlock* pDataBlock) { return 0; } +int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { + assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols); + int32_t numOfCols = pSrc->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); + SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); + uint32_t oldLen = colDataGetSize(pCol2, pDest->info.rows); + uint32_t newLen = colDataGetSize(pCol1, pSrc->info.rows); + int32_t newSize = oldLen + newLen; + char* tmp = realloc(pCol2->pData, newSize); + if (tmp != NULL) { + pCol2->pData = tmp; + colDataMergeCol(pCol2, pDest->info.rows, pCol1, pSrc->info.rows); + } else { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + } + + pDest->info.rows += pSrc->info.rows; + return TSDB_CODE_SUCCESS; +} + +size_t blockDataGetSize(const SSDataBlock* pBlock) { + assert(pBlock != NULL); + + size_t total = 0; + int32_t numOfCols = pBlock->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + total += colDataGetSize(pColInfoData, pBlock->info.rows); + } + + // bitmap for each column + total += BitmapLen(pBlock->info.rows) * numOfCols; + return total; +} + +// the number of tuples can be fit in one page. +// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size. +int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize) { + ASSERT(pBlock != NULL && stopIndex != NULL); + + int32_t size = 0; + int32_t numOfCols = pBlock->info.numOfCols; + int32_t numOfRows = pBlock->info.rows; + + size_t headerSize = sizeof(int32_t); + + // TODO speedup by checking if the whole page can fit in firstly. + + if (!hasVarCol) { + size_t rowSize = blockDataGetRowSize(pBlock); + int32_t capacity = ((pageSize - headerSize) / (rowSize * 8 + 1)) * 8; + *stopIndex = startIndex + capacity; + + if (*stopIndex >= numOfRows) { + *stopIndex = numOfRows - 1; + } + + return TSDB_CODE_SUCCESS; + } else { + // iterate the rows that can be fit in this buffer page + size += headerSize; + + for(int32_t j = startIndex; j < numOfRows; ++j) { + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + bool isNull = colDataIsNull(pColInfoData, numOfRows, j, NULL); + if (isNull) { + // do nothing + } else { + char* p = colDataGet(pColInfoData, j); + size += varDataTLen(p); + } + + size += sizeof(pColInfoData->varmeta.offset[0]); + } else { + size += pColInfoData->info.bytes; + + if (((j - startIndex) % 8) == 0) { + size += 1; // the space for null bitmap + } + } + } + + if (size > pageSize) { + *stopIndex = j - 1; + ASSERT(*stopIndex > startIndex); + + return TSDB_CODE_SUCCESS; + } + } + + // all fit in + *stopIndex = numOfRows - 1; + return TSDB_CODE_SUCCESS; + } +} + +/** + * + * +---------------------------+---------------------+ + * |the number of rows(4 bytes)| column #1 | + * |---------------------+ + * | | null bitmap| values | + * +---------------------------+---------------------+ + * @param buf + * @param pBlock + * @return + */ +int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { + ASSERT(pBlock != NULL); + + // write the number of rows + *(uint32_t*) buf = pBlock->info.rows; + + int32_t numOfCols = pBlock->info.numOfCols; + int32_t numOfRows = pBlock->info.rows; + + char* pStart = buf + sizeof(uint32_t); + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t)); + pStart += numOfRows * sizeof(int32_t); + } else { + memcpy(pStart, pCol->nullbitmap, BitmapLen(numOfRows)); + pStart += BitmapLen(pBlock->info.rows); + } + + uint32_t dataSize = colDataGetSize(pCol, numOfRows); + memcpy(pStart, pCol->pData, dataSize); + pStart += dataSize; + } + + return 0; +} + +size_t blockDataGetRowSize(const SSDataBlock* pBlock) { + ASSERT(pBlock != NULL); + size_t rowSize = 0; + + size_t numOfCols = pBlock->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + rowSize += pColInfo->info.bytes; + } + + return rowSize; +} + +typedef struct SSDataBlockSortHelper { + SArray *orderInfo; // SArray + SSDataBlock *pDataBlock; + bool nullFirst; +} SSDataBlockSortHelper; + +int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { + const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*) param; + + SSDataBlock* pDataBlock = pHelper->pDataBlock; + + int32_t* left = (int32_t*) p1; + int32_t* right = (int32_t*) p2; + + SArray* pInfo = pHelper->orderInfo; + size_t num = taosArrayGetSize(pInfo); + for(int32_t i = 0; i < num; ++i) { + SBlockOrderInfo* pOrder = taosArrayGet(pInfo, i); + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pOrder->colIndex); + + bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, *left, pDataBlock->pBlockAgg); + bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, *right, pDataBlock->pBlockAgg); + if (leftNull && rightNull) { + continue; // continue to next slot + } + + if (rightNull) { + return pHelper->nullFirst? 1:-1; + } + + if (leftNull) { + return pHelper->nullFirst? -1:1; + } + + void* left1 = colDataGet(pColInfoData, *left); + void* right1 = colDataGet(pColInfoData, *right); + + switch(pColInfoData->info.type) { + case TSDB_DATA_TYPE_INT: { + if (*(int32_t*) left1 == *(int32_t*) right1) { + continue;// TODO continue + } else { + if (pOrder->order == TSDB_ORDER_ASC) { + return (*(int32_t*) left1 < *(int32_t*) right1)? -1:1; + } else { + return (*(int32_t*) left1 < *(int32_t*) right1)? 1:-1; + } + } + } + default: + assert(0); + } + } + + return 0; +} + +static void doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) { + int32_t numOfCols = pSrcBlock->info.numOfCols; + + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pDst = &pDstCols[i]; + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, i); + + bool isNull = colDataIsNull(pSrc, pSrcBlock->info.rows, tupleIndex, NULL); + if (isNull) { + colDataAppend(pDst, numOfRows, NULL, true); + } else { + char* p = colDataGet((SColumnInfoData*)pSrc, tupleIndex); + colDataAppend(pDst, numOfRows, p, false); + } + } +} + +static void blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataBlock, int32_t* index) { + for (int32_t i = 0; i < pDataBlock->info.rows; ++i) { + doAssignOneTuple(pCols, i, pDataBlock, index[i]); + } +} + +static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { + int32_t rows = pDataBlock->info.rows; + int32_t numOfCols = pDataBlock->info.numOfCols; + + SColumnInfoData* pCols = calloc(numOfCols, sizeof(SColumnInfoData)); + if (pCols == NULL) { + return NULL; + } + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); + pCols[i].info = pColInfoData->info; + + if (IS_VAR_DATA_TYPE(pCols[i].info.type)) { + pCols[i].varmeta.offset = calloc(rows, sizeof(int32_t)); + } else { + pCols[i].nullbitmap = calloc(1, BitmapLen(rows)); + pCols[i].pData = calloc(rows, pCols[i].info.bytes); + } + } + + return pCols; +} + +static int32_t copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) { + int32_t numOfCols = pDataBlock->info.numOfCols; + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); + pColInfoData->info = pCols[i].info; + + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + tfree(pColInfoData->varmeta.offset); + pColInfoData->varmeta = pCols[i].varmeta; + } else { + tfree(pColInfoData->nullbitmap); + pColInfoData->nullbitmap = pCols[i].nullbitmap; + } + + tfree(pColInfoData->pData); + pColInfoData->pData = pCols[i].pData; + } + + tfree(pCols); +} + +static int32_t* createTupleIndex(size_t rows) { + int32_t* index = calloc(rows, sizeof(int32_t)); + if (index == NULL) { + return NULL; + } + + for(int32_t i = 0; i < rows; ++i) { + index[i] = i; + } + + return index; +} + +static void destroyTupleIndex(int32_t* index) { + tfree(index); +} + +int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) { + ASSERT(pDataBlock != NULL && pOrderInfo != NULL); + if (pDataBlock->info.rows <= 1) { + return TSDB_CODE_SUCCESS; + } + + // Allocate the additional buffer. + uint32_t rows = pDataBlock->info.rows; + int32_t* index = createTupleIndex(rows); + if (index == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + + SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; + taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar); + + int32_t numOfCols = pDataBlock->info.numOfCols; + SColumnInfoData* pCols = createHelpColInfoData(pDataBlock); + if (pCols == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return terrno; + } + +#if 0 + SColumnInfoData* px = taosArrayGet(pDataBlock->pDataBlock, 0); + for(int32_t i = 0; i < pDataBlock->info.rows; ++i) { + printf("%d, %d, %d\n", index[i], ((int32_t*)px->pData)[i], ((int32_t*)px->pData)[index[i]]); + } +#endif + blockDataAssign(pCols, pDataBlock, index); + +#if 0 + for(int32_t i = 0; i < pDataBlock->info.rows; ++i) { + if (colDataIsNull(&pCols[0], rows, i, NULL)) { + printf("0\t"); + } else { + printf("%d\t", ((int32_t*)pCols[0].pData)[i]); + } + } + + printf("end\n"); +#endif + + copyBackToBlock(pDataBlock, pCols); + destroyTupleIndex(index); +} diff --git a/source/common/src/tname.c b/source/common/src/tname.c index f8ef9f0979..4d606b0bdf 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -1,3 +1,4 @@ +#include #include "os.h" #include "tutil.h" @@ -268,4 +269,27 @@ SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* nam tstrncpy(s.name, name, tListLen(s.name)); return s; -} \ No newline at end of file +} + +void* destroySDataBlock(SSDataBlock* pBlock) { + if (pBlock == NULL) { + return NULL; + } + + int32_t numOfOutput = pBlock->info.numOfCols; + for(int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + tfree(pColInfoData->varmeta.offset); + } else { + tfree(pColInfoData->nullbitmap); + } + + tfree(pColInfoData->pData); + } + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockAgg); + tfree(pBlock); + return NULL; +} diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index b91b6b06f2..13e053b034 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -1,11 +1,12 @@ +#include #include +#include #include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-variable" -#pragma GCC diagnostic ignored "-Wunused-but-set-variable" #pragma GCC diagnostic ignored "-Wsign-compare" #include "os.h" @@ -96,4 +97,199 @@ TEST(testCase, toInteger_test) { ASSERT_EQ(ret, -1); } +TEST(testCase, Datablock_test) { + SSDataBlock* b = static_cast(calloc(1, sizeof(SSDataBlock))); + b->info.numOfCols = 2; + b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + + SColumnInfoData infoData = {0}; + infoData.info.bytes = 4; + infoData.info.type = TSDB_DATA_TYPE_INT; + infoData.info.colId = 1; + + infoData.pData = (char*) calloc(40, infoData.info.bytes); + infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (40/8)); + taosArrayPush(b->pDataBlock, &infoData); + + SColumnInfoData infoData1 = {0}; + infoData1.info.bytes = 40; + infoData1.info.type = TSDB_DATA_TYPE_BINARY; + infoData1.info.colId = 2; + + infoData1.varmeta.offset = (int32_t*) calloc(40, sizeof(uint32_t)); + taosArrayPush(b->pDataBlock, &infoData1); + + char* str = "the value of: %d"; + char buf[128] = {0}; + char varbuf[128] = {0}; + + for(int32_t i = 0; i < 40; ++i) { + SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0); + SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1); + + if (i&0x01) { + int32_t len = sprintf(buf, str, i); + STR_TO_VARSTR(varbuf, buf) + colDataAppend(p0, i, (const char*) &i, false); + colDataAppend(p1, i, (const char*) varbuf, false); + + memset(varbuf, 0, sizeof(varbuf)); + memset(buf, 0, sizeof(buf)); + } else { + colDataAppend(p0, i, (const char*) &i, true); + colDataAppend(p1, i, (const char*) varbuf, true); + } + + b->info.rows++; + } + + SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0); + SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1); + for(int32_t i = 0; i < 40; ++i) { + if (i & 0x01) { + ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), false); + ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), false); + } else { + ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), true); + + ASSERT_EQ(colDataIsNull(p0, b->info.rows, i, nullptr), true); + ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), true); + } + } + + printf("binary column length:%d\n", *(int32_t*) p1->pData); + + ASSERT_EQ(colDataGetNumOfCols(b), 2); + ASSERT_EQ(colDataGetNumOfRows(b), 40); + + char* pData = colDataGet(p1, 3); + printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); + + SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo)); + SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0}; + taosArrayPush(pOrderInfo, &order); + + blockDataSort(b, pOrderInfo, true); + destroySDataBlock(b); + + taosArrayDestroy(pOrderInfo); +} + +#if 0 +TEST(testCase, non_var_dataBlock_split_test) { + SSDataBlock* b = static_cast(calloc(1, sizeof(SSDataBlock))); + b->info.numOfCols = 2; + b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + + SColumnInfoData infoData = {0}; + infoData.info.bytes = 4; + infoData.info.type = TSDB_DATA_TYPE_INT; + infoData.info.colId = 1; + + int32_t numOfRows = 1000000; + + infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes); + infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8)); + taosArrayPush(b->pDataBlock, &infoData); + + SColumnInfoData infoData1 = {0}; + infoData1.info.bytes = 1; + infoData1.info.type = TSDB_DATA_TYPE_TINYINT; + infoData1.info.colId = 2; + + infoData1.pData = (char*) calloc(numOfRows, infoData.info.bytes); + infoData1.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8)); + taosArrayPush(b->pDataBlock, &infoData1); + + for(int32_t i = 0; i < numOfRows; ++i) { + SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0); + SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1); + + int8_t v = i; + colDataAppend(p0, i, (const char*)&i, false); + colDataAppend(p1, i, (const char*)&v, false); + b->info.rows++; + } + + int32_t pageSize = 64 * 1024; + + int32_t startIndex= 0; + int32_t stopIndex = 0; + int32_t count = 1; + while(1) { + blockDataSplitRows(b, false, startIndex, &stopIndex, pageSize); + printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex); + + if (stopIndex == numOfRows - 1) { + break; + } + + startIndex = stopIndex + 1; + } + +} + +#endif + +TEST(testCase, var_dataBlock_split_test) { + SSDataBlock* b = static_cast(calloc(1, sizeof(SSDataBlock))); + b->info.numOfCols = 2; + b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + + int32_t numOfRows = 1000000; + + SColumnInfoData infoData = {0}; + infoData.info.bytes = 4; + infoData.info.type = TSDB_DATA_TYPE_INT; + infoData.info.colId = 1; + + infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes); + infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8)); + taosArrayPush(b->pDataBlock, &infoData); + + SColumnInfoData infoData1 = {0}; + infoData1.info.bytes = 40; + infoData1.info.type = TSDB_DATA_TYPE_BINARY; + infoData1.info.colId = 2; + + infoData1.varmeta.offset = (int32_t*) calloc(numOfRows, sizeof(uint32_t)); + taosArrayPush(b->pDataBlock, &infoData1); + + char buf[41] = {0}; + char buf1[100] = {0}; + + for(int32_t i = 0; i < numOfRows; ++i) { + SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0); + SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1); + + int8_t v = i; + colDataAppend(p0, i, (const char*)&i, false); + + sprintf(buf, "the number of row:%d", i); + int32_t len = sprintf(buf1, buf, i); + STR_TO_VARSTR(buf1, buf) + colDataAppend(p1, i, buf1, false); + b->info.rows++; + + memset(buf, 0, sizeof(buf)); + memset(buf1, 0, sizeof(buf1)); + } + + int32_t pageSize = 64 * 1024; + + int32_t startIndex= 0; + int32_t stopIndex = 0; + int32_t count = 1; + while(1) { + blockDataSplitRows(b, true, startIndex, &stopIndex, pageSize); + printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex); + + if (stopIndex == numOfRows - 1) { + break; + } + + startIndex = stopIndex + 1; + } +} + #pragma GCC diagnostic pop \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1351b9627b..0dcc7db33d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3640,13 +3640,13 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) { STbCfg* pTbCfg = metaGetTbInfoByUid(pMeta, uid); if (pTbCfg == NULL) { -// tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId); + tsdbError("%p failed to get stable, uid:%"PRIu64", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId); terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; goto _error; } if (pTbCfg->type != META_SUPER_TABLE) { -// tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, uid, taskId, reqId); + tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, uid, taskId, reqId); terrno = TSDB_CODE_OPS_NOT_SUPPORT; //basically, this error is caused by invalid sql issued by client goto _error; } @@ -3665,8 +3665,8 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); -// tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, tsdb, -// pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId); + tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%"PRIx64" QID:0x%"PRIx64, pMeta, + pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId); taosArrayDestroy(res); return ret; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 41ed1739b9..f2a068869c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -557,17 +557,15 @@ typedef struct SMultiwayMergeInfo { // todo support the disk-based sort typedef struct SOrderOperatorInfo { - int32_t colIndex; - int32_t order; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray *orderInfo; // SArray SSDataBlock *pDataBlock; + bool nullFirst; // null value is put in the front } SOrderOperatorInfo; SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); - SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); -SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo); - SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); @@ -591,7 +589,8 @@ SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo* createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); +SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SOrder* pOrderVal); +SOperatorInfo* createMergeSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); //SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); //SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); @@ -604,7 +603,6 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); -void* destroyOutputBuf(SSDataBlock* pBlock); void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); @@ -613,7 +611,6 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); -void freeParam(STaskParam *param); int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e84fe3d45a..f0d16f3733 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -12,12 +12,13 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "parser.h" -#include "tq.h" +#include #include "exception.h" #include "os.h" +#include "parser.h" #include "tglobal.h" #include "tmsg.h" +#include "tq.h" #include "ttime.h" #include "executorimpl.h" @@ -337,23 +338,6 @@ SSDataBlock* createOutputBuf_rv(SArray* pExprInfo, int32_t numOfRows) { return res; } -void* destroyOutputBuf(SSDataBlock* pBlock) { - if (pBlock == NULL) { - return NULL; - } - - int32_t numOfOutput = pBlock->info.numOfCols; - for(int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - tfree(pColInfoData->pData); - } - - taosArrayDestroy(pBlock->pDataBlock); - tfree(pBlock->pBlockAgg); - tfree(pBlock); - return NULL; -} - static bool isSelectivityWithTagsQuery(SqlFunctionCtx *pCtx, int32_t numOfOutput) { return true; // bool hasTags = false; @@ -2188,174 +2172,6 @@ static int32_t setupQueryRuntimeEnv(STaskRuntimeEnv *pRuntimeEnv, int32_t numOfT // group by normal column, sliding window query, interval query are handled by interval query processor // interval (down sampling operation) - int32_t numOfOperator = (int32_t) taosArrayGetSize(pOperator); - for(int32_t i = 0; i < numOfOperator; ++i) { - int32_t* op = taosArrayGet(pOperator, i); - - switch (*op) { -// case OP_TagScan: { -// pRuntimeEnv->proot = createTagScanOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// break; -// } -// case OP_MultiTableTimeInterval: { -// pRuntimeEnv->proot = -// createMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); -// break; -// } -// case OP_AllMultiTableTimeInterval: { -// pRuntimeEnv->proot = -// createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); -// break; -// } -// case OP_TimeWindow: { -// pRuntimeEnv->proot = -// createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; -// if (opType != OP_DummyInput && opType != OP_Join) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); -// } -// break; -// } -// case OP_AllTimeWindow: { -// pRuntimeEnv->proot = -// createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; -// if (opType != OP_DummyInput && opType != OP_Join) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); -// } -// break; -// } -// case OP_Groupby: { -// pRuntimeEnv->proot = -// createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// -// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; -// if (opType != OP_DummyInput) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); -// } -// break; -// } -// case OP_SessionWindow: { -// pRuntimeEnv->proot = -// createSWindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; -// if (opType != OP_DummyInput) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); -// } -// break; -// } -// case OP_MultiTableAggregate: { -// pRuntimeEnv->proot = -// createMultiTableAggOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); -// break; -// } -// case OP_Aggregate: { -// pRuntimeEnv->proot = -// createAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// -// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; -// if (opType != OP_DummyInput && opType != OP_Join) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); -// } -// break; -// } -// -// case OP_Project: { // TODO refactor to remove arith operator. -// SOperatorInfo* prev = pRuntimeEnv->proot; -// if (i == 0) { -// pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor -// setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot); -// } -// } else { -// prev = pRuntimeEnv->proot; -// assert(pQueryAttr->pExpr2 != NULL); -// pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); -// } -// break; -// } -// -// case OP_StateWindow: { -// pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// int32_t opType = pRuntimeEnv->proot->downstream[0]->operatorType; -// if (opType != OP_DummyInput) { -// setTableScanFilterOperatorInfo(pRuntimeEnv->proot->downstream[0]->info, pRuntimeEnv->proot); -// } -// break; -// } -// -// case OP_Limit: { -// pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot); -// break; -// } -// -// case OP_Filter: { // todo refactor -// int32_t numOfFilterCols = 0; -// if (pQueryAttr->stableQuery) { -// SColumnInfo* pColInfo = -// extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols); -// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, -// pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); -// freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); -// } else { -// SColumnInfo* pColInfo = -// extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); -// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, -// pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); -// freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); -// } -// -// break; -// } -// -// case OP_Fill: { -// SOperatorInfo* pInfo = pRuntimeEnv->proot; -// pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult); -// break; -// } -// -// case OP_MultiwayMergeSort: { -// pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger); -// break; -// } -// -// case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock. -// bool multigroupResult = pQueryAttr->multigroupResult; -// if (pQueryAttr->multigroupResult) { -// multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE); -// } -// -// pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, -// pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult); -// break; -// } -// -// case OP_SLimit: { -// int32_t num = pRuntimeEnv->proot->numOfOutput; -// SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; -// pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult); -// break; -// } -// -// case OP_Distinct: { -// pRuntimeEnv->proot = createDistinctOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); -// break; -// } -// -// case OP_Order: { -// pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); -// break; -// } - - default: { - assert(0); - } - } - } - return TSDB_CODE_SUCCESS; _clean: @@ -5071,6 +4887,8 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { SStreamBlockScanInfo* pInfo = pOperator->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + pBlockInfo->rows = 0; + while (tqNextDataBlock(pInfo->readerHandle)) { pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { @@ -5605,7 +5423,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; taosArrayDestroy(pInfo->orderColumnList); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); tfree(pInfo->prevRow); } @@ -5717,30 +5535,6 @@ SOperatorInfo *createMultiwaySortOperatorInfo(STaskRuntimeEnv *pRuntimeEnv, SExp return pOperator; } -static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { - assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols); - - int32_t numOfCols = pSrc->info.numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); - SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); - - int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes; - char* tmp = realloc(pCol2->pData, newSize); - if (tmp != NULL) { - pCol2->pData = tmp; - int32_t offset = pCol2->info.bytes * pDest->info.rows; - memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes); - } else { - return TSDB_CODE_VND_OUT_OF_MEMORY; - } - } - - pDest->info.rows += pSrc->info.rows; - - return TSDB_CODE_SUCCESS; -} - static SSDataBlock* doSort(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -5761,64 +5555,56 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { break; } - int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock); + int32_t code = blockDataMerge(pInfo->pDataBlock, pBlock); if (code != TSDB_CODE_SUCCESS) { // todo handle error } + + size_t size = blockDataGetSize(pInfo->pDataBlock); + if (size > pInfo->sortBufSize) { + // Perform the in-memory sort and then flush data in the buffer into disk. + blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst); + + // flush to disk + } } - int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; - void** pCols = calloc(numOfCols, POINTER_BYTES); - SSchema* pSchema = calloc(numOfCols, sizeof(SSchema)); +// int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; +// void** pCols = calloc(numOfCols, POINTER_BYTES); +// SSchema* pSchema = calloc(numOfCols, sizeof(SSchema)); +// +// for(int32_t i = 0; i < numOfCols; ++i) { +// SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); +// pCols[i] = p1->pData; +// pSchema[i].colId = p1->info.colId; +// pSchema[i].bytes = p1->info.bytes; +// pSchema[i].type = (uint8_t) p1->info.type; +// } - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); - pCols[i] = p1->pData; - pSchema[i].colId = p1->info.colId; - pSchema[i].bytes = p1->info.bytes; - pSchema[i].type = (uint8_t) p1->info.type; - } +// __compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order); +// taoscQSort(pCols, pInfo->pDataBlock->info.rows, sizeof(int32_t), pInfo, comp); - __compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order); -// taosqsort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp); - - tfree(pCols); - tfree(pSchema); +// tfree(pCols); +// tfree(pSchema); return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } -SOperatorInfo *createOrderOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal) { +SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SOrder* pOrderVal) { SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); - { - SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); - pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); - for(int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData col = {{0}}; - col.info.colId = pExpr[i].base.pColumns->info.colId; -// col.info.bytes = pExpr[i].base.colBytes; -// col.info.type = pExpr[i].base.colType; - taosArrayPush(pDataBlock->pDataBlock, &col); - -// if (col.info.colId == pOrderVal->orderColId) { -// pInfo->colIndex = i; -// } - } - - pDataBlock->info.numOfCols = numOfOutput; -// pInfo->order = pOrderVal->order; - pInfo->pDataBlock = pDataBlock; - } + pInfo->sortBufSize = 1024 * 1024; // 1MB + pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, 4096); + pInfo->orderInfo = taosArrayInit(1, sizeof(SOrder)); + taosArrayPush(pInfo->orderInfo, pOrderVal); // todo more than one order column SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "InMemoryOrder"; -// pOperator->operatorType = OP_Order; + pOperator->name = "Order"; + pOperator->operatorType = OP_Order; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->exec = doSort; - pOperator->cleanupFn = destroyOrderOperatorInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->cleanupFn = destroyOrderOperatorInfo; appendDownstream(pOperator, downstream); return pOperator; @@ -6794,7 +6580,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) { tfree(pInfo->rowCellInfoOffset); cleanupResultRowInfo(&pInfo->resultRowInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); } static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { @@ -6818,7 +6604,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); tfree(pInfo->p); } @@ -6835,12 +6621,12 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { STagScanInfo* pInfo = (STagScanInfo*) param; - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); } static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; - pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); + pInfo->pDataBlock = destroySDataBlock(pInfo->pDataBlock); } static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { @@ -6853,7 +6639,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { taosHashCleanup(pInfo->pSet); tfree(pInfo->buf); taosArrayDestroy(pInfo->pDistinctDataInfo); - pInfo->pRes = destroyOutputBuf(pInfo->pRes); + pInfo->pRes = destroySDataBlock(pInfo->pRes); } SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { @@ -7717,6 +7503,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId) { } static tsdbReaderT doCreateDataReader(STableScanPhyNode* pTableScanNode, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId); + static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo, uint64_t queryId, uint64_t taskId); SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId) { diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 5d0600c820..a8833dd8e9 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -216,7 +216,9 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable } else if (needSeqScan(pPlanNode)) { return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan); } - return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan); + + int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan; + return createUserTableScanNode(pPlanNode, pTable, type); } static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {