diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 68f6951c26..c90c18e6f5 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -35,7 +35,7 @@ typedef struct SJoinRowCtx { int32_t leftRowIdx; int32_t rightRowIdx; - SSHashObj* buildTableTSRange; + bool rightUseBuildTable; SArray* rightRowLocations; } SJoinRowCtx; @@ -62,6 +62,7 @@ typedef struct SJoinOperatorInfo { char* rightTagKeyBuf; int32_t rightTagKeyLen; + SSHashObj* rightBuildTable; SJoinRowCtx rowCtx; } SJoinOperatorInfo; @@ -265,6 +266,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t extractTagEqualCondCols(pInfo, pDownstream, pInfo->pTagEqualConditions, pInfo->leftTagCols, pInfo->rightTagCols); initTagColskeyBuf(&pInfo->leftTagKeyLen, &pInfo->leftTagKeyBuf, pInfo->leftTagCols); initTagColskeyBuf(&pInfo->rightTagKeyLen, &pInfo->rightTagKeyBuf, pInfo->rightTagCols); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); } pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); @@ -292,9 +295,22 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->scale = pColumnNode->node.resType.scale; } +static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { + void* p = NULL; + int32_t iter = 0; + + while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { + SArray* rows = (*(SArray**)p); + taosArrayDestroy(rows); + } + + tSimpleHashCleanup(pBuildTable); +} + void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; if (pJoinOperator->pTagEqualConditions != NULL) { + mergeJoinDestoryBuildTable(pJoinOperator->rightBuildTable); taosMemoryFreeClear(pJoinOperator->rightTagKeyBuf); taosArrayDestroy(pJoinOperator->rightTagCols); @@ -420,10 +436,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator return 0; } -static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj** ppHashObj) { - int32_t buildTableCap = TMIN(taosArrayGetSize(rightRowLocations), 256); - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); - SSHashObj* buildTable = tSimpleHashInit(buildTableCap, hashFn); +static int32_t mergeJoinFillBuildTable(SJoinOperatorInfo* pInfo, SArray* rightRowLocations,SSHashObj* buildTable) { for (int32_t i = 0; i < taosArrayGetSize(rightRowLocations); ++i) { SRowLocation* rightRow = taosArrayGet(rightRowLocations, i); int32_t keyLen = fillKeyBufFromTagCols(pInfo->rightTagCols, rightRow->pDataBlock, rightRow->pos, pInfo->rightTagKeyBuf); @@ -436,13 +449,12 @@ static int32_t mergeJoinCreateBuildTable(SJoinOperatorInfo* pInfo, SArray* right taosArrayPush(*ppRows, rightRow); } } - *ppHashObj = buildTable; return TSDB_CODE_SUCCESS; } static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t* nRows, const SArray* leftRowLocations, int32_t leftRowIdx, - int32_t rightRowIdx, SSHashObj* rightTableHash, SArray* rightRowLocations, bool* pReachThreshold) { + int32_t rightRowIdx, bool useBuildTableTSRange, SArray* rightRowLocations, bool* pReachThreshold) { *pReachThreshold = false; uint32_t limitRowNum = pOperator->resultInfo.threshold; SJoinOperatorInfo* pJoinInfo = pOperator->info; @@ -453,9 +465,9 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* for (i = leftRowIdx; i < leftNumJoin; ++i, rightRowIdx = 0) { SRowLocation* leftRow = taosArrayGet(leftRowLocations, i); SArray* pRightRows = NULL; - if (rightTableHash != NULL) { + if (useBuildTableTSRange) { int32_t keyLen = fillKeyBufFromTagCols(pJoinInfo->leftTagCols, leftRow->pDataBlock, leftRow->pos, pJoinInfo->leftTagKeyBuf); - SArray** ppRightRows = tSimpleHashGet(rightTableHash, pJoinInfo->leftTagKeyBuf, keyLen); + SArray** ppRightRows = tSimpleHashGet(pJoinInfo->rightBuildTable, pJoinInfo->leftTagKeyBuf, keyLen); if (!ppRightRows) { continue; } @@ -488,20 +500,8 @@ static int32_t mergeJoinLeftRowsRightRows(SOperatorInfo* pOperator, SSDataBlock* return TSDB_CODE_SUCCESS; } -static void mergeJoinDestoryBuildTable(SSHashObj* pBuildTable) { - void* p = NULL; - int32_t iter = 0; - - while ((p = tSimpleHashIterate(pBuildTable, p, &iter)) != NULL) { - SArray* rows = (*(SArray**)p); - taosArrayDestroy(rows); - } - - tSimpleHashCleanup(pBuildTable); -} - static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* leftRowLocations, SArray* leftCreatedBlocks, - SArray* rightCreatedBlocks, SSHashObj* rightTableHash, SArray* rightRowLocations) { + SArray* rightCreatedBlocks, bool rightUseBuildTable, SArray* rightRowLocations) { for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) { SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i); blockDataDestroy(pBlock); @@ -514,8 +514,8 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef if (rightRowLocations != NULL) { taosArrayDestroy(rightRowLocations); } - if (rightTableHash != NULL) { - mergeJoinDestoryBuildTable(rightTableHash); + if (rightUseBuildTable) { + tSimpleHashClear(pJoinInfo->rightBuildTable); } taosArrayDestroy(leftCreatedBlocks); @@ -525,7 +525,7 @@ static void mergeJoinDestroyTSRangeCtx(SJoinOperatorInfo* pJoinInfo, SArray* lef pJoinInfo->rowCtx.leftRowLocations = NULL; pJoinInfo->rowCtx.leftCreatedBlocks = NULL; pJoinInfo->rowCtx.rightCreatedBlocks = NULL; - pJoinInfo->rowCtx.buildTableTSRange = NULL; + pJoinInfo->rowCtx.rightUseBuildTable = false; pJoinInfo->rowCtx.rightRowLocations = NULL; } @@ -540,11 +540,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t int32_t leftRowIdx = 0; int32_t rightRowIdx = 0; SSHashObj* rightTableHash = NULL; + bool rightUseBuildTable = false; if (pJoinInfo->rowCtx.rowRemains) { leftRowLocations = pJoinInfo->rowCtx.leftRowLocations; leftCreatedBlocks = pJoinInfo->rowCtx.leftCreatedBlocks; - rightTableHash = pJoinInfo->rowCtx.buildTableTSRange; + rightUseBuildTable = pJoinInfo->rowCtx.rightUseBuildTable; rightRowLocations = pJoinInfo->rowCtx.rightRowLocations; rightCreatedBlocks = pJoinInfo->rowCtx.rightCreatedBlocks; leftRowIdx = pJoinInfo->rowCtx.leftRowIdx; @@ -561,7 +562,8 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight, pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks); if (pJoinInfo->pTagEqualConditions != NULL && taosArrayGetSize(rightRowLocations) > 16) { - mergeJoinCreateBuildTable(pJoinInfo, rightRowLocations, &rightTableHash); + mergeJoinFillBuildTable(pJoinInfo, rightRowLocations, pJoinInfo->rightBuildTable); + rightUseBuildTable = true; taosArrayDestroy(rightRowLocations); rightRowLocations = NULL; } @@ -578,12 +580,12 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t if (code == TSDB_CODE_SUCCESS) { mergeJoinLeftRowsRightRows(pOperator, pRes, nRows, leftRowLocations, leftRowIdx, - rightRowIdx, rightTableHash, rightRowLocations, &reachThreshold); + rightRowIdx, rightUseBuildTable, rightRowLocations, &reachThreshold); } if (!reachThreshold) { mergeJoinDestroyTSRangeCtx(pJoinInfo, leftRowLocations, leftCreatedBlocks, rightCreatedBlocks, - rightTableHash, rightRowLocations); + rightUseBuildTable, rightRowLocations); } else { pJoinInfo->rowCtx.rowRemains = true; @@ -591,7 +593,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t pJoinInfo->rowCtx.leftRowLocations = leftRowLocations; pJoinInfo->rowCtx.leftCreatedBlocks = leftCreatedBlocks; pJoinInfo->rowCtx.rightCreatedBlocks = rightCreatedBlocks; - pJoinInfo->rowCtx.buildTableTSRange = rightTableHash; + pJoinInfo->rowCtx.rightUseBuildTable = true; pJoinInfo->rowCtx.rightRowLocations = rightRowLocations; } return TSDB_CODE_SUCCESS;