enh: support asof join

This commit is contained in:
dapan1121 2024-01-12 18:29:27 +08:00
parent 8d1f5ff479
commit dec6e3fffe
10 changed files with 331 additions and 44 deletions

View File

@ -237,6 +237,8 @@ void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
void blockDataCleanup(SSDataBlock* pDataBlock); void blockDataCleanup(SSDataBlock* pDataBlock);
void blockDataReset(SSDataBlock* pDataBlock); void blockDataReset(SSDataBlock* pDataBlock);
void blockDataEmpty(SSDataBlock* pDataBlock); void blockDataEmpty(SSDataBlock* pDataBlock);
int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
bool clearPayload);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int32_t extraSize);

View File

@ -477,7 +477,9 @@ typedef struct SSortMergeJoinPhysiNode {
EJoinSubType subType; EJoinSubType subType;
SNode* pWindowOffset; SNode* pWindowOffset;
SNode* pJLimit; SNode* pJLimit;
int32_t asofOp; int32_t asofOpType;
SNode* leftPrimExpr;
SNode* rightPrimExpr;
int32_t leftPrimSlotId; int32_t leftPrimSlotId;
int32_t rightPrimSlotId; int32_t rightPrimSlotId;
SNodeList* pEqLeft; SNodeList* pEqLeft;

View File

@ -268,6 +268,7 @@ void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema(); const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t getAsofJoinReverseOp(EOperatorType op);
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta); int32_t queryCreateCTableMetaFromMsg(STableMetaRsp* msg, SCTableMeta* pMeta);
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);

View File

