diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 7f7bd4a3ef..ec96a21f47 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -116,7 +116,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { } static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, - SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) { + SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, + int32_t rightPos) { SJoinOperatorInfo* pJoinInfo = pOperator->info; for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { @@ -144,24 +145,22 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* colDataAppend(pDst, currRow, p, false); } } - } typedef struct SRowLocation { - SSDataBlock* pDataBlock; - int32_t pos; + SSDataBlock* pDataBlock; + int32_t pos; } SRowLocation; -static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slotId, int32_t startPos, - SArray* pPosArray) { +static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t slotId, int32_t startPos, int64_t timestamp, + SArray* pPosArray) { int32_t numRows = pBlock->info.rows; ASSERT(startPos < numRows); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); int32_t i = startPos; - char* pVal = colDataGetData(pCol, i); - for (i = startPos + 1; i < numRows; ++i) { + for (; i < numRows; ++i) { char* pNextVal = colDataGetData(pCol, i); - if (*(int64_t*)pVal != *(int64_t*)pNextVal) { + if (timestamp != *(int64_t*)pNextVal) { break; } } @@ -171,7 +170,7 @@ static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slot if (endPos - startPos > 1) { block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); } - SRowLocation location = {0}; + SRowLocation location = {0}; for (int32_t j = startPos; j < endPos; ++j) { location.pDataBlock = block; location.pos = j; @@ -180,6 +179,11 @@ static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slot return 0; } +static int32_t mergeJoinGetRowsEqualTimeStamp(SJoinOperatorInfo* pJoinInfo, SArray* pPosArray) { + + return 0; +} + static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { SJoinOperatorInfo* pJoinInfo = pOperator->info; @@ -228,14 +232,14 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) while (1) { int64_t leftTs = 0; int64_t rightTs = 0; - bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); + bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); if (!hasNextTs) { break; } if (leftTs == rightTs) { - mergeJoinJoinLeftRight(pOperator, pRes, nrows, - pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos); + mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, + pJoinInfo->rightPos); pJoinInfo->leftPos += 1; pJoinInfo->rightPos += 1;