From dec6e3fffe73e383dac3a29848e994c450456a23 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 12 Jan 2024 18:29:27 +0800 Subject: [PATCH] enh: support asof join --- include/common/tdatablock.h | 2 + include/libs/nodes/plannodes.h | 4 +- include/libs/qcom/query.h | 1 + source/common/src/tdatablock.c | 6 +- source/libs/executor/inc/mergejoin.h | 31 +++- source/libs/executor/src/mergejoin.c | 175 ++++++++++++++++++- source/libs/executor/src/mergejoinoperator.c | 24 ++- source/libs/executor/test/joinTests.cpp | 6 +- source/libs/planner/src/planPhysiCreater.c | 107 ++++++++++-- source/libs/qcom/src/queryUtil.c | 19 ++ 10 files changed, 331 insertions(+), 44 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 2378a2c5b8..c6343b7433 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -237,6 +237,8 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); void blockDataCleanup(SSDataBlock* pDataBlock); void blockDataReset(SSDataBlock* pDataBlock); void blockDataEmpty(SSDataBlock* pDataBlock); +int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, + bool clearPayload); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index bac0ba5642..e1244d4c74 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -477,7 +477,9 @@ typedef struct SSortMergeJoinPhysiNode { EJoinSubType subType; SNode* pWindowOffset; SNode* pJLimit; - int32_t asofOp; + int32_t asofOpType; + SNode* leftPrimExpr; + SNode* rightPrimExpr; int32_t leftPrimSlotId; int32_t rightPrimSlotId; SNodeList* pEqLeft; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 8b5bf1abb1..e65577dfd2 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -268,6 +268,7 @@ void initQueryModuleMsgHandle(); const SSchema* tGetTbnameColumnSchema(); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); +int32_t getAsofJoinReverseOp(EOperatorType op); int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b71a067692..cbb90b8db1 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1353,13 +1353,13 @@ void blockDataReset(SSDataBlock* pDataBlock) { * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to * any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case. */ -static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, +int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, bool clearPayload) { - if (numOfRows <= 0 || numOfRows <= pBlockInfo->capacity) { + if (numOfRows <= 0 || pBlockInfo && numOfRows <= pBlockInfo->capacity) { return TSDB_CODE_SUCCESS; } - int32_t existedRows = pBlockInfo->rows; + int32_t existedRows = pBlockInfo ? pBlockInfo->rows : 0; if (IS_VAR_DATA_TYPE(pColumn->info.type)) { char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows); diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 6853f0df6c..6107ea2c40 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -52,12 +52,13 @@ typedef struct SMJoinRowPos { typedef struct SMJoinColMap { int32_t srcSlot; int32_t dstSlot; + bool vardata; + int32_t bytes; } SMJoinColMap; typedef struct SMJoinColInfo { int32_t srcSlot; int32_t dstSlot; - bool keyCol; bool vardata; int32_t* offset; int32_t bytes; @@ -171,16 +172,32 @@ typedef struct SMJoinMergeCtx { joinCartFp mergeCartFp; } SMJoinMergeCtx; +typedef struct SMJoinWinCache { + int32_t pageLimit; + + int64_t rowsNum; + int32_t rowOffset; + int32_t outBlkIdx; + int32_t outRowOffset; + + int32_t colNum; + SSDataBlock* blk; +} SMJoinWinCache; + typedef struct SMJoinWindowCtx { struct SMJoinOperatorInfo* pJoin; - int32_t asofOp; + bool ascTs; + + int32_t asofOpType; + bool asofLowerRow; bool asofEqRow; + bool asofGreaterRow; int64_t jLimit; + int32_t blkThreshold; SSDataBlock* finBlk; - int64_t resRowsNum; - int32_t resRowOffset; - SArray* resArray; + bool grpRemains; + SMJoinWinCache cache; } SMJoinWindowCtx; @@ -250,6 +267,10 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_ROW_BITMAP_SET(_b, _base, _idx) (!colDataIsNull_f((_b + _base), _idx)) #define MJOIN_SET_ROW_BITMAP(_b, _base, _idx) colDataClearNull_f((_b + _base), _idx) +#define ASOF_EQ_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_EQUAL == (_op)) +#define ASOF_LOWER_ROW_INCLUDED(_op) (OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_LOWER_THAN == (_op)) +#define ASOF_GREATER_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_GREATER_THAN == (_op)) + #define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \ do { \ diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 6e5007bea2..4b75e7e8e8 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -381,7 +381,7 @@ static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) { -static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { +static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; @@ -450,7 +450,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { + if (!mLeftJoinRetrieve(pOperator, pJoin)) { break; } @@ -634,7 +634,7 @@ static FORCE_INLINE int32_t mInnerJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { } -static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { +static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; @@ -671,7 +671,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) { + if (!mInnerJoinRetrieve(pOperator, pJoin)) { break; } @@ -745,7 +745,7 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { return pCtx->lastProbeGrp ? mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false); } -static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { +static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); @@ -1018,7 +1018,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mFullJoinRetrieve(pOperator, pJoin, pCtx)) { + if (!mFullJoinRetrieve(pOperator, pJoin)) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { @@ -1370,7 +1370,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) { + if (!mInnerJoinRetrieve(pOperator, pJoin)) { break; } @@ -1641,7 +1641,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { } do { - if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { + if (!mLeftJoinRetrieve(pOperator, pJoin)) { break; } @@ -1713,11 +1713,170 @@ _return: return pCtx->finBlk; } + +static bool mAsofJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { + bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); + bool buildGot = false; + + do { + if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { + buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); + } + + if (!probeGot) { + mJoinSetDone(pOperator); + return false; + } + + if (buildGot) { + SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); + SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); + if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { + pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; + continue; + } + } + + break; + } while (true); + + return true; +} + + +SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) { + SMJoinOperatorInfo* pJoin = pOperator->info; + SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; + int32_t code = TSDB_CODE_SUCCESS; + int64_t probeTs = 0; + int64_t buildTs = 0; + SColumnInfoData* pBuildCol = NULL; + SColumnInfoData* pProbeCol = NULL; + + blockDataCleanup(pCtx->finBlk); + + if (pCtx->grpRemains) { + MJ_ERR_JRET(mAsofJoinHandleGrpRemains(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + pCtx->grpRemains = false; + } + + do { + if (!mAsofJoinRetrieve(pOperator, pJoin, pCtx)) { + break; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + + if (probeTs == pCtx->lastEqTs) { + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { + continue; + } else { + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + } + } + + while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) { + if (probeTs == buildTs) { + pCtx->lastEqTs = probeTs; + MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + + MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build); + MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); + } else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) { + MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } else { + while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) { + MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build); + if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) { + continue; + } + + break; + } + } + } + + if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) { + pCtx->probeNEqGrp.blk = pJoin->probe->blk; + pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; + pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; + pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1; + + pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; + + MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + return pCtx->finBlk; + } + } + } while (true); + +_return: + + if (code) { + pJoin->errCode = code; + return NULL; + } + + return pCtx->finBlk; +} + + +int32_t mJoinInitWinCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { + pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT; + + pCache->colNum = pJoin->build->finNum; + pCache->blk = createOneDataBlock(pCtx->finBlk, false); + if (NULL == pCache->blk) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pCache->blk->info.capacity = pCtx->jLimit; + + SMJoinTableCtx* build = pJoin->build; + for (int32_t i = 0; i < pCache->colNum; ++i) { + SColumnInfoData* pCol = taosArrayGet(pCache->blk->pDataBlock, build->finCols[i].dstSlot); + doEnsureCapacity(pCol, NULL, pCtx->jLimit, false); + } + + return TSDB_CODE_SUCCESS; +} + int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; pCtx->pJoin = pJoin; + pCtx->asofOpType = pJoinNode->asofOpType; + pCtx->asofEqRow = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType); + pCtx->asofLowerRow = ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType); + pCtx->asofGreaterRow = ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType); + pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; + if (pJoinNode->node.inputTsOrder != ORDER_DESC) { + pCtx->ascTs = true; + } + + pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); + blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize)); + + pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9; + + MJ_ERR_RET(mJoinInitWinCache(&pCtx->cache, pJoin, pCtx)); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 5a67018a74..1a5fcf44ab 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -707,9 +707,9 @@ static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bo } -static int32_t mJoinInitColsMap(int32_t* colNum, SMJoinColMap** pCols, int32_t blkId, SNodeList* pList) { - *pCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap)); - if (NULL == *pCols) { +static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) { + pTable->finCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap)); + if (NULL == pTable->finCols) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -718,14 +718,16 @@ static int32_t mJoinInitColsMap(int32_t* colNum, SMJoinColMap** pCols, int32_t b FOREACH(pNode, pList) { STargetNode* pTarget = (STargetNode*)pNode; SColumnNode* pColumn = (SColumnNode*)pTarget->pExpr; - if (pColumn->dataBlockId == blkId) { - (*pCols)[i].srcSlot = pColumn->slotId; - (*pCols)[i].dstSlot = pTarget->slotId; + if (pColumn->dataBlockId == pTable->blkId) { + pTable->finCols[i].srcSlot = pColumn->slotId; + pTable->finCols[i].dstSlot = pTarget->slotId; + pTable->finCols[i].bytes = pColumn->node.resType.bytes; + pTable->finCols[i].vardata = IS_VAR_DATA_TYPE(pColumn->node.resType.type); ++i; } } - *colNum = i; + pTable->finNum = i; return TSDB_CODE_SUCCESS; } @@ -737,7 +739,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi MJ_ERR_RET(mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId)); MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight, JOIN_TYPE_FULL == pJoin->joinType)); - MJ_ERR_RET(mJoinInitColsMap(&pTable->finNum, &pTable->finCols, pTable->blkId, pJoinNode->pTargets)); + MJ_ERR_RET(mJoinInitFinColsInfo(pTable, pJoinNode->pTargets)); memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); @@ -1330,6 +1332,12 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) { case JOIN_STYPE_ANTI: pJoin->joinFp = mAntiJoinDo; break; + case JOIN_STYPE_ASOF: + pJoin->joinFp = mAsofJoinDo; + break; + case JOIN_STYPE_WIN: + pJoin->joinFp = mWinJoinDo; + break; default: break; } diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 3f240f6645..4eb5c41116 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -2198,7 +2198,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param->joinType, param->subType, param->cond, param->filter, param->asc); - createDummyBlkList(50, 50, 50, 50, 10); + createDummyBlkList(200, 200, 200, 200, 20); while (contLoop) { rerunBlockedHere(); @@ -2230,7 +2230,7 @@ void handleCaseEnd() { } // namespace -#if 0 +#if 1 #if 1 TEST(innerJoin, noCondTest) { SJoinTestParam param; @@ -2333,7 +2333,7 @@ TEST(innerJoin, fullCondTest) { #endif -#if 0 +#if 1 #if 1 TEST(leftOuterJoin, noCondTest) { SJoinTestParam param; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index ae9b829fe4..7299e03a2d 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -755,25 +755,100 @@ static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkI return TSDB_CODE_SUCCESS; } -static int32_t setColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, int32_t* pLeftId, int32_t* pRightId) { - if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) { +static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId, SSortMergeJoinPhysiNode* pJoin) { + if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) { SOperatorNode* pOp = (SOperatorNode*)pEqCond; - if (leftBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) { - *pLeftId = ((SColumnNode*)pOp->pLeft)->slotId; - } else if (rightBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) { - *pRightId = ((SColumnNode*)pOp->pLeft)->slotId; - } else { - planError("invalid primary key col equal cond, leftBlockId:%d", ((SColumnNode*)pOp->pLeft)->dataBlockId); + if (pOp->opType != OP_TYPE_EQUAL && JOIN_STYPE_ASOF != subType) { + planError("invalid primary cond opType, opType:%d", pOp->opType); return TSDB_CODE_PLAN_INTERNAL_ERROR; } - if (leftBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) { - *pLeftId = ((SColumnNode*)pOp->pRight)->slotId; - } else if (rightBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) { - *pRightId = ((SColumnNode*)pOp->pRight)->slotId; - } else { - planError("invalid primary key col equal cond, rightBlockId:%d", ((SColumnNode*)pOp->pRight)->dataBlockId); - return TSDB_CODE_PLAN_INTERNAL_ERROR; + switch (nodeType(pOp->pLeft)) { + case QUERY_NODE_COLUMN: { + SColumnNode* pCol = (SColumnNode*)pOp->pLeft; + if (leftBlkId == pCol->dataBlockId) { + pJoin->leftPrimSlotId = pCol->slotId; + pJoin->asofOpType = pOp->opType; + } else if (rightBlkId == pCol->dataBlockId) { + pJoin->rightPrimSlotId = pCol->slotId; + } else { + planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + break; + } + case QUERY_NODE_FUNCTION: { + SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft; + if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) { + planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN == nodeType(pParam)) { + planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + SColumnNode* pCol = (SColumnNode*)pParam; + if (leftBlkId == pCol->dataBlockId) { + pJoin->leftPrimSlotId = pCol->slotId; + pJoin->asofOpType = pOp->opType; + pJoin->leftPrimExpr = nodesCloneNode(pFunc); + } else if (rightBlkId == pCol->dataBlockId) { + pJoin->rightPrimSlotId = pCol->slotId; + pJoin->rightPrimExpr = nodesCloneNode(pFunc); + } else { + planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + break; + } + default: + planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + switch (nodeType(pOp->pRight)) { + case QUERY_NODE_COLUMN: { + SColumnNode* pCol = (SColumnNode*)pOp->pRight; + if (leftBlkId == pCol->dataBlockId) { + pJoin->leftPrimSlotId = pCol->slotId; + pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType); + } else if (rightBlkId == pCol->dataBlockId) { + pJoin->rightPrimSlotId = pCol->slotId; + } else { + planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + break; + } + case QUERY_NODE_FUNCTION: { + SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight; + if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) { + planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN == nodeType(pParam)) { + planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + SColumnNode* pCol = (SColumnNode*)pParam; + if (leftBlkId == pCol->dataBlockId) { + pJoin->leftPrimSlotId = pCol->slotId; + pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType); + pJoin->leftPrimExpr = nodesCloneNode(pFunc); + } else if (rightBlkId == pCol->dataBlockId) { + pJoin->rightPrimSlotId = pCol->slotId; + pJoin->rightPrimExpr = nodesCloneNode(pFunc); + } else { + planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId); + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + break; + } + default: + planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight)); + return TSDB_CODE_PLAN_INTERNAL_ERROR; } } else { planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond)); @@ -808,7 +883,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond); if (TSDB_CODE_SUCCESS == code) { - code = setColEqCond(pJoin->pPrimKeyCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->leftPrimSlotId, &pJoin->rightPrimSlotId); + code = setColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); } } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index f2d6668863..e25d0ece2d 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -23,6 +23,25 @@ #include "cJSON.h" #include "queryInt.h" +int32_t getAsofJoinReverseOp(EOperatorType op) { + switch (op) { + case OP_TYPE_GREATER_THAN: + return OP_TYPE_LOWER_THAN; + case OP_TYPE_GREATER_EQUAL: + return OP_TYPE_LOWER_EQUAL; + case OP_TYPE_LOWER_THAN: + return OP_TYPE_GREATER_THAN; + case OP_TYPE_LOWER_EQUAL: + return OP_TYPE_GREATER_EQUAL; + case OP_TYPE_EQUAL: + return OP_TYPE_EQUAL; + default: + break; + } + + return -1; +} + const SSchema* tGetTbnameColumnSchema() { static struct SSchema _s = { .colId = TSDB_TBNAME_COLUMN_INDEX,