@ -1353,13 +1353,13 @@ void blockDataReset(SSDataBlock* pDataBlock) {
* the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to
* any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case. * any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case.
*/ */
static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows,
bool clearPayload) { bool clearPayload) {
if (numOfRows <= 0 || numOfRows <= pBlockInfo->capacity) { if (numOfRows <= 0 || pBlockInfo && numOfRows <= pBlockInfo->capacity) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t existedRows = pBlockInfo->rows; int32_t existedRows = pBlockInfo ? pBlockInfo->rows : 0;
if (IS_VAR_DATA_TYPE(pColumn->info.type)) { if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows); char* tmp = taosMemoryRealloc(pColumn->varmeta.offset, sizeof(int32_t) * numOfRows);

View File

@ -52,12 +52,13 @@ typedef struct SMJoinRowPos {
typedef struct SMJoinColMap { typedef struct SMJoinColMap {
int32_t srcSlot; int32_t srcSlot;
int32_t dstSlot; int32_t dstSlot;
bool vardata;
int32_t bytes;
} SMJoinColMap; } SMJoinColMap;
typedef struct SMJoinColInfo { typedef struct SMJoinColInfo {
int32_t srcSlot; int32_t srcSlot;
int32_t dstSlot; int32_t dstSlot;
bool keyCol;
bool vardata; bool vardata;
int32_t* offset; int32_t* offset;
int32_t bytes; int32_t bytes;
@ -171,16 +172,32 @@ typedef struct SMJoinMergeCtx {
joinCartFp mergeCartFp; joinCartFp mergeCartFp;
} SMJoinMergeCtx; } SMJoinMergeCtx;
typedef struct SMJoinWinCache {
int32_t pageLimit;
int64_t rowsNum;
int32_t rowOffset;
int32_t outBlkIdx;
int32_t outRowOffset;
int32_t colNum;
SSDataBlock* blk;
} SMJoinWinCache;
typedef struct SMJoinWindowCtx { typedef struct SMJoinWindowCtx {
struct SMJoinOperatorInfo* pJoin; struct SMJoinOperatorInfo* pJoin;
int32_t asofOp; bool ascTs;
int32_t asofOpType;
bool asofLowerRow;
bool asofEqRow; bool asofEqRow;
bool asofGreaterRow;
int64_t jLimit; int64_t jLimit;
int32_t blkThreshold;
SSDataBlock* finBlk; SSDataBlock* finBlk;
int64_t resRowsNum; bool grpRemains;
int32_t resRowOffset; SMJoinWinCache cache;
SArray* resArray;
} SMJoinWindowCtx; } SMJoinWindowCtx;
@ -250,6 +267,10 @@ typedef struct SMJoinOperatorInfo {
#define MJOIN_ROW_BITMAP_SET(_b, _base, _idx) (!colDataIsNull_f((_b + _base), _idx)) #define MJOIN_ROW_BITMAP_SET(_b, _base, _idx) (!colDataIsNull_f((_b + _base), _idx))
#define MJOIN_SET_ROW_BITMAP(_b, _base, _idx) colDataClearNull_f((_b + _base), _idx) #define MJOIN_SET_ROW_BITMAP(_b, _base, _idx) colDataClearNull_f((_b + _base), _idx)
#define ASOF_EQ_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_EQUAL == (_op))
#define ASOF_LOWER_ROW_INCLUDED(_op) (OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_LOWER_THAN == (_op))
#define ASOF_GREATER_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_GREATER_THAN == (_op))
#define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \ #define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \
do { \ do { \

View File

@ -381,7 +381,7 @@ static int32_t mLeftJoinMergeCart(SMJoinMergeCtx* pCtx) {
static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false; bool buildGot = false;
@ -450,7 +450,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
} }
do { do {
if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { if (!mLeftJoinRetrieve(pOperator, pJoin)) {
break; break;
} }
@ -634,7 +634,7 @@ static FORCE_INLINE int32_t mInnerJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
} }
static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { static bool mInnerJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false; bool buildGot = false;
@ -671,7 +671,7 @@ SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator) {
} }
do { do {
if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) { if (!mInnerJoinRetrieve(pOperator, pJoin)) {
break; break;
} }
@ -745,7 +745,7 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
return pCtx->lastProbeGrp ? mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false); return pCtx->lastProbeGrp ? mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart(pCtx, &pCtx->buildNEqGrp, false);
} }
static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinMergeCtx* pCtx) { static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); bool buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
@ -1018,7 +1018,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
} }
do { do {
if (!mFullJoinRetrieve(pOperator, pJoin, pCtx)) { if (!mFullJoinRetrieve(pOperator, pJoin)) {
if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) { if (pCtx->lastEqGrp && pJoin->build->rowBitmapSize > 0) {
MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx)); MJ_ERR_JRET(mFullJoinHandleBuildTableRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
@ -1370,7 +1370,7 @@ SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator) {
} }
do { do {
if (!mInnerJoinRetrieve(pOperator, pJoin, pCtx)) { if (!mInnerJoinRetrieve(pOperator, pJoin)) {
break; break;
} }
@ -1641,7 +1641,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
} }
do { do {
if (!mLeftJoinRetrieve(pOperator, pJoin, pCtx)) { if (!mLeftJoinRetrieve(pOperator, pJoin)) {
break; break;
} }
@ -1713,11 +1713,170 @@ _return:
return pCtx->finBlk; return pCtx->finBlk;
} }
static bool mAsofJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
if (!probeGot) {
mJoinSetDone(pOperator);
return false;
}
if (buildGot) {
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
continue;
}
}
break;
} while (true);
return true;
}
SSDataBlock* mAsofJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
int32_t code = TSDB_CODE_SUCCESS;
int64_t probeTs = 0;
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
blockDataCleanup(pCtx->finBlk);
if (pCtx->grpRemains) {
MJ_ERR_JRET(mAsofJoinHandleGrpRemains(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
pCtx->grpRemains = false;
}
do {
if (!mAsofJoinRetrieve(pOperator, pJoin, pCtx)) {
break;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
if (probeTs == pCtx->lastEqTs) {
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
if (MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
continue;
} else {
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
}
}
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && !MJOIN_BUILD_TB_ROWS_DONE(pJoin->build)) {
if (probeTs == buildTs) {
pCtx->lastEqTs = probeTs;
MJ_ERR_JRET(mJoinProcessEqualGrp(pCtx, probeTs, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
MJOIN_GET_TB_COL_TS(pBuildCol, buildTs, pJoin->build);
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
} else if (PROBE_TS_UNREACH(pCtx->ascTs, probeTs, buildTs)) {
MJ_ERR_JRET(mJoinProcessUnreachGrp(pCtx, pJoin->probe, pProbeCol, &probeTs, &buildTs));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
} else {
while (++pJoin->build->blkRowIdx < pJoin->build->blk->info.rows) {
MJOIN_GET_TB_CUR_TS(pBuildCol, buildTs, pJoin->build);
if (PROBE_TS_OVER(pCtx->ascTs, probeTs, buildTs)) {
continue;
}
break;
}
}
}
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && pJoin->build->dsFetchDone) {
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
pCtx->probeNEqGrp.endIdx = pJoin->probe->blk->info.rows - 1;
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart(pCtx, &pCtx->probeNEqGrp, true));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
}
} while (true);
_return:
if (code) {
pJoin->errCode = code;
return NULL;
}
return pCtx->finBlk;
}
int32_t mJoinInitWinCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT;
pCache->colNum = pJoin->build->finNum;
pCache->blk = createOneDataBlock(pCtx->finBlk, false);
if (NULL == pCache->blk) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pCache->blk->info.capacity = pCtx->jLimit;
SMJoinTableCtx* build = pJoin->build;
for (int32_t i = 0; i < pCache->colNum; ++i) {
SColumnInfoData* pCol = taosArrayGet(pCache->blk->pDataBlock, build->finCols[i].dstSlot);
doEnsureCapacity(pCol, NULL, pCtx->jLimit, false);
}
return TSDB_CODE_SUCCESS;
}
int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
pCtx->pJoin = pJoin; pCtx->pJoin = pJoin;
pCtx->asofOpType = pJoinNode->asofOpType;
pCtx->asofEqRow = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType);
pCtx->asofLowerRow = ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
pCtx->asofGreaterRow = ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
if (pJoinNode->node.inputTsOrder != ORDER_DESC) {
pCtx->ascTs = true;
}
pCtx->finBlk = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc);
blockDataEnsureCapacity(pCtx->finBlk, TMAX(MJOIN_DEFAULT_BLK_ROWS_NUM, MJOIN_BLK_SIZE_LIMIT/pJoinNode->node.pOutputDataBlockDesc->totalRowSize));
pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9;
MJ_ERR_RET(mJoinInitWinCache(&pCtx->cache, pJoin, pCtx));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -707,9 +707,9 @@ static int32_t mJoinInitKeyColsInfo(SMJoinTableCtx* pTable, SNodeList* pList, bo
} }
static int32_t mJoinInitColsMap(int32_t* colNum, SMJoinColMap** pCols, int32_t blkId, SNodeList* pList) { static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) {
*pCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap)); pTable->finCols = taosMemoryMalloc(LIST_LENGTH(pList) * sizeof(SMJoinColMap));
if (NULL == *pCols) { if (NULL == pTable->finCols) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -718,14 +718,16 @@ static int32_t mJoinInitColsMap(int32_t* colNum, SMJoinColMap** pCols, int32_t b
FOREACH(pNode, pList) { FOREACH(pNode, pList) {
STargetNode* pTarget = (STargetNode*)pNode; STargetNode* pTarget = (STargetNode*)pNode;
SColumnNode* pColumn = (SColumnNode*)pTarget->pExpr; SColumnNode* pColumn = (SColumnNode*)pTarget->pExpr;
if (pColumn->dataBlockId == blkId) { if (pColumn->dataBlockId == pTable->blkId) {
(*pCols)[i].srcSlot = pColumn->slotId; pTable->finCols[i].srcSlot = pColumn->slotId;
(*pCols)[i].dstSlot = pTarget->slotId; pTable->finCols[i].dstSlot = pTarget->slotId;
pTable->finCols[i].bytes = pColumn->node.resType.bytes;
pTable->finCols[i].vardata = IS_VAR_DATA_TYPE(pColumn->node.resType.type);
++i; ++i;
} }
} }
*colNum = i; pTable->finNum = i;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -737,7 +739,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
MJ_ERR_RET(mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId)); MJ_ERR_RET(mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId));
MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight, JOIN_TYPE_FULL == pJoin->joinType)); MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight, JOIN_TYPE_FULL == pJoin->joinType));
MJ_ERR_RET(mJoinInitColsMap(&pTable->finNum, &pTable->finCols, pTable->blkId, pJoinNode->pTargets)); MJ_ERR_RET(mJoinInitFinColsInfo(pTable, pJoinNode->pTargets));
memcpy(&pTable->inputStat, pStat, sizeof(*pStat)); memcpy(&pTable->inputStat, pStat, sizeof(*pStat));
@ -1330,6 +1332,12 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
case JOIN_STYPE_ANTI: case JOIN_STYPE_ANTI:
pJoin->joinFp = mAntiJoinDo; pJoin->joinFp = mAntiJoinDo;
break; break;
case JOIN_STYPE_ASOF:
pJoin->joinFp = mAsofJoinDo;
break;
case JOIN_STYPE_WIN:
pJoin->joinFp = mWinJoinDo;
break;
default: default:
break; break;
} }

