feat: add support for join operator when multiple rows with same ts
This commit is contained in:
parent
c061cd2fe2
commit
c962e9b8fd
|
@ -130,7 +130,7 @@ static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock*
|
|||
int32_t rowIndex = -1;
|
||||
|
||||
SColumnInfoData* pSrc = NULL;
|
||||
if (pJoinInfo->pLeft->info.blockId == blockId) {
|
||||
if (pLeftBlock->info.blockId == blockId) {
|
||||
pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
|
||||
rowIndex = leftPos;
|
||||
} else {
|
||||
|
@ -151,8 +151,9 @@ typedef struct SRowLocation {
|
|||
int32_t pos;
|
||||
} SRowLocation;
|
||||
|
||||
// pBlock[tsSlotId][startPos, endPos) == timestamp,
|
||||
static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotId, int32_t startPos, int64_t timestamp,
|
||||
int32_t* pEndPos, SArray* pRowLocations, SArray* createdBlocks) {
|
||||
int32_t* pEndPos, SArray* rowLocations, SArray* createdBlocks) {
|
||||
int32_t numRows = pBlock->info.rows;
|
||||
ASSERT(startPos < numRows);
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsSlotId);
|
||||
|
@ -167,25 +168,107 @@ static int32_t mergeJoinGetBlockRowsEqualTs(SSDataBlock* pBlock, int16_t tsSlotI
|
|||
int32_t endPos = i;
|
||||
*pEndPos = endPos;
|
||||
|
||||
if (endPos - startPos == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSDataBlock* block = pBlock;
|
||||
bool createdNewBlock = false;
|
||||
if (endPos == numRows) {
|
||||
block = blockDataExtractBlock(pBlock, startPos, endPos - startPos);
|
||||
block = blockDataExtractBlock(pBlock, startPos, endPos-startPos);
|
||||
taosArrayPush(createdBlocks, &block);
|
||||
createdNewBlock = true;
|
||||
}
|
||||
SRowLocation location = {0};
|
||||
for (int32_t j = startPos; j < endPos; ++j) {
|
||||
location.pDataBlock = block;
|
||||
location.pos = j;
|
||||
taosArrayPush(pRowLocations, &location);
|
||||
location.pos = ( createdNewBlock ? j - startPos : j);
|
||||
taosArrayPush(rowLocations, &location);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mergeJoinGetRowsEqualTimeStamp(SJoinOperatorInfo* pJoinInfo, SArray* pPosArray) {
|
||||
// whichChild == 0, left child of join; whichChild ==1, right child of join
|
||||
static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator, int32_t whichChild, int16_t tsSlotId,
|
||||
SSDataBlock* startDataBlock, int32_t startPos,
|
||||
int64_t timestamp, SArray* rowLocations,
|
||||
SArray* createdBlocks) {
|
||||
ASSERT(whichChild == 0 || whichChild == 1);
|
||||
|
||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||
int32_t endPos = -1;
|
||||
SSDataBlock* dataBlock = startDataBlock;
|
||||
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks);
|
||||
while (endPos == dataBlock->info.rows) {
|
||||
SOperatorInfo* ds = pOperator->pDownstream[whichChild];
|
||||
dataBlock = ds->fpSet.getNextFn(ds);
|
||||
if (whichChild == 0) {
|
||||
pJoinInfo->leftPos = 0;
|
||||
pJoinInfo->pLeft = dataBlock;
|
||||
} else if (whichChild == 1) {
|
||||
pJoinInfo->rightPos = 0;
|
||||
pJoinInfo->pRight = dataBlock;
|
||||
}
|
||||
|
||||
if (dataBlock == NULL) {
|
||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
endPos = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, 0, timestamp, &endPos, rowLocations, createdBlocks);
|
||||
}
|
||||
if (endPos != -1) {
|
||||
if (whichChild == 0) {
|
||||
pJoinInfo->leftPos = endPos;
|
||||
} else if (whichChild == 1) {
|
||||
pJoinInfo->rightPos = endPos;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t timestamp, SSDataBlock* pRes,
|
||||
int32_t* nRows) {
|
||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||
SArray* leftRowLocations = taosArrayInit(8, sizeof(SRowLocation));
|
||||
SArray* leftCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
|
||||
|
||||
SArray* rightRowLocations = taosArrayInit(8, sizeof(SRowLocation));
|
||||
SArray* rightCreatedBlocks = taosArrayInit(8, POINTER_BYTES);
|
||||
|
||||
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 0, pJoinInfo->leftCol.slotId, pJoinInfo->pLeft,
|
||||
pJoinInfo->leftPos, timestamp, leftRowLocations, leftCreatedBlocks);
|
||||
mergeJoinGetDownStreamRowsEqualTimeStamp(pOperator, 1, pJoinInfo->rightCol.slotId, pJoinInfo->pRight,
|
||||
pJoinInfo->rightPos, timestamp, rightRowLocations, rightCreatedBlocks);
|
||||
|
||||
size_t leftNumJoin = taosArrayGetSize(leftRowLocations);
|
||||
size_t rightNumJoin = taosArrayGetSize(rightRowLocations);
|
||||
for (int32_t i = 0; i < leftNumJoin; ++i) {
|
||||
for (int32_t j = 0; j < rightNumJoin; ++j) {
|
||||
SRowLocation* leftRow = taosArrayGet(leftRowLocations, i);
|
||||
SRowLocation* rightRow = taosArrayGet(rightRowLocations, j);
|
||||
mergeJoinJoinLeftRight(pOperator, pRes, *nRows, leftRow->pDataBlock, leftRow->pos, rightRow->pDataBlock,
|
||||
rightRow->pos);
|
||||
++*nRows;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(rightCreatedBlocks); ++i) {
|
||||
SSDataBlock* pBlock = taosArrayGetP(rightCreatedBlocks, i);
|
||||
blockDataDestroy(pBlock);
|
||||
}
|
||||
taosArrayDestroy(rightCreatedBlocks);
|
||||
taosArrayDestroy(rightRowLocations);
|
||||
for (int i = 0; i < taosArrayGetSize(leftCreatedBlocks); ++i) {
|
||||
SSDataBlock* pBlock = taosArrayGetP(leftCreatedBlocks, i);
|
||||
blockDataDestroy(pBlock);
|
||||
}
|
||||
taosArrayDestroy(leftCreatedBlocks);
|
||||
taosArrayDestroy(leftRowLocations);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
|
||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||
|
||||
|
@ -242,10 +325,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
|
|||
if (leftTs == rightTs) {
|
||||
mergeJoinJoinLeftRight(pOperator, pRes, nrows, pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight,
|
||||
pJoinInfo->rightPos);
|
||||
pJoinInfo->leftPos += 1;
|
||||
pJoinInfo->rightPos += 1;
|
||||
|
||||
nrows += 1;
|
||||
mergeJoinJoinDownstreamTsRanges(pOperator, leftTs, pRes, &nrows);
|
||||
} else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
|
||||
pJoinInfo->leftPos += 1;
|
||||
|
||||
|
|
Loading…
Reference in New Issue