diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index eb37316402..01b8d5c51e 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -229,6 +229,7 @@ typedef struct STableScanPhysiNode { double ratio; int32_t dataRequired; SNodeList* pDynamicScanFuncs; + SNodeList* pPartitionKeys; int64_t interval; int64_t offset; int64_t sliding; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index f2f0bc2055..88f308710e 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -27,6 +27,10 @@ typedef struct { int32_t bytes; } SGroupKeys, SStateKeys; +int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList); +uint64_t calcGroupId(char* pData, int32_t len); +void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex); +int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e7a3390cf3..60be28167d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -333,6 +333,12 @@ typedef struct STableScanInfo { double sampleRatio; // data block sample ratio, 1 by default SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result + int32_t curTWinIdx; } STableScanInfo; @@ -727,7 +733,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3c46f46e19..8388253975 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4387,9 +4387,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pDataReader == NULL && terrno != 0) { return NULL; } - + SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); - SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, groupKyes, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8c3a0c0e6e..3691024096 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -24,10 +24,10 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" +#include "executorInt.h" static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); -static uint64_t calcGroupId(char* pData, int32_t len); static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; @@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pGroupColVals); } -static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { +int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); if ((*pGroupColVals) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -110,7 +110,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo return true; } -static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) { +void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex) { SColumnDataAgg* pColAgg = NULL; size_t numOfGroupCols = taosArrayGetSize(pGroupCols); @@ -139,7 +139,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData } } -static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { +int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { ASSERT(pKey != NULL); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); @@ -409,7 +409,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SPartitionOperatorInfo* pInfo = pOperator->info; - int32_t numOfGroupCols = taosArrayGetSize(pInfo->pGroupCols); for (int32_t j = 0; j < pBlock->info.rows; ++j) { recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); @@ -622,8 +621,13 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); taosArrayDestroy(pInfo->pGroupCols); + for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){ + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i); + taosMemoryFree(key.pData); + } taosArrayDestroy(pInfo->pGroupColVals); taosMemoryFree(pInfo->keyBuf); + taosHashCleanup(pInfo->pGroupSet); taosMemoryFree(pInfo->columnOffset); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 348d85943e..0774ccec1c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -33,6 +33,8 @@ #include "ttypes.h" #include "vnode.h" +#include "executorInt.h" + #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -373,6 +375,17 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { longjmp(pOperator->pTaskInfo->env, code); } + recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0); + int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); + + uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); + if (groupId) { + pBlock->info.groupId = *groupId; + }else if(len != 0){ + pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len); + taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t)); + } + // current block is filter out according to filter condition, continue load the next block if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { continue; @@ -497,21 +510,25 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFree(pTableScanInfo->pResBlock); tsdbCleanupReadHandle(pTableScanInfo->dataReader); + taosArrayDestroy(pTableScanInfo->pGroupCols); + for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){ + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i); + taosMemoryFree(key.pData); + } + taosArrayDestroy(pTableScanInfo->pGroupColVals); + taosMemoryFree(pTableScanInfo->keyBuf); + taosHashCleanup(pTableScanInfo->pGroupSet); if (pTableScanInfo->pColMatchInfo != NULL) { taosArrayDestroy(pTableScanInfo->pColMatchInfo); } } SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, - SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { + SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; @@ -522,7 +539,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - return NULL; + goto _error; } if (pTableScanNode->scan.pScanPseudoCols != NULL) { @@ -552,6 +569,18 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; + // for table group + pInfo->pGroupCols = groupKyes; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + if (pInfo->pGroupSet == NULL) { + goto _error; + } + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo); @@ -559,6 +588,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->cost.openCost = 0; return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; } SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) { @@ -901,7 +936,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan SScanPhysiNode* pScanPhyNode = &pTableScanNode->scan; SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo); STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;