View File

@ -2198,7 +2198,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) {
bool contLoop = true; bool contLoop = true;
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param->joinType, param->subType, param->cond, param->filter, param->asc); SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param->joinType, param->subType, param->cond, param->filter, param->asc);
createDummyBlkList(50, 50, 50, 50, 10); createDummyBlkList(200, 200, 200, 200, 20);
while (contLoop) { while (contLoop) {
rerunBlockedHere(); rerunBlockedHere();
@ -2230,7 +2230,7 @@ void handleCaseEnd() {
} // namespace } // namespace
#if 0 #if 1
#if 1 #if 1
TEST(innerJoin, noCondTest) { TEST(innerJoin, noCondTest) {
SJoinTestParam param; SJoinTestParam param;
@ -2333,7 +2333,7 @@ TEST(innerJoin, fullCondTest) {
#endif #endif
#if 0 #if 1
#if 1 #if 1
TEST(leftOuterJoin, noCondTest) { TEST(leftOuterJoin, noCondTest) {
SJoinTestParam param; SJoinTestParam param;

View File

@ -755,25 +755,100 @@ static int32_t setColEqList(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t setColEqCond(SNode* pEqCond, int16_t leftBlkId, int16_t rightBlkId, int32_t* pLeftId, int32_t* pRightId) { static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, int16_t rightBlkId, SSortMergeJoinPhysiNode* pJoin) {
if (QUERY_NODE_OPERATOR == nodeType(pEqCond) && ((SOperatorNode*)pEqCond)->opType == OP_TYPE_EQUAL) { if (QUERY_NODE_OPERATOR == nodeType(pEqCond)) {
SOperatorNode* pOp = (SOperatorNode*)pEqCond; SOperatorNode* pOp = (SOperatorNode*)pEqCond;
if (leftBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) { if (pOp->opType != OP_TYPE_EQUAL && JOIN_STYPE_ASOF != subType) {
*pLeftId = ((SColumnNode*)pOp->pLeft)->slotId; planError("invalid primary cond opType, opType:%d", pOp->opType);
} else if (rightBlkId == ((SColumnNode*)pOp->pLeft)->dataBlockId) {
*pRightId = ((SColumnNode*)pOp->pLeft)->slotId;
} else {
planError("invalid primary key col equal cond, leftBlockId:%d", ((SColumnNode*)pOp->pLeft)->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR; return TSDB_CODE_PLAN_INTERNAL_ERROR;
} }
if (leftBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) { switch (nodeType(pOp->pLeft)) {
*pLeftId = ((SColumnNode*)pOp->pRight)->slotId; case QUERY_NODE_COLUMN: {
} else if (rightBlkId == ((SColumnNode*)pOp->pRight)->dataBlockId) { SColumnNode* pCol = (SColumnNode*)pOp->pLeft;
*pRightId = ((SColumnNode*)pOp->pRight)->slotId; if (leftBlkId == pCol->dataBlockId) {
} else { pJoin->leftPrimSlotId = pCol->slotId;
planError("invalid primary key col equal cond, rightBlockId:%d", ((SColumnNode*)pOp->pRight)->dataBlockId); pJoin->asofOpType = pOp->opType;
return TSDB_CODE_PLAN_INTERNAL_ERROR; } else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
} else {
planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
break;
}
case QUERY_NODE_FUNCTION: {
SFunctionNode* pFunc = (SFunctionNode*)pOp->pLeft;
if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
planError("invalid primary cond left function type, leftFuncType:%d", pFunc->funcType);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN == nodeType(pParam)) {
planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SColumnNode* pCol = (SColumnNode*)pParam;
if (leftBlkId == pCol->dataBlockId) {
pJoin->leftPrimSlotId = pCol->slotId;
pJoin->asofOpType = pOp->opType;
pJoin->leftPrimExpr = nodesCloneNode(pFunc);
} else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
pJoin->rightPrimExpr = nodesCloneNode(pFunc);
} else {
planError("invalid primary key col equal cond, leftBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
break;
}
default:
planError("invalid primary cond left node type, leftNodeType:%d", nodeType(pOp->pLeft));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
switch (nodeType(pOp->pRight)) {
case QUERY_NODE_COLUMN: {
SColumnNode* pCol = (SColumnNode*)pOp->pRight;
if (leftBlkId == pCol->dataBlockId) {
pJoin->leftPrimSlotId = pCol->slotId;
pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType);
} else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
} else {
planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
break;
}
case QUERY_NODE_FUNCTION: {
SFunctionNode* pFunc = (SFunctionNode*)pOp->pRight;
if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
planError("invalid primary cond right function type, rightFuncType:%d", pFunc->funcType);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN == nodeType(pParam)) {
planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SColumnNode* pCol = (SColumnNode*)pParam;
if (leftBlkId == pCol->dataBlockId) {
pJoin->leftPrimSlotId = pCol->slotId;
pJoin->asofOpType = getAsofJoinReverseOp(pOp->opType);
pJoin->leftPrimExpr = nodesCloneNode(pFunc);
} else if (rightBlkId == pCol->dataBlockId) {
pJoin->rightPrimSlotId = pCol->slotId;
pJoin->rightPrimExpr = nodesCloneNode(pFunc);
} else {
planError("invalid primary key col equal cond, rightBlockId:%d", pCol->dataBlockId);
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
break;
}
default:
planError("invalid primary cond right node type, rightNodeType:%d", nodeType(pOp->pRight));
return TSDB_CODE_PLAN_INTERNAL_ERROR;
} }
} else { } else {
planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond)); planError("invalid primary key col equal cond, type:%d", nodeType(pEqCond));
@ -808,7 +883,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
&pJoin->pPrimKeyCond); &pJoin->pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setColEqCond(pJoin->pPrimKeyCond, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, &pJoin->leftPrimSlotId, &pJoin->rightPrimSlotId); code = setColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
} }
} }

View File

@ -23,6 +23,25 @@
#include "cJSON.h" #include "cJSON.h"
#include "queryInt.h" #include "queryInt.h"
int32_t getAsofJoinReverseOp(EOperatorType op) {
switch (op) {
case OP_TYPE_GREATER_THAN:
return OP_TYPE_LOWER_THAN;
case OP_TYPE_GREATER_EQUAL:
return OP_TYPE_LOWER_EQUAL;
case OP_TYPE_LOWER_THAN:
return OP_TYPE_GREATER_THAN;
case OP_TYPE_LOWER_EQUAL:
return OP_TYPE_GREATER_EQUAL;
case OP_TYPE_EQUAL:
return OP_TYPE_EQUAL;
default:
break;
}
return -1;
}
const SSchema* tGetTbnameColumnSchema() { const SSchema* tGetTbnameColumnSchema() {
static struct SSchema _s = { static struct SSchema _s = {
.colId = TSDB_TBNAME_COLUMN_INDEX, .colId = TSDB_TBNAME_COLUMN_INDEX,