From 78bbfcb4bcd34fe4eeb86c0eb7b6c0aa01918899 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Jul 2022 09:56:03 +0800 Subject: [PATCH 1/6] refactor: do some internal refactor and opt query performance. --- include/common/tmsg.h | 2 +- source/client/test/clientTests.cpp | 26 ++-- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/inc/executorimpl.h | 24 ++-- source/libs/executor/src/executil.c | 2 +- source/libs/executor/src/executorimpl.c | 36 ++--- source/libs/executor/src/groupoperator.c | 3 +- source/libs/executor/src/scanoperator.c | 128 ++++++++++++++---- source/libs/executor/src/timewindowoperator.c | 18 +-- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 1 + source/libs/function/src/builtinsimpl.c | 16 +++ source/libs/function/src/functionMgt.c | 7 +- 13 files changed, 185 insertions(+), 81 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dc83015a89..fbaf2c3e3f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1404,7 +1404,7 @@ typedef struct STableScanAnalyzeInfo { uint32_t skipBlocks; uint32_t filterOutBlocks; double elapsedTime; - uint64_t filterTime; + double filterTime; } STableScanAnalyzeInfo; int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index e7ae3917f9..08b0f3abb2 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -123,7 +123,7 @@ void createNewTable(TAOS* pConn, int32_t index) { } taos_free_result(pRes); - for(int32_t i = 0; i < 1000; i += 20) { + for(int32_t i = 0; i < 100000; i += 20) { char sql[1024] = {0}; sprintf(sql, "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" @@ -154,7 +154,7 @@ TEST(testCase, driverInit_Test) { } TEST(testCase, connect_Test) { -// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg"); + taos_options(TSDB_OPTION_CONFIGDIR, "/home/lisa/Documents/workspace/tdengine/sim/dnode1/cfg"); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { @@ -501,7 +501,6 @@ TEST(testCase, show_vgroup_Test) { taos_close(pConn); } - TEST(testCase, create_multiple_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -665,6 +664,7 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } +#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -697,7 +697,7 @@ TEST(testCase, projection_query_tables) { } taos_free_result(pRes); - for(int32_t i = 0; i < 100; ++i) { + for(int32_t i = 0; i < 1; ++i) { printf("create table :%d\n", i); createNewTable(pConn, i); } @@ -723,6 +723,7 @@ TEST(testCase, projection_query_tables) { taos_close(pConn); } +#if 0 TEST(testCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -820,21 +821,8 @@ TEST(testCase, async_api_test) { getchar(); taos_close(pConn); } -#endif - TEST(testCase, update_test) { - - SInterval interval = {0}; - interval.offset = 8000; - interval.interval = 10000; - interval.sliding = 4000; - interval.intervalUnit = 's'; - interval.offsetUnit = 's'; - interval.slidingUnit = 's'; -// STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1630000000000); - STimeWindow w = getAlignQueryTimeWindow(&interval, 0, 1629999999999); - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -869,4 +857,8 @@ TEST(testCase, update_test) { taos_free_result(pRes); } } + +#endif + + #pragma GCC diagnostic pop diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index be97b20455..c3dad1ed7c 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -103,7 +103,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); -bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo); +bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d5486d62b1..05dd3806b9 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -297,6 +297,20 @@ enum { TABLE_SCAN__BLOCK_ORDER = 2, }; +typedef struct SAggSupporter { + SHashObj* pResultRowHashTable; // quick locate the window object for each result + char* keyBuf; // window key buffer + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row +} SAggSupporter; + +typedef struct { + // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. + SInterval interval; + SAggSupporter *pAggSup; + SExprSupp *pExprSup; // expr supporter of aggregate operator +} SAggOptrPushDownInfo; + typedef struct STableScanInfo { STsdbReader* dataReader; SReadHandle readHandle; @@ -312,12 +326,13 @@ typedef struct STableScanInfo { SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. +// SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. SSampleExecInfo sample; // sample execution info int32_t currentGroupId; int32_t currentTable; int8_t scanMode; int8_t noTable; + SAggOptrPushDownInfo pdInfo; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -504,13 +519,6 @@ typedef struct SOptrBasicInfo { SSDataBlock* pRes; } SOptrBasicInfo; -typedef struct SAggSupporter { - SHashObj* pResultRowHashTable; // quick locate the window object for each result - char* keyBuf; // window key buffer - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row -} SAggSupporter; - typedef struct SIntervalAggOperatorInfo { // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; // basic info diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 65df7140f7..96c20d6136 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -137,7 +137,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } -bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo) { +bool hasRemainResults(SGroupResInfo* pGroupResInfo) { if (pGroupResInfo->pRows == NULL) { return false; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 525d7bf336..12501f9f7a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -141,8 +141,7 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SqlFunctionCtx* pCtx, int32_t numOfExprs); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, - uint64_t groupId); +static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); // setup the output buffer for each operator static bool hasNull(SColumn* pColumn, SColumnDataAgg* pStatis) { @@ -1393,10 +1392,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR } } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggInfo, int32_t numOfOutput, - uint64_t groupId) { +void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { // for simple group by query without interval, all the tables belong to one group result. - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SAggOperatorInfo* pAggInfo = pOperator->info; + SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; @@ -1420,14 +1420,13 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, SAggOperatorInfo* pAggIn setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); } -void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId, SAggOperatorInfo* pAggInfo) { - if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == groupId) { +static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + SAggOperatorInfo* pAggInfo = pOperator->info; + if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { return; } -#ifdef BUF_PAGE_DEBUG - qDebug("page_setbuf, groupId:%" PRIu64, groupId); -#endif - doSetTableGroupOutputBuf(pOperator, pAggInfo, numOfOutput, groupId); + + doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); // record the current active group id pAggInfo->groupId = groupId; @@ -1594,7 +1593,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG pBlock->info.version = pTaskInfo->version; blockDataCleanup(pBlock); - if (!hasDataInGroupInfo(pGroupResInfo)) { + if (!hasRemainResults(pGroupResInfo)) { return; } @@ -2931,7 +2930,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId, pAggInfo); + setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.groupId); setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true); code = doAggregateImpl(pOperator, pSup->pCtx); if (code != 0) { @@ -2966,7 +2965,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); doFilter(pAggInfo->pCondition, pInfo->pRes, NULL); - if (!hasDataInGroupInfo(&pAggInfo->groupResInfo)) { + if (!hasRemainResults(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); break; } @@ -3356,7 +3355,6 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { pOperator->numOfDownstream = 0; } - cleanupExprSupp(&pOperator->exprSupp); taosMemoryFreeClear(pOperator); } @@ -3501,7 +3499,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - pInfo->groupId = INT32_MIN; + pInfo->groupId = UINT64_MAX; pInfo->pCondition = pCondition; pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; @@ -3513,6 +3511,12 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pTableScanInfo = downstream->info; + pTableScanInfo->pdInfo.pExprSup = &pOperator->exprSupp; + pTableScanInfo->pdInfo.pAggSup = &pInfo->aggSup; + } + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index c83206b730..5c37cf01e8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -301,8 +301,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); - if (!hasRemain) { + if (!hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); break; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f07256e88e..34fd9a07ee 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -166,6 +166,67 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn return false; } +// this function is for table scanner to extract temporary results of upstream aggregate results. +static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t groupId, SFilePage** pPage) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + return NULL; + } + + int64_t buf[2] = {0}; + SET_RES_WINDOW_KEY((char*)buf, &groupId, sizeof(groupId), groupId); + + STableScanInfo* pTableScanInfo = pOperator->info; + + SResultRowPosition* p1 = + (SResultRowPosition*)taosHashGet(pTableScanInfo->pdInfo.pAggSup->pResultRowHashTable, buf, GET_RES_WINDOW_KEY_LEN(sizeof(groupId))); + + if (p1 == NULL) { + return NULL; + } + + *pPage = getBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, p1->pageId); + return (SResultRow*)((char*)(*pPage) + p1->offset); +} + +static int32_t doDynamicPruneDataBlock(SOperatorInfo* pOperator, SDataBlockInfo* pBlockInfo, uint32_t* status) { + STableScanInfo* pTableScanInfo = pOperator->info; + + if (pTableScanInfo->pdInfo.pExprSup == NULL) { + return TSDB_CODE_SUCCESS; + } + + SExprSupp* pSup1 = pTableScanInfo->pdInfo.pExprSup; + + SFilePage* pPage = NULL; + SResultRow* pRow = getTableGroupOutputBuf(pOperator, pBlockInfo->groupId, &pPage); + + if (pRow == NULL) { + return TSDB_CODE_SUCCESS; + } + + bool notLoadBlock = true; + for (int32_t i = 0; i < pSup1->numOfExprs; ++i) { + int32_t functionId = pSup1->pCtx[i].functionId; + + SResultRowEntryInfo* pEntry = getResultEntryInfo(pRow, i, pTableScanInfo->pdInfo.pExprSup->rowEntryInfoOffset); + + int32_t reqStatus = fmFuncDynDataRequired(functionId, pEntry, &pBlockInfo->window); + if (reqStatus != FUNC_DATA_REQUIRED_NOT_LOAD) { + notLoadBlock = false; + break; + } + } + + // release buffer pages + releaseBufPage(pTableScanInfo->pdInfo.pAggSup->pResultBuf, pPage); + + if (notLoadBlock) { + *status = FUNC_DATA_REQUIRED_NOT_LOAD; + } + + return TSDB_CODE_SUCCESS; +} + static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -178,7 +239,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca *status = pInfo->dataBlockLoadFlag; if (pTableScanInfo->pFilterNode != NULL || - overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { + overlapWithTimeWindow(&pTableScanInfo->pdInfo.interval, &pBlock->info, pTableScanInfo->cond.order)) { (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; } @@ -232,6 +293,16 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); // todo filter data block according to the block sma data firstly + + doDynamicPruneDataBlock(pOperator, pBlockInfo, status); + if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->skipBlocks += 1; + + return TSDB_CODE_SUCCESS; + } + #if 0 if (!doFilterByBlockStatistics(pBlock->pBlockStatis, pTableScanInfo->pCtx, pBlockInfo->rows)) { pCost->filterOutBlocks += 1; @@ -263,18 +334,20 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca } } - int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->pFilterNode != NULL) { + int64_t st = taosGetTimestampUs(); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); - int64_t et = taosGetTimestampMs(); - pTableScanInfo->readRecorder.filterTime += (et - st); + double el = (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->readRecorder.filterTime += el; - if (pBlock->info.rows == 0) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - } else { - qDebug("%s data block filter out, elapsed time:%" PRId64, GET_TASKID(pTaskInfo), (et - st)); + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); + } else { + qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); + } } return TSDB_CODE_SUCCESS; @@ -602,11 +675,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } - pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose +// pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; + pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose + pInfo->cond.order = TSDB_ORDER_DESC; + pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode); pInfo->readHandle = *readHandle; - pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); @@ -1484,14 +1558,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->tqReader = pHandle->tqReader; } - if (pTSInfo->interval.interval > 0) { - pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->interval, pInfo->twAggSup.waterMark); + if (pTSInfo->pdInfo.interval.interval > 0) { + pInfo->pUpdateInfo = updateInfoInitP(&pTSInfo->pdInfo.interval, pInfo->twAggSup.waterMark); } else { pInfo->pUpdateInfo = NULL; } pInfo->pTableScanOp = pTableScanOp; - pInfo->interval = pTSInfo->interval; + pInfo->interval = pTSInfo->pdInfo.interval; pInfo->readHandle = *pHandle; pInfo->tableUid = pScanPhyNode->uid; @@ -2667,16 +2741,20 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc } } - int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + if (pTableScanInfo->pFilterNode != NULL) { + int64_t st = taosGetTimestampMs(); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); - int64_t et = taosGetTimestampMs(); - pTableScanInfo->readRecorder.filterTime += (et - st); + double el = (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->readRecorder.filterTime += el; - if (pBlock->info.rows == 0) { - pCost->filterOutBlocks += 1; - qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), - pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); + } else { + qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); + } } return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 9a82b194a9..8dedffce3a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1216,7 +1216,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -1256,7 +1256,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -1293,7 +1293,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBlock, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -1562,7 +1562,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; qDebug("===stream===single interval is done"); freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); @@ -2009,7 +2009,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -2052,7 +2052,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pBInfo->pRes, NULL); - bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { doSetOperatorCompleted(pOperator); break; @@ -2210,7 +2210,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // if (pOperator->status == OP_RES_TO_RETURN) { // // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - // if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) { + // if (pResBlock->info.rows == 0 || !hasRemainResults(&pSliceInfo->groupResInfo)) { // doSetOperatorCompleted(pOperator); // } // @@ -3823,7 +3823,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); @@ -4379,7 +4379,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } printDataBlock(pBInfo->pRes, "single state"); diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 35669b3e42..0880f2f5c7 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -118,6 +118,7 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getFirstLastInfoSize(int32_t resBytes); +EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow); int32_t lastRowFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 01a5e7997e..8b87a85a8d 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2312,6 +2312,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_LAST, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, + .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, .processFunc = lastFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fb82ab206c..5569cc3887 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2702,6 +2702,22 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) return TSDB_CODE_SUCCESS; } +EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow) { + SResultRowEntryInfo* pEntry = (SResultRowEntryInfo*) pRes; + + // not initialized yet, data is required + if (pEntry == NULL) { + return FUNC_DATA_REQUIRED_DATA_LOAD; + } + + SFirstLastRes* pResult = GET_ROWCELL_INTERBUF(pEntry); + if (pResult->hasResult && pResult->ts >= pTimeWindow->ekey) { + return FUNC_DATA_REQUIRED_NOT_LOAD; + } else { + return FUNC_DATA_REQUIRED_DATA_LOAD; + } +} + int32_t getFirstLastInfoSize(int32_t resBytes) { return sizeof(SFirstLastRes) + resBytes; } bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 020fd648e1..cdf089385e 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -107,7 +107,12 @@ EFuncDataRequired fmFuncDynDataRequired(int32_t funcId, void* pRes, STimeWindow* if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { return TSDB_CODE_FAILED; } - return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow); + + if (funcMgtBuiltins[funcId].dynDataRequiredFunc == NULL) { + return FUNC_DATA_REQUIRED_DATA_LOAD; + } else { + return funcMgtBuiltins[funcId].dynDataRequiredFunc(pRes, pTimeWindow); + } } int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) { From 26e5a7c796a09f9dd3d375d58d308d8af8fa03e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Jul 2022 13:54:14 +0800 Subject: [PATCH 2/6] refactor: do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executor.c | 15 +++++++++---- source/libs/executor/src/executorimpl.c | 30 +++++++++++++++++++++---- 3 files changed, 38 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 05dd3806b9..7aa85279b1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1026,6 +1026,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); +bool groupbyTbname(SNodeList* pGroupList); int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); SSDataBlock* createSpecialDataBlock(EStreamType type); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8b1cbb5ae8..0abc6b0624 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -248,9 +248,11 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } // todo refactor STableList + bool assignUid = false; size_t bufLen = (pScanInfo->pGroupTags != NULL) ? getTableTagsBufLen(pScanInfo->pGroupTags) : 0; char* keyBuf = NULL; if (bufLen > 0) { + assignUid = groupbyTbname(pScanInfo->pGroupTags); keyBuf = taosMemoryMalloc(bufLen); if (keyBuf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -262,14 +264,19 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; if (bufLen > 0) { - code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, - &keyInfo.groupId); - if (code != TSDB_CODE_SUCCESS) { - return code; + if (assignUid) { + keyInfo.groupId = keyInfo.uid; + } else { + code = getGroupIdFromTagsVal(pScanInfo->readHandle.meta, keyInfo.uid, pScanInfo->pGroupTags, keyBuf, + &keyInfo.groupId); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); + taosHashPut(pTaskInfo->tableqinfoList.map, &keyInfo.uid, sizeof(keyInfo.uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); } if (keyBuf != NULL) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 12501f9f7a..fa27dacafa 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3829,6 +3829,19 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) return TDB_CODE_SUCCESS; } +bool groupbyTbname(SNodeList* pGroupList) { + bool bytbname = false; + if (LIST_LENGTH(pGroupList) == 0) { + SNode* p = nodesListGetNode(pGroupList, 0); + if (p->type == QUERY_NODE_FUNCTION) { + // partition by tbname/group by tbname + bytbname = (strcmp(((struct SFunctionNode*)p)->functionName, "tbname") == 0); + } + } + + return bytbname; +} + int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) { if (group == NULL) { return TDB_CODE_SUCCESS; @@ -3855,12 +3868,21 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, return TSDB_CODE_OUT_OF_MEMORY; } + bool assignUid = groupbyTbname(group); + int32_t groupNum = 0; - for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); + + for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); - if (code != TSDB_CODE_SUCCESS) { - return code; + + if (assignUid) { + info->groupId = info->uid; + } else { + int32_t code = getGroupIdFromTagsVal(pHandle->meta, info->uid, group, keyBuf, &info->groupId); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); From 60c5b2beac42efaec8fa5fcde11a3501ad8f10c1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Jul 2022 17:53:30 +0800 Subject: [PATCH 3/6] fix(query): optimize the row merge procedure for files. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 97 ++++++++++++++++++++------ 1 file changed, 76 insertions(+), 21 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 03985654f8..c259a7cfe0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -145,7 +145,8 @@ static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanI SRowMerger* pMerger); static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); -static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); +static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); +static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); @@ -691,16 +692,13 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SBlock* pBlock = getCurrentBlock(pBlockIter); SSDataBlock* pResBlock = pReader->pResBlock; - int32_t numOfCols = blockDataGetNumOfCols(pResBlock); + int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - int64_t st = taosGetTimestampUs(); - SColVal cv = {0}; - int32_t colIndex = 0; - + int64_t st = taosGetTimestampUs(); bool asc = ASCENDING_TRAVERSE(pReader->order); int32_t step = asc ? 1 : -1; @@ -724,7 +722,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn i += 1; } - while (i < numOfCols && colIndex < taosArrayGetSize(pBlockData->aIdx)) { + int32_t colIndex = 0; + int32_t num = taosArrayGetSize(pBlockData->aIdx); + while (i < numOfOutputCols && colIndex < num) { rowIndex = 0; pColData = taosArrayGet(pResBlock->pDataBlock, i); @@ -744,7 +744,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn i += 1; } - while (i < numOfCols) { + while (i < numOfOutputCols) { pColData = taosArrayGet(pResBlock->pDataBlock, i); colDataAppendNNULL(pColData, 0, remain); i += 1; @@ -1256,7 +1256,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } tRowMergerClear(&merge); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); taosMemoryFree(pTSRow); return TSDB_CODE_SUCCESS; @@ -1300,7 +1300,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* } tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } else { // key > ik.ts || key > k.ts ASSERT(key != ik.ts); @@ -1309,7 +1309,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [4] ik.ts < k.ts <= key if (ik.ts < k.ts) { doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1317,7 +1317,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [6] k.ts < ik.ts <= key if (k.ts < ik.ts) { doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1326,7 +1326,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ASSERT(key > ik.ts && key > k.ts); doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } } @@ -1350,7 +1350,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* } tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } else { ASSERT(ik.ts != k.ts); // this case has been included in the previous if branch @@ -1359,7 +1359,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [4] ik.ts > key >= k.ts if (ik.ts > key) { doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1371,7 +1371,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } @@ -1383,7 +1383,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); return TSDB_CODE_SUCCESS; } } @@ -1438,6 +1438,21 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } // imem & mem are all empty, only file exist + + // opt version + // 1. it is not a border point + // 2. the direct next point is not an duplicated timestamp + if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) || + (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) { + int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1; + int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; + if (nextKey != key) { + // merge is not needed + doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); + return TSDB_CODE_SUCCESS; + } + } + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); STSRow* pTSRow = NULL; @@ -1446,7 +1461,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); - doAppendOneRow(pReader->pResBlock, pReader, pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); taosMemoryFree(pTSRow); tRowMergerClear(&merge); @@ -2201,7 +2216,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc int32_t step = asc ? 1 : -1; pDumpInfo->rowIndex += step; - if (pDumpInfo->rowIndex <= pBlockData->nRow - 1) { + if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) ||(pDumpInfo->rowIndex >= 0 && !asc)) { pDumpInfo->rowIndex = doMergeRowsInFileBlockImpl(pBlockData, pDumpInfo->rowIndex, key, pMerger, &pReader->verRange, step); } @@ -2325,7 +2340,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR return TSDB_CODE_SUCCESS; } -int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) { +int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow) { int32_t numOfRows = pBlock->info.rows; int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); @@ -2369,6 +2384,46 @@ int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow return TSDB_CODE_SUCCESS; } +int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) { + int32_t i = 0, j = 0; + int32_t outputRowIndex = pResBlock->info.rows; + + SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; + + SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); + if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]); + } + + SColVal cv = {0}; + int32_t numOfInputCols = taosArrayGetSize(pBlockData->aIdx); + int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock); + + while(i < numOfOutputCols && j < numOfInputCols) { + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); + SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); + + if (pData->cid == pCol->info.colId) { + tColDataGetValue(pData, j, &cv); + doCopyColVal(pCol, outputRowIndex++, i, &cv, pSupInfo); + j += 1; + } else { // the specified column does not exist in file block, fill with null data + colDataAppendNULL(pCol, outputRowIndex); + } + + i += 1; + } + + while (i < numOfOutputCols) { + SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); + colDataAppendNULL(pCol, rowIndex); + i += 1; + } + + pResBlock->info.rows += 1; + return TSDB_CODE_SUCCESS; +} + int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader) { SSDataBlock* pBlock = pReader->pResBlock; @@ -2380,7 +2435,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e break; } - doAppendOneRow(pBlock, pReader, pTSRow); + doAppendRowFromTSRow(pBlock, pReader, pTSRow); taosMemoryFree(pTSRow); // no data in buffer, return immediately From cd5491fea11ad24322ceb16f2440292e208d1be6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Jul 2022 19:00:12 +0800 Subject: [PATCH 4/6] fix(query): remove invalid code. --- .gitignore | 2 ++ source/libs/executor/src/scanoperator.c | 6 +++--- tests/script/tsim/insert/basic0.sim | 3 ++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index d7fcb019ae..80fd850cd4 100644 --- a/.gitignore +++ b/.gitignore @@ -48,6 +48,8 @@ pysim/ *.out *DS_Store tests/script/api/batchprepare +taosadapter +taosadapter-debug # Doxygen Generated files html/ diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 30a9b6d0e5..fc67f3da6c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -679,9 +679,9 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pSup->pCtx = createSqlFunctionCtx(pSup->pExprInfo, pSup->numOfExprs, &pSup->rowEntryInfoOffset); } -// pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; - pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose - pInfo->cond.order = TSDB_ORDER_DESC; + pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; +// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose +// pInfo->cond.order = TSDB_ORDER_DESC; pInfo->pdInfo.interval = extractIntervalInfo(pTableScanNode); pInfo->readHandle = *readHandle; diff --git a/tests/script/tsim/insert/basic0.sim b/tests/script/tsim/insert/basic0.sim index 1f3c93a4bf..7d91a77a83 100644 --- a/tests/script/tsim/insert/basic0.sim +++ b/tests/script/tsim/insert/basic0.sim @@ -54,7 +54,8 @@ print $data30 $data31 $data32 $data33 if $rows != 4 then return -1 endi -if $data01 != 10 then +if $data01 != 10 then + print expect 10, actual: $data01 return -1 endi if $data02 != 2.00000 then From 0c1e30dc56cd03aac2b3c600fb42aa35df05f268 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Jul 2022 19:34:13 +0800 Subject: [PATCH 5/6] fix(query): step forward index --- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c259a7cfe0..c5667cce41 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1445,10 +1445,10 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) || (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) { int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1; - int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; - if (nextKey != key) { - // merge is not needed + int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; + if (nextKey != key) { // merge is not needed doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); + pDumpInfo->rowIndex += step; return TSDB_CODE_SUCCESS; } } From 2261d689e4155515540beaa6866eae7ef93a4de6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Jul 2022 21:09:33 +0800 Subject: [PATCH 6/6] fix(query): opt read data from file block. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 7 ++++--- source/libs/executor/src/executorimpl.c | 1 + tests/script/tsim/parser/function.sim | 4 +++- tests/script/tsim/parser/groupby.sim | 3 ++- tests/script/tsim/parser/limit_stb.sim | 1 + 5 files changed, 11 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c5667cce41..072d15d715 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2393,6 +2393,7 @@ int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBloc SColumnInfoData* pColData = taosArrayGet(pResBlock->pDataBlock, i); if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { colDataAppendInt64(pColData, outputRowIndex, &pBlockData->aTSKEY[rowIndex]); + i += 1; } SColVal cv = {0}; @@ -2404,8 +2405,8 @@ int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBloc SColData* pData = tBlockDataGetColDataByIdx(pBlockData, j); if (pData->cid == pCol->info.colId) { - tColDataGetValue(pData, j, &cv); - doCopyColVal(pCol, outputRowIndex++, i, &cv, pSupInfo); + tColDataGetValue(pData, rowIndex, &cv); + doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo); j += 1; } else { // the specified column does not exist in file block, fill with null data colDataAppendNULL(pCol, outputRowIndex); @@ -2416,7 +2417,7 @@ int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBloc while (i < numOfOutputCols) { SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); - colDataAppendNULL(pCol, rowIndex); + colDataAppendNULL(pCol, outputRowIndex); i += 1; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 46e1b198b6..064c747c88 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3355,6 +3355,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { pOperator->numOfDownstream = 0; } + cleanupExprSupp(&pOperator->exprSupp); taosMemoryFreeClear(pOperator); } diff --git a/tests/script/tsim/parser/function.sim b/tests/script/tsim/parser/function.sim index cbfb59bcab..110901a6e1 100644 --- a/tests/script/tsim/parser/function.sim +++ b/tests/script/tsim/parser/function.sim @@ -500,11 +500,12 @@ if $rows != 2 then return -1 endi -sql select stddev(k), stddev(b), stddev(c),tbname, a from m1 group by tbname,a +sql select stddev(k), stddev(b), stddev(c),tbname, a from m1 group by tbname,a order by a asc if $rows != 2 then return -1 endi if $data00 != 1.414213562 then + print expect 1.414213562, actual: $data00 return -1 endi if $data01 != 14.142135624 then @@ -732,6 +733,7 @@ if $rows != 1 then return -1 endi if $data00 != 0.005633334 then + print expect 0.005633334, actual: $data00 return -1 endi diff --git a/tests/script/tsim/parser/groupby.sim b/tests/script/tsim/parser/groupby.sim index bf2c7cc7bf..c4c19ca211 100644 --- a/tests/script/tsim/parser/groupby.sim +++ b/tests/script/tsim/parser/groupby.sim @@ -681,12 +681,13 @@ if $data14 != 1 then return -1 endi -sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname, t1, t2 interval(1m) sliding(15s) order by tbname desc limit 1; +sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname, t1, t2 interval(1m) sliding(15s) order by tbname desc,_wstart asc limit 1; if $rows != 1 then return -1 endi if $data01 != 1.000000000 then + print expect 1.000000000, actual: $data01 return -1 endi if $data02 != t2 then diff --git a/tests/script/tsim/parser/limit_stb.sim b/tests/script/tsim/parser/limit_stb.sim index 0d0e4a8ea3..a0aff953cf 100644 --- a/tests/script/tsim/parser/limit_stb.sim +++ b/tests/script/tsim/parser/limit_stb.sim @@ -360,6 +360,7 @@ endi #sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from lm_stb0 where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 10:30:00.000' and c1 > 1 and c2 < 9 and c3 > 2 and c4 < 8 and c5 > 3 and c6 < 7 and c7 > 0 and c8 like '%5' and t1 > 3 and t1 < 6 limit 1 offset 0; sql select max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9) from lm_stb0 where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 10:30:00.000' and c1 > 1 and c2 < 9 and c3 > 2 and c4 < 8 and c5 > 3 and c6 < 7 and c7 = true and c8 like '%5' and t1 > 3 and t1 < 6 limit 1 offset 0; if $rows != 1 then + print expect 1, actual: $rows return -1 endi if $data00 != 5 then