diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dc83015a89..fbaf2c3e3f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1404,7 +1404,7 @@ typedef struct STableScanAnalyzeInfo { uint32_t skipBlocks; uint32_t filterOutBlocks; double elapsedTime; - uint64_t filterTime; + double filterTime; } STableScanAnalyzeInfo; int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index e7ae3917f9..08b0f3abb2 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -123,7 +123,7 @@ void createNewTable(TAOS* pConn, int32_t index) { } taos_free_result(pRes); - for(int32_t i = 0; i < 1000; i += 20) { + for(int32_t i = 0; i < 100000; i += 20) { char sql[1024] = {0}; sprintf(sql, "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" @@ -154,7 +154,7 @@ TEST(testCase, driverInit_Test) { } TEST(testCase, connect_Test) { -// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg"); + taos_options(TSDB_OPTION_CONFIGDIR, "/home/lisa/Documents/workspace/tdengine/sim/dnode1/cfg"); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -501,7 +501,6 @@ TEST(testCase, show_vgroup_Test) { taos_close(pConn); } - TEST(testCase, create_multiple_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -665,6 +664,7 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } +#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -697,7 +697,7 @@ TEST(testCase, projection_query_tables) { } taos_free_result(pRes); - for(int32_t i = 0; i < 100; ++i) { + for(int32_t i = 0; i < 1; ++i) { printf("create table :%d\n", i); createNewTable(pConn, i); } @@ -723,6 +723,7 @@ TEST(testCase, projection_query_tables) { taos_close(pConn); } +#if 0 TEST(testCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -820,21 +821,8 @@ TEST(testCase, async_api_test) { getchar(); taos_close(pConn); } -#endif - TEST(testCase, update_test) { - - SInterval interval = {0}; - interval.offset = 8000; - interval.interval = 10000; - interval.sliding = 4000; - interval.intervalUnit = 's'; - interval.offsetUnit = 's'; - interval.slidingUnit = 's'; -// STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1630000000000); - STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1629999999999); - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -869,4 +857,8 @@ TEST(testCase, update_test) { taos_free_result(pRes); } } + +#endif + + #pragma GCC diagnostic pop diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index be97b20455..c3dad1ed7c 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -103,7 +103,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); -bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo); +bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d5486d62b1..05dd3806b9 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -297,6 +297,20 @@ enum { TABLE_SCAN__BLOCK_ORDER = 2, }; +typedef struct SAggSupporter { + SHashObj* pResultRowHashTable; // quick locate the window object for each result + char* keyBuf; // window key buffer + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row +} SAggSupporter; + +typedef struct { + // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. + SInterval interval; + SAggSupporter *pAggSup; + SExprSupp *pExprSup; // expr supporter of aggregate operator +} SAggOptrPushDownInfo; + typedef struct STableScanInfo { STsdbReader* dataReader; SReadHandle readHandle; @@ -312,12 +326,13 @@ typedef struct STableScanInfo { SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. +// SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. SSampleExecInfo sample; // sample execution info int32_t currentGroupId; int32_t currentTable; int8_t scanMode; int8_t noTable; + SAggOptrPushDownInfo pdInfo; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -504,13 +519,6 @@ typedef struct SOptrBasicInfo { SSDataBlock* pRes; } SOptrBasicInfo; -typedef struct SAggSupporter { - SHashObj* pResultRowHashTable; // quick locate the window object for each result - char* keyBuf; // window key buffer - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row -} SAggSupporter; - typedef struct SIntervalAggOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 65df7140f7..96c20d6136 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -137,7 +137,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } -bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo) { +bool hasRemainResults(SGroupResInfo* pGroupResInfo) { if (pGroupResInfo->pRows == NULL) { return false; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 525d7bf336..12501f9f7a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -141,8 +141,7 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx, int32_t numOfExprs); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, - uint64_t groupId); +static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); // setup the output buffer for each operator static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { @@ -1393,10 +1392,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, - uint64_t groupId) { +void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { // for simple group by query without interval, all the tables belong to one group result. - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SAggOperatorInfo* pAggInfo = pOperator->info; + SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; @@ -1420,14 +1420,13 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggIn setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); } -void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) { - if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) { +static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + SAggOperatorInfo* pAggInfo = pOperator->info; + if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { return; } -#ifdef BUF_PAGE_DEBUG - qDebug("page_setbuf, groupId:%" PRIu64, groupId); -#endif - doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId); + + doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); // record the current active group id pAggInfo->groupId = groupId; @@ -1594,7 +1593,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG pBlock->info.version = pTaskInfo->version; blockDataCleanup(pBlock); - if (!hasDataInGroupInfo(pGroupResInfo)) { + if (!hasRemainResults(pGroupResInfo)) { return; } @@ -2931,7 +2930,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo); + setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId); setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true); code = doAggregateImpl(pOperator, pSup->pCtx); if (code != 0) { @@ -2966,7 +2965,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); doFilter(pAggInfo->pCondition, pInfo->pRes, NULL); - if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) { + if (!hasRemainResults(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); break; } @@ -3356,7 +3355,6 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { pOperator->numOfDownstream = 0; } - cleanupExprSupp(&pOperator->exprSupp); taosMemoryFreeClear(pOperator); } @@ -3501,7 +3499,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - pInfo->groupId = INT32_MIN; + pInfo->groupId = UINT64_MAX; pInfo->pCondition = pCondition; pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; @@ -3513,6 +3511,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pTableScanInfo = downstream->info; + pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp; + pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup; + } + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c83206b730..5c37cf01e8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -301,8 +301,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); - if (!hasRemain) { + if (!hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); break; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f07256e88e..34fd9a07ee 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -166,6 +166,67 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } +// this function is for table scanner to extract temporary results of upstream aggregate results. +static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + return NULL; + } + + int64_t buf[2] = {0}; + SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId); + + STableScanInfo* pTableScanInfo = pOperator->info; + + SResultRowPosition* p1 = + (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + + if (p1 == NULL) { + return NULL; + } + + *pPage = getBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, p1->pageId); + return (SResultRow*)((char*)(*pPage) + p1->offset); +} + +static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) { + STableScanInfo* pTableScanInfo = pOperator->info; + + if (pTableScanInfo->pdInfo.pExprSup == NULL) { + return TSDB_CODE_SUCCESS; + } + + SExprSupp* pSup1 = pTableScanInfo->pdInfo.pExprSup; + + SFilePage* pPage = NULL; + SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage); + + if (pRow == NULL) { + return TSDB_CODE_SUCCESS; + } + + bool notLoadBlock = true; + for (int32_t i = 0; i < pSup1->numOfExprs; ++i) { + int32_t functionId = pSup1->pCtx[i].functionId; + + SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->pdInfo.pExprSup->rowEntryInfoOffset); + + int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window); + if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) { + notLoadBlock = false; + break; + } + } + + // release buffer pages + releaseBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, pPage); + + if (notLoadBlock) { + *status = FUNC_DATA_REQUIRED_NOT_LOAD; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -178,7 +239,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca *status = pInfo->dataBlockLoadFlag; if (pTableScanInfo->pFilterNode != NULL || - overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { + overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) { (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; } @@ -232,6 +293,16 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); // todo filter data block according to the block sma data firstly + + doDynamicPruneDataBlock(pOperator, pBlockInfo, status); + if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->skipBlocks += 1; + + return TSDB_CODE_SUCCESS; + } + #if 0 if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { pCost->filterOutBlocks += 1; @@ -263,18 +334,20 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } } - int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->pFilterNode != NULL) { + int64_t st = taosGetTimestampUs(); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); - int64_t et = taosGetTimestampMs(); - pTableScanInfo->readRecorder.filterTime += (et - st); + double el = (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->readRecorder.filterTime += el; - if (pBlock->info.rows == 0) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - } else { - qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st)); + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); + } else { + qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); + } } return TSDB_CODE_SUCCESS; @@ -602,11 +675,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } - pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose +// pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; + pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose + pInfo->cond.order = TSDB_ORDER_DESC; + pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode); pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); @@ -1484,14 +1558,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->tqReader = pHandle->tqReader; } - if (pTSInfo->interval.interval > 0) { - pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pInfo->twAggSup.waterMark); + if (pTSInfo->pdInfo.interval.interval > 0) { + pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->pdInfo.interval, pInfo->twAggSup.waterMark); } else { pInfo->pUpdateInfo = NULL; } pInfo->pTableScanOp = pTableScanOp; - pInfo->interval = pTSInfo->interval; + pInfo->interval = pTSInfo->pdInfo.interval; pInfo->readHandle = *pHandle; pInfo->tableUid = pScanPhyNode->uid; @@ -2667,16 +2741,20 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc } } - int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->pFilterNode != NULL) { + int64_t st = taosGetTimestampMs(); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); - int64_t et = taosGetTimestampMs(); - pTableScanInfo->readRecorder.filterTime += (et - st); + double el = (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->readRecorder.filterTime += el; - if (pBlock->info.rows == 0) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); + } else { + qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9a82b194a9..8dedffce3a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1216,7 +1216,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -1256,7 +1256,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -1293,7 +1293,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBlock, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -1562,7 +1562,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; qDebug("===stream===single interval is done"); freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); @@ -2009,7 +2009,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -2052,7 +2052,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -2210,7 +2210,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // if (pOperator->status == OP_RES_TO_RETURN) { // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { + // if (pResBlock->info.rows == 0 || !hasRemainResults(&pSliceInfo->groupResInfo)) { // doSetOperatorCompleted(pOperator); // } // @@ -3823,7 +3823,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); @@ -4379,7 +4379,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } printDataBlock(pBInfo->pRes, "single state"); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 35669b3e42..0880f2f5c7 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -118,6 +118,7 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getFirstLastInfoSize(int32_t resBytes); +EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow); int32_t lastRowFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 01a5e7997e..8b87a85a8d 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2312,6 +2312,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_LAST, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, + .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = lastFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fb82ab206c..5569cc3887 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2702,6 +2702,22 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) return TSDB_CODE_SUCCESS; } +EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) { + SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*) pRes; + + // not initialized yet, data is required + if (pEntry == NULL) { + return FUNC_DATA_REQUIRED_DATA_LOAD; + } + + SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry); + if (pResult->hasResult && pResult->ts >= pTimeWindow->ekey) { + return FUNC_DATA_REQUIRED_NOT_LOAD; + } else { + return FUNC_DATA_REQUIRED_DATA_LOAD; + } +} + int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes; } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 020fd648e1..cdf089385e 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -107,7 +107,12 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { return TSDB_CODE_FAILED; } - return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow); + + if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) { + return FUNC_DATA_REQUIRED_DATA_LOAD; + } else { + return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow); + } } int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {