fix: support multi-rows with same ts for join operator

This commit is contained in:
slzhou@taodata.com 2022-07-27 17:09:24 +08:00
parent e1d5971e39
commit 6cb92ef6ee
1 changed files with 17 additions and 13 deletions

View File

@ -116,7 +116,8 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
} }
static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow, static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) { SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock,
int32_t rightPos) {
SJoinOperatorInfo* pJoinInfo = pOperator->info; SJoinOperatorInfo* pJoinInfo = pOperator->info;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) { for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
@ -144,24 +145,22 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock*
colDataAppend(pDst, currRow, p, false); colDataAppend(pDst, currRow, p, false);
} }
} }
} }
typedef struct SRowLocation { typedef struct SRowLocation {
SSDataBlock* pDataBlock; SSDataBlock* pDataBlock;
int32_t pos; int32_t pos;
} SRowLocation; } SRowLocation;
static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slotId, int32_t startPos, static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t slotId, int32_t startPos, int64_t timestamp,
SArray* pPosArray) { SArray* pPosArray) {
int32_t numRows = pBlock->info.rows; int32_t numRows = pBlock->info.rows;
ASSERT(startPos < numRows); ASSERT(startPos < numRows);
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t i = startPos; int32_t i = startPos;
char* pVal = colDataGetData(pCol, i); for (; i < numRows; ++i) {
for (i = startPos + 1; i < numRows; ++i) {
char* pNextVal = colDataGetData(pCol, i); char* pNextVal = colDataGetData(pCol, i);
if (*(int64_t*)pVal != *(int64_t*)pNextVal) { if (timestamp != *(int64_t*)pNextVal) {
break; break;
} }
} }
@ -171,7 +170,7 @@ static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slot
if (endPos - startPos > 1) { if (endPos - startPos > 1) {
block = blockDataExtractBlock(pBlock, startPos, endPos - startPos); block = blockDataExtractBlock(pBlock, startPos, endPos - startPos);
} }
SRowLocation location = {0}; SRowLocation location = {0};
for (int32_t j = startPos; j < endPos; ++j) { for (int32_t j = startPos; j < endPos; ++j) {
location.pDataBlock = block; location.pDataBlock = block;
location.pos = j; location.pos = j;
@ -180,6 +179,11 @@ static int32_t mergeJoinGetBlockRowsEqualStart(SSDataBlock* pBlock, int16_t slot
return 0; return 0;
} }
static int32_t mergeJoinGetRowsEqualTimeStamp(SJoinOperatorInfo* pJoinInfo, SArray* pPosArray) {
return 0;
}
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) { static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
SJoinOperatorInfo* pJoinInfo = pOperator->info; SJoinOperatorInfo* pJoinInfo = pOperator->info;
@ -228,14 +232,14 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
while (1) { while (1) {
int64_t leftTs = 0; int64_t leftTs = 0;
int64_t rightTs = 0; int64_t rightTs = 0;
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs); bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
if (!hasNextTs) { if (!hasNextTs) {
break; break;
} }
if (leftTs == rightTs) { if (leftTs == rightTs) {
mergeJoinJoinLeftRight(pOperator, pRes, nrows, mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight,
pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos); pJoinInfo->rightPos);
pJoinInfo->leftPos += 1; pJoinInfo->leftPos += 1;
pJoinInfo->rightPos += 1; pJoinInfo->rightPos += 1;