feat: add input ts order for join operator

This commit is contained in:
shenglian zhou 2022-07-26 16:05:42 +08:00
parent b870b6381f
commit cdcb1a368d
2 changed files with 47 additions and 29 deletions

View File

@ -804,6 +804,7 @@ typedef struct STagFilterOperatorInfo {
typedef struct SJoinOperatorInfo {
SSDataBlock *pRes;
int32_t joinType;
int32_t inputTsOrder;
SSDataBlock *pLeft;
int32_t leftPos;

View File

@ -77,6 +77,15 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
pInfo->pCondAfterMerge = NULL;
}
pInfo->inputTsOrder = TSDB_ORDER_ASC;
if (pJoinNode->inputTsOrder == ORDER_ASC) {
pInfo->inputTsOrder = TSDB_ORDER_ASC;
} else if (pJoinNode->inputTsOrder == ORDER_DESC) {
pInfo->inputTsOrder = TSDB_ORDER_DESC;
}
//TODO: remove this when JoinNode inputTsOrder is ready
pInfo->inputTsOrder = TSDB_ORDER_ASC;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
@ -107,11 +116,42 @@ void destroyMergeJoinOperator(void* param, int32_t numOfOutput) {
taosMemoryFreeClear(param);
}
static void doJoinOneRow(struct SOperatorInfo* pOperator, SSDataBlock* pRes, int32_t currRow) {
SJoinOperatorInfo* pJoinInfo = pOperator->info;
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
int32_t rowIndex = -1;
SColumnInfoData* pSrc = NULL;
if (pJoinInfo->pLeft->info.blockId == blockId) {
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
rowIndex = pJoinInfo->leftPos;
} else {
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
rowIndex = pJoinInfo->rightPos;
}
if (colDataIsNull_s(pSrc, rowIndex)) {
colDataAppendNULL(pDst, currRow);
} else {
char* p = colDataGetData(pSrc, rowIndex);
colDataAppend(pDst, currRow, p, false);
}
}
}
static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) {
SJoinOperatorInfo* pJoinInfo = pOperator->info;
int32_t nrows = 0;
int32_t nrows = pRes->info.rows;
bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false;
while (1) {
// todo extract method
@ -146,43 +186,20 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
// only the timestamp match support for ordinary table
ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP);
if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) {
for (int32_t i = 0; i < pOperator->exprSupp.numOfExprs; ++i) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i);
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[i];
int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId;
int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId;
int32_t rowIndex = -1;
SColumnInfoData* pSrc = NULL;
if (pJoinInfo->pLeft->info.blockId == blockId) {
pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId);
rowIndex = pJoinInfo->leftPos;
} else {
pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId);
rowIndex = pJoinInfo->rightPos;
}
if (colDataIsNull_s(pSrc, rowIndex)) {
colDataAppendNULL(pDst, nrows);
} else {
char* p = colDataGetData(pSrc, rowIndex);
colDataAppend(pDst, nrows, p, false);
}
}
doJoinOneRow(pOperator, pRes, nrows);
pJoinInfo->leftPos += 1;
pJoinInfo->rightPos += 1;
nrows += 1;
} else if (*(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
} else if (asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal ||
!asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
pJoinInfo->leftPos += 1;
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
continue;
}
} else if (*(int64_t*)pLeftVal > *(int64_t*)pRightVal) {
} else if (asc && *(int64_t*)pLeftVal > *(int64_t*)pRightVal ||
!asc && *(int64_t*)pLeftVal < *(int64_t*)pRightVal) {
pJoinInfo->rightPos += 1;
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
continue;