diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 83246289f1..8488dfb981 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -27,10 +27,6 @@ else () cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) endif() -if(TD_LINUX_64 AND JEMALLOC_ENABLED) - cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) -endif() - # pthread if(${BUILD_PTHREAD}) cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) @@ -418,18 +414,6 @@ if(${BUILD_ADDR2LINE}) endif(NOT ${TD_WINDOWS}) endif(${BUILD_ADDR2LINE}) -# jemalloc -IF (TD_LINUX_64 AND JEMALLOC_ENABLED) - include(ExternalProject) - ExternalProject_Add(jemalloc - PREFIX "jemalloc" - SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc - BUILD_IN_SOURCE 1 - CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/ - BUILD_COMMAND ${MAKE} - ) - INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include) -ENDIF () # ================================================================================================ # Build test diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5d6d9178ed..7eafc4c3d8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1404,7 +1404,7 @@ typedef struct STableScanAnalyzeInfo { uint32_t skipBlocks; uint32_t filterOutBlocks; double elapsedTime; - uint64_t filterTime; + double filterTime; } STableScanAnalyzeInfo; int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index e7ae3917f9..08b0f3abb2 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -123,7 +123,7 @@ void createNewTable(TAOS* pConn, int32_t index) { } taos_free_result(pRes); - for(int32_t i = 0; i < 1000; i += 20) { + for(int32_t i = 0; i < 100000; i += 20) { char sql[1024] = {0}; sprintf(sql, "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" @@ -154,7 +154,7 @@ TEST(testCase, driverInit_Test) { } TEST(testCase, connect_Test) { -// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg"); + taos_options(TSDB_OPTION_CONFIGDIR, "/home/lisa/Documents/workspace/tdengine/sim/dnode1/cfg"); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -501,7 +501,6 @@ TEST(testCase, show_vgroup_Test) { taos_close(pConn); } - TEST(testCase, create_multiple_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -665,6 +664,7 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } +#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -697,7 +697,7 @@ TEST(testCase, projection_query_tables) { } taos_free_result(pRes); - for(int32_t i = 0; i < 100; ++i) { + for(int32_t i = 0; i < 1; ++i) { printf("create table :%d\n", i); createNewTable(pConn, i); } @@ -723,6 +723,7 @@ TEST(testCase, projection_query_tables) { taos_close(pConn); } +#if 0 TEST(testCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -820,21 +821,8 @@ TEST(testCase, async_api_test) { getchar(); taos_close(pConn); } -#endif - TEST(testCase, update_test) { - - SInterval interval = {0}; - interval.offset = 8000; - interval.interval = 10000; - interval.sliding = 4000; - interval.intervalUnit = 's'; - interval.offsetUnit = 's'; - interval.slidingUnit = 's'; -// STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1630000000000); - STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1629999999999); - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -869,4 +857,8 @@ TEST(testCase, update_test) { taos_free_result(pRes); } } + +#endif + + #pragma GCC diagnostic pop diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 03985654f8..072d15d715 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -145,7 +145,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI SRowMerger* pMerger); static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); -static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); +static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); +static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); @@ -691,16 +692,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SBlock* pBlock = getCurrentBlock(pBlockIter); SSDataBlock* pResBlock = pReader->pResBlock; - int32_t numOfCols = blockDataGetNumOfCols(pResBlock); + int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t st = taosGetTimestampUs(); - SColVal cv = {0}; - int32_t colIndex = 0; - + int64_t st = taosGetTimestampUs(); bool asc = ASCENDING_TRAVERSE(pReader->order); int32_t step = asc ? 1 : -1; @@ -724,7 +722,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn i += 1; } - while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) { + int32_t colIndex = 0; + int32_t num = taosArrayGetSize(pBlockData->aIdx); + while (i < numOfOutputCols && colIndex < num) { rowIndex = 0; pColData = taosArrayGet(pResBlock->pDataBlock, i); @@ -744,7 +744,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn i += 1; } - while (i < numOfCols) { + while (i < numOfOutputCols) { pColData = taosArrayGet(pResBlock->pDataBlock, i); colDataAppendNNULL(pColData, 0, remain); i += 1; @@ -1256,7 +1256,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } tRowMergerClear(&merge); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); taosMemoryFree(pTSRow); return TSDB_CODE_SUCCESS; @@ -1300,7 +1300,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* } tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } else { // key > ik.ts || key > k.ts ASSERT(key != ik.ts); @@ -1309,7 +1309,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [4] ik.ts < k.ts <= key if (ik.ts < k.ts) { doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1317,7 +1317,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [6] k.ts < ik.ts <= key if (k.ts < ik.ts) { doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1326,7 +1326,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ASSERT(key > ik.ts && key > k.ts); doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } } @@ -1350,7 +1350,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* } tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } else { ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch @@ -1359,7 +1359,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [4] ik.ts > key >= k.ts if (ik.ts > key) { doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1371,7 +1371,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1383,7 +1383,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } } @@ -1438,6 +1438,21 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } // imem & mem are all empty, only file exist + + // opt version + // 1. it is not a border point + // 2. the direct next point is not an duplicated timestamp + if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) || + (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) { + int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1; + int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; + if (nextKey != key) { // merge is not needed + doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); + pDumpInfo->rowIndex += step; + return TSDB_CODE_SUCCESS; + } + } + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); STSRow* pTSRow = NULL; @@ -1446,7 +1461,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); taosMemoryFree(pTSRow); tRowMergerClear(&merge); @@ -2201,7 +2216,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc int32_t step = asc ? 1 : -1; pDumpInfo->rowIndex += step; - if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) { + if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) ||(pDumpInfo->rowIndex >= 0 && !asc)) { pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); } @@ -2325,7 +2340,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR return TSDB_CODE_SUCCESS; } -int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) { +int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) { int32_t numOfRows = pBlock->info.rows; int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); @@ -2369,6 +2384,47 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow return TSDB_CODE_SUCCESS; } +int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) { + int32_t i = 0, j = 0; + int32_t outputRowIndex = pResBlock->info.rows; + + SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + + SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); + if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]); + i += 1; + } + + SColVal cv = {0}; + int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx); + int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); + + while(i < numOfOutputCols && j < numOfInputCols) { + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); + SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); + + if (pData->cid == pCol->info.colId) { + tColDataGetValue(pData, rowIndex, &cv); + doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); + j += 1; + } else { // the specified column does not exist in file block, fill with null data + colDataAppendNULL(pCol, outputRowIndex); + } + + i += 1; + } + + while (i < numOfOutputCols) { + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); + colDataAppendNULL(pCol, outputRowIndex); + i += 1; + } + + pResBlock->info.rows += 1; + return TSDB_CODE_SUCCESS; +} + int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader) { SSDataBlock* pBlock = pReader->pResBlock; @@ -2380,7 +2436,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e break; } - doAppendOneRow(pBlock, pReader, pTSRow); + doAppendRowFromTSRow(pBlock, pReader, pTSRow); taosMemoryFree(pTSRow); // no data in buffer, return immediately diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index be97b20455..c3dad1ed7c 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -103,7 +103,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); -bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo); +bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 37818dd91d..10b84ba7b1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -297,6 +297,20 @@ enum { TABLE_SCAN__BLOCK_ORDER = 2, }; +typedef struct SAggSupporter { + SHashObj* pResultRowHashTable; // quick locate the window object for each result + char* keyBuf; // window key buffer + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row +} SAggSupporter; + +typedef struct { + // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. + SInterval interval; + SAggSupporter *pAggSup; + SExprSupp *pExprSup; // expr supporter of aggregate operator +} SAggOptrPushDownInfo; + typedef struct STableScanInfo { STsdbReader* dataReader; SReadHandle readHandle; @@ -312,12 +326,13 @@ typedef struct STableScanInfo { SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. +// SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. SSampleExecInfo sample; // sample execution info int32_t currentGroupId; int32_t currentTable; int8_t scanMode; int8_t noTable; + SAggOptrPushDownInfo pdInfo; int8_t assignBlockUid; } STableScanInfo; @@ -505,13 +520,6 @@ typedef struct SOptrBasicInfo { SSDataBlock* pRes; } SOptrBasicInfo; -typedef struct SAggSupporter { - SHashObj* pResultRowHashTable; // quick locate the window object for each result - char* keyBuf; // window key buffer - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row -} SAggSupporter; - typedef struct SIntervalAggOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info @@ -1019,6 +1027,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); +bool groupbyTbname(SNodeList* pGroupList); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); SSDataBlock* createSpecialDataBlock(EStreamType type); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 65df7140f7..96c20d6136 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -137,7 +137,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } -bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo) { +bool hasRemainResults(SGroupResInfo* pGroupResInfo) { if (pGroupResInfo->pRows == NULL) { return false; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 171f39db1b..a7d13da52b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -242,25 +242,17 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } // todo refactor STableList + bool assignUid = false; size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0; char* keyBuf = NULL; if (bufLen > 0) { + assignUid = groupbyTbname(pScanInfo->pGroupTags); keyBuf = taosMemoryMalloc(bufLen); if (keyBuf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } } - bool assignUid = false; - - if (LIST_LENGTH(pScanInfo->pGroupTags) > 0) { - SNode* p = nodesListGetNode(pScanInfo->pGroupTags, 0); - if (p->type == QUERY_NODE_FUNCTION) { - // partition by tbname/group by tbname - assignUid = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0); - } - } - for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) { uint64_t* uid = taosArrayGet(qa, i); STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7359a4ea90..022089e06a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -141,8 +141,7 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx, int32_t numOfExprs); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, - uint64_t groupId); +static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); // setup the output buffer for each operator static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { @@ -1393,10 +1392,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, - uint64_t groupId) { +void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { // for simple group by query without interval, all the tables belong to one group result. - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SAggOperatorInfo* pAggInfo = pOperator->info; + SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; @@ -1420,14 +1420,13 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggIn setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); } -void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) { - if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) { +static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + SAggOperatorInfo* pAggInfo = pOperator->info; + if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { return; } -#ifdef BUF_PAGE_DEBUG - qDebug("page_setbuf, groupId:%" PRIu64, groupId); -#endif - doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId); + + doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); // record the current active group id pAggInfo->groupId = groupId; @@ -1594,7 +1593,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG pBlock->info.version = pTaskInfo->version; blockDataCleanup(pBlock); - if (!hasDataInGroupInfo(pGroupResInfo)) { + if (!hasRemainResults(pGroupResInfo)) { return; } @@ -2931,7 +2930,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo); + setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId); setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true); code = doAggregateImpl(pOperator, pSup->pCtx); if (code != 0) { @@ -2966,7 +2965,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); doFilter(pAggInfo->pCondition, pInfo->pRes, NULL); - if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) { + if (!hasRemainResults(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); break; } @@ -3501,7 +3500,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - pInfo->groupId = INT32_MIN; + pInfo->groupId = UINT64_MAX; pInfo->pCondition = pCondition; pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; @@ -3513,6 +3512,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pTableScanInfo = downstream->info; + pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp; + pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup; + } + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -3825,6 +3830,19 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) return TDB_CODE_SUCCESS; } +bool groupbyTbname(SNodeList* pGroupList) { + bool bytbname = false; + if (LIST_LENGTH(pGroupList) > 0) { + SNode* p = nodesListGetNode(pGroupList, 0); + if (p->type == QUERY_NODE_FUNCTION) { + // partition by tbname/group by tbname + bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0); + } + } + + return bytbname; +} + int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) { if (group == NULL) { return TDB_CODE_SUCCESS; @@ -3851,12 +3869,21 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, return TSDB_CODE_OUT_OF_MEMORY; } + bool assignUid = groupbyTbname(group); + int32_t groupNum = 0; - for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); + + for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); - if (code != TSDB_CODE_SUCCESS) { - return code; + + if (assignUid) { + info->groupId = info->uid; + } else { + int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c83206b730..5c37cf01e8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -301,8 +301,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); - if (!hasRemain) { + if (!hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); break; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4a2f57d628..fc67f3da6c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -166,6 +166,67 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } +// this function is for table scanner to extract temporary results of upstream aggregate results. +static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + return NULL; + } + + int64_t buf[2] = {0}; + SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId); + + STableScanInfo* pTableScanInfo = pOperator->info; + + SResultRowPosition* p1 = + (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + + if (p1 == NULL) { + return NULL; + } + + *pPage = getBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, p1->pageId); + return (SResultRow*)((char*)(*pPage) + p1->offset); +} + +static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) { + STableScanInfo* pTableScanInfo = pOperator->info; + + if (pTableScanInfo->pdInfo.pExprSup == NULL) { + return TSDB_CODE_SUCCESS; + } + + SExprSupp* pSup1 = pTableScanInfo->pdInfo.pExprSup; + + SFilePage* pPage = NULL; + SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage); + + if (pRow == NULL) { + return TSDB_CODE_SUCCESS; + } + + bool notLoadBlock = true; + for (int32_t i = 0; i < pSup1->numOfExprs; ++i) { + int32_t functionId = pSup1->pCtx[i].functionId; + + SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->pdInfo.pExprSup->rowEntryInfoOffset); + + int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window); + if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) { + notLoadBlock = false; + break; + } + } + + // release buffer pages + releaseBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, pPage); + + if (notLoadBlock) { + *status = FUNC_DATA_REQUIRED_NOT_LOAD; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -178,7 +239,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca *status = pInfo->dataBlockLoadFlag; if (pTableScanInfo->pFilterNode != NULL || - overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { + overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) { (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; } @@ -232,6 +293,16 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); // todo filter data block according to the block sma data firstly + + doDynamicPruneDataBlock(pOperator, pBlockInfo, status); + if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->skipBlocks += 1; + + return TSDB_CODE_SUCCESS; + } + #if 0 if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { pCost->filterOutBlocks += 1; @@ -263,18 +334,20 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } } - int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->pFilterNode != NULL) { + int64_t st = taosGetTimestampUs(); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); - int64_t et = taosGetTimestampMs(); - pTableScanInfo->readRecorder.filterTime += (et - st); + double el = (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->readRecorder.filterTime += el; - if (pBlock->info.rows == 0) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - } else { - qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st)); + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); + } else { + qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); + } } return TSDB_CODE_SUCCESS; @@ -607,10 +680,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, } pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose +// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose +// pInfo->cond.order = TSDB_ORDER_DESC; + pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode); pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); @@ -1489,14 +1563,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->tqReader = pHandle->tqReader; } - if (pTSInfo->interval.interval > 0) { - pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pInfo->twAggSup.waterMark); + if (pTSInfo->pdInfo.interval.interval > 0) { + pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->pdInfo.interval, pInfo->twAggSup.waterMark); } else { pInfo->pUpdateInfo = NULL; } pInfo->pTableScanOp = pTableScanOp; - pInfo->interval = pTSInfo->interval; + pInfo->interval = pTSInfo->pdInfo.interval; pInfo->readHandle = *pHandle; pInfo->tableUid = pScanPhyNode->uid; @@ -2672,16 +2746,20 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc } } - int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->pFilterNode != NULL) { + int64_t st = taosGetTimestampMs(); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); - int64_t et = taosGetTimestampMs(); - pTableScanInfo->readRecorder.filterTime += (et - st); + double el = (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->readRecorder.filterTime += el; - if (pBlock->info.rows == 0) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); + } else { + qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 822c5c9502..fb1cb8dff5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1218,7 +1218,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -1257,7 +1257,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -1294,7 +1294,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBlock, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -1563,7 +1563,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; qDebug("===stream===single interval is done"); freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); @@ -2010,7 +2010,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -2053,7 +2053,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -2219,7 +2219,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // if (pOperator->status == OP_RES_TO_RETURN) { // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { + // if (pResBlock->info.rows == 0 || !hasRemainResults(&pSliceInfo->groupResInfo)) { // doSetOperatorCompleted(pOperator); // } // @@ -3860,7 +3860,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); @@ -4420,7 +4420,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } printDataBlock(pBInfo->pRes, "single state"); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 35669b3e42..0880f2f5c7 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -118,6 +118,7 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getFirstLastInfoSize(int32_t resBytes); +EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow); int32_t lastRowFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d5ffe39b08..aee1ef5c3c 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2328,6 +2328,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_LAST, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, + .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = lastFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 200df6bc80..0767c2e5a2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2700,6 +2700,22 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) return TSDB_CODE_SUCCESS; } +EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) { + SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*) pRes; + + // not initialized yet, data is required + if (pEntry == NULL) { + return FUNC_DATA_REQUIRED_DATA_LOAD; + } + + SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry); + if (pResult->hasResult && pResult->ts >= pTimeWindow->ekey) { + return FUNC_DATA_REQUIRED_NOT_LOAD; + } else { + return FUNC_DATA_REQUIRED_DATA_LOAD; + } +} + int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes; } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 2cd06be4b3..5fc4e7882c 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -115,7 +115,12 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { return TSDB_CODE_FAILED; } - return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow); + + if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) { + return FUNC_DATA_REQUIRED_DATA_LOAD; + } else { + return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow); + } } int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { diff --git a/tests/script/tsim/insert/basic0.sim b/tests/script/tsim/insert/basic0.sim index 1f3c93a4bf..7d91a77a83 100644 --- a/tests/script/tsim/insert/basic0.sim +++ b/tests/script/tsim/insert/basic0.sim @@ -54,7 +54,8 @@ print $data30 $data31 $data32 $data33 if $rows != 4 then return -1 endi -if $data01 != 10 then +if $data01 != 10 then + print expect 10, actual: $data01 return -1 endi if $data02 != 2.00000 then diff --git a/tests/script/tsim/parser/function.sim b/tests/script/tsim/parser/function.sim index cbfb59bcab..110901a6e1 100644 --- a/tests/script/tsim/parser/function.sim +++ b/tests/script/tsim/parser/function.sim @@ -500,11 +500,12 @@ if $rows != 2 then return -1 endi -sql select stddev(k), stddev(b), stddev(c),tbname, a from m1 group by tbname,a +sql select stddev(k), stddev(b), stddev(c),tbname, a from m1 group by tbname,a order by a asc if $rows != 2 then return -1 endi if $data00 != 1.414213562 then + print expect 1.414213562, actual: $data00 return -1 endi if $data01 != 14.142135624 then @@ -732,6 +733,7 @@ if $rows != 1 then return -1 endi if $data00 != 0.005633334 then + print expect 0.005633334, actual: $data00 return -1 endi diff --git a/tests/script/tsim/parser/groupby.sim b/tests/script/tsim/parser/groupby.sim index bf2c7cc7bf..c4c19ca211 100644 --- a/tests/script/tsim/parser/groupby.sim +++ b/tests/script/tsim/parser/groupby.sim @@ -681,12 +681,13 @@ if $data14 != 1 then return -1 endi -sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname, t1, t2 interval(1m) sliding(15s) order by tbname desc limit 1; +sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname, t1, t2 interval(1m) sliding(15s) order by tbname desc,_wstart asc limit 1; if $rows != 1 then return -1 endi if $data01 != 1.000000000 then + print expect 1.000000000, actual: $data01 return -1 endi if $data02 != t2 then diff --git a/tests/script/tsim/parser/limit_stb.sim b/tests/script/tsim/parser/limit_stb.sim index 0d0e4a8ea3..a0aff953cf 100644 --- a/tests/script/tsim/parser/limit_stb.sim +++ b/tests/script/tsim/parser/limit_stb.sim @@ -360,6 +360,7 @@ endi #sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from lm_stb0 where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 10:30:00.000' and c1 > 1 and c2 < 9 and c3 > 2 and c4 < 8 and c5 > 3 and c6 < 7 and c7 > 0 and c8 like '%5' and t1 > 3 and t1 < 6 limit 1 offset 0; sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from lm_stb0 where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 10:30:00.000' and c1 > 1 and c2 < 9 and c3 > 2 and c4 < 8 and c5 > 3 and c6 < 7 and c7 = true and c8 like '%5' and t1 > 3 and t1 < 6 limit 1 offset 0; if $rows != 1 then + print expect 1, actual: $rows return -1 endi if $data00 != 5 then