From 85cca872a5fbbf1d5af20774c393e2be2d8e8a2a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 29 Feb 2024 17:52:53 +0800 Subject: [PATCH] enh: support timetruncate function --- include/libs/nodes/plannodes.h | 2 +- include/libs/nodes/querynodes.h | 2 +- include/libs/scalar/scalar.h | 1 + source/libs/executor/inc/mergejoin.h | 82 ++++++++------ source/libs/executor/src/mergejoin.c | 30 ++--- source/libs/executor/src/mergejoinoperator.c | 97 ++++++++++++++-- source/libs/executor/test/joinTests.cpp | 110 ++++++++++++++++--- source/libs/nodes/src/nodesCloneFuncs.c | 2 +- source/libs/nodes/src/nodesUtilFuncs.c | 5 +- source/libs/parser/src/parTranslater.c | 20 ++-- source/libs/planner/src/planLogicCreater.c | 2 +- source/libs/planner/src/planOptimizer.c | 53 +++++++-- source/libs/planner/src/planPhysiCreater.c | 14 ++- source/libs/planner/src/planUtil.c | 2 +- source/libs/scalar/src/sclfunc.c | 2 +- tests/script/tsim/join/join.sim | 2 + tests/script/tsim/join/join_scalar.sim | 81 ++++++++++++++ tests/script/tsim/join/left_asof_join.sim | 21 ++++ tests/script/tsim/join/left_semi_join.sim | 3 + tests/script/tsim/join/left_win_join.sim | 4 + 20 files changed, 427 insertions(+), 108 deletions(-) create mode 100644 tests/script/tsim/join/join_scalar.sim diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index ec5571f11c..558af8532f 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -131,7 +131,7 @@ typedef struct SJoinLogicNode { SNode* pWindowOffset; SNode* pJLimit; EJoinAlgorithm joinAlgo; - SNode* winPrimEqCond; + SNode* addPrimEqCond; SNode* pPrimKeyEqCond; SNode* pColEqCond; SNode* pColOnCond; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 6a0566eaeb..5d5b26d236 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -245,7 +245,7 @@ typedef struct SJoinTableNode { EJoinSubType subType; SNode* pWindowOffset; SNode* pJLimit; - SNode* winPrimCond; + SNode* addPrimCond; bool hasSubQuery; bool isLowLevelJoin; SNode* pParent; diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index 5e946357db..ac7f3e5c20 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -82,6 +82,7 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t toTimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t toCharFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); +int64_t offsetFromTz(char *timezone, int64_t factor); int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index e1f82ddeca..a206fbeb7f 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -82,51 +82,59 @@ typedef struct SMJoinNMatchCtx { int32_t grpIdx; } SMJoinNMatchCtx; +// for now timetruncate only +typedef struct SMJoinPrimExprCtx { + int64_t truncateUnit; + int64_t timezoneUnit; + int32_t targetSlotId; +} SMJoinPrimExprCtx; typedef struct SMJoinTableCtx { - EJoinTableType type; - int32_t downStreamIdx; - SOperatorInfo* downStream; - bool dsInitDone; - bool dsFetchDone; + EJoinTableType type; + int32_t downStreamIdx; + SOperatorInfo* downStream; + bool dsInitDone; + bool dsFetchDone; + SNode* primExpr; + SMJoinPrimExprCtx primCtx; - int32_t blkId; - SQueryStat inputStat; + int32_t blkId; + SQueryStat inputStat; - uint64_t lastInGid; - SSDataBlock* remainInBlk; + uint64_t lastInGid; + SSDataBlock* remainInBlk; - SMJoinColMap* primCol; - char* primData; + SMJoinColMap* primCol; + char* primData; - int32_t finNum; - SMJoinColMap* finCols; + int32_t finNum; + SMJoinColMap* finCols; - int32_t keyNum; - int32_t keyNullSize; - SMJoinColInfo* keyCols; - char* keyBuf; - char* keyData; + int32_t keyNum; + int32_t keyNullSize; + SMJoinColInfo* keyCols; + char* keyBuf; + char* keyData; - bool newBlk; - SSDataBlock* blk; - int32_t blkRowIdx; + bool newBlk; + SSDataBlock* blk; + int32_t blkRowIdx; // merge join - int64_t grpTotalRows; - int32_t grpIdx; - bool noKeepEqGrpRows; - bool multiEqGrpRows; - int64_t eqRowLimit; - int64_t eqRowNum; - SArray* eqGrps; - SArray* createdBlks; + int64_t grpTotalRows; + int32_t grpIdx; + bool noKeepEqGrpRows; + bool multiEqGrpRows; + int64_t eqRowLimit; + int64_t eqRowNum; + SArray* eqGrps; + SArray* createdBlks; // hash join - int32_t grpArrayIdx; - SArray* pGrpArrays; + int32_t grpArrayIdx; + SArray* pGrpArrays; bool multiRowsGrp; int32_t grpRowIdx; @@ -134,11 +142,11 @@ typedef struct SMJoinTableCtx { SMJoinHashGrpRows* pHashGrpRows; SSHashObj* pGrpHash; - int64_t rowBitmapSize; - int64_t rowBitmapOffset; - char* pRowBitmap; + int64_t rowBitmapSize; + int64_t rowBitmapOffset; + char* pRowBitmap; - SMJoinNMatchCtx nMatchCtx; + SMJoinNMatchCtx nMatchCtx; } SMJoinTableCtx; typedef struct SMJoinMatchInfo { @@ -321,6 +329,7 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_PROBE_TB_ROWS_DONE(_tb) ((_tb)->blkRowIdx >= (_tb)->blk->info.rows) #define FJOIN_PROBE_TB_ROWS_DONE(_tb) ((NULL == (_tb)->blk) || ((_tb)->blkRowIdx >= (_tb)->blk->info.rows)) #define MJOIN_BUILD_TB_ROWS_DONE(_tb) ((NULL == (_tb)->blk) || ((_tb)->blkRowIdx >= (_tb)->blk->info.rows)) +#define MJOIN_TB_GRP_ROWS_DONE(_tb, _grpJoin) ((_tb)->dsFetchDone || ((_grpJoin) && NULL == (_tb)->blk && NULL != (_tb)->remainInBlk)) #define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity) @@ -378,7 +387,7 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \ do { \ if (NULL != (_tb)->blk && (_tb)->blkRowIdx < (_tb)->blk->info.rows) { \ - (_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \ + (_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCtx.targetSlotId); \ (_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \ } else { \ (_ts) = INT64_MAX; \ @@ -423,6 +432,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator); SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator); void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx); void mJoinResetTableCtx(SMJoinTableCtx* pCtx); +void mLeftJoinGroupReset(SMJoinOperatorInfo* pJoin); void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin); bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb); void mJoinSetDone(SOperatorInfo* pOperator); diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index c17abdf160..eb55a8d016 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -480,8 +480,8 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi } if (buildGot) { - SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); - SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); + SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); + SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; continue; @@ -585,7 +585,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { } } - if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) { + if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) { pCtx->probeNEqGrp.blk = pJoin->probe->blk; pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx; pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx; @@ -1856,7 +1856,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow do { grp.blk = pTable->blk; - SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { return TSDB_CODE_SUCCESS; @@ -2176,7 +2176,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) { } } - if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) { + if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) { pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; @@ -2312,7 +2312,7 @@ int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { } int32_t mAsofGreaterSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) { - SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { *wholeBlk = false; @@ -2468,8 +2468,8 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p } if (buildGot) { - SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); - SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); + SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); + SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; MJOIN_POP_TB_BLK(&pCtx->cache); @@ -2564,7 +2564,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { } } - if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) { + if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) { pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx; pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; @@ -2663,8 +2663,8 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin } if (buildGot && !pCtx->lowerRowsAcq) { - SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot); - SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot); + SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId); + SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId); if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; continue; @@ -2681,7 +2681,7 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { SSDataBlock* pBlk = build->blk; - SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCol->srcSlot); + SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId); if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) >= pCtx->winBeginTs) { for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) { if (*((int64_t*)pCol->pData + build->blkRowIdx) < pCtx->winBeginTs) { @@ -2715,7 +2715,7 @@ int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SM int32_t mWinJoinAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { SSDataBlock* pBlk = build->blk; - SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCol->srcSlot); + SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId); SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) { @@ -2771,7 +2771,7 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { int32_t grpNum = taosArrayGetSize(pCache->grps); for (int32_t i = 0; i < grpNum; ++i) { SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i); - SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot); + SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId); if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) { pCache->rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx); if (pGrp->blk == pCache->outBlk) { @@ -2861,7 +2861,7 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) { } SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps); - SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot); + SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId); if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) <= pCtx->winEndTs) { pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1; if (pCache->rowNum >= pCtx->jLimit) { diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 4d0da62463..c7e8b80007 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -25,11 +25,12 @@ #include "thash.h" #include "tmsg.h" #include "ttypes.h" +#include "functionMgt.h" #include "mergejoin.h" int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) { - SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); pGrp->beginIdx = pTable->blkRowIdx; pGrp->readIdx = pTable->blkRowIdx; @@ -768,6 +769,44 @@ static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) { return TSDB_CODE_SUCCESS; } +static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) { + if (NULL == pNode) { + pCtx->targetSlotId = pTable->primCol->srcSlot; + return TSDB_CODE_SUCCESS; + } + + if (QUERY_NODE_TARGET != nodeType(pNode)) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + STargetNode* pTarget = (STargetNode*)pNode; + if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr)) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr; + if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) { + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + + SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); + SValueNode* pCurrTz = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2) : NULL; + SValueNode* pTimeZone = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4) : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3); + + pCtx->truncateUnit = pUnit->typeData; + if (NULL != pCurrTz && 0 == pCurrTz->typeData) { + pCtx->timezoneUnit = offsetFromTz(pTimeZone->datum.p, TSDB_TICK_PER_SECOND(pFunc->node.resType.precision)); + } + + pCtx->targetSlotId = pTarget->slotId; + + return TSDB_CODE_SUCCESS; +} + static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) { SMJoinTableCtx* pTable = &pJoin->tbs[idx]; pTable->downStream = pDownstream[idx]; @@ -807,6 +846,8 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi } else { pTable->multiEqGrpRows = true; } + + MJ_ERR_RET(mJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable)); return TSDB_CODE_SUCCESS; } @@ -842,26 +883,56 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin pInfo->build->downStreamIdx = buildIdx; pInfo->probe->downStreamIdx = probeIdx; + if (0 == buildIdx) { + pInfo->build->primExpr = pJoinNode->leftPrimExpr; + pInfo->probe->primExpr = pJoinNode->rightPrimExpr; + } else { + pInfo->build->primExpr = pJoinNode->rightPrimExpr; + pInfo->probe->primExpr = pJoinNode->leftPrimExpr; + } + pInfo->build->type = E_JOIN_TB_BUILD; pInfo->probe->type = E_JOIN_TB_PROBE; } +int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) { + if (NULL == pTable->primExpr) { + return TSDB_CODE_SUCCESS; + } + + SMJoinPrimExprCtx* pCtx = &pTable->primCtx; + SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot); + SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId); + if (0 != pCtx->timezoneUnit) { + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + ((int64_t*)pPrimOut->pData)[i] = (((int64_t*)pPrimIn->pData)[i] - pCtx->timezoneUnit) / pCtx->truncateUnit * pCtx->truncateUnit; + } + } else { + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit; + } + } + + return TSDB_CODE_SUCCESS; +} + SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) { + SSDataBlock* pTmp = NULL; int32_t dsIdx = pTable->downStreamIdx; if (E_JOIN_TB_PROBE == pTable->type) { if (pTable->remainInBlk) { - SSDataBlock* pTmp = pTable->remainInBlk; + pTmp = pTable->remainInBlk; pTable->remainInBlk = NULL; (*pJoin->grpResetFp)(pJoin); pTable->lastInGid = pTmp->info.id.groupId; - return pTmp; + goto _return; } if (pTable->dsFetchDone) { return NULL; } - SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx); + pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx); if (NULL == pTmp) { pTable->dsFetchDone = true; return NULL; @@ -869,11 +940,11 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa if (0 == pTable->lastInGid) { pTable->lastInGid = pTmp->info.id.groupId; - return pTmp; + goto _return; } if (pTable->lastInGid == pTmp->info.id.groupId) { - return pTmp; + goto _return; } pTable->remainInBlk = pTmp; @@ -886,10 +957,10 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa while (true) { if (pTable->remainInBlk) { if (pTable->remainInBlk->info.id.groupId == pProbe->lastInGid) { - SSDataBlock* pTmp = pTable->remainInBlk; + pTmp = pTable->remainInBlk; pTable->remainInBlk = NULL; pTable->lastInGid = pTmp->info.id.groupId; - return pTmp; + goto _return; } if (pTable->remainInBlk->info.id.groupId > pProbe->lastInGid) { @@ -912,7 +983,10 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa pTable->remainInBlk = pTmp; } - return NULL; +_return: + + mJoinLaunchPrimExpr(pTmp, pTable); + return pTmp; } static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) { @@ -923,6 +997,8 @@ static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SM SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTable->downStreamIdx); if (NULL == pTmp) { pTable->dsFetchDone = true; + } else { + mJoinLaunchPrimExpr(pTmp, pTable); } return pTmp; @@ -1022,7 +1098,7 @@ void mJoinResetForBuildTable(SMJoinTableCtx* pTable) { } int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) { - SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId); SMJoinGrpRows* pGrp = NULL; if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { @@ -1496,6 +1572,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) { switch (pJoin->subType) { case JOIN_STYPE_OUTER: pJoin->joinFp = mLeftJoinDo; + pJoin->grpResetFp = mLeftJoinGroupReset; break; case JOIN_STYPE_SEMI: pJoin->joinFp = mSemiJoinDo; diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 62077033bc..7f8b257b9f 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -66,7 +66,7 @@ enum { }; #define COL_DISPLAY_WIDTH 18 -#define JT_MAX_LOOP 10000 +#define JT_MAX_LOOP 20000 #define LEFT_BLK_ID 0 #define RIGHT_BLK_ID 1 @@ -107,6 +107,7 @@ typedef struct { typedef struct { bool filter; bool asc; + bool grpJoin; int32_t leftMaxRows; int32_t leftMaxGrpRows; int32_t rightMaxRows; @@ -119,6 +120,7 @@ typedef struct { int64_t jLimit; int64_t winStartOffset; int64_t winEndOffset; + int64_t inGrpId; int32_t leftTotalRows; int32_t rightTotalRows; @@ -194,12 +196,13 @@ typedef struct { int32_t cond; bool filter; bool asc; + bool grpJoin; SExecTaskInfo* pTask; } SJoinTestParam; SJoinTestCtx jtCtx = {0}; -SJoinTestCtrl jtCtrl = {0, 0, 0, 0}; +SJoinTestCtrl jtCtrl = {1, 1, 1, 0}; SJoinTestStat jtStat = {0}; SJoinTestResInfo jtRes = {0}; @@ -794,6 +797,7 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param p->joinType = param->joinType; p->subType = param->subType; p->asofOpType = param->asofOp; + p->grpJoin = param->grpJoin; if (p->subType == JOIN_STYPE_WIN || param->jLimit > 1 || taosRand() % 2) { SLimitNode* limitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT); limitNode->limit = param->jLimit; @@ -824,6 +828,7 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param jtCtx.winEndOffset = pEnd->datum.i; } + jtCtx.grpJoin = param->grpJoin; jtCtx.joinType = param->joinType; jtCtx.subType = param->subType; jtCtx.asc = param->asc; @@ -831,6 +836,7 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param jtCtx.asofOpType = param->asofOp; jtCtx.leftColOnly = (JOIN_TYPE_LEFT == param->joinType && JOIN_STYPE_SEMI == param->subType); jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == param->joinType && JOIN_STYPE_SEMI == param->subType); + jtCtx.inGrpId = 1; createColCond(p, param->cond); createFilterStart(p, param->filter); @@ -1137,6 +1143,10 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk); } + if (jtCtx.grpJoin) { + (*ppBlk)->info.id.groupId = jtCtx.inGrpId; + } + jtCtx.inputStat |= (1 << blkId); SArray* pTableRows = NULL; @@ -1184,6 +1194,9 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) { *ppBlk = createDummyBlock((blkId == LEFT_BLK_ID) ? LEFT_BLK_ID : RIGHT_BLK_ID); blockDataEnsureCapacity(*ppBlk, jtCtx.blkRows); taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk); + if (jtCtx.grpJoin) { + (*ppBlk)->info.id.groupId = jtCtx.inGrpId; + } } filterOut = (peerFilterNum > 0 && (jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN)) ? true : false; @@ -1350,6 +1363,9 @@ void makeAppendBlkData(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left *ppLeft = createDummyBlock(LEFT_BLK_ID); blockDataEnsureCapacity(*ppLeft, jtCtx.blkRows); taosArrayPush(jtCtx.leftBlkList, ppLeft); + if (jtCtx.grpJoin) { + (*ppLeft)->info.id.groupId = jtCtx.inGrpId; + } } createRowData(*ppLeft, 0, i, vRange); @@ -1361,6 +1377,9 @@ void makeAppendBlkData(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left *ppRight = createDummyBlock(RIGHT_BLK_ID); blockDataEnsureCapacity(*ppRight, jtCtx.blkRows); taosArrayPush(jtCtx.rightBlkList, ppRight); + if (jtCtx.grpJoin) { + (*ppRight)->info.id.groupId = jtCtx.inGrpId; + } } createRowData(*ppRight, rightOffset, i, vRange); @@ -2215,18 +2234,39 @@ void createTsEqGrpRows(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left taosArrayPush(jtCtx.leftBlkList, ppLeft); } + if (jtCtx.grpJoin) { + (*ppLeft)->info.id.groupId = jtCtx.inGrpId; + } + if (NULL == *ppRight && rightGrpRows > 0) { *ppRight = createDummyBlock(RIGHT_BLK_ID); blockDataEnsureCapacity(*ppRight, jtCtx.blkRows); taosArrayPush(jtCtx.rightBlkList, ppRight); } + if (jtCtx.grpJoin) { + (*ppRight)->info.id.groupId = jtCtx.inGrpId; + } + makeAppendBlkData(ppLeft, ppRight, leftGrpRows, rightGrpRows); appendEqGrpRes(leftGrpRows, rightGrpRows); } +void forceFlushResRows() { + if (JOIN_STYPE_ASOF == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) { + ASSERT(OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType); + chkAppendAsofGreaterResRows(true); + } else if (JOIN_STYPE_WIN == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) { + chkAppendWinResRows(true); + } + + taosArrayClear(jtCtx.rightRowsList); + taosArrayClear(jtCtx.rightFilterOut); + taosArrayClear(jtCtx.leftRowsList); + +} void createBothBlkRowsData(void) { SSDataBlock* pLeft = NULL; @@ -2269,6 +2309,13 @@ void createBothBlkRowsData(void) { } } + if (jtCtx.grpJoin && (0 == taosRand() % 3)) { + forceFlushResRows(); + jtCtx.inGrpId++; + pLeft = NULL; + pRight = NULL; + } + switch (grpType) { case 0: createGrpRows(&pLeft, LEFT_BLK_ID, leftGrpRows); @@ -2286,12 +2333,7 @@ void createBothBlkRowsData(void) { } } - if (JOIN_STYPE_ASOF == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) { - ASSERT(OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType); - chkAppendAsofGreaterResRows(true); - } else if (JOIN_STYPE_WIN == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) { - chkAppendWinResRows(true); - } + forceFlushResRows(); } void createDummyBlkList(int32_t leftMaxRows, int32_t leftMaxGrpRows, int32_t rightMaxRows, int32_t rightMaxGrpRows, int32_t blkRows) { @@ -2382,6 +2424,9 @@ void printColList(char* title, bool left, int32_t* colList, bool filter, char* o } void printInputRowData(SSDataBlock* pBlk, int32_t* rowIdx) { + if (jtCtx.grpJoin) { + printf("%5" PRIu64, pBlk->info.id.groupId); + } for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) { SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pBlk->pDataBlock, c); ASSERT(pCol->info.type == jtInputColType[c]); @@ -2418,13 +2463,13 @@ void printInputData() { break; } - printf("\t--------------------------blk end-------------------------------"); + printf("\t--------------------------blk end------------------------------- "); jtCtx.leftBlkReadIdx++; leftRowIdx = 0; break; } } else { - printf("%72s", " "); + printf("%*s", jtCtx.grpJoin ? 77 : 72, " "); } if (jtCtx.rightBlkReadIdx < taosArrayGetSize(jtCtx.rightBlkList)) { @@ -2435,7 +2480,7 @@ void printInputData() { break; } - printf("\t--------------------------blk end----------------------------\t"); + printf("\t%*s--------------------------blk end----------------------------\t", jtCtx.grpJoin ? 5 : 0, " "); jtCtx.rightBlkReadIdx++; rightRowIdx = 0; break; @@ -2486,9 +2531,9 @@ void printBasicInfo(char* caseName) { char inputStat[4] = {0}; printf("\n%dth TEST [%s] START\nBasic Info:\n\t asc:%d\n\t filter:%d\n\t maxRows:left-%d right-%d\n\t " "maxGrpRows:left-%d right-%d\n\t blkRows:%d\n\t colCond:%s\n\t joinType:%s\n\t " - "subType:%s\n\t inputStat:%s\n", jtCtx.loopIdx, caseName, jtCtx.asc, jtCtx.filter, jtCtx.leftMaxRows, jtCtx.rightMaxRows, + "subType:%s\n\t inputStat:%s\n\t groupJoin:%s\n", jtCtx.loopIdx, caseName, jtCtx.asc, jtCtx.filter, jtCtx.leftMaxRows, jtCtx.rightMaxRows, jtCtx.leftMaxGrpRows, jtCtx.rightMaxGrpRows, jtCtx.blkRows, jtColCondStr[jtCtx.colCond], jtJoinTypeStr[jtCtx.joinType], - jtSubTypeStr[jtCtx.subType], getInputStatStr(inputStat)); + jtSubTypeStr[jtCtx.subType], getInputStatStr(inputStat), jtCtx.grpJoin ? "true" : "false"); if (JOIN_STYPE_ASOF == jtCtx.subType) { printf("\t asofOp:%s\n\t JLimit:%" PRId64 "\n", getAsofOpStr(), jtCtx.jLimit); @@ -2755,7 +2800,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) { bool contLoop = true; SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param); - createDummyBlkList(200, 200, 200, 200, 20); + createDummyBlkList(12, 3, 12, 3, 3); while (contLoop) { rerunBlockedHere(); @@ -2799,6 +2844,7 @@ TEST(innerJoin, noCondTest) { param.subType = JOIN_STYPE_NONE; param.cond = TEST_NO_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -2824,6 +2870,7 @@ TEST(innerJoin, eqCondTest) { param.subType = JOIN_STYPE_NONE; param.cond = TEST_EQ_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -2849,6 +2896,7 @@ TEST(innerJoin, onCondTest) { param.subType = JOIN_STYPE_NONE; param.cond = TEST_ON_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -2874,6 +2922,7 @@ TEST(innerJoin, fullCondTest) { param.subType = JOIN_STYPE_NONE; param.cond = TEST_FULL_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -2904,9 +2953,11 @@ TEST(leftOuterJoin, noCondTest) { param.asc = true; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { + param.grpJoin = taosRand() % 2 ? true : false; param.filter = false; runSingleTest(caseName, ¶m); + param.grpJoin = taosRand() % 2 ? true : false; param.filter = true; runSingleTest(caseName, ¶m); } @@ -2927,6 +2978,7 @@ TEST(leftOuterJoin, eqCondTest) { param.subType = JOIN_STYPE_OUTER; param.cond = TEST_EQ_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -2952,6 +3004,7 @@ TEST(leftOuterJoin, onCondTest) { param.subType = JOIN_STYPE_OUTER; param.cond = TEST_ON_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -2977,6 +3030,7 @@ TEST(leftOuterJoin, fullCondTest) { param.subType = JOIN_STYPE_OUTER; param.cond = TEST_FULL_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3004,6 +3058,7 @@ TEST(fullOuterJoin, noCondTest) { param.subType = JOIN_STYPE_OUTER; param.cond = TEST_NO_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3029,6 +3084,7 @@ TEST(fullOuterJoin, eqCondTest) { param.subType = JOIN_STYPE_OUTER; param.cond = TEST_EQ_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3055,6 +3111,7 @@ TEST(fullOuterJoin, onCondTest) { param.subType = JOIN_STYPE_OUTER; param.cond = TEST_ON_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3080,6 +3137,7 @@ TEST(fullOuterJoin, fullCondTest) { param.subType = JOIN_STYPE_OUTER; param.cond = TEST_FULL_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3107,6 +3165,7 @@ TEST(leftSemiJoin, noCondTest) { param.subType = JOIN_STYPE_SEMI; param.cond = TEST_NO_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3132,6 +3191,7 @@ TEST(leftSemiJoin, eqCondTest) { param.subType = JOIN_STYPE_SEMI; param.cond = TEST_EQ_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3158,6 +3218,7 @@ TEST(leftSemiJoin, onCondTest) { param.subType = JOIN_STYPE_SEMI; param.cond = TEST_ON_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3183,6 +3244,7 @@ TEST(leftSemiJoin, fullCondTest) { param.subType = JOIN_STYPE_SEMI; param.cond = TEST_FULL_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3210,6 +3272,7 @@ TEST(leftAntiJoin, noCondTest) { param.subType = JOIN_STYPE_ANTI; param.cond = TEST_NO_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3235,6 +3298,7 @@ TEST(leftAntiJoin, eqCondTest) { param.subType = JOIN_STYPE_ANTI; param.cond = TEST_EQ_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3261,6 +3325,7 @@ TEST(leftAntiJoin, onCondTest) { param.subType = JOIN_STYPE_ANTI; param.cond = TEST_ON_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3286,6 +3351,7 @@ TEST(leftAntiJoin, fullCondTest) { param.subType = JOIN_STYPE_ANTI; param.cond = TEST_FULL_COND; param.asc = true; + param.grpJoin = false; for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.filter = false; @@ -3317,10 +3383,12 @@ TEST(leftAsofJoin, noCondGreaterThanTest) { for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; - + + param.grpJoin = taosRand() % 2 ? true : false; param.filter = false; runSingleTest(caseName, ¶m); + param.grpJoin = taosRand() % 2 ? true : false; param.filter = true; runSingleTest(caseName, ¶m); } @@ -3345,10 +3413,12 @@ TEST(leftAsofJoin, noCondGreaterEqTest) { for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; - + + param.grpJoin = taosRand() % 2 ? true : false; param.filter = false; runSingleTest(caseName, ¶m); + param.grpJoin = taosRand() % 2 ? true : false; param.filter = true; runSingleTest(caseName, ¶m); } @@ -3374,9 +3444,11 @@ TEST(leftAsofJoin, noCondEqTest) { for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + param.grpJoin = taosRand() % 2 ? true : false; param.filter = false; runSingleTest(caseName, ¶m); + param.grpJoin = taosRand() % 2 ? true : false; param.filter = true; runSingleTest(caseName, ¶m); } @@ -3402,9 +3474,11 @@ TEST(leftAsofJoin, noCondLowerThanTest) { for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + param.grpJoin = taosRand() % 2 ? true : false; param.filter = false; runSingleTest(caseName, ¶m); + param.grpJoin = taosRand() % 2 ? true : false; param.filter = true; runSingleTest(caseName, ¶m); } @@ -3431,9 +3505,11 @@ TEST(leftAsofJoin, noCondLowerEqTest) { for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + param.grpJoin = taosRand() % 2 ? true : false; param.filter = false; runSingleTest(caseName, ¶m); + param.grpJoin = taosRand() % 2 ? true : false; param.filter = true; runSingleTest(caseName, ¶m); } @@ -3462,9 +3538,11 @@ TEST(leftWinJoin, noCondProjectionTest) { for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) { param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1; + param.grpJoin = taosRand() % 2 ? true : false; param.filter = false; runSingleTest(caseName, ¶m); + param.grpJoin = taosRand() % 2 ? true : false; param.filter = true; runSingleTest(caseName, ¶m); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 8d435806eb..1705b9da48 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -476,7 +476,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(joinAlgo); CLONE_NODE_FIELD(pWindowOffset); CLONE_NODE_FIELD(pJLimit); - CLONE_NODE_FIELD(winPrimEqCond); + CLONE_NODE_FIELD(addPrimEqCond); CLONE_NODE_FIELD(pPrimKeyEqCond); CLONE_NODE_FIELD(pColEqCond); CLONE_NODE_FIELD(pColOnCond); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 470c6c4b9f..9c0c22aac3 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -56,7 +56,6 @@ char* getJoinSTypeString(EJoinSubType type) { char* getFullJoinTypeString(EJoinType type, EJoinSubType stype) { static char* joinFullType[][8] = { - {}, {"INNER", "INNER", "INNER", "INNER", "INNER", "INNER ANY", "INNER", "INNER"}, {"LEFT", "LEFT", "LEFT OUTER", "LEFT SEMI", "LEFT ANTI", "LEFT ANY", "LEFT ASOF", "LEFT WINDOW"}, {"RIGHT", "RIGHT", "RIGHT OUTER", "RIGHT SEMI", "RIGHT ANTI", "RIGHT ANY", "RIGHT ASOF", "RIGHT WINDOW"}, @@ -816,7 +815,7 @@ void nodesDestroyNode(SNode* pNode) { SJoinTableNode* pJoin = (SJoinTableNode*)pNode; nodesDestroyNode(pJoin->pWindowOffset); nodesDestroyNode(pJoin->pJLimit); - nodesDestroyNode(pJoin->winPrimCond); + nodesDestroyNode(pJoin->addPrimCond); nodesDestroyNode(pJoin->pLeft); nodesDestroyNode(pJoin->pRight); nodesDestroyNode(pJoin->pOnCond); @@ -1286,7 +1285,7 @@ void nodesDestroyNode(SNode* pNode) { destroyLogicNode((SLogicNode*)pLogicNode); nodesDestroyNode(pLogicNode->pWindowOffset); nodesDestroyNode(pLogicNode->pJLimit); - nodesDestroyNode(pLogicNode->winPrimEqCond); + nodesDestroyNode(pLogicNode->addPrimEqCond); nodesDestroyNode(pLogicNode->pPrimKeyEqCond); nodesDestroyNode(pLogicNode->pColEqCond); nodesDestroyNode(pLogicNode->pColOnCond); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8a518bef41..c236974976 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3244,7 +3244,7 @@ static int32_t replaceTbName(STranslateContext* pCxt, SSelectStmt* pSelect) { return pRewriteCxt.errCode; } -static int32_t addPrimEqCond(SNode** pCond, SRealTableNode* leftTable, SRealTableNode* rightTable) { +static int32_t addPrimJoinEqCond(SNode** pCond, SRealTableNode* leftTable, SRealTableNode* rightTable, EJoinType joinType, EJoinSubType subType) { struct STableMeta* pLMeta = leftTable->pMeta; struct STableMeta* pRMeta = rightTable->pMeta; @@ -3256,7 +3256,13 @@ static int32_t addPrimEqCond(SNode** pCond, SRealTableNode* leftTable, SRealTabl SOperatorNode* pOp = (SOperatorNode*)*pCond; pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; - pOp->opType = OP_TYPE_EQUAL; + if (IS_WINDOW_JOIN(subType)) { + pOp->opType = OP_TYPE_EQUAL; + } else if (JOIN_TYPE_LEFT == joinType) { + pOp->opType = OP_TYPE_GREATER_EQUAL; + } else { + pOp->opType = OP_TYPE_LOWER_EQUAL; + } SColumnNode* pLeft = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pLeft) { @@ -3302,25 +3308,25 @@ static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTabl "Join requires valid time series input"); } - if (JOIN_STYPE_WIN == pJoinTable->subType) { + if (IS_ASOF_JOIN(pJoinTable->subType) || IS_WINDOW_JOIN(pJoinTable->subType)) { if (QUERY_NODE_REAL_TABLE != nodeType(pJoinTable->pLeft) || QUERY_NODE_REAL_TABLE != nodeType(pJoinTable->pRight)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN, - "Only support WINDOW join between tables"); + "Only support ASOF/WINDOW join between tables"); } SRealTableNode* pLeft = (SRealTableNode*)pJoinTable->pLeft; if (TSDB_SUPER_TABLE != pLeft->pMeta->tableType && TSDB_CHILD_TABLE != pLeft->pMeta->tableType && TSDB_NORMAL_TABLE != pLeft->pMeta->tableType) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN, - "Unsupported WINDOW join table type"); + "Unsupported ASOF/WINDOW join table type"); } SRealTableNode* pRight = (SRealTableNode*)pJoinTable->pRight; if (TSDB_SUPER_TABLE != pRight->pMeta->tableType && TSDB_CHILD_TABLE != pRight->pMeta->tableType && TSDB_NORMAL_TABLE != pRight->pMeta->tableType) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN, - "Unsupported WINDOW join table type"); + "Unsupported ASOF/WINDOW join table type"); } - return addPrimEqCond(&pJoinTable->winPrimCond, pLeft, pRight); + return addPrimJoinEqCond(&pJoinTable->addPrimCond, pLeft, pRight, pJoinTable->joinType, pJoinTable->subType); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 91cf4dd1d1..5689d26855 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -549,7 +549,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin; pJoin->pWindowOffset = nodesCloneNode(pJoinTable->pWindowOffset); pJoin->pJLimit = nodesCloneNode(pJoinTable->pJLimit); - pJoin->winPrimEqCond = nodesCloneNode(pJoinTable->winPrimCond); + pJoin->addPrimEqCond = nodesCloneNode(pJoinTable->addPrimCond); pJoin->node.pChildren = nodesMakeList(); pJoin->seqWinGroup = (JOIN_STYPE_WIN == pJoinTable->subType) && pSelect->hasAggFuncs; if (NULL == pJoin->node.pChildren) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index f6d35daad8..7f5b797dfc 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -98,8 +98,8 @@ static SJoinOptimizeOpt gJoinOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { static SJoinOptimizeOpt gJoinWhereOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = { /* NONE OUTER SEMI ANTI ASOF WINDOW */ /*INNER*/ {{PUSH_DOWN_ALL_COND}, {0}, {0}, {0}, {0}, {0}}, -/*LEFT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_ALL_COND}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}}, -/*RIGHT*/ {{0}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_ALL_COND}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}}, +/*LEFT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}}, +/*RIGHT*/ {{0}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}}, /*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}}, }; @@ -757,8 +757,14 @@ static bool pdcJoinIsPrimEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { } SOperatorNode* pOper = (SOperatorNode*)pCond; - if (OP_TYPE_EQUAL != pOper->opType && JOIN_STYPE_ASOF != pJoin->subType) { - return false; + if (OP_TYPE_EQUAL != pOper->opType) { + if (JOIN_STYPE_ASOF != pJoin->subType) { + return false; + } + if (OP_TYPE_GREATER_THAN != pOper->opType && OP_TYPE_GREATER_EQUAL != pOper->opType && + OP_TYPE_LOWER_THAN != pOper->opType && OP_TYPE_LOWER_EQUAL != pOper->opType) { + return false; + } } SSHashObj* pLeftTables = NULL; @@ -809,8 +815,7 @@ static int32_t pdcJoinSplitPrimInLogicCond(SJoinLogicNode* pJoin, SNode** ppPrim SNodeList* pOnConds = NULL; SNode* pCond = NULL; WHERE_EACH(pCond, pLogicCond->pParameterList) { - if (pdcJoinIsPrimEqualCond(pJoin, pCond)) { - nodesDestroyNode(*ppPrimEqCond); + if (pdcJoinIsPrimEqualCond(pJoin, pCond) && (NULL == *ppPrimEqCond)) { *ppPrimEqCond = nodesCloneNode(pCond); ERASE_NODE(pLogicCond->pParameterList); } else { @@ -1195,11 +1200,15 @@ static int32_t pdcJoinAddWhereFilterColsToTarget(SOptimizeContext* pCxt, SJoinLo static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { if (NULL == pJoin->pFullOnCond) { - if (IS_WINDOW_JOIN(pJoin->subType)) { + if (IS_WINDOW_JOIN(pJoin->subType) || IS_ASOF_JOIN(pJoin->subType)) { return TSDB_CODE_SUCCESS; } + + if (NULL == pJoin->node.pConditions) { + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); + } - if ((!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) || NULL == pJoin->node.pConditions) { + if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN); } } @@ -1210,15 +1219,22 @@ static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin if (errCond) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); } - if (IS_WINDOW_JOIN(pJoin->subType)) { + if (IS_WINDOW_JOIN(pJoin->subType) || IS_ASOF_JOIN(pJoin->subType)) { return TSDB_CODE_SUCCESS; } return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); - } else if (IS_WINDOW_JOIN(pJoin->subType)) { - return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); } + if (IS_ASOF_JOIN(pJoin->subType)) { + nodesDestroyNode(pJoin->addPrimEqCond); + pJoin->addPrimEqCond = NULL; + } + + if (IS_WINDOW_JOIN(pJoin->subType)) { + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); + } + return TSDB_CODE_SUCCESS; } @@ -1235,6 +1251,21 @@ static int32_t pdcJoinHandleGrpJoinCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin->pTagEqCond = NULL; nodesDestroyNode(pJoin->pFullOnCond); pJoin->pFullOnCond = NULL; + if (!pJoin->allEqTags) { + SNode* pNode = NULL; + FOREACH(pNode, pJoin->pLeftEqNodes) { + SColumnNode* pCol = (SColumnNode*)pNode; + if (COLUMN_TYPE_TAG != pCol->colType && PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); + } + } + FOREACH(pNode, pJoin->pRightEqNodes) { + SColumnNode* pCol = (SColumnNode*)pNode; + if (COLUMN_TYPE_TAG != pCol->colType && PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); + } + } + } break; default: break; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 23cb15a741..d164ec042b 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -812,7 +812,7 @@ static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, return TSDB_CODE_PLAN_INTERNAL_ERROR; } SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN == nodeType(pParam)) { + if (QUERY_NODE_COLUMN != nodeType(pParam)) { planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam)); return TSDB_CODE_PLAN_INTERNAL_ERROR; } @@ -856,7 +856,7 @@ static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId, return TSDB_CODE_PLAN_INTERNAL_ERROR; } SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); - if (QUERY_NODE_COLUMN == nodeType(pParam)) { + if (QUERY_NODE_COLUMN != nodeType(pParam)) { planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam)); return TSDB_CODE_PLAN_INTERNAL_ERROR; } @@ -915,11 +915,17 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi if (TSDB_CODE_SUCCESS == code) { code = setColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); } + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) { + code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc); + } + if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) { + code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc); + } } - if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->winPrimEqCond) { + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->addPrimEqCond) { SNode* pPrimKeyCond = NULL; - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->winPrimEqCond, + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->addPrimEqCond, &pPrimKeyCond); if (TSDB_CODE_SUCCESS == code) { code = setColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 240a054373..9d5619a61f 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -19,7 +19,7 @@ static char* getUsageErrFormat(int32_t errCode) { switch (errCode) { case TSDB_CODE_PLAN_EXPECTED_TS_EQUAL: - return "left.ts = right.ts is expected in join expression"; + return "primary timestamp equal condition is expected in join conditions"; case TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN: return "not support cross join"; case TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND: diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 62fda148cd..e732d1c587 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1285,7 +1285,7 @@ int32_t toCharFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOu } /** Time functions **/ -static int64_t offsetFromTz(char *timezone, int64_t factor) { +int64_t offsetFromTz(char *timezone, int64_t factor) { char *minStr = &timezone[3]; int64_t minutes = taosStr2Int64(minStr, NULL, 10); memset(minStr, 0, strlen(minStr)); diff --git a/tests/script/tsim/join/join.sim b/tests/script/tsim/join/join.sim index d1e4214ac9..289d386979 100644 --- a/tests/script/tsim/join/join.sim +++ b/tests/script/tsim/join/join.sim @@ -69,6 +69,7 @@ run tsim/join/left_asof_join.sim run tsim/join/right_asof_join.sim run tsim/join/left_win_join.sim run tsim/join/right_win_join.sim +run tsim/join/join_scalar.sim print ================== restart server to commit data into disk system sh/exec.sh -n dnode1 -s stop -x SIGINT @@ -87,5 +88,6 @@ run tsim/join/left_asof_join.sim run tsim/join/right_asof_join.sim run tsim/join/left_win_join.sim run tsim/join/right_win_join.sim +run tsim/join/join_scalar.sim system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/join/join_scalar.sim b/tests/script/tsim/join/join_scalar.sim new file mode 100644 index 0000000000..001df82396 --- /dev/null +++ b/tests/script/tsim/join/join_scalar.sim @@ -0,0 +1,81 @@ +sql connect +sql use test0; + +sql select a.ts, a.col1, b.ts, b.col1 from sta a left join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h); +if $rows != 64 then + return -1 +endi + +sql select a.ts, a.col1, b.ts, b.col1 from sta a join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h); +if $rows != 64 then + return -1 +endi + +sql select a.ts, a.col1, b.ts, b.col1 from sta a join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h) and a.col1=b.col1; +if $rows != 12 then + return -1 +endi + +sql select a.ts, a.col1, b.ts, b.col1 from sta a join sta b on timetruncate(a.ts, 1m) = b.ts; +if $rows != 16 then + return -1 +endi + +sql select a.ts, b.col1 from sta a left join sta b on timetruncate(b.ts, 1h) = a.ts; +if $rows != 8 then + return -1 +endi + +sql select a.ts, b.col1,timetruncate(a.col1, 1h) from sta a left join sta b on a.ts = b.ts and timetruncate(a.col1, 1h) = timetruncate(a.col1, 1h); +if $rows != 12 then + return -1 +endi + +sql select a.ts, b.col1 from sta a left semi join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h); +if $rows != 8 then + return -1 +endi + +sql select a.ts, b.col1 from sta a left anti join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h); +if $rows != 0 then + return -1 +endi + +sql select a.ts, b.col1 from sta a left asof join sta b on timetruncate(a.ts, 1h) > timetruncate(b.ts, 1h); +if $rows != 8 then + return -1 +endi +sql select a.ts, b.col1 from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 2; +if $rows != 16 then + return -1 +endi +sql select a.ts, b.col1 from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 8; +if $rows != 64 then + return -1 +endi +sql select a.ts, b.col1 from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 9; +if $rows != 64 then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on a.col1 =b.col1 and timetruncate(a.ts, 1h) > timetruncate(b.ts, 1h) jlimit 2; +if $rows != 8 then + return -1 +endi + +sql select a.ts, b.ts from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 2 where timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h); +if $rows != 16 then + return -1 +endi + +sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts=b.ts and a.ts=b.ts; +sql_error select a.ts, b.ts from sta a left asof join sta b on timetruncate(a.ts, 1h) > timetruncate(b.ts, 1h) and timetruncate(a.ts, 1h) > timetruncate(b.ts, 1h); +sql_error select a.ts, b.ts from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) and a.ts > timetruncate(b.ts, 1s) jlimit 2; +sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts > timetruncate(b.ts, 1s) and timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 2; +sql_error select a.ts, b.ts from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) and a.ts =b.col1 jlimit 2; +sql_error select a.ts, b.col1 from sta a left join sta b on timetruncate(b.ts, 1h) + 1 = a.ts; +sql_error select a.ts, b.col1 from sta a left join sta b on timetruncate(b.ts, 1h) = a.ts + 1; +sql_error select a.ts, b.col1 from sta a left join sta b on b.ts + 1 = a.ts + 1; + + + diff --git a/tests/script/tsim/join/left_asof_join.sim b/tests/script/tsim/join/left_asof_join.sim index 8341da1078..97a3a7e0dd 100644 --- a/tests/script/tsim/join/left_asof_join.sim +++ b/tests/script/tsim/join/left_asof_join.sim @@ -960,10 +960,31 @@ if $data11 != 2 then return -1 endi +sql select a.ts, b.ts from sta a left asof join sta b; +if $rows != 8 then + return -1 +endi +sql select a.ts, b.ts from sta a left asof join sta b jlimit 3; +if $rows != 22 then + return -1 +endi +sql select a.ts, b.ts from sta a left asof join sta b where a.ts > b.ts; +if $rows != 0 then + return -1 +endi +sql select a.ts, b.ts from sta a left asof join sta b jlimit 3 where a.ts > b.ts; +if $rows != 10 then + return -1 +endi + +sql_error select a.ts, b.ts from sta a asof join sta b on a.ts = b.ts; +sql_error select a.ts, b.ts from sta a full asof join sta b on a.ts = b.ts; +sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts != b.ts; sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1=a.ts; sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1 > 1; sql_error select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1 > b.col1 jlimit 2 order by a.ts; sql_error select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1 = 1 jlimit 2 order by a.ts; sql_error select a.t1, a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 having(a.ts>0) order by a.t1, a.ts, b.ts; sql_error select count(*) from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 where a.ts > '2023-11-17 16:29:00.000' slimit 1; +sql_error select a.ts, b.ts from (select * from sta) a left asof join sta b where a.ts = b.ts; diff --git a/tests/script/tsim/join/left_semi_join.sim b/tests/script/tsim/join/left_semi_join.sim index 83b55f1dc6..e7fa4da0f3 100644 --- a/tests/script/tsim/join/left_semi_join.sim +++ b/tests/script/tsim/join/left_semi_join.sim @@ -74,3 +74,6 @@ if $data11 != 4 then return -1 endi +sql_error select a.ts, b.ts from sta a left semi join sta b jlimit 3 where a.ts > b.ts; +sql_error select a.ts, b.ts from sta a left semi join sta b where a.ts > b.ts; +sql_error select a.ts, b.ts from sta a left semi join sta b on a.ts > 1 where a.ts = b.ts; diff --git a/tests/script/tsim/join/left_win_join.sim b/tests/script/tsim/join/left_win_join.sim index 3d9932a69c..46d0095dcd 100644 --- a/tests/script/tsim/join/left_win_join.sim +++ b/tests/script/tsim/join/left_win_join.sim @@ -581,3 +581,7 @@ sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, sql_error select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(a.ts > 0) order by count(*); sql_error select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s) slimit 1; sql_error select a.ts, b.ts from sta a left window join sta b on a.t1=1 window_offset(-1s, 1s) order by a.t1, a.ts; +sql_error select a.ts, b.ts from sta a left window join sta b on a.ts > 1 window_offset(-1s, 1s) where a.ts = b.ts; +sql_error select a.ts, b.ts from sta a left window join sta b on a.ts =b.ts window_offset(-1s, 1s) where a.ts = b.ts; +sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1, 1) where a.ts = b.ts; +sql_error select a.ts, b.ts from (select * from sta) a left window join sta b window_offset(-1s, 1s) where a.ts = b.ts;