From cdcb1a368d249a72739d67b2025921a7f9633990 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 26 Jul 2022 16:05:42 +0800 Subject: [PATCH 1/5] feat: add input ts order for join operator --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/joinoperator.c | 75 +++++++++++++++---------- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4a57819eba..f1a1011ff0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -804,6 +804,7 @@ typedef struct STagFilterOperatorInfo { typedef struct SJoinOperatorInfo { SSDataBlock *pRes; int32_t joinType; + int32_t inputTsOrder; SSDataBlock *pLeft; int32_t leftPos; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index f26b2f4f0a..11e0059017 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -77,6 +77,15 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->pCondAfterMerge = NULL; } + pInfo->inputTsOrder = TSDB_ORDER_ASC; + if (pJoinNode->inputTsOrder == ORDER_ASC) { + pInfo->inputTsOrder = TSDB_ORDER_ASC; + } else if (pJoinNode->inputTsOrder == ORDER_DESC) { + pInfo->inputTsOrder = TSDB_ORDER_DESC; + } + //TODO: remove this when JoinNode inputTsOrder is ready + pInfo->inputTsOrder = TSDB_ORDER_ASC; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); @@ -107,11 +116,42 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } +static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow) { + SJoinOperatorInfo* pJoinInfo = pOperator->info; + for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); + + SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i]; + + int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; + int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; + int32_t rowIndex = -1; + + SColumnInfoData* pSrc = NULL; + if (pJoinInfo->pLeft->info.blockId == blockId) { + pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); + rowIndex = pJoinInfo->leftPos; + } else { + pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); + rowIndex = pJoinInfo->rightPos; + } + + if (colDataIsNull_s(pSrc, rowIndex)) { + colDataAppendNULL(pDst, currRow); + } else { + char* p = colDataGetData(pSrc, rowIndex); + colDataAppend(pDst, currRow, p, false); + } + } + +} static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) { SJoinOperatorInfo* pJoinInfo = pOperator->info; - int32_t nrows = 0; + int32_t nrows = pRes->info.rows; + + bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false; while (1) { // todo extract method @@ -146,43 +186,20 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) // only the timestamp match support for ordinary table ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { - for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { - SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); - - SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i]; - - int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; - int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; - int32_t rowIndex = -1; - - SColumnInfoData* pSrc = NULL; - if (pJoinInfo->pLeft->info.blockId == blockId) { - pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); - rowIndex = pJoinInfo->leftPos; - } else { - pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); - rowIndex = pJoinInfo->rightPos; - } - - if (colDataIsNull_s(pSrc, rowIndex)) { - colDataAppendNULL(pDst, nrows); - } else { - char* p = colDataGetData(pSrc, rowIndex); - colDataAppend(pDst, nrows, p, false); - } - } - + doJoinOneRow(pOperator, pRes, nrows); pJoinInfo->leftPos += 1; pJoinInfo->rightPos += 1; nrows += 1; - } else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) { + } else if (asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal || + !asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal) { pJoinInfo->leftPos += 1; if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { continue; } - } else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) { + } else if (asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal || + !asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal) { pJoinInfo->rightPos += 1; if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { continue; From dab90f47bc2c1cff05ef9e1896965d1af84f4957 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 26 Jul 2022 17:10:43 +0800 Subject: [PATCH 2/5] fix: fix memory leak --- source/libs/executor/src/executorimpl.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 36ae1d19ec..93964b8e0b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3548,6 +3548,7 @@ void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { cleanupBasicInfo(&pInfo->binfo); cleanupAggSup(&pInfo->aggSup); + cleanupExprSupp(&pInfo->scalarExprSup); cleanupGroupResInfo(&pInfo->groupResInfo); taosMemoryFreeClear(param); } From a17015138c46a7ac8775a73cf5c89dca91f3c140 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 26 Jul 2022 17:18:08 +0800 Subject: [PATCH 3/5] fix: fix fill operator memory leak --- source/libs/executor/src/executorimpl.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 93964b8e0b..8be5466182 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -92,7 +92,7 @@ static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlo static void releaseQueryBuf(size_t numOfTables); -static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); +static void destroyFillOperatorInfo(void* param, int32_t numOfOutput); static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput); static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); @@ -3553,12 +3553,12 @@ void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { +void destroyFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pRes = blockDataDestroy(pInfo->pRes); taosMemoryFreeClear(pInfo->p); - + taosArrayDestroy(pInfo->pColMatchColInfo); taosMemoryFreeClear(param); } @@ -3649,7 +3649,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL, NULL, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; From 4ab28c4da2bc6eea94fc6c461d2190866b8e3dc2 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 26 Jul 2022 17:22:23 +0800 Subject: [PATCH 4/5] fix: fix mem leak of fill operator --- source/libs/executor/src/tfill.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 90ffff5faf..98eb0d1d90 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -514,7 +514,7 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { if (pFillInfo == NULL) { return NULL; } - + taosMemoryFree(pFillInfo->prev->pData); taosArrayDestroy(pFillInfo->prev); taosArrayDestroy(pFillInfo->next); From 633542914baa388a69be6acdc607fe02ad45c079 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 26 Jul 2022 19:19:24 +0800 Subject: [PATCH 5/5] fix: fix memory leak of fill operator --- source/libs/executor/src/tfill.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 98eb0d1d90..ff856a48b6 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -514,8 +514,15 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { if (pFillInfo == NULL) { return NULL; } - taosMemoryFree(pFillInfo->prev->pData); + for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev); ++i) { + SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i); + taosMemoryFree(pKey->pData); + } taosArrayDestroy(pFillInfo->prev); + for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next); ++i) { + SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i); + taosMemoryFree(pKey->pData); + } taosArrayDestroy(pFillInfo->next); for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) {