fix: prepare for multirow join
This commit is contained in:
parent
633542914b
commit
70067fa939
|
@ -116,7 +116,9 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
|
||||||
|
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow) {
|
|
||||||
|
static void mergeJoinJoinLeftRight(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow,
|
||||||
|
SSDataBlock* pLeftBlock, int32_t leftPos, SSDataBlock* pRightBlock, int32_t rightPos) {
|
||||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
|
||||||
|
@ -130,11 +132,11 @@ static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int
|
||||||
|
|
||||||
SColumnInfoData* pSrc = NULL;
|
SColumnInfoData* pSrc = NULL;
|
||||||
if (pJoinInfo->pLeft->info.blockId == blockId) {
|
if (pJoinInfo->pLeft->info.blockId == blockId) {
|
||||||
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
|
pSrc = taosArrayGet(pLeftBlock->pDataBlock, slotId);
|
||||||
rowIndex = pJoinInfo->leftPos;
|
rowIndex = leftPos;
|
||||||
} else {
|
} else {
|
||||||
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
|
pSrc = taosArrayGet(pRightBlock->pDataBlock, slotId);
|
||||||
rowIndex = pJoinInfo->rightPos;
|
rowIndex = rightPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (colDataIsNull_s(pSrc, rowIndex)) {
|
if (colDataIsNull_s(pSrc, rowIndex)) {
|
||||||
|
@ -146,15 +148,10 @@ static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
|
|
||||||
|
static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs, int64_t* pRightTs) {
|
||||||
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
|
||||||
int32_t nrows = pRes->info.rows;
|
|
||||||
|
|
||||||
bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
// todo extract method
|
|
||||||
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
||||||
SOperatorInfo* ds1 = pOperator->pDownstream[0];
|
SOperatorInfo* ds1 = pOperator->pDownstream[0];
|
||||||
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
|
pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1);
|
||||||
|
@ -162,7 +159,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
|
||||||
pJoinInfo->leftPos = 0;
|
pJoinInfo->leftPos = 0;
|
||||||
if (pJoinInfo->pLeft == NULL) {
|
if (pJoinInfo->pLeft == NULL) {
|
||||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||||
break;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -173,33 +170,54 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
|
||||||
pJoinInfo->rightPos = 0;
|
pJoinInfo->rightPos = 0;
|
||||||
if (pJoinInfo->pRight == NULL) {
|
if (pJoinInfo->pRight == NULL) {
|
||||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||||
break;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// only the timestamp match support for ordinary table
|
||||||
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
|
SColumnInfoData* pLeftCol = taosArrayGet(pJoinInfo->pLeft->pDataBlock, pJoinInfo->leftCol.slotId);
|
||||||
char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
|
char* pLeftVal = colDataGetData(pLeftCol, pJoinInfo->leftPos);
|
||||||
|
*pLeftTs = *(int64_t*)pLeftVal;
|
||||||
|
|
||||||
SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
|
SColumnInfoData* pRightCol = taosArrayGet(pJoinInfo->pRight->pDataBlock, pJoinInfo->rightCol.slotId);
|
||||||
char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
|
char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos);
|
||||||
|
*pRightTs = *(int64_t*)pRightVal;
|
||||||
|
|
||||||
// only the timestamp match support for ordinary table
|
|
||||||
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
|
ASSERT(pRightCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||||
doJoinOneRow(pOperator, pRes, nrows);
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
|
||||||
|
SJoinOperatorInfo* pJoinInfo = pOperator->info;
|
||||||
|
|
||||||
|
int32_t nrows = pRes->info.rows;
|
||||||
|
|
||||||
|
bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
int64_t leftTs = 0;
|
||||||
|
int64_t rightTs = 0;
|
||||||
|
bool hasNextTs = mergeJoinGetNextTimestamp(pOperator, &leftTs, &rightTs);
|
||||||
|
if (!hasNextTs) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (leftTs == rightTs) {
|
||||||
|
mergeJoinJoinLeftRight(pOperator, pRes, nrows,
|
||||||
|
pJoinInfo->pLeft, pJoinInfo->leftPos, pJoinInfo->pRight, pJoinInfo->rightPos);
|
||||||
pJoinInfo->leftPos += 1;
|
pJoinInfo->leftPos += 1;
|
||||||
pJoinInfo->rightPos += 1;
|
pJoinInfo->rightPos += 1;
|
||||||
|
|
||||||
nrows += 1;
|
nrows += 1;
|
||||||
} else if (asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal ||
|
} else if (asc && leftTs < rightTs ||
|
||||||
!asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
|
!asc && leftTs > rightTs) {
|
||||||
pJoinInfo->leftPos += 1;
|
pJoinInfo->leftPos += 1;
|
||||||
|
|
||||||
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else if (asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal ||
|
} else if (asc && leftTs > rightTs ||
|
||||||
!asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
|
!asc && leftTs < rightTs) {
|
||||||
pJoinInfo->rightPos += 1;
|
pJoinInfo->rightPos += 1;
|
||||||
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
||||||
continue;
|
continue;
|
||||||
|
|
Loading…
Reference in New Issue