diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index cac8f73042..a65f30f023 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1035,6 +1035,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { return TSDB_CODE_SUCCESS; } +#if 0 typedef struct SHelper { int32_t index; union { @@ -1083,59 +1084,20 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) { const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param; - // SSDataBlock* pDataBlock = pHelper->pDataBlock; - SHelper* left = (SHelper*)p1; SHelper* right = (SHelper*)p2; SArray* pInfo = pHelper->orderInfo; int32_t offset = 0; - // for(int32_t i = 0; i < pInfo->size; ++i) { - // SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, 0); - // SColumnInfoData* pColInfoData = pOrder->pColData;//TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex); - - // if (pColInfoData->hasNull) { - // bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg); - // bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg); - // if (leftNull && rightNull) { - // continue; // continue to next slot - // } - // - // if (rightNull) { - // return pHelper->nullFirst? 1:-1; - // } - // - // if (leftNull) { - // return pHelper->nullFirst? -1:1; - // } - // } - - // void* left1 = colDataGetData(pColInfoData, left); - // void* right1 = colDataGetData(pColInfoData, right); - - // switch(pColInfoData->info.type) { - // case TSDB_DATA_TYPE_INT: { int32_t leftx = *(int32_t*)left->pData; //*(int32_t*)(left->pData + offset); int32_t rightx = *(int32_t*)right->pData; //*(int32_t*)(right->pData + offset); - // offset += pColInfoData->info.bytes; if (leftx == rightx) { - // break; return 0; } else { - // if (pOrder->order == TSDB_ORDER_ASC) { return (leftx < rightx) ? -1 : 1; - // } else { - // return (leftx < rightx)? 1:-1; - // } } - // } - // default: - // assert(0); - // } - // } - return 0; } @@ -1179,6 +1141,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF // destroyTupleIndex(index); return 0; } +#endif void blockDataCleanup(SSDataBlock* pDataBlock) { blockDataEmpty(pDataBlock); @@ -1887,6 +1850,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { return buf; } +#if 0 void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) { SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*)); taosArrayPush(dataBlocks, &pBlock); @@ -1979,6 +1943,8 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { } } +#endif + // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) { int32_t size = 2048; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4eceefcd51..a6353f722a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -175,7 +175,6 @@ struct SExecTaskInfo { int64_t version; // used for stream to record wal version, why not move to sschemainfo SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; - STableListInfo* pTableInfoList; // this is a table list const char* sql; // query sql string jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] @@ -323,6 +322,8 @@ typedef struct STableScanBase { int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; SLimitInfo limitInfo; + // there are more than one table list exists in one task, if only one vnode exists. + STableListInfo* pTableListInfo; } STableScanBase; typedef struct STableScanInfo { @@ -363,11 +364,12 @@ typedef struct STableMergeScanInfo { } STableMergeScanInfo; typedef struct STagScanInfo { - SColumnInfo* pCols; - SSDataBlock* pRes; - SColMatchInfo matchInfo; - int32_t curPos; - SReadHandle readHandle; + SColumnInfo* pCols; + SSDataBlock* pRes; + SColMatchInfo matchInfo; + int32_t curPos; + SReadHandle readHandle; + STableListInfo* pTableListInfo; } STagScanInfo; typedef enum EStreamScanMode { @@ -483,10 +485,11 @@ typedef struct SStreamScanInfo { } SStreamScanInfo; typedef struct { - SVnode* vnode; - SSDataBlock pRes; // result SSDataBlock - STsdbReader* dataReader; - SSnapContext* sContext; + SVnode* vnode; + SSDataBlock pRes; // result SSDataBlock + STsdbReader* dataReader; + SSnapContext* sContext; + STableListInfo* pTableListInfo; } SStreamRawScanInfo; typedef struct STableCountScanSupp { @@ -515,6 +518,16 @@ typedef struct SOptrBasicInfo { bool mergeResultBlock; } SOptrBasicInfo; +typedef struct SAggOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + STableQueryInfo* current; + uint64_t groupId; + SGroupResInfo groupResInfo; + SExprSupp scalarExprSup; + bool groupKeyOptimized; +} SAggOperatorInfo; + typedef struct SIntervalAggOperatorInfo { SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter @@ -731,6 +744,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3 STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); +SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); @@ -753,11 +767,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // clang-format off SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableList, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); @@ -773,7 +787,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); @@ -787,9 +801,9 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); @@ -834,9 +848,11 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); char* buildTaskId(uint64_t taskId, uint64_t queryId); +SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo); + int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); -int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); +int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, @@ -850,7 +866,6 @@ bool isInTimeWindow(STimeWindow* pWin, TSKEY ts, int64_t gap); bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); -bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup); void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid, uint64_t* pGp, void* pTbName); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c new file mode 100644 index 0000000000..a26e3ace7b --- /dev/null +++ b/source/libs/executor/src/aggregateoperator.c @@ -0,0 +1,563 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "os.h" +#include "querynodes.h" +#include "tfill.h" +#include "tname.h" + +#include "tdatablock.h" +#include "tglobal.h" +#include "tmsg.h" +#include "ttime.h" + +#include "executorimpl.h" +#include "index.h" +#include "query.h" +#include "tcompare.h" +#include "thash.h" +#include "ttypes.h" +#include "vnode.h" + +typedef struct { + bool hasAgg; + int32_t numOfRows; + int32_t startOffset; +} SFunctionCtxStatus; + +static void destroyAggOperatorInfo(void* param); +static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); + +static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock); +static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock); + +static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator); +static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx); +static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator); + +static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, + const char* pKey); + +static int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size); + +static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); + +static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); +static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus); + +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, + SExecTaskInfo* pTaskInfo) { + SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); + initBasicInfo(&pInfo->binfo, pResBlock); + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + initResultSizeInfo(&pOperator->resultInfo, 4096); + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, + pTaskInfo->streamInfo.pState); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + if (pAggNode->pExprs != NULL) { + pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + } + + code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; + pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized; + pInfo->groupId = UINT64_MAX; + + setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, + optrDefaultBufFn, NULL); + + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pTableScanInfo = downstream->info; + pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp; + pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup; + } + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + + _error: + if (pInfo != NULL) { + destroyAggOperatorInfo(pInfo); + } + + if (pOperator != NULL) { + cleanupExprSupp(&pOperator->exprSupp); + } + + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +} + +void destroyAggOperatorInfo(void* param) { + SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; + cleanupBasicInfo(&pInfo->binfo); + + cleanupAggSup(&pInfo->aggSup); + cleanupExprSupp(&pInfo->scalarExprSup); + cleanupGroupResInfo(&pInfo->groupResInfo); + taosMemoryFreeClear(param); +} + +// this is a blocking operator +int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SAggOperatorInfo* pAggInfo = pOperator->info; + + SExprSupp* pSup = &pOperator->exprSupp; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + + int64_t st = taosGetTimestampUs(); + + int32_t order = TSDB_ORDER_ASC; + int32_t scanFlag = MAIN_SCAN; + + bool hasValidBlock = false; + bool blockAllocated = false; + + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + if (!hasValidBlock) { + createDataBlockForEmptyInput(pOperator, &pBlock); + if (pBlock == NULL) { + break; + } + blockAllocated = true; + } else { + break; + } + } + hasValidBlock = true; + + int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } + + // there is an scalar expression that needs to be calculated before apply the group aggregation. + if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) { + SExprSupp* pSup1 = &pAggInfo->scalarExprSup; + code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL); + if (code != TSDB_CODE_SUCCESS) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } + } + + // the pDataBlock are always the same one, no need to call this again + setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); + setInputDataBlock(pSup, pBlock, order, scanFlag, true); + code = doAggregateImpl(pOperator, pSup->pCtx); + if (code != 0) { + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + T_LONG_JMP(pTaskInfo->env, code); + } + + destroyDataBlockForEmptyInput(blockAllocated, &pBlock); + } + + // the downstream operator may return with error code, so let's check the code before generating results. + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); + } + + initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); + OPTR_SET_OPENED(pOperator); + + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + return pTaskInfo->code; +} + +SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { + SAggOperatorInfo* pAggInfo = pOperator->info; + SOptrBasicInfo* pInfo = &pAggInfo->binfo; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + pTaskInfo->code = pOperator->fpSet._openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + setOperatorCompleted(pOperator); + return NULL; + } + + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + while (1) { + doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); + doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + + if (!hasRemainResults(&pAggInfo->groupResInfo)) { + setOperatorCompleted(pOperator); + break; + } + + if (pInfo->pRes->info.rows > 0) { + break; + } + } + + size_t rows = blockDataGetNumOfRows(pInfo->pRes); + pOperator->resultInfo.totalRows += rows; + + return (rows == 0) ? NULL : pInfo->pRes; +} + +int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { + for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) { + if (functionNeedToExecute(&pCtx[k])) { + // todo add a dummy funtion to avoid process check + if (pCtx[k].fpSet.process == NULL) { + continue; + } + + int32_t code = pCtx[k].fpSet.process(&pCtx[k]); + if (code != TSDB_CODE_SUCCESS) { + qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); + return code; + } + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { + if (!tsCountAlwaysReturnValue) { + return TSDB_CODE_SUCCESS; + } + + SAggOperatorInfo* pAggInfo = pOperator->info; + if (pAggInfo->groupKeyOptimized) { + return TSDB_CODE_SUCCESS; + } + + SOperatorInfo* downstream = pOperator->pDownstream[0]; + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || + (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && + ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) { + return TSDB_CODE_SUCCESS; + } + + SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; + bool hasCountFunc = false; + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; + if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) || + (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) { + hasCountFunc = true; + break; + } + } + + if (!hasCountFunc) { + return TSDB_CODE_SUCCESS; + } + + SSDataBlock* pBlock = createDataBlock(); + pBlock->info.rows = 1; + pBlock->info.capacity = 0; + + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SColumnInfoData colInfo = {0}; + colInfo.hasNull = true; + colInfo.info.type = TSDB_DATA_TYPE_NULL; + colInfo.info.bytes = 1; + + SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i]; + for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { + SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; + if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { + int32_t slotId = pFuncParam->pCol->slotId; + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + if (slotId >= numOfCols) { + taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1); + for (int32_t k = numOfCols; k < slotId + 1; ++k) { + taosArrayPush(pBlock->pDataBlock, &colInfo); + } + } + } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { + // do nothing + } + } + } + + blockDataEnsureCapacity(pBlock, pBlock->info.rows); + *ppBlock = pBlock; + + return TSDB_CODE_SUCCESS; +} + +void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { + if (!blockAllocated) { + return; + } + + blockDataDestroy(*ppBlock); + *ppBlock = NULL; +} + +void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + SAggOperatorInfo* pAggInfo = pOperator->info; + if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { + return; + } + + doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); + + // record the current active group id + pAggInfo->groupId = groupId; +} + +void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { + // for simple group by query without interval, all the tables belong to one group result. + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SAggOperatorInfo* pAggInfo = pOperator->info; + + SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; + SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; + int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; + + SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, + sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true); + /* + * not assign result buffer yet, add new result buffer + * all group belong to one result set, and each group result has different group id so set the id to be one + */ + if (pResultRow->pageId == -1) { + int32_t ret = addNewResultRowBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); + if (ret != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, terrno); + } + } + + setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); +} + +// a new buffer page for each table. Needs to opt this design +int32_t addNewResultRowBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) { + if (pWindowRes->pageId != -1) { + return 0; + } + + SFilePage* pData = NULL; + + // in the first scan, new space needed for results + int32_t pageId = -1; + SArray* list = getDataBufPagesIdList(pResultBuf); + + if (taosArrayGetSize(list) == 0) { + pData = getNewBufPage(pResultBuf, &pageId); + pData->num = sizeof(SFilePage); + } else { + SPageInfo* pi = getLastPageInfo(list); + pData = getBufPage(pResultBuf, getPageId(pi)); + if (pData == NULL) { + qError("failed to get buffer, code:%s", tstrerror(terrno)); + return terrno; + } + + pageId = getPageId(pi); + + if (pData->num + size > getBufPageSize(pResultBuf)) { + // release current page first, and prepare the next one + releaseBufPageInfo(pResultBuf, pi); + + pData = getNewBufPage(pResultBuf, &pageId); + if (pData != NULL) { + pData->num = sizeof(SFilePage); + } + } + } + + if (pData == NULL) { + return -1; + } + + // set the number of rows in current disk page + if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer + pWindowRes->pageId = pageId; + pWindowRes->offset = (int32_t)pData->num; + + pData->num += size; + } + + return 0; +} + +int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, + const char* pKey) { + int32_t code = 0; + // _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + + pAggSup->currentPageId = -1; + pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); + pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); + pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash); + + if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + uint32_t defaultPgsz = 0; + uint32_t defaultBufsz = 0; + getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); + + if (!osTempSpaceAvailable()) { + code = TSDB_CODE_NO_AVAIL_DISK; + qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey); + return code; + } + + code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); + if (code != TSDB_CODE_SUCCESS) { + qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey); + return code; + } + + return code; +} + +void cleanupAggSup(SAggSupporter* pAggSup) { + taosMemoryFreeClear(pAggSup->keyBuf); + tSimpleHashCleanup(pAggSup->pResultRowHashTable); + destroyDiskbasedBuf(pAggSup->pResultBuf); +} + +int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, + const char* pkey, void* pState) { + int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + if (pState) { + pSup->pCtx[i].saveHandle.pBuf = NULL; + pSup->pCtx[i].saveHandle.pState = pState; + pSup->pCtx[i].exprIdx = i; + } else { + pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf; + } + } + + return TSDB_CODE_SUCCESS; +} + +void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, + int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { + for (int32_t k = 0; k < numOfOutput; ++k) { + // keep it temporarily + SFunctionCtxStatus status = {0}; + functionCtxSave(&pCtx[k], &status); + + pCtx[k].input.startRowIndex = offset; + pCtx[k].input.numOfRows = forwardStep; + + // not a whole block involved in query processing, statistics data can not be used + // NOTE: the original value of isSet have been changed here + if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) { + pCtx[k].input.colDataSMAIsSet = false; + } + + if (pCtx[k].isPseudoFunc) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); + + char* p = GET_ROWCELL_INTERBUF(pEntryInfo); + + SColumnInfoData idata = {0}; + idata.info.type = TSDB_DATA_TYPE_BIGINT; + idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; + idata.pData = p; + + SScalarParam out = {.columnData = &idata}; + SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; + pCtx[k].sfp.process(&tw, 1, &out); + pEntryInfo->numOfRes = 1; + } else { + int32_t code = TSDB_CODE_SUCCESS; + if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { + code = pCtx[k].fpSet.process(&pCtx[k]); + + if (code != TSDB_CODE_SUCCESS) { + qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); + taskInfo->code = code; + T_LONG_JMP(taskInfo->env, code); + } + } + + // restore it + functionCtxRestore(&pCtx[k], &status); + } + } +} + +void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { + pStatus->hasAgg = pCtx->input.colDataSMAIsSet; + pStatus->numOfRows = pCtx->input.numOfRows; + pStatus->startOffset = pCtx->input.startRowIndex; +} + +void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { + pCtx->input.colDataSMAIsSet = pStatus->hasAgg; + pCtx->input.numOfRows = pStatus->numOfRows; + pCtx->input.startRowIndex = pStatus->startOffset; +} \ No newline at end of file diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index d42b348fd8..f6fc332b37 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -37,6 +37,7 @@ typedef struct SCacheRowsScanInfo { SSDataBlock* pBufferredRes; SArray* pUidList; int32_t indexOfBufferedRes; + STableListInfo* pTableList; } SCacheRowsScanInfo; static SSDataBlock* doScanCache(SOperatorInfo* pOperator); @@ -47,15 +48,17 @@ static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColM #define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW) SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } + pInfo->pTableList = pTableListInfo; pInfo->readHandle = *readHandle; SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc; @@ -75,20 +78,18 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe goto _error; } - STableListInfo* pTableList = pTaskInfo->pTableInfoList; - - int32_t totalTables = tableListGetSize(pTableList); + int32_t totalTables = tableListGetSize(pTableListInfo); int32_t capacity = 0; pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); // partition by tbname - if (oneTableForEachGroup(pTableList) || (totalTables == 1)) { + if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull); - STableKeyInfo* pList = tableListGetInfo(pTableList, 0); + STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); - uint64_t suid = tableListGetSuid(pTableList); + uint64_t suid = tableListGetSuid(pTableListInfo); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables, taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { @@ -136,7 +137,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SCacheRowsScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableListInfo* pTableList = pTaskInfo->pTableInfoList; + STableListInfo* pTableList = pInfo->pTableList; uint64_t suid = tableListGetSuid(pTableList); int32_t size = tableListGetSize(pTableList); @@ -281,6 +282,7 @@ void destroyCacheScanOperator(void* param) { taosMemoryFree(pInfo->pSlotIds); taosArrayDestroy(pInfo->pUidList); taosArrayDestroy(pInfo->matchInfo.pList); + tableListDestroy(pInfo->pTableList); if (pInfo->pLastrowReader != NULL) { pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 8caebd2a34..c855a104b2 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, pTaskInfo->code); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } for (int32_t i = 0; i < totalSources; ++i) { @@ -576,7 +576,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; pOperator->cost.openCost = taosGetTimestampUs() - startTs; if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, pTaskInfo->code); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } return TSDB_CODE_SUCCESS; @@ -629,7 +629,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, pTaskInfo->code); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ff11dadf22..e1cd509cba 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -404,7 +404,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + STableListInfo* pTableListInfo = ((STableScanInfo*)pScanInfo->pTableScanOp->info)->base.pTableListInfo; taosWLockLatch(&pTaskInfo->lock); for (int32_t i = 0; i < numOfQualifiedTables; ++i) { @@ -485,9 +485,7 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, char* sql, EOPTR_EXEC_MODEL model) { - assert(pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; - taosThreadOnce(&initPoolOnce, initRefPool); qDebug("start to create task, TID:0x%" PRIx64 " QID:0x%" PRIx64 ", vgId:%d", taskId, pSubplan->id.queryId, vgId); @@ -507,7 +505,12 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, if (handle) { void* pSinkParam = NULL; - code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle); + + SArray* pInfoList = getTableListInfo(*pTask); + STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0); + taosArrayDestroy(pInfoList); + + code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTableListInfo, readHandle); if (code != TSDB_CODE_SUCCESS) { qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str); goto _error; @@ -1083,7 +1086,6 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; - STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; const char* id = GET_TASKID(pTaskInfo); pTaskInfo->streamInfo.prepareStatus = *pOffset; @@ -1095,19 +1097,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT if (subType == TOPIC_SUB_TYPE__COLUMN) { pOperator->status = OP_OPENED; - - // TODO add more check - if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - if (pOperator->numOfDownstream != 1) { - qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id); - return -1; - } - pOperator = pOperator->pDownstream[0]; - } + pOperator = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id); SStreamScanInfo* pInfo = pOperator->info; STableScanInfo* pScanInfo = pInfo->pTableScanOp->info; STableScanBase* pScanBaseInfo = &pScanInfo->base; + STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo; if (pOffset->type == TMQ_OFFSET__LOG) { tsdbReaderClose(pScanBaseInfo->dataReader); @@ -1191,9 +1186,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT } } else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { SStreamRawScanInfo* pInfo = pOperator->info; SSnapContext* sContext = pInfo->sContext; + + SOperatorInfo* p = extractOperatorInTree(pOperator, QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, id); + STableListInfo* pTableListInfo = ((SStreamRawScanInfo*)(p->info))->pTableListInfo; + if (setForSnapShot(sContext, pOffset->uid) != 0) { qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id); return -1; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8068590dad..c120c6055d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -34,9 +34,7 @@ #include "ttypes.h" #include "vnode.h" -#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) - #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) #if 0 @@ -74,34 +72,21 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #endif #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) -#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) - -typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - STableQueryInfo* current; - uint64_t groupId; - SGroupResInfo groupResInfo; - SExprSupp scalarExprSup; - bool groupKeyOptimized; -} SAggOperatorInfo; static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); static void releaseQueryBuf(size_t numOfTables); -static void destroyAggOperatorInfo(void* param); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); -static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); -static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, - const char* pKey); + static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status); static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo); +static SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); void setOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; @@ -266,57 +251,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR return pResult; } -// a new buffer page for each table. Needs to opt this design -static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pResultBuf, uint32_t size) { - if (pWindowRes->pageId != -1) { - return 0; - } - - SFilePage* pData = NULL; - - // in the first scan, new space needed for results - int32_t pageId = -1; - SArray* list = getDataBufPagesIdList(pResultBuf); - - if (taosArrayGetSize(list) == 0) { - pData = getNewBufPage(pResultBuf, &pageId); - pData->num = sizeof(SFilePage); - } else { - SPageInfo* pi = getLastPageInfo(list); - pData = getBufPage(pResultBuf, getPageId(pi)); - if (pData == NULL) { - qError("failed to get buffer, code:%s", tstrerror(terrno)); - return terrno; - } - - pageId = getPageId(pi); - - if (pData->num + size > getBufPageSize(pResultBuf)) { - // release current page first, and prepare the next one - releaseBufPageInfo(pResultBuf, pi); - - pData = getNewBufPage(pResultBuf, &pageId); - if (pData != NULL) { - pData->num = sizeof(SFilePage); - } - } - } - - if (pData == NULL) { - return -1; - } - - // set the number of rows in current disk page - if (pWindowRes->pageId == -1) { // not allocated yet, allocate new buffer - pWindowRes->pageId = pageId; - pWindowRes->offset = (int32_t)pData->num; - - pData->num += size; - } - - return 0; -} - // query_range_start, query_range_end, window_duration, window_start, window_end void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) { pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP; @@ -332,72 +266,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow colDataSetInt64(pColData, 4, &pQueryWindow->ekey); } -typedef struct { - bool hasAgg; - int32_t numOfRows; - int32_t startOffset; -} SFunctionCtxStatus; - -static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { - pStatus->hasAgg = pCtx->input.colDataSMAIsSet; - pStatus->numOfRows = pCtx->input.numOfRows; - pStatus->startOffset = pCtx->input.startRowIndex; -} - -static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { - pCtx->input.colDataSMAIsSet = pStatus->hasAgg; - pCtx->input.numOfRows = pStatus->numOfRows; - pCtx->input.startRowIndex = pStatus->startOffset; -} - -void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, - int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { - for (int32_t k = 0; k < numOfOutput; ++k) { - // keep it temporarily - SFunctionCtxStatus status = {0}; - functionCtxSave(&pCtx[k], &status); - - pCtx[k].input.startRowIndex = offset; - pCtx[k].input.numOfRows = forwardStep; - - // not a whole block involved in query processing, statistics data can not be used - // NOTE: the original value of isSet have been changed here - if (pCtx[k].input.colDataSMAIsSet && forwardStep < numOfTotal) { - pCtx[k].input.colDataSMAIsSet = false; - } - - if (pCtx[k].isPseudoFunc) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); - - char* p = GET_ROWCELL_INTERBUF(pEntryInfo); - - SColumnInfoData idata = {0}; - idata.info.type = TSDB_DATA_TYPE_BIGINT; - idata.info.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes; - idata.pData = p; - - SScalarParam out = {.columnData = &idata}; - SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; - pCtx[k].sfp.process(&tw, 1, &out); - pEntryInfo->numOfRes = 1; - } else { - int32_t code = TSDB_CODE_SUCCESS; - if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { - code = pCtx[k].fpSet.process(&pCtx[k]); - - if (code != TSDB_CODE_SUCCESS) { - qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); - taskInfo->code = code; - T_LONG_JMP(taskInfo->env, code); - } - } - - // restore it - functionCtxRestore(&pCtx[k], &status); - } - } -} - static void doSetInputDataBlockInfo(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) { SqlFunctionCtx* pCtx = pExprSup->pCtx; for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) { @@ -512,25 +380,6 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int return code; } -static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) { - for (int32_t k = 0; k < pOperator->exprSupp.numOfExprs; ++k) { - if (functionNeedToExecute(&pCtx[k])) { - // todo add a dummy funtion to avoid process check - if (pCtx[k].fpSet.process == NULL) { - continue; - } - - int32_t code = pCtx[k].fpSet.process(&pCtx[k]); - if (code != TSDB_CODE_SUCCESS) { - qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); - return code; - } - } - } - - return TSDB_CODE_SUCCESS; -} - bool functionNeedToExecute(SqlFunctionCtx* pCtx) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); @@ -654,149 +503,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int return win; } -int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, - uint32_t* status) { - *status = BLK_DATA_NOT_LOAD; - - pBlock->pDataBlock = NULL; - pBlock->pBlockAgg = NULL; - - // int64_t groupId = pRuntimeEnv->current->groupIndex; - // bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); - - STaskCostInfo* pCost = &pTaskInfo->cost; - -// pCost->totalBlocks += 1; -// pCost->totalRows += pBlock->info.rows; -#if 0 - // Calculate all time windows that are overlapping or contain current data block. - // If current data block is contained by all possible time window, do not load current data block. - if (/*pQueryAttr->pFilters || */pQueryAttr->groupbyColumn || pQueryAttr->sw.gap > 0 || - (QUERY_IS_INTERVAL_QUERY(pQueryAttr) && overlapWithTimeWindow(pTaskInfo, &pBlock->info))) { - (*status) = BLK_DATA_DATA_LOAD; - } - - // check if this data block is required to load - if ((*status) != BLK_DATA_DATA_LOAD) { - bool needFilter = true; - - // the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet, - // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer - if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { - SResultRow* pResult = NULL; - - bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); - TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; - - STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (pQueryAttr->pointInterpQuery) { - needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId, - pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowEntryInfoOffset); - } else { - if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId, - pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY); - } - } - } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate - doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, - pTableScanInfo->rowEntryInfoOffset, pTableScanInfo->numOfOutput, - pRuntimeEnv->current->groupIndex); - } - - if (needFilter) { - (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock); - } else { - (*status) = BLK_DATA_DATA_LOAD; - } - } - - SDataBlockInfo* pBlockInfo = &pBlock->info; -// *status = updateBlockLoadStatus(pRuntimeEnv->pQueryAttr, *status); - - if ((*status) == BLK_DATA_NOT_LOAD || (*status) == BLK_DATA_FILTEROUT) { - //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, -// pBlockInfo->window.ekey, pBlockInfo->rows); - pCost->skipBlocks += 1; - } else if ((*status) == BLK_DATA_SMA_LOAD) { - // this function never returns error? - pCost->loadBlockStatis += 1; -// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg); - - if (pBlock->pBlockAgg == NULL) { // data block statistics does not exist, load data block -// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); - pCost->totalCheckedRows += pBlock->info.rows; - } - } else { - assert((*status) == BLK_DATA_DATA_LOAD); - - // load the data block statistics to perform further filter - pCost->loadBlockStatis += 1; -// tsdbRetrieveDatablockSMA(pTableScanInfo->pTsdbReadHandle, &pBlock->pBlockAgg); - - if (pQueryAttr->topBotQuery && pBlock->pBlockAgg != NULL) { - { // set previous window - if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { - SResultRow* pResult = NULL; - - bool masterScan = IS_MAIN_SCAN(pRuntimeEnv); - TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; - - STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.id.uid, &win, masterScan, &pResult, groupId, - pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, - pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_OUT_OF_MEMORY); - } - } - } - bool load = false; - for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { - int32_t functionId = pTableScanInfo->pCtx[i].functionId; - if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM) { -// load = topbot_datablock_filter(&pTableScanInfo->pCtx[i], (char*)&(pBlock->pBlockAgg[i].min), -// (char*)&(pBlock->pBlockAgg[i].max)); - if (!load) { // current block has been discard due to filter applied - pCost->skipBlocks += 1; - //qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, -// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - (*status) = BLK_DATA_FILTEROUT; - return TSDB_CODE_SUCCESS; - } - } - } - } - - // current block has been discard due to filter applied -// if (!doFilterByBlockSMA(pRuntimeEnv, pBlock->pBlockAgg, pTableScanInfo->pCtx, pBlockInfo->rows)) { -// pCost->skipBlocks += 1; -// qDebug("QInfo:0x%"PRIx64" data block discard, brange:%" PRId64 "-%" PRId64 ", rows:%d", pQInfo->qId, pBlockInfo->window.skey, -// pBlockInfo->window.ekey, pBlockInfo->rows); -// (*status) = BLK_DATA_FILTEROUT; -// return TSDB_CODE_SUCCESS; -// } - - pCost->totalCheckedRows += pBlockInfo->rows; - pCost->loadBlocks += 1; -// pBlock->pDataBlock = tsdbRetrieveDataBlock(pTableScanInfo->pTsdbReadHandle, NULL); -// if (pBlock->pDataBlock == NULL) { -// return terrno; -// } - -// if (pQueryAttr->pFilters != NULL) { -// filterSetColFieldData(pQueryAttr->pFilters, taosArrayGetSize(pBlock->pDataBlock), pBlock->pDataBlock); -// } - -// if (pQueryAttr->pFilters != NULL || pRuntimeEnv->pTsBuf != NULL) { -// filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery); -// } - } -#endif - return TSDB_CODE_SUCCESS; -} - void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { if (status == TASK_NOT_COMPLETED) { pTaskInfo->status = status; @@ -1027,43 +733,6 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD } } -void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { - // for simple group by query without interval, all the tables belong to one group result. - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SAggOperatorInfo* pAggInfo = pOperator->info; - - SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; - SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; - int32_t* rowEntryInfoOffset = pOperator->exprSupp.rowEntryInfoOffset; - - SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, (char*)&groupId, - sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup, true); - /* - * not assign result buffer yet, add new result buffer - * all group belong to one result set, and each group result has different group id so set the id to be one - */ - if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); - if (ret != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, terrno); - } - } - - setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset); -} - -static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) { - SAggOperatorInfo* pAggInfo = pOperator->info; - if (pAggInfo->groupId != UINT64_MAX && pAggInfo->groupId == groupId) { - return; - } - - doSetTableGroupOutputBuf(pOperator, numOfOutput, groupId); - - // record the current active group id - pAggInfo->groupId = groupId; -} - void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset) { bool returnNotNull = false; for (int32_t j = 0; j < numOfExprs; ++j) { @@ -1281,161 +950,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } } -// static TSKEY doSkipIntervalProcess(STaskRuntimeEnv* pRuntimeEnv, STimeWindow* win, SDataBlockInfo* pBlockInfo, -// STableQueryInfo* pTableQueryInfo) { -// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; -// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; -// -// STimeWindow tw = *win; -// getNextTimeWindow(pQueryAttr, &tw); -// -// if ((tw.skey <= pBlockInfo->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) || -// (tw.ekey >= pBlockInfo->window.skey && !QUERY_IS_ASC_QUERY(pQueryAttr))) { -// -// // load the data block and check data remaining in current data block -// // TODO optimize performance -// SArray * pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); -// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); -// -// tw = *win; -// int32_t startPos = -// getNextQualifiedWindow(pQueryAttr, &tw, pBlockInfo, pColInfoData->pData, binarySearchForKey, -1); -// -// // set the abort info -// pQueryAttr->pos = startPos; -// -// // reset the query start timestamp -// pTableQueryInfo->win.skey = ((TSKEY *)pColInfoData->pData)[startPos]; -// pQueryAttr->window.skey = pTableQueryInfo->win.skey; -// TSKEY key = pTableQueryInfo->win.skey; -// -// pWindowResInfo->prevSKey = tw.skey; -// int32_t index = pRuntimeEnv->resultRowInfo.curIndex; -// -// int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); -// pRuntimeEnv->resultRowInfo.curIndex = index; // restore the window index -// -// //qDebug("QInfo:0x%"PRIx64" check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, -// lastKey:%" PRId64, -// GET_TASKID(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, -// pQueryAttr->current->lastKey); -// -// return key; -// } else { // do nothing -// pQueryAttr->window.skey = tw.skey; -// pWindowResInfo->prevSKey = tw.skey; -// pTableQueryInfo->lastKey = tw.skey; -// -// return tw.skey; -// } -// -// return true; -// } - -// static bool skipTimeInterval(STaskRuntimeEnv *pRuntimeEnv, TSKEY* start) { -// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; -// -// // if queried with value filter, do NOT forward query start position -// if (pQueryAttr->limit.offset <= 0 || pQueryAttr->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf != NULL || -// pRuntimeEnv->pFillInfo != NULL) { -// return true; -// } -// -// /* -// * 1. for interval without interpolation query we forward pQueryAttr->interval.interval at a time for -// * pQueryAttr->limit.offset times. Since hole exists, pQueryAttr->interval.interval*pQueryAttr->limit.offset -// value is -// * not valid. otherwise, we only forward pQueryAttr->limit.offset number of points -// */ -// STimeWindow w = TSWINDOW_INITIALIZER; -// bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); -// -// SResultRowInfo *pWindowResInfo = &pRuntimeEnv->resultRowInfo; -// STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; -// -// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; -// while (tsdbNextDataBlock(pRuntimeEnv->pTsdbReadHandle)) { -// tsdbRetrieveDataBlockInfo(pRuntimeEnv->pTsdbReadHandle, &blockInfo); -// -// if (QUERY_IS_ASC_QUERY(pQueryAttr)) { -// if (pWindowResInfo->prevSKey == TSKEY_INITIAL_VAL) { -// getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.skey, blockInfo.window.skey, pQueryAttr->window.ekey, -// &w); pWindowResInfo->prevSKey = w.skey; -// } -// } else { -// getAlignQueryTimeWindow(pQueryAttr, blockInfo.window.ekey, pQueryAttr->window.ekey, blockInfo.window.ekey, &w); -// pWindowResInfo->prevSKey = w.skey; -// } -// -// // the first time window -// STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQueryAttr); -// -// while (pQueryAttr->limit.offset > 0) { -// STimeWindow tw = win; -// -// if ((win.ekey <= blockInfo.window.ekey && ascQuery) || (win.ekey >= blockInfo.window.skey && !ascQuery)) { -// pQueryAttr->limit.offset -= 1; -// pWindowResInfo->prevSKey = win.skey; -// -// // current time window is aligned with blockInfo.window.ekey -// // restart it from next data block by set prevSKey to be TSKEY_INITIAL_VAL; -// if ((win.ekey == blockInfo.window.ekey && ascQuery) || (win.ekey == blockInfo.window.skey && !ascQuery)) { -// pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; -// } -// } -// -// if (pQueryAttr->limit.offset == 0) { -// *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo); -// return true; -// } -// -// // current window does not ended in current data block, try next data block -// getNextTimeWindow(pQueryAttr, &tw); -// -// /* -// * If the next time window still starts from current data block, -// * load the primary timestamp column first, and then find the start position for the next queried time window. -// * Note that only the primary timestamp column is required. -// * TODO: Optimize for this cases. All data blocks are not needed to be loaded, only if the first actually -// required -// * time window resides in current data block. -// */ -// if ((tw.skey <= blockInfo.window.ekey && ascQuery) || (tw.ekey >= blockInfo.window.skey && !ascQuery)) { -// -// SArray *pDataBlock = tsdbRetrieveDataBlock(pRuntimeEnv->pTsdbReadHandle, NULL); -// SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock, 0); -// -// if ((win.ekey > blockInfo.window.ekey && ascQuery) || (win.ekey < blockInfo.window.skey && !ascQuery)) { -// pQueryAttr->limit.offset -= 1; -// } -// -// if (pQueryAttr->limit.offset == 0) { -// *start = doSkipIntervalProcess(pRuntimeEnv, &win, &blockInfo, pTableQueryInfo); -// return true; -// } else { -// tw = win; -// int32_t startPos = -// getNextQualifiedWindow(pQueryAttr, &tw, &blockInfo, pColInfoData->pData, binarySearchForKey, -1); -// // set the abort info -// pQueryAttr->pos = startPos; -// pTableQueryInfo->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; -// pWindowResInfo->prevSKey = tw.skey; -// win = tw; -// } -// } else { -// break; // offset is not 0, and next time window begins or ends in the next block. -// } -// } -// } -// -// // check for error -// if (terrno != TSDB_CODE_SUCCESS) { -// T_LONG_JMP(pRuntimeEnv->env, terrno); -// } -// -// return true; -// } - int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num) { p->pDownstream = taosMemoryCalloc(1, num * POINTER_BYTES); if (p->pDownstream == NULL) { @@ -1481,191 +995,23 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan } } -static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBlock** ppBlock) { - if (!tsCountAlwaysReturnValue) { - return TSDB_CODE_SUCCESS; - } - - SAggOperatorInfo* pAggInfo = pOperator->info; - if (pAggInfo->groupKeyOptimized) { - return TSDB_CODE_SUCCESS; - } - - SOperatorInfo* downstream = pOperator->pDownstream[0]; - if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION || - (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN && - ((STableScanInfo*)downstream->info)->hasGroupByTag == true)) { - return TSDB_CODE_SUCCESS; - } - - SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; - bool hasCountFunc = false; - - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; - if ((strcmp(pName, "count") == 0) || (strcmp(pName, "hyperloglog") == 0) || - (strcmp(pName, "_hyperloglog_partial") == 0) || (strcmp(pName, "_hyperloglog_merge") == 0)) { - hasCountFunc = true; - break; - } - } - - if (!hasCountFunc) { - return TSDB_CODE_SUCCESS; - } - - SSDataBlock* pBlock = createDataBlock(); - pBlock->info.rows = 1; - pBlock->info.capacity = 0; - - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SColumnInfoData colInfo = {0}; - colInfo.hasNull = true; - colInfo.info.type = TSDB_DATA_TYPE_NULL; - colInfo.info.bytes = 1; - - SExprInfo* pOneExpr = &pOperator->exprSupp.pExprInfo[i]; - for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { - SFunctParam* pFuncParam = &pOneExpr->base.pParam[j]; - if (pFuncParam->type == FUNC_PARAM_TYPE_COLUMN) { - int32_t slotId = pFuncParam->pCol->slotId; - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - if (slotId >= numOfCols) { - taosArrayEnsureCap(pBlock->pDataBlock, slotId + 1); - for (int32_t k = numOfCols; k < slotId + 1; ++k) { - taosArrayPush(pBlock->pDataBlock, &colInfo); - } - } - } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { - // do nothing - } - } - } - - blockDataEnsureCapacity(pBlock, pBlock->info.rows); - *ppBlock = pBlock; - - return TSDB_CODE_SUCCESS; -} - -static void destroyDataBlockForEmptyInput(bool blockAllocated, SSDataBlock** ppBlock) { - if (!blockAllocated) { - return; - } - - blockDataDestroy(*ppBlock); - *ppBlock = NULL; -} - -// this is a blocking operator -static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { - if (OPTR_IS_OPENED(pOperator)) { - return TSDB_CODE_SUCCESS; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SAggOperatorInfo* pAggInfo = pOperator->info; - - SExprSupp* pSup = &pOperator->exprSupp; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - - int64_t st = taosGetTimestampUs(); - - int32_t order = TSDB_ORDER_ASC; - int32_t scanFlag = MAIN_SCAN; - - bool hasValidBlock = false; - bool blockAllocated = false; - - while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - if (pBlock == NULL) { - if (!hasValidBlock) { - createDataBlockForEmptyInput(pOperator, &pBlock); - if (pBlock == NULL) { - break; - } - blockAllocated = true; - } else { - break; - } - } - hasValidBlock = true; - - int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false); - if (code != TSDB_CODE_SUCCESS) { - destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - T_LONG_JMP(pTaskInfo->env, code); - } - - // there is an scalar expression that needs to be calculated before apply the group aggregation. - if (pAggInfo->scalarExprSup.pExprInfo != NULL && !blockAllocated) { - SExprSupp* pSup1 = &pAggInfo->scalarExprSup; - code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL); - if (code != TSDB_CODE_SUCCESS) { - destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - T_LONG_JMP(pTaskInfo->env, code); - } - } - - // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator, pOperator->exprSupp.numOfExprs, pBlock->info.id.groupId); - setInputDataBlock(pSup, pBlock, order, scanFlag, true); - code = doAggregateImpl(pOperator, pSup->pCtx); - if (code != 0) { - destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - T_LONG_JMP(pTaskInfo->env, code); - } - - destroyDataBlockForEmptyInput(blockAllocated, &pBlock); - } - - // the downstream operator may return with error code, so let's check the code before generating results. - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); - } - - initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); - OPTR_SET_OPENED(pOperator); - - pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - return pTaskInfo->code; -} - -static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { - SAggOperatorInfo* pAggInfo = pOperator->info; - SOptrBasicInfo* pInfo = &pAggInfo->binfo; - - if (pOperator->status == OP_EXEC_DONE) { +//QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN +SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id) { + if (pOperator == NULL) { + qError("invalid operator, failed to find tableScanOperator %s", id); return NULL; } - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pTaskInfo->code = pOperator->fpSet._openFn(pOperator); - if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - setOperatorCompleted(pOperator); - return NULL; - } - - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - while (1) { - doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); - - if (!hasRemainResults(&pAggInfo->groupResInfo)) { - setOperatorCompleted(pOperator); - break; + if (pOperator->operatorType == type) { + return pOperator; + } else { + if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { + qError("invalid operator, failed to find tableScanOperator %s", id); + return NULL; } - if (pInfo->pRes->info.rows > 0) { - break; - } + return extractOperatorInTree(pOperator->pDownstream[0], type, id); } - - size_t rows = blockDataGetNumOfRows(pInfo->pRes); - pOperator->resultInfo.totalRows += rows; - - return (rows == 0) ? NULL : pInfo->pRes; } void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { @@ -1732,70 +1078,6 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul return 0; } -int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, - const char* pKey) { - int32_t code = 0; - // _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - - pAggSup->currentPageId = -1; - pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); - pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); - pAggSup->pResultRowHashTable = tSimpleHashInit(100, taosFastHash); - - if (pAggSup->keyBuf == NULL || pAggSup->pResultRowHashTable == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - uint32_t defaultPgsz = 0; - uint32_t defaultBufsz = 0; - getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); - - if (!osTempSpaceAvailable()) { - code = TSDB_CODE_NO_AVAIL_DISK; - qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey); - return code; - } - - code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); - if (code != TSDB_CODE_SUCCESS) { - qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey); - return code; - } - - return code; -} - -void cleanupAggSup(SAggSupporter* pAggSup) { - taosMemoryFreeClear(pAggSup->keyBuf); - tSimpleHashCleanup(pAggSup->pResultRowHashTable); - destroyDiskbasedBuf(pAggSup->pResultBuf); -} - -int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, - const char* pkey, void* pState) { - int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - code = doInitAggInfoSup(pAggSup, pSup->pCtx, numOfCols, keyBufSize, pkey); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - for (int32_t i = 0; i < numOfCols; ++i) { - if (pState) { - pSup->pCtx[i].saveHandle.pBuf = NULL; - pSup->pCtx[i].saveHandle.pState = pState; - pSup->pCtx[i].exprIdx = i; - } else { - pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf; - } - } - - return TSDB_CODE_SUCCESS; -} - void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows) { if (numOfRows == 0) { numOfRows = 4096; @@ -1814,7 +1096,7 @@ void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock) { initResultRowInfo(&pInfo->resultRowInfo); } -void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { +static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { if (pCtx == NULL) { return NULL; } @@ -1866,99 +1148,8 @@ void cleanupExprSupp(SExprSupp* pSupp) { taosMemoryFree(pSupp->rowEntryInfoOffset); } -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, - SExecTaskInfo* pTaskInfo) { - SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pInfo == NULL || pOperator == NULL) { - goto _error; - } - - SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); - initBasicInfo(&pInfo->binfo, pResBlock); - - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - initResultSizeInfo(&pOperator->resultInfo, 4096); - - int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = NULL; - if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); - } - - code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - code = filterInitFromNode((SNode*)pAggNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; - pInfo->groupKeyOptimized = pAggNode->groupKeyOptimized; - pInfo->groupId = UINT64_MAX; - - setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, - pTaskInfo); - pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, - optrDefaultBufFn, NULL); - - if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { - STableScanInfo* pTableScanInfo = downstream->info; - pTableScanInfo->base.pdInfo.pExprSup = &pOperator->exprSupp; - pTableScanInfo->base.pdInfo.pAggSup = &pInfo->aggSup; - } - - code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - return pOperator; - -_error: - if (pInfo != NULL) { - destroyAggOperatorInfo(pInfo); - } - - if (pOperator != NULL) { - cleanupExprSupp(&pOperator->exprSupp); - } - - taosMemoryFreeClear(pOperator); - pTaskInfo->code = code; - return NULL; -} - void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -static void freeItem(void* pItem) { - void** p = pItem; - if (*p != NULL) { - taosMemoryFreeClear(*p); - } -} - -void destroyAggOperatorInfo(void* param) { - SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; - cleanupBasicInfo(&pInfo->binfo); - - cleanupAggSup(&pInfo->aggSup); - cleanupExprSupp(&pInfo->scalarExprSup); - cleanupGroupResInfo(&pInfo->groupResInfo); - taosMemoryFreeClear(param); -} - char* buildTaskId(uint64_t taskId, uint64_t queryId) { char* p = taosMemoryMalloc(64); @@ -1986,7 +1177,6 @@ SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t v pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName); pTaskInfo->execModel = model; - pTaskInfo->pTableInfoList = tableListCreate(); pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); @@ -1997,8 +1187,6 @@ SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t v return pTaskInfo; } -SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); - int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); @@ -2101,7 +1289,6 @@ bool groupbyTbname(SNodeList* pGroupList) { SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) { int32_t type = nodeType(pPhyNode); - STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; const char* idstr = GET_TASKID(pTaskInfo); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { @@ -2115,11 +1302,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableScanNode->groupSort = true; } + STableListInfo* pTableListInfo = tableListCreate(); int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); if (code) { pTaskInfo->code = code; + tableListDestroy(pTableListInfo); qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr); return NULL; } @@ -2127,10 +1316,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); if (code) { pTaskInfo->code = terrno; + tableListDestroy(pTableListInfo); return NULL; } - pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); + pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); if (NULL == pOperator) { pTaskInfo->code = terrno; return NULL; @@ -2140,11 +1330,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTaskInfo->cost.pRecoder = &pScanInfo->base.readRecorder; } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); if (code) { pTaskInfo->code = code; + tableListDestroy(pTableListInfo); qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); return NULL; } @@ -2152,12 +1344,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo code = extractTableSchemaInfo(pHandle, &pTableScanNode->scan, pTaskInfo); if (code) { pTaskInfo->code = terrno; + tableListDestroy(pTableListInfo); return NULL; } - pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); + pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); if (NULL == pOperator) { pTaskInfo->code = terrno; + tableListDestroy(pTableListInfo); return NULL; } @@ -2168,29 +1362,22 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); + if (pHandle->vnode) { int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); if (code) { pTaskInfo->code = code; + tableListDestroy(pTableListInfo); qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); return NULL; } - -#ifndef NDEBUG - int32_t sz = tableListGetSize(pTableListInfo); - qDebug("vgId:%d create stream task, total qualified tables:%d, %s", pTaskInfo->id.vgId, sz, idstr); - - for (int32_t i = 0; i < sz; i++) { - STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); - qDebug("add table uid:%" PRIu64 ", gid:%" PRIu64, pKeyInfo->uid, pKeyInfo->groupId); - } -#endif } pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); - pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); + pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); @@ -2199,7 +1386,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - + STableListInfo* pTableListInfo = tableListCreate(); int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); if (code != TSDB_CODE_SUCCESS) { @@ -2208,9 +1395,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo); + pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); if (pBlockNode->tableType == TSDB_SUPER_TABLE) { SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo)); @@ -2231,9 +1419,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); } - pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo); + pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; + STableListInfo* pTableListInfo = tableListCreate(); int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo); @@ -2248,7 +1437,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTaskInfo); + pOperator = createCacherowsScanOperator(pScanNode, pHandle, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else { @@ -2256,7 +1445,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - if (pOperator != NULL) { + if (pOperator != NULL) { // todo moved away pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; } @@ -2397,9 +1586,7 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) { return -1; } -int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) { - SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; - +int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, STableListInfo* pTableListInfo, SReadHandle* readHandle) { switch (pNode->type) { case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam)); @@ -2416,8 +1603,9 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT if (NULL == pDeleterParam) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t tbNum = tableListGetSize(pTask->pTableInfoList); - pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList); + + int32_t tbNum = tableListGetSize(pTableListInfo); + pDeleterParam->suid = tableListGetSuid(pTableListInfo); // TODO extract uid list pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t)); @@ -2427,11 +1615,12 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT } for (int32_t i = 0; i < tbNum; ++i) { - STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i); + STableKeyInfo* pTable = tableListGetInfo(pTableListInfo, i); taosArrayPush(pDeleterParam->pUidList, &pTable->uid); } *pParam = pDeleterParam; + break; } default: @@ -2481,8 +1670,6 @@ static void freeBlock(void* pParam) { void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); - - pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList); destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupStreamInfo(&pTaskInfo->streamInfo); @@ -2809,3 +1996,21 @@ void qStreamCloseTsdbReader(void* task) { } } } + +static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) { + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pScanInfo = pOperator->info; + taosArrayPush(pList, &pScanInfo->base.pTableListInfo); + } else { + if (pOperator->pDownstream != NULL && pOperator->pDownstream[0] != NULL) { + extractTableList(pList, pOperator->pDownstream[0]); + } + } +} + +SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) { + SArray* pArray = taosArrayInit(0, POINTER_BYTES); + SOperatorInfo* pOperator = pTaskInfo->pRoot; + extractTableList(pArray, pOperator); + return pArray; +} \ No newline at end of file diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2907c8d056..1bfa8ccfc4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -676,7 +676,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { } if (pBlock->info.id.uid) { - pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } uint32_t status = 0; @@ -771,7 +771,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { // scan table one by one sequentially if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { - int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableInfoList); + int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableListInfo); STableKeyInfo tInfo = {0}; while (1) { @@ -784,7 +784,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { pInfo->currentTable++; taosRLockLatch(&pTaskInfo->lock); - numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); + numOfTables = tableListGetSize(pInfo->base.pTableListInfo); if (pInfo->currentTable >= numOfTables) { qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); @@ -792,7 +792,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } - tInfo = *(STableKeyInfo*) tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); + tInfo = *(STableKeyInfo*) tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); taosRUnLockLatch(&pTaskInfo->lock); tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1); @@ -804,14 +804,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } } else { // scan table group by group sequentially if (pInfo->currentGroupId == -1) { - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); return NULL; } int32_t num = 0; STableKeyInfo* pList = NULL; - tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); + tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); ASSERT(pInfo->base.dataReader == NULL); int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, @@ -830,7 +830,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return result; } - if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { setOperatorCompleted(pOperator); return NULL; } @@ -841,7 +841,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t num = 0; STableKeyInfo* pList = NULL; - tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); + tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); tsdbSetTableList(pInfo->base.dataReader, pList, num); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); @@ -866,25 +866,30 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr return 0; } +static void destroyTableScanBase(STableScanBase* pBase) { + cleanupQueryTableDataCond(&pBase->cond); + + tsdbReaderClose(pBase->dataReader); + pBase->dataReader = NULL; + + if (pBase->matchInfo.pList != NULL) { + taosArrayDestroy(pBase->matchInfo.pList); + } + + tableListDestroy(pBase->pTableListInfo); + taosLRUCacheCleanup(pBase->metaCache.pTableMetaEntryCache); + cleanupExprSupp(&pBase->pseudoSup); +} + static void destroyTableScanOperatorInfo(void* param) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; blockDataDestroy(pTableScanInfo->pResBlock); - cleanupQueryTableDataCond(&pTableScanInfo->base.cond); - - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; - - if (pTableScanInfo->base.matchInfo.pList != NULL) { - taosArrayDestroy(pTableScanInfo->base.matchInfo.pList); - } - - taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); - cleanupExprSupp(&pTableScanInfo->base.pseudoSup); + destroyTableScanBase(&pTableScanInfo->base); taosMemoryFreeClear(param); } SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -940,6 +945,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; + pInfo->base.pTableListInfo = pTableListInfo; pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); if (pInfo->base.metaCache.pTableMetaEntryCache == NULL) { code = terrno; @@ -1059,7 +1065,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU if (hasNext) { /*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL); doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); - pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + pBlock->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } tsdbReaderClose(pReader); @@ -1080,7 +1086,8 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, } static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { - return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid); + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + return getTableGroupId(pTableScanInfo->base.pTableListInfo, uid); } static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -1554,7 +1561,8 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - pInfo->pRes->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + pInfo->pRes->info.id.groupId = getTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -2164,19 +2172,19 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { code = tsdbNextDataBlock(pInfo->dataReader, &hasNext); if (code) { tsdbReleaseDataBlock(pInfo->dataReader); - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } if (pInfo->dataReader && hasNext) { if (isTaskKilled(pTaskInfo)) { tsdbReleaseDataBlock(pInfo->dataReader); - longjmp(pTaskInfo->env, pTaskInfo->code); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } SSDataBlock* pBlock = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); if (pBlock == NULL) { - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } qDebug("tmqsnap doRawScan get data uid:%" PRId64 "", pBlock->info.id.uid); @@ -2275,6 +2283,7 @@ static void destroyRawScanOperatorInfo(void* param) { SStreamRawScanInfo* pRawScan = (SStreamRawScanInfo*)param; tsdbReaderClose(pRawScan->dataReader); destroySnapContext(pRawScan->sContext); + tableListDestroy(pRawScan->pTableListInfo); taosMemoryFree(pRawScan); } @@ -2296,6 +2305,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT goto _end; } + pInfo->pTableListInfo = tableListCreate(); pInfo->vnode = pHandle->vnode; pInfo->sContext = pHandle->sContext; @@ -2314,9 +2324,11 @@ _end: static void destroyStreamScanOperatorInfo(void* param) { SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; + if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { destroyOperatorInfo(pStreamScan->pTableScanOp); } + if (pStreamScan->tqReader) { tqCloseReader(pStreamScan->tqReader); } @@ -2343,13 +2355,14 @@ static void destroyStreamScanOperatorInfo(void* param) { } SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { SArray* pColIds = NULL; SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } @@ -2363,6 +2376,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys int32_t code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { + tableListDestroy(pTableListInfo); goto _error; } @@ -2382,11 +2396,14 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SExprInfo* pSubTableExpr = taosMemoryCalloc(1, sizeof(SExprInfo)); if (pSubTableExpr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } + pInfo->tbnameCalSup.pExprInfo = pSubTableExpr; createExprFromOneNode(pSubTableExpr, pTableScanNode->pSubtable, 0); if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) { + tableListDestroy(pTableListInfo); goto _error; } } @@ -2396,10 +2413,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys SExprInfo* pTagExpr = createExpr(pTableScanNode->pTags, &numOfTags); if (pTagExpr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } } @@ -2407,11 +2426,12 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData)); if (pInfo->pBlockLists == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; + tableListDestroy(pTableListInfo); goto _error; } if (pHandle->vnode) { - SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); + SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo); STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info; if (pHandle->version > 0) { pTSInfo->base.cond.endVersion = pHandle->version; @@ -2419,7 +2439,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys STableKeyInfo* pList = NULL; int32_t num = 0; - tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num); + tableListGetGroupList(pTableListInfo, 0, &pList, &num); if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; @@ -2450,16 +2470,18 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); - SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList); + SArray* tableIdList = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo); code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); if (code != 0) { taosArrayDestroy(tableIdList); goto _error; } + taosArrayDestroy(tableIdList); memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->base.cond, sizeof(SQueryTableDataCond)); } else { taosArrayDestroy(pColIds); + tableListDestroy(pTableListInfo); pColIds = NULL; } @@ -2494,7 +2516,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; + __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL); @@ -2525,7 +2547,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); - int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); + int32_t size = tableListGetSize(pInfo->pTableListInfo); if (size == 0) { setTaskStatus(pTaskInfo, TASK_COMPLETED); return NULL; @@ -2537,7 +2559,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - STableKeyInfo* item = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->curPos); + STableKeyInfo* item = tableListGetInfo(pInfo->pTableListInfo, pInfo->curPos); int32_t code = metaGetTableEntryByUid(&mr, item->uid); tDecoderClear(&mr.coder); if (code != TSDB_CODE_SUCCESS) { @@ -2598,10 +2620,11 @@ static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); taosArrayDestroy(pInfo->matchInfo.pList); + pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo); taosMemoryFreeClear(param); } -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -2624,6 +2647,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi goto _error; } + pInfo->pTableListInfo = pTableListInfo; pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; @@ -2657,7 +2681,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { SQueryTableDataCond* pQueryCond = taosArrayGet(pInfo->queryConds, readIdx); int64_t st = taosGetTimestampUs(); - void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex); + void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->base.readHandle; if (NULL == source->dataReader || !source->multiReader) { @@ -2714,7 +2738,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { continue; } - pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid); + pBlock->info.id.groupId = getTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -2770,10 +2794,10 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; { - size_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); + size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { - STableKeyInfo* tableKeyInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); + STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -2907,7 +2931,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - size_t tableListSize = tableListGetSize(pTaskInfo->pTableInfoList); + size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; @@ -2916,7 +2940,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { return NULL; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex))->groupId; + pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator); } @@ -2941,7 +2965,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->tableStartIndex)->groupId; + pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId; startGroupTableMergeScan(pOperator); resetLimitInfoForNextGroup(&pInfo->limitInfo); } @@ -2963,9 +2987,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { p->dataReader = NULL; } - tsdbReaderClose(pTableScanInfo->base.dataReader); - pTableScanInfo->base.dataReader = NULL; - taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; @@ -2974,20 +2995,14 @@ void destroyTableMergeScanOperatorInfo(void* param) { SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); taosMemoryFree(pCond->colList); } - taosArrayDestroy(pTableScanInfo->queryConds); - if (pTableScanInfo->base.matchInfo.pList != NULL) { - taosArrayDestroy(pTableScanInfo->base.matchInfo.pList); - } + taosArrayDestroy(pTableScanInfo->queryConds); + destroyTableScanBase(&pTableScanInfo->base); pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock); pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock); taosArrayDestroy(pTableScanInfo->pSortInfo); - cleanupExprSupp(&pTableScanInfo->base.pseudoSup); - - taosLRUCacheCleanup(pTableScanInfo->base.metaCache.pTableMetaEntryCache); - taosMemoryFreeClear(param); } @@ -3006,7 +3021,7 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla } SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -3048,6 +3063,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->base.limitInfo.limit.limit = -1; pInfo->base.limitInfo.slimit.limit = -1; + pInfo->base.pTableListInfo = pTableListInfo; pInfo->sample.sampleRatio = pTableScanNode->ratio; pInfo->sample.seed = taosGetTimestampSec(); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 68e5cf6ef8..1abe678ac6 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -83,10 +83,11 @@ typedef struct MergeIndex { } MergeIndex; typedef struct SBlockDistInfo { - SSDataBlock* pResBlock; - STsdbReader* pHandle; - SReadHandle readHandle; - uint64_t uid; // table uid + SSDataBlock* pResBlock; + STsdbReader* pHandle; + SReadHandle readHandle; + STableListInfo* pTableListInfo; + uint64_t uid; // table uid } SBlockDistInfo; static int32_t sysChkFilter__Comm(SNode* pNode); @@ -2214,6 +2215,7 @@ static void destroyBlockDistScanOperatorInfo(void* param) { SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); tsdbReaderClose(pDistInfo->pHandle); + tableListDestroy(pDistInfo->pTableListInfo); taosMemoryFreeClear(param); } @@ -2245,7 +2247,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC } SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, - SExecTaskInfo* pTaskInfo) { + STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -2263,9 +2265,9 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi goto _error; } - STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; - size_t num = tableListGetSize(pTableListInfo); - void* pList = tableListGetInfo(pTableListInfo, 0); + pInfo->pTableListInfo = pTableListInfo; + size_t num = tableListGetSize(pTableListInfo); + void* pList = tableListGetInfo(pTableListInfo, 0); code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false); cleanupQueryTableDataCond(&cond); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 70febcaf6a..29acc1fea2 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -938,6 +938,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } + TSKEY ekey = ascScan ? win.ekey : win.skey; int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); @@ -2175,13 +2176,6 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S } } -bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup) { - SET_RES_WINDOW_KEY(pSup->keyBuf, &pWin->skey, sizeof(int64_t), groupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(pSup->pResultRowHashTable, pSup->keyBuf, - GET_RES_WINDOW_KEY_LEN(sizeof(int64_t))); - return p1 == NULL; -} - bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) { if (pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId};