enh: support timetruncate function
This commit is contained in:
parent
479279b75e
commit
85cca872a5
|
@ -131,7 +131,7 @@ typedef struct SJoinLogicNode {
|
|||
SNode* pWindowOffset;
|
||||
SNode* pJLimit;
|
||||
EJoinAlgorithm joinAlgo;
|
||||
SNode* winPrimEqCond;
|
||||
SNode* addPrimEqCond;
|
||||
SNode* pPrimKeyEqCond;
|
||||
SNode* pColEqCond;
|
||||
SNode* pColOnCond;
|
||||
|
|
|
@ -245,7 +245,7 @@ typedef struct SJoinTableNode {
|
|||
EJoinSubType subType;
|
||||
SNode* pWindowOffset;
|
||||
SNode* pJLimit;
|
||||
SNode* winPrimCond;
|
||||
SNode* addPrimCond;
|
||||
bool hasSubQuery;
|
||||
bool isLowLevelJoin;
|
||||
SNode* pParent;
|
||||
|
|
|
@ -82,6 +82,7 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP
|
|||
int32_t toJsonFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t toTimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t toCharFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int64_t offsetFromTz(char *timezone, int64_t factor);
|
||||
int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
|
|
@ -82,51 +82,59 @@ typedef struct SMJoinNMatchCtx {
|
|||
int32_t grpIdx;
|
||||
} SMJoinNMatchCtx;
|
||||
|
||||
// for now timetruncate only
|
||||
typedef struct SMJoinPrimExprCtx {
|
||||
int64_t truncateUnit;
|
||||
int64_t timezoneUnit;
|
||||
int32_t targetSlotId;
|
||||
} SMJoinPrimExprCtx;
|
||||
|
||||
typedef struct SMJoinTableCtx {
|
||||
EJoinTableType type;
|
||||
int32_t downStreamIdx;
|
||||
SOperatorInfo* downStream;
|
||||
bool dsInitDone;
|
||||
bool dsFetchDone;
|
||||
EJoinTableType type;
|
||||
int32_t downStreamIdx;
|
||||
SOperatorInfo* downStream;
|
||||
bool dsInitDone;
|
||||
bool dsFetchDone;
|
||||
SNode* primExpr;
|
||||
SMJoinPrimExprCtx primCtx;
|
||||
|
||||
int32_t blkId;
|
||||
SQueryStat inputStat;
|
||||
int32_t blkId;
|
||||
SQueryStat inputStat;
|
||||
|
||||
uint64_t lastInGid;
|
||||
SSDataBlock* remainInBlk;
|
||||
uint64_t lastInGid;
|
||||
SSDataBlock* remainInBlk;
|
||||
|
||||
SMJoinColMap* primCol;
|
||||
char* primData;
|
||||
SMJoinColMap* primCol;
|
||||
char* primData;
|
||||
|
||||
int32_t finNum;
|
||||
SMJoinColMap* finCols;
|
||||
int32_t finNum;
|
||||
SMJoinColMap* finCols;
|
||||
|
||||
int32_t keyNum;
|
||||
int32_t keyNullSize;
|
||||
SMJoinColInfo* keyCols;
|
||||
char* keyBuf;
|
||||
char* keyData;
|
||||
int32_t keyNum;
|
||||
int32_t keyNullSize;
|
||||
SMJoinColInfo* keyCols;
|
||||
char* keyBuf;
|
||||
char* keyData;
|
||||
|
||||
bool newBlk;
|
||||
SSDataBlock* blk;
|
||||
int32_t blkRowIdx;
|
||||
bool newBlk;
|
||||
SSDataBlock* blk;
|
||||
int32_t blkRowIdx;
|
||||
|
||||
// merge join
|
||||
|
||||
int64_t grpTotalRows;
|
||||
int32_t grpIdx;
|
||||
bool noKeepEqGrpRows;
|
||||
bool multiEqGrpRows;
|
||||
int64_t eqRowLimit;
|
||||
int64_t eqRowNum;
|
||||
SArray* eqGrps;
|
||||
SArray* createdBlks;
|
||||
int64_t grpTotalRows;
|
||||
int32_t grpIdx;
|
||||
bool noKeepEqGrpRows;
|
||||
bool multiEqGrpRows;
|
||||
int64_t eqRowLimit;
|
||||
int64_t eqRowNum;
|
||||
SArray* eqGrps;
|
||||
SArray* createdBlks;
|
||||
|
||||
// hash join
|
||||
|
||||
int32_t grpArrayIdx;
|
||||
SArray* pGrpArrays;
|
||||
int32_t grpArrayIdx;
|
||||
SArray* pGrpArrays;
|
||||
|
||||
bool multiRowsGrp;
|
||||
int32_t grpRowIdx;
|
||||
|
@ -134,11 +142,11 @@ typedef struct SMJoinTableCtx {
|
|||
SMJoinHashGrpRows* pHashGrpRows;
|
||||
SSHashObj* pGrpHash;
|
||||
|
||||
int64_t rowBitmapSize;
|
||||
int64_t rowBitmapOffset;
|
||||
char* pRowBitmap;
|
||||
int64_t rowBitmapSize;
|
||||
int64_t rowBitmapOffset;
|
||||
char* pRowBitmap;
|
||||
|
||||
SMJoinNMatchCtx nMatchCtx;
|
||||
SMJoinNMatchCtx nMatchCtx;
|
||||
} SMJoinTableCtx;
|
||||
|
||||
typedef struct SMJoinMatchInfo {
|
||||
|
@ -321,6 +329,7 @@ typedef struct SMJoinOperatorInfo {
|
|||
#define MJOIN_PROBE_TB_ROWS_DONE(_tb) ((_tb)->blkRowIdx >= (_tb)->blk->info.rows)
|
||||
#define FJOIN_PROBE_TB_ROWS_DONE(_tb) ((NULL == (_tb)->blk) || ((_tb)->blkRowIdx >= (_tb)->blk->info.rows))
|
||||
#define MJOIN_BUILD_TB_ROWS_DONE(_tb) ((NULL == (_tb)->blk) || ((_tb)->blkRowIdx >= (_tb)->blk->info.rows))
|
||||
#define MJOIN_TB_GRP_ROWS_DONE(_tb, _grpJoin) ((_tb)->dsFetchDone || ((_grpJoin) && NULL == (_tb)->blk && NULL != (_tb)->remainInBlk))
|
||||
|
||||
#define BLK_IS_FULL(_blk) ((_blk)->info.rows == (_blk)->info.capacity)
|
||||
|
||||
|
@ -378,7 +387,7 @@ typedef struct SMJoinOperatorInfo {
|
|||
#define MJOIN_GET_TB_COL_TS(_col, _ts, _tb) \
|
||||
do { \
|
||||
if (NULL != (_tb)->blk && (_tb)->blkRowIdx < (_tb)->blk->info.rows) { \
|
||||
(_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCol->srcSlot); \
|
||||
(_col) = taosArrayGet((_tb)->blk->pDataBlock, (_tb)->primCtx.targetSlotId); \
|
||||
(_ts) = *((int64_t*)(_col)->pData + (_tb)->blkRowIdx); \
|
||||
} else { \
|
||||
(_ts) = INT64_MAX; \
|
||||
|
@ -423,6 +432,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator);
|
|||
SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator);
|
||||
void mJoinResetGroupTableCtx(SMJoinTableCtx* pCtx);
|
||||
void mJoinResetTableCtx(SMJoinTableCtx* pCtx);
|
||||
void mLeftJoinGroupReset(SMJoinOperatorInfo* pJoin);
|
||||
void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin);
|
||||
bool mJoinRetrieveBlk(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb);
|
||||
void mJoinSetDone(SOperatorInfo* pOperator);
|
||||
|
|
|
@ -480,8 +480,8 @@ static bool mLeftJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoi
|
|||
}
|
||||
|
||||
if (buildGot) {
|
||||
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
|
||||
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
|
||||
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
|
||||
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
|
||||
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
|
||||
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
|
||||
continue;
|
||||
|
@ -585,7 +585,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) {
|
||||
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) {
|
||||
pCtx->probeNEqGrp.blk = pJoin->probe->blk;
|
||||
pCtx->probeNEqGrp.beginIdx = pJoin->probe->blkRowIdx;
|
||||
pCtx->probeNEqGrp.readIdx = pCtx->probeNEqGrp.beginIdx;
|
||||
|
@ -1856,7 +1856,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow
|
|||
do {
|
||||
grp.blk = pTable->blk;
|
||||
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
|
||||
|
||||
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2176,7 +2176,7 @@ SSDataBlock* mAsofLowerJoinDo(struct SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) {
|
||||
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) {
|
||||
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
|
||||
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
|
||||
pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
|
||||
|
@ -2312,7 +2312,7 @@ int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) {
|
|||
}
|
||||
|
||||
int32_t mAsofGreaterSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
|
||||
|
||||
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
|
||||
*wholeBlk = false;
|
||||
|
@ -2468,8 +2468,8 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
|
|||
}
|
||||
|
||||
if (buildGot) {
|
||||
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
|
||||
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
|
||||
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
|
||||
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
|
||||
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
|
||||
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
|
||||
MJOIN_POP_TB_BLK(&pCtx->cache);
|
||||
|
@ -2564,7 +2564,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && (pJoin->build->dsFetchDone || (pCtx->groupJoin && NULL == pJoin->build->blk))) {
|
||||
if (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe) && MJOIN_TB_GRP_ROWS_DONE(pJoin->build, pCtx->groupJoin)) {
|
||||
pCtx->probeGrp.beginIdx = pJoin->probe->blkRowIdx;
|
||||
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
|
||||
pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
|
||||
|
@ -2663,8 +2663,8 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin
|
|||
}
|
||||
|
||||
if (buildGot && !pCtx->lowerRowsAcq) {
|
||||
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
|
||||
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
|
||||
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCtx.targetSlotId);
|
||||
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCtx.targetSlotId);
|
||||
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
|
||||
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
|
||||
continue;
|
||||
|
@ -2681,7 +2681,7 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin
|
|||
|
||||
int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
|
||||
SSDataBlock* pBlk = build->blk;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCol->srcSlot);
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId);
|
||||
if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) >= pCtx->winBeginTs) {
|
||||
for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
|
||||
if (*((int64_t*)pCol->pData + build->blkRowIdx) < pCtx->winBeginTs) {
|
||||
|
@ -2715,7 +2715,7 @@ int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SM
|
|||
|
||||
int32_t mWinJoinAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
|
||||
SSDataBlock* pBlk = build->blk;
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCol->srcSlot);
|
||||
SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCtx.targetSlotId);
|
||||
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
|
||||
|
||||
if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) {
|
||||
|
@ -2771,7 +2771,7 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) {
|
|||
int32_t grpNum = taosArrayGetSize(pCache->grps);
|
||||
for (int32_t i = 0; i < grpNum; ++i) {
|
||||
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
|
||||
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot);
|
||||
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
|
||||
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) {
|
||||
pCache->rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx);
|
||||
if (pGrp->blk == pCache->outBlk) {
|
||||
|
@ -2861,7 +2861,7 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) {
|
|||
}
|
||||
|
||||
SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps);
|
||||
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot);
|
||||
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCtx.targetSlotId);
|
||||
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) <= pCtx->winEndTs) {
|
||||
pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1;
|
||||
if (pCache->rowNum >= pCtx->jLimit) {
|
||||
|
|
|
@ -25,11 +25,12 @@
|
|||
#include "thash.h"
|
||||
#include "tmsg.h"
|
||||
#include "ttypes.h"
|
||||
#include "functionMgt.h"
|
||||
#include "mergejoin.h"
|
||||
|
||||
|
||||
int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
|
||||
|
||||
pGrp->beginIdx = pTable->blkRowIdx;
|
||||
pGrp->readIdx = pTable->blkRowIdx;
|
||||
|
@ -768,6 +769,44 @@ static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) {
|
||||
if (NULL == pNode) {
|
||||
pCtx->targetSlotId = pTable->primCol->srcSlot;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (QUERY_NODE_TARGET != nodeType(pNode)) {
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
STargetNode* pTarget = (STargetNode*)pNode;
|
||||
if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr)) {
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr;
|
||||
if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (4 != pFunc->pParameterList->length && 5 != pFunc->pParameterList->length) {
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
SValueNode* pUnit = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
|
||||
SValueNode* pCurrTz = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 2) : NULL;
|
||||
SValueNode* pTimeZone = (5 == pFunc->pParameterList->length) ? (SValueNode*)nodesListGetNode(pFunc->pParameterList, 4) : (SValueNode*)nodesListGetNode(pFunc->pParameterList, 3);
|
||||
|
||||
pCtx->truncateUnit = pUnit->typeData;
|
||||
if (NULL != pCurrTz && 0 == pCurrTz->typeData) {
|
||||
pCtx->timezoneUnit = offsetFromTz(pTimeZone->datum.p, TSDB_TICK_PER_SECOND(pFunc->node.resType.precision));
|
||||
}
|
||||
|
||||
pCtx->targetSlotId = pTarget->slotId;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) {
|
||||
SMJoinTableCtx* pTable = &pJoin->tbs[idx];
|
||||
pTable->downStream = pDownstream[idx];
|
||||
|
@ -807,6 +846,8 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
|
|||
} else {
|
||||
pTable->multiEqGrpRows = true;
|
||||
}
|
||||
|
||||
MJ_ERR_RET(mJoinInitPrimExprCtx(pTable->primExpr, &pTable->primCtx, pTable));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -842,26 +883,56 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
|
|||
pInfo->build->downStreamIdx = buildIdx;
|
||||
pInfo->probe->downStreamIdx = probeIdx;
|
||||
|
||||
if (0 == buildIdx) {
|
||||
pInfo->build->primExpr = pJoinNode->leftPrimExpr;
|
||||
pInfo->probe->primExpr = pJoinNode->rightPrimExpr;
|
||||
} else {
|
||||
pInfo->build->primExpr = pJoinNode->rightPrimExpr;
|
||||
pInfo->probe->primExpr = pJoinNode->leftPrimExpr;
|
||||
}
|
||||
|
||||
pInfo->build->type = E_JOIN_TB_BUILD;
|
||||
pInfo->probe->type = E_JOIN_TB_PROBE;
|
||||
}
|
||||
|
||||
int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
|
||||
if (NULL == pTable->primExpr) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SMJoinPrimExprCtx* pCtx = &pTable->primCtx;
|
||||
SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
|
||||
SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
|
||||
if (0 != pCtx->timezoneUnit) {
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
((int64_t*)pPrimOut->pData)[i] = (((int64_t*)pPrimIn->pData)[i] - pCtx->timezoneUnit) / pCtx->truncateUnit * pCtx->truncateUnit;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
|
||||
SSDataBlock* pTmp = NULL;
|
||||
int32_t dsIdx = pTable->downStreamIdx;
|
||||
if (E_JOIN_TB_PROBE == pTable->type) {
|
||||
if (pTable->remainInBlk) {
|
||||
SSDataBlock* pTmp = pTable->remainInBlk;
|
||||
pTmp = pTable->remainInBlk;
|
||||
pTable->remainInBlk = NULL;
|
||||
(*pJoin->grpResetFp)(pJoin);
|
||||
pTable->lastInGid = pTmp->info.id.groupId;
|
||||
return pTmp;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (pTable->dsFetchDone) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
|
||||
pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, dsIdx);
|
||||
if (NULL == pTmp) {
|
||||
pTable->dsFetchDone = true;
|
||||
return NULL;
|
||||
|
@ -869,11 +940,11 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa
|
|||
|
||||
if (0 == pTable->lastInGid) {
|
||||
pTable->lastInGid = pTmp->info.id.groupId;
|
||||
return pTmp;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (pTable->lastInGid == pTmp->info.id.groupId) {
|
||||
return pTmp;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
pTable->remainInBlk = pTmp;
|
||||
|
@ -886,10 +957,10 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa
|
|||
while (true) {
|
||||
if (pTable->remainInBlk) {
|
||||
if (pTable->remainInBlk->info.id.groupId == pProbe->lastInGid) {
|
||||
SSDataBlock* pTmp = pTable->remainInBlk;
|
||||
pTmp = pTable->remainInBlk;
|
||||
pTable->remainInBlk = NULL;
|
||||
pTable->lastInGid = pTmp->info.id.groupId;
|
||||
return pTmp;
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (pTable->remainInBlk->info.id.groupId > pProbe->lastInGid) {
|
||||
|
@ -912,7 +983,10 @@ SSDataBlock* mJoinGrpRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTa
|
|||
pTable->remainInBlk = pTmp;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
_return:
|
||||
|
||||
mJoinLaunchPrimExpr(pTmp, pTable);
|
||||
return pTmp;
|
||||
}
|
||||
|
||||
static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable) {
|
||||
|
@ -923,6 +997,8 @@ static FORCE_INLINE SSDataBlock* mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, SM
|
|||
SSDataBlock* pTmp = getNextBlockFromDownstreamRemain(pJoin->pOperator, pTable->downStreamIdx);
|
||||
if (NULL == pTmp) {
|
||||
pTable->dsFetchDone = true;
|
||||
} else {
|
||||
mJoinLaunchPrimExpr(pTmp, pTable);
|
||||
}
|
||||
|
||||
return pTmp;
|
||||
|
@ -1022,7 +1098,7 @@ void mJoinResetForBuildTable(SMJoinTableCtx* pTable) {
|
|||
}
|
||||
|
||||
int32_t mJoinBuildEqGroups(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, bool restart) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCtx.targetSlotId);
|
||||
SMJoinGrpRows* pGrp = NULL;
|
||||
|
||||
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
|
||||
|
@ -1496,6 +1572,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
|
|||
switch (pJoin->subType) {
|
||||
case JOIN_STYPE_OUTER:
|
||||
pJoin->joinFp = mLeftJoinDo;
|
||||
pJoin->grpResetFp = mLeftJoinGroupReset;
|
||||
break;
|
||||
case JOIN_STYPE_SEMI:
|
||||
pJoin->joinFp = mSemiJoinDo;
|
||||
|
|
|
@ -66,7 +66,7 @@ enum {
|
|||
};
|
||||
|
||||
#define COL_DISPLAY_WIDTH 18
|
||||
#define JT_MAX_LOOP 10000
|
||||
#define JT_MAX_LOOP 20000
|
||||
|
||||
#define LEFT_BLK_ID 0
|
||||
#define RIGHT_BLK_ID 1
|
||||
|
@ -107,6 +107,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
bool filter;
|
||||
bool asc;
|
||||
bool grpJoin;
|
||||
int32_t leftMaxRows;
|
||||
int32_t leftMaxGrpRows;
|
||||
int32_t rightMaxRows;
|
||||
|
@ -119,6 +120,7 @@ typedef struct {
|
|||
int64_t jLimit;
|
||||
int64_t winStartOffset;
|
||||
int64_t winEndOffset;
|
||||
int64_t inGrpId;
|
||||
|
||||
int32_t leftTotalRows;
|
||||
int32_t rightTotalRows;
|
||||
|
@ -194,12 +196,13 @@ typedef struct {
|
|||
int32_t cond;
|
||||
bool filter;
|
||||
bool asc;
|
||||
bool grpJoin;
|
||||
SExecTaskInfo* pTask;
|
||||
} SJoinTestParam;
|
||||
|
||||
|
||||
SJoinTestCtx jtCtx = {0};
|
||||
SJoinTestCtrl jtCtrl = {0, 0, 0, 0};
|
||||
SJoinTestCtrl jtCtrl = {1, 1, 1, 0};
|
||||
SJoinTestStat jtStat = {0};
|
||||
SJoinTestResInfo jtRes = {0};
|
||||
|
||||
|
@ -794,6 +797,7 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param
|
|||
p->joinType = param->joinType;
|
||||
p->subType = param->subType;
|
||||
p->asofOpType = param->asofOp;
|
||||
p->grpJoin = param->grpJoin;
|
||||
if (p->subType == JOIN_STYPE_WIN || param->jLimit > 1 || taosRand() % 2) {
|
||||
SLimitNode* limitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT);
|
||||
limitNode->limit = param->jLimit;
|
||||
|
@ -824,6 +828,7 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param
|
|||
jtCtx.winEndOffset = pEnd->datum.i;
|
||||
}
|
||||
|
||||
jtCtx.grpJoin = param->grpJoin;
|
||||
jtCtx.joinType = param->joinType;
|
||||
jtCtx.subType = param->subType;
|
||||
jtCtx.asc = param->asc;
|
||||
|
@ -831,6 +836,7 @@ SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param
|
|||
jtCtx.asofOpType = param->asofOp;
|
||||
jtCtx.leftColOnly = (JOIN_TYPE_LEFT == param->joinType && JOIN_STYPE_SEMI == param->subType);
|
||||
jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == param->joinType && JOIN_STYPE_SEMI == param->subType);
|
||||
jtCtx.inGrpId = 1;
|
||||
|
||||
createColCond(p, param->cond);
|
||||
createFilterStart(p, param->filter);
|
||||
|
@ -1137,6 +1143,10 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
|
|||
taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk);
|
||||
}
|
||||
|
||||
if (jtCtx.grpJoin) {
|
||||
(*ppBlk)->info.id.groupId = jtCtx.inGrpId;
|
||||
}
|
||||
|
||||
jtCtx.inputStat |= (1 << blkId);
|
||||
|
||||
SArray* pTableRows = NULL;
|
||||
|
@ -1184,6 +1194,9 @@ void createGrpRows(SSDataBlock** ppBlk, int32_t blkId, int32_t grpRows) {
|
|||
*ppBlk = createDummyBlock((blkId == LEFT_BLK_ID) ? LEFT_BLK_ID : RIGHT_BLK_ID);
|
||||
blockDataEnsureCapacity(*ppBlk, jtCtx.blkRows);
|
||||
taosArrayPush((blkId == LEFT_BLK_ID) ? jtCtx.leftBlkList : jtCtx.rightBlkList, ppBlk);
|
||||
if (jtCtx.grpJoin) {
|
||||
(*ppBlk)->info.id.groupId = jtCtx.inGrpId;
|
||||
}
|
||||
}
|
||||
|
||||
filterOut = (peerFilterNum > 0 && (jtCtx.subType != JOIN_STYPE_ASOF && jtCtx.subType != JOIN_STYPE_WIN)) ? true : false;
|
||||
|
@ -1350,6 +1363,9 @@ void makeAppendBlkData(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left
|
|||
*ppLeft = createDummyBlock(LEFT_BLK_ID);
|
||||
blockDataEnsureCapacity(*ppLeft, jtCtx.blkRows);
|
||||
taosArrayPush(jtCtx.leftBlkList, ppLeft);
|
||||
if (jtCtx.grpJoin) {
|
||||
(*ppLeft)->info.id.groupId = jtCtx.inGrpId;
|
||||
}
|
||||
}
|
||||
|
||||
createRowData(*ppLeft, 0, i, vRange);
|
||||
|
@ -1361,6 +1377,9 @@ void makeAppendBlkData(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left
|
|||
*ppRight = createDummyBlock(RIGHT_BLK_ID);
|
||||
blockDataEnsureCapacity(*ppRight, jtCtx.blkRows);
|
||||
taosArrayPush(jtCtx.rightBlkList, ppRight);
|
||||
if (jtCtx.grpJoin) {
|
||||
(*ppRight)->info.id.groupId = jtCtx.inGrpId;
|
||||
}
|
||||
}
|
||||
|
||||
createRowData(*ppRight, rightOffset, i, vRange);
|
||||
|
@ -2215,18 +2234,39 @@ void createTsEqGrpRows(SSDataBlock** ppLeft, SSDataBlock** ppRight, int32_t left
|
|||
taosArrayPush(jtCtx.leftBlkList, ppLeft);
|
||||
}
|
||||
|
||||
if (jtCtx.grpJoin) {
|
||||
(*ppLeft)->info.id.groupId = jtCtx.inGrpId;
|
||||
}
|
||||
|
||||
if (NULL == *ppRight && rightGrpRows > 0) {
|
||||
*ppRight = createDummyBlock(RIGHT_BLK_ID);
|
||||
blockDataEnsureCapacity(*ppRight, jtCtx.blkRows);
|
||||
taosArrayPush(jtCtx.rightBlkList, ppRight);
|
||||
}
|
||||
|
||||
if (jtCtx.grpJoin) {
|
||||
(*ppRight)->info.id.groupId = jtCtx.inGrpId;
|
||||
}
|
||||
|
||||
|
||||
makeAppendBlkData(ppLeft, ppRight, leftGrpRows, rightGrpRows);
|
||||
|
||||
appendEqGrpRes(leftGrpRows, rightGrpRows);
|
||||
}
|
||||
|
||||
void forceFlushResRows() {
|
||||
if (JOIN_STYPE_ASOF == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) {
|
||||
ASSERT(OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType);
|
||||
chkAppendAsofGreaterResRows(true);
|
||||
} else if (JOIN_STYPE_WIN == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) {
|
||||
chkAppendWinResRows(true);
|
||||
}
|
||||
|
||||
taosArrayClear(jtCtx.rightRowsList);
|
||||
taosArrayClear(jtCtx.rightFilterOut);
|
||||
taosArrayClear(jtCtx.leftRowsList);
|
||||
|
||||
}
|
||||
|
||||
void createBothBlkRowsData(void) {
|
||||
SSDataBlock* pLeft = NULL;
|
||||
|
@ -2269,6 +2309,13 @@ void createBothBlkRowsData(void) {
|
|||
}
|
||||
}
|
||||
|
||||
if (jtCtx.grpJoin && (0 == taosRand() % 3)) {
|
||||
forceFlushResRows();
|
||||
jtCtx.inGrpId++;
|
||||
pLeft = NULL;
|
||||
pRight = NULL;
|
||||
}
|
||||
|
||||
switch (grpType) {
|
||||
case 0:
|
||||
createGrpRows(&pLeft, LEFT_BLK_ID, leftGrpRows);
|
||||
|
@ -2286,12 +2333,7 @@ void createBothBlkRowsData(void) {
|
|||
}
|
||||
}
|
||||
|
||||
if (JOIN_STYPE_ASOF == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) {
|
||||
ASSERT(OP_TYPE_LOWER_EQUAL == jtCtx.asofOpType || OP_TYPE_LOWER_THAN == jtCtx.asofOpType);
|
||||
chkAppendAsofGreaterResRows(true);
|
||||
} else if (JOIN_STYPE_WIN == jtCtx.subType && taosArrayGetSize(jtCtx.leftRowsList) > 0) {
|
||||
chkAppendWinResRows(true);
|
||||
}
|
||||
forceFlushResRows();
|
||||
}
|
||||
|
||||
void createDummyBlkList(int32_t leftMaxRows, int32_t leftMaxGrpRows, int32_t rightMaxRows, int32_t rightMaxGrpRows, int32_t blkRows) {
|
||||
|
@ -2382,6 +2424,9 @@ void printColList(char* title, bool left, int32_t* colList, bool filter, char* o
|
|||
}
|
||||
|
||||
void printInputRowData(SSDataBlock* pBlk, int32_t* rowIdx) {
|
||||
if (jtCtx.grpJoin) {
|
||||
printf("%5" PRIu64, pBlk->info.id.groupId);
|
||||
}
|
||||
for (int32_t c = 0; c < MAX_SLOT_NUM; ++c) {
|
||||
SColumnInfoData* pCol = (SColumnInfoData*)taosArrayGet(pBlk->pDataBlock, c);
|
||||
ASSERT(pCol->info.type == jtInputColType[c]);
|
||||
|
@ -2418,13 +2463,13 @@ void printInputData() {
|
|||
break;
|
||||
}
|
||||
|
||||
printf("\t--------------------------blk end-------------------------------");
|
||||
printf("\t--------------------------blk end------------------------------- ");
|
||||
jtCtx.leftBlkReadIdx++;
|
||||
leftRowIdx = 0;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
printf("%72s", " ");
|
||||
printf("%*s", jtCtx.grpJoin ? 77 : 72, " ");
|
||||
}
|
||||
|
||||
if (jtCtx.rightBlkReadIdx < taosArrayGetSize(jtCtx.rightBlkList)) {
|
||||
|
@ -2435,7 +2480,7 @@ void printInputData() {
|
|||
break;
|
||||
}
|
||||
|
||||
printf("\t--------------------------blk end----------------------------\t");
|
||||
printf("\t%*s--------------------------blk end----------------------------\t", jtCtx.grpJoin ? 5 : 0, " ");
|
||||
jtCtx.rightBlkReadIdx++;
|
||||
rightRowIdx = 0;
|
||||
break;
|
||||
|
@ -2486,9 +2531,9 @@ void printBasicInfo(char* caseName) {
|
|||
char inputStat[4] = {0};
|
||||
printf("\n%dth TEST [%s] START\nBasic Info:\n\t asc:%d\n\t filter:%d\n\t maxRows:left-%d right-%d\n\t "
|
||||
"maxGrpRows:left-%d right-%d\n\t blkRows:%d\n\t colCond:%s\n\t joinType:%s\n\t "
|
||||
"subType:%s\n\t inputStat:%s\n", jtCtx.loopIdx, caseName, jtCtx.asc, jtCtx.filter, jtCtx.leftMaxRows, jtCtx.rightMaxRows,
|
||||
"subType:%s\n\t inputStat:%s\n\t groupJoin:%s\n", jtCtx.loopIdx, caseName, jtCtx.asc, jtCtx.filter, jtCtx.leftMaxRows, jtCtx.rightMaxRows,
|
||||
jtCtx.leftMaxGrpRows, jtCtx.rightMaxGrpRows, jtCtx.blkRows, jtColCondStr[jtCtx.colCond], jtJoinTypeStr[jtCtx.joinType],
|
||||
jtSubTypeStr[jtCtx.subType], getInputStatStr(inputStat));
|
||||
jtSubTypeStr[jtCtx.subType], getInputStatStr(inputStat), jtCtx.grpJoin ? "true" : "false");
|
||||
|
||||
if (JOIN_STYPE_ASOF == jtCtx.subType) {
|
||||
printf("\t asofOp:%s\n\t JLimit:%" PRId64 "\n", getAsofOpStr(), jtCtx.jLimit);
|
||||
|
@ -2755,7 +2800,7 @@ void runSingleTest(char* caseName, SJoinTestParam* param) {
|
|||
bool contLoop = true;
|
||||
|
||||
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param);
|
||||
createDummyBlkList(200, 200, 200, 200, 20);
|
||||
createDummyBlkList(12, 3, 12, 3, 3);
|
||||
|
||||
while (contLoop) {
|
||||
rerunBlockedHere();
|
||||
|
@ -2799,6 +2844,7 @@ TEST(innerJoin, noCondTest) {
|
|||
param.subType = JOIN_STYPE_NONE;
|
||||
param.cond = TEST_NO_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -2824,6 +2870,7 @@ TEST(innerJoin, eqCondTest) {
|
|||
param.subType = JOIN_STYPE_NONE;
|
||||
param.cond = TEST_EQ_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -2849,6 +2896,7 @@ TEST(innerJoin, onCondTest) {
|
|||
param.subType = JOIN_STYPE_NONE;
|
||||
param.cond = TEST_ON_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -2874,6 +2922,7 @@ TEST(innerJoin, fullCondTest) {
|
|||
param.subType = JOIN_STYPE_NONE;
|
||||
param.cond = TEST_FULL_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -2904,9 +2953,11 @@ TEST(leftOuterJoin, noCondTest) {
|
|||
param.asc = true;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
@ -2927,6 +2978,7 @@ TEST(leftOuterJoin, eqCondTest) {
|
|||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_EQ_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -2952,6 +3004,7 @@ TEST(leftOuterJoin, onCondTest) {
|
|||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_ON_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -2977,6 +3030,7 @@ TEST(leftOuterJoin, fullCondTest) {
|
|||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_FULL_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3004,6 +3058,7 @@ TEST(fullOuterJoin, noCondTest) {
|
|||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_NO_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3029,6 +3084,7 @@ TEST(fullOuterJoin, eqCondTest) {
|
|||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_EQ_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3055,6 +3111,7 @@ TEST(fullOuterJoin, onCondTest) {
|
|||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_ON_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3080,6 +3137,7 @@ TEST(fullOuterJoin, fullCondTest) {
|
|||
param.subType = JOIN_STYPE_OUTER;
|
||||
param.cond = TEST_FULL_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3107,6 +3165,7 @@ TEST(leftSemiJoin, noCondTest) {
|
|||
param.subType = JOIN_STYPE_SEMI;
|
||||
param.cond = TEST_NO_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3132,6 +3191,7 @@ TEST(leftSemiJoin, eqCondTest) {
|
|||
param.subType = JOIN_STYPE_SEMI;
|
||||
param.cond = TEST_EQ_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3158,6 +3218,7 @@ TEST(leftSemiJoin, onCondTest) {
|
|||
param.subType = JOIN_STYPE_SEMI;
|
||||
param.cond = TEST_ON_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3183,6 +3244,7 @@ TEST(leftSemiJoin, fullCondTest) {
|
|||
param.subType = JOIN_STYPE_SEMI;
|
||||
param.cond = TEST_FULL_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3210,6 +3272,7 @@ TEST(leftAntiJoin, noCondTest) {
|
|||
param.subType = JOIN_STYPE_ANTI;
|
||||
param.cond = TEST_NO_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3235,6 +3298,7 @@ TEST(leftAntiJoin, eqCondTest) {
|
|||
param.subType = JOIN_STYPE_ANTI;
|
||||
param.cond = TEST_EQ_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3261,6 +3325,7 @@ TEST(leftAntiJoin, onCondTest) {
|
|||
param.subType = JOIN_STYPE_ANTI;
|
||||
param.cond = TEST_ON_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3286,6 +3351,7 @@ TEST(leftAntiJoin, fullCondTest) {
|
|||
param.subType = JOIN_STYPE_ANTI;
|
||||
param.cond = TEST_FULL_COND;
|
||||
param.asc = true;
|
||||
param.grpJoin = false;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
|
@ -3317,10 +3383,12 @@ TEST(leftAsofJoin, noCondGreaterThanTest) {
|
|||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
|
||||
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
@ -3345,10 +3413,12 @@ TEST(leftAsofJoin, noCondGreaterEqTest) {
|
|||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
|
||||
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
@ -3374,9 +3444,11 @@ TEST(leftAsofJoin, noCondEqTest) {
|
|||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
@ -3402,9 +3474,11 @@ TEST(leftAsofJoin, noCondLowerThanTest) {
|
|||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
@ -3431,9 +3505,11 @@ TEST(leftAsofJoin, noCondLowerEqTest) {
|
|||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
@ -3462,9 +3538,11 @@ TEST(leftWinJoin, noCondProjectionTest) {
|
|||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.grpJoin = taosRand() % 2 ? true : false;
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
|
|
@ -476,7 +476,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
|
|||
COPY_SCALAR_FIELD(joinAlgo);
|
||||
CLONE_NODE_FIELD(pWindowOffset);
|
||||
CLONE_NODE_FIELD(pJLimit);
|
||||
CLONE_NODE_FIELD(winPrimEqCond);
|
||||
CLONE_NODE_FIELD(addPrimEqCond);
|
||||
CLONE_NODE_FIELD(pPrimKeyEqCond);
|
||||
CLONE_NODE_FIELD(pColEqCond);
|
||||
CLONE_NODE_FIELD(pColOnCond);
|
||||
|
|
|
@ -56,7 +56,6 @@ char* getJoinSTypeString(EJoinSubType type) {
|
|||
|
||||
char* getFullJoinTypeString(EJoinType type, EJoinSubType stype) {
|
||||
static char* joinFullType[][8] = {
|
||||
{},
|
||||
{"INNER", "INNER", "INNER", "INNER", "INNER", "INNER ANY", "INNER", "INNER"},
|
||||
{"LEFT", "LEFT", "LEFT OUTER", "LEFT SEMI", "LEFT ANTI", "LEFT ANY", "LEFT ASOF", "LEFT WINDOW"},
|
||||
{"RIGHT", "RIGHT", "RIGHT OUTER", "RIGHT SEMI", "RIGHT ANTI", "RIGHT ANY", "RIGHT ASOF", "RIGHT WINDOW"},
|
||||
|
@ -816,7 +815,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
SJoinTableNode* pJoin = (SJoinTableNode*)pNode;
|
||||
nodesDestroyNode(pJoin->pWindowOffset);
|
||||
nodesDestroyNode(pJoin->pJLimit);
|
||||
nodesDestroyNode(pJoin->winPrimCond);
|
||||
nodesDestroyNode(pJoin->addPrimCond);
|
||||
nodesDestroyNode(pJoin->pLeft);
|
||||
nodesDestroyNode(pJoin->pRight);
|
||||
nodesDestroyNode(pJoin->pOnCond);
|
||||
|
@ -1286,7 +1285,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
destroyLogicNode((SLogicNode*)pLogicNode);
|
||||
nodesDestroyNode(pLogicNode->pWindowOffset);
|
||||
nodesDestroyNode(pLogicNode->pJLimit);
|
||||
nodesDestroyNode(pLogicNode->winPrimEqCond);
|
||||
nodesDestroyNode(pLogicNode->addPrimEqCond);
|
||||
nodesDestroyNode(pLogicNode->pPrimKeyEqCond);
|
||||
nodesDestroyNode(pLogicNode->pColEqCond);
|
||||
nodesDestroyNode(pLogicNode->pColOnCond);
|
||||
|
|
|
@ -3244,7 +3244,7 @@ static int32_t replaceTbName(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
|||
return pRewriteCxt.errCode;
|
||||
}
|
||||
|
||||
static int32_t addPrimEqCond(SNode** pCond, SRealTableNode* leftTable, SRealTableNode* rightTable) {
|
||||
static int32_t addPrimJoinEqCond(SNode** pCond, SRealTableNode* leftTable, SRealTableNode* rightTable, EJoinType joinType, EJoinSubType subType) {
|
||||
struct STableMeta* pLMeta = leftTable->pMeta;
|
||||
struct STableMeta* pRMeta = rightTable->pMeta;
|
||||
|
||||
|
@ -3256,7 +3256,13 @@ static int32_t addPrimEqCond(SNode** pCond, SRealTableNode* leftTable, SRealTabl
|
|||
SOperatorNode* pOp = (SOperatorNode*)*pCond;
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||
pOp->opType = OP_TYPE_EQUAL;
|
||||
if (IS_WINDOW_JOIN(subType)) {
|
||||
pOp->opType = OP_TYPE_EQUAL;
|
||||
} else if (JOIN_TYPE_LEFT == joinType) {
|
||||
pOp->opType = OP_TYPE_GREATER_EQUAL;
|
||||
} else {
|
||||
pOp->opType = OP_TYPE_LOWER_EQUAL;
|
||||
}
|
||||
|
||||
SColumnNode* pLeft = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pLeft) {
|
||||
|
@ -3302,25 +3308,25 @@ static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTabl
|
|||
"Join requires valid time series input");
|
||||
}
|
||||
|
||||
if (JOIN_STYPE_WIN == pJoinTable->subType) {
|
||||
if (IS_ASOF_JOIN(pJoinTable->subType) || IS_WINDOW_JOIN(pJoinTable->subType)) {
|
||||
if (QUERY_NODE_REAL_TABLE != nodeType(pJoinTable->pLeft) || QUERY_NODE_REAL_TABLE != nodeType(pJoinTable->pRight)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN,
|
||||
"Only support WINDOW join between tables");
|
||||
"Only support ASOF/WINDOW join between tables");
|
||||
}
|
||||
|
||||
SRealTableNode* pLeft = (SRealTableNode*)pJoinTable->pLeft;
|
||||
if (TSDB_SUPER_TABLE != pLeft->pMeta->tableType && TSDB_CHILD_TABLE != pLeft->pMeta->tableType && TSDB_NORMAL_TABLE != pLeft->pMeta->tableType) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN,
|
||||
"Unsupported WINDOW join table type");
|
||||
"Unsupported ASOF/WINDOW join table type");
|
||||
}
|
||||
|
||||
SRealTableNode* pRight = (SRealTableNode*)pJoinTable->pRight;
|
||||
if (TSDB_SUPER_TABLE != pRight->pMeta->tableType && TSDB_CHILD_TABLE != pRight->pMeta->tableType && TSDB_NORMAL_TABLE != pRight->pMeta->tableType) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN,
|
||||
"Unsupported WINDOW join table type");
|
||||
"Unsupported ASOF/WINDOW join table type");
|
||||
}
|
||||
|
||||
return addPrimEqCond(&pJoinTable->winPrimCond, pLeft, pRight);
|
||||
return addPrimJoinEqCond(&pJoinTable->addPrimCond, pLeft, pRight, pJoinTable->joinType, pJoinTable->subType);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -549,7 +549,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin;
|
||||
pJoin->pWindowOffset = nodesCloneNode(pJoinTable->pWindowOffset);
|
||||
pJoin->pJLimit = nodesCloneNode(pJoinTable->pJLimit);
|
||||
pJoin->winPrimEqCond = nodesCloneNode(pJoinTable->winPrimCond);
|
||||
pJoin->addPrimEqCond = nodesCloneNode(pJoinTable->addPrimCond);
|
||||
pJoin->node.pChildren = nodesMakeList();
|
||||
pJoin->seqWinGroup = (JOIN_STYPE_WIN == pJoinTable->subType) && pSelect->hasAggFuncs;
|
||||
if (NULL == pJoin->node.pChildren) {
|
||||
|
|
|
@ -98,8 +98,8 @@ static SJoinOptimizeOpt gJoinOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = {
|
|||
static SJoinOptimizeOpt gJoinWhereOpt[JOIN_TYPE_MAX_VALUE][JOIN_STYPE_MAX_VALUE] = {
|
||||
/* NONE OUTER SEMI ANTI ASOF WINDOW */
|
||||
/*INNER*/ {{PUSH_DOWN_ALL_COND}, {0}, {0}, {0}, {0}, {0}},
|
||||
/*LEFT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_ALL_COND}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}},
|
||||
/*RIGHT*/ {{0}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_ALL_COND}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}},
|
||||
/*LEFT*/ {{0}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}, {PUSH_DOWN_LEFT_FLT}},
|
||||
/*RIGHT*/ {{0}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}, {PUSH_DOWN_RIGHT_FLT}},
|
||||
/*FULL*/ {{0}, {0}, {0}, {0}, {0}, {0}},
|
||||
};
|
||||
|
||||
|
@ -757,8 +757,14 @@ static bool pdcJoinIsPrimEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
|
|||
}
|
||||
|
||||
SOperatorNode* pOper = (SOperatorNode*)pCond;
|
||||
if (OP_TYPE_EQUAL != pOper->opType && JOIN_STYPE_ASOF != pJoin->subType) {
|
||||
return false;
|
||||
if (OP_TYPE_EQUAL != pOper->opType) {
|
||||
if (JOIN_STYPE_ASOF != pJoin->subType) {
|
||||
return false;
|
||||
}
|
||||
if (OP_TYPE_GREATER_THAN != pOper->opType && OP_TYPE_GREATER_EQUAL != pOper->opType &&
|
||||
OP_TYPE_LOWER_THAN != pOper->opType && OP_TYPE_LOWER_EQUAL != pOper->opType) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
SSHashObj* pLeftTables = NULL;
|
||||
|
@ -809,8 +815,7 @@ static int32_t pdcJoinSplitPrimInLogicCond(SJoinLogicNode* pJoin, SNode** ppPrim
|
|||
SNodeList* pOnConds = NULL;
|
||||
SNode* pCond = NULL;
|
||||
WHERE_EACH(pCond, pLogicCond->pParameterList) {
|
||||
if (pdcJoinIsPrimEqualCond(pJoin, pCond)) {
|
||||
nodesDestroyNode(*ppPrimEqCond);
|
||||
if (pdcJoinIsPrimEqualCond(pJoin, pCond) && (NULL == *ppPrimEqCond)) {
|
||||
*ppPrimEqCond = nodesCloneNode(pCond);
|
||||
ERASE_NODE(pLogicCond->pParameterList);
|
||||
} else {
|
||||
|
@ -1195,11 +1200,15 @@ static int32_t pdcJoinAddWhereFilterColsToTarget(SOptimizeContext* pCxt, SJoinLo
|
|||
|
||||
static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||
if (NULL == pJoin->pFullOnCond) {
|
||||
if (IS_WINDOW_JOIN(pJoin->subType)) {
|
||||
if (IS_WINDOW_JOIN(pJoin->subType) || IS_ASOF_JOIN(pJoin->subType)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (NULL == pJoin->node.pConditions) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
|
||||
}
|
||||
|
||||
if ((!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) || NULL == pJoin->node.pConditions) {
|
||||
if (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
|
||||
}
|
||||
}
|
||||
|
@ -1210,15 +1219,22 @@ static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin
|
|||
if (errCond) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND);
|
||||
}
|
||||
if (IS_WINDOW_JOIN(pJoin->subType)) {
|
||||
if (IS_WINDOW_JOIN(pJoin->subType) || IS_ASOF_JOIN(pJoin->subType)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
|
||||
} else if (IS_WINDOW_JOIN(pJoin->subType)) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND);
|
||||
}
|
||||
|
||||
if (IS_ASOF_JOIN(pJoin->subType)) {
|
||||
nodesDestroyNode(pJoin->addPrimEqCond);
|
||||
pJoin->addPrimEqCond = NULL;
|
||||
}
|
||||
|
||||
if (IS_WINDOW_JOIN(pJoin->subType)) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1235,6 +1251,21 @@ static int32_t pdcJoinHandleGrpJoinCond(SOptimizeContext* pCxt, SJoinLogicNode*
|
|||
pJoin->pTagEqCond = NULL;
|
||||
nodesDestroyNode(pJoin->pFullOnCond);
|
||||
pJoin->pFullOnCond = NULL;
|
||||
if (!pJoin->allEqTags) {
|
||||
SNode* pNode = NULL;
|
||||
FOREACH(pNode, pJoin->pLeftEqNodes) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
if (COLUMN_TYPE_TAG != pCol->colType && PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND);
|
||||
}
|
||||
}
|
||||
FOREACH(pNode, pJoin->pRightEqNodes) {
|
||||
SColumnNode* pCol = (SColumnNode*)pNode;
|
||||
if (COLUMN_TYPE_TAG != pCol->colType && PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) {
|
||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -812,7 +812,7 @@ static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId,
|
|||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN == nodeType(pParam)) {
|
||||
if (QUERY_NODE_COLUMN != nodeType(pParam)) {
|
||||
planError("invalid primary cond left timetruncate param type, leftParamType:%d", nodeType(pParam));
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
@ -856,7 +856,7 @@ static int32_t setColEqCond(SNode* pEqCond, int32_t subType, int16_t leftBlkId,
|
|||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
SNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (QUERY_NODE_COLUMN == nodeType(pParam)) {
|
||||
if (QUERY_NODE_COLUMN != nodeType(pParam)) {
|
||||
planError("invalid primary cond right timetruncate param type, rightParamType:%d", nodeType(pParam));
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
@ -915,11 +915,17 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setColEqCond(pJoin->pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->leftPrimExpr) {
|
||||
code = addDataBlockSlot(pCxt, &pJoin->leftPrimExpr, pLeftDesc);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->rightPrimExpr) {
|
||||
code = addDataBlockSlot(pCxt, &pJoin->rightPrimExpr, pRightDesc);
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->winPrimEqCond) {
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->addPrimEqCond) {
|
||||
SNode* pPrimKeyCond = NULL;
|
||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->winPrimEqCond,
|
||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->addPrimEqCond,
|
||||
&pPrimKeyCond);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
static char* getUsageErrFormat(int32_t errCode) {
|
||||
switch (errCode) {
|
||||
case TSDB_CODE_PLAN_EXPECTED_TS_EQUAL:
|
||||
return "left.ts = right.ts is expected in join expression";
|
||||
return "primary timestamp equal condition is expected in join conditions";
|
||||
case TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN:
|
||||
return "not support cross join";
|
||||
case TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND:
|
||||
|
|
|
@ -1285,7 +1285,7 @@ int32_t toCharFunction(SScalarParam* pInput, int32_t inputNum, SScalarParam* pOu
|
|||
}
|
||||
|
||||
/** Time functions **/
|
||||
static int64_t offsetFromTz(char *timezone, int64_t factor) {
|
||||
int64_t offsetFromTz(char *timezone, int64_t factor) {
|
||||
char *minStr = &timezone[3];
|
||||
int64_t minutes = taosStr2Int64(minStr, NULL, 10);
|
||||
memset(minStr, 0, strlen(minStr));
|
||||
|
|
|
@ -69,6 +69,7 @@ run tsim/join/left_asof_join.sim
|
|||
run tsim/join/right_asof_join.sim
|
||||
run tsim/join/left_win_join.sim
|
||||
run tsim/join/right_win_join.sim
|
||||
run tsim/join/join_scalar.sim
|
||||
|
||||
print ================== restart server to commit data into disk
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
@ -87,5 +88,6 @@ run tsim/join/left_asof_join.sim
|
|||
run tsim/join/right_asof_join.sim
|
||||
run tsim/join/left_win_join.sim
|
||||
run tsim/join/right_win_join.sim
|
||||
run tsim/join/join_scalar.sim
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
sql connect
|
||||
sql use test0;
|
||||
|
||||
sql select a.ts, a.col1, b.ts, b.col1 from sta a left join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h);
|
||||
if $rows != 64 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, a.col1, b.ts, b.col1 from sta a join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h);
|
||||
if $rows != 64 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, a.col1, b.ts, b.col1 from sta a join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h) and a.col1=b.col1;
|
||||
if $rows != 12 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, a.col1, b.ts, b.col1 from sta a join sta b on timetruncate(a.ts, 1m) = b.ts;
|
||||
if $rows != 16 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.col1 from sta a left join sta b on timetruncate(b.ts, 1h) = a.ts;
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.col1,timetruncate(a.col1, 1h) from sta a left join sta b on a.ts = b.ts and timetruncate(a.col1, 1h) = timetruncate(a.col1, 1h);
|
||||
if $rows != 12 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.col1 from sta a left semi join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h);
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.col1 from sta a left anti join sta b on timetruncate(a.ts, 1h) = timetruncate(b.ts, 1h);
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.col1 from sta a left asof join sta b on timetruncate(a.ts, 1h) > timetruncate(b.ts, 1h);
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
sql select a.ts, b.col1 from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 2;
|
||||
if $rows != 16 then
|
||||
return -1
|
||||
endi
|
||||
sql select a.ts, b.col1 from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 8;
|
||||
if $rows != 64 then
|
||||
return -1
|
||||
endi
|
||||
sql select a.ts, b.col1 from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 9;
|
||||
if $rows != 64 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from sta a left asof join sta b on a.col1 =b.col1 and timetruncate(a.ts, 1h) > timetruncate(b.ts, 1h) jlimit 2;
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 2 where timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h);
|
||||
if $rows != 16 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts=b.ts and a.ts=b.ts;
|
||||
sql_error select a.ts, b.ts from sta a left asof join sta b on timetruncate(a.ts, 1h) > timetruncate(b.ts, 1h) and timetruncate(a.ts, 1h) > timetruncate(b.ts, 1h);
|
||||
sql_error select a.ts, b.ts from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) and a.ts > timetruncate(b.ts, 1s) jlimit 2;
|
||||
sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts > timetruncate(b.ts, 1s) and timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) jlimit 2;
|
||||
sql_error select a.ts, b.ts from sta a left asof join sta b on timetruncate(a.ts, 1h) >= timetruncate(b.ts, 1h) and a.ts =b.col1 jlimit 2;
|
||||
sql_error select a.ts, b.col1 from sta a left join sta b on timetruncate(b.ts, 1h) + 1 = a.ts;
|
||||
sql_error select a.ts, b.col1 from sta a left join sta b on timetruncate(b.ts, 1h) = a.ts + 1;
|
||||
sql_error select a.ts, b.col1 from sta a left join sta b on b.ts + 1 = a.ts + 1;
|
||||
|
||||
|
||||
|
|
@ -960,10 +960,31 @@ if $data11 != 2 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from sta a left asof join sta b;
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
sql select a.ts, b.ts from sta a left asof join sta b jlimit 3;
|
||||
if $rows != 22 then
|
||||
return -1
|
||||
endi
|
||||
sql select a.ts, b.ts from sta a left asof join sta b where a.ts > b.ts;
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select a.ts, b.ts from sta a left asof join sta b jlimit 3 where a.ts > b.ts;
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql_error select a.ts, b.ts from sta a asof join sta b on a.ts = b.ts;
|
||||
sql_error select a.ts, b.ts from sta a full asof join sta b on a.ts = b.ts;
|
||||
sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts != b.ts;
|
||||
sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1=a.ts;
|
||||
sql_error select a.ts, b.ts from sta a left asof join sta b on a.ts >=b.ts and a.col1 > 1;
|
||||
sql_error select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1 > b.col1 jlimit 2 order by a.ts;
|
||||
sql_error select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and a.col1 = 1 jlimit 2 order by a.ts;
|
||||
sql_error select a.t1, a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 having(a.ts>0) order by a.t1, a.ts, b.ts;
|
||||
sql_error select count(*) from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 where a.ts > '2023-11-17 16:29:00.000' slimit 1;
|
||||
sql_error select a.ts, b.ts from (select * from sta) a left asof join sta b where a.ts = b.ts;
|
||||
|
||||
|
|
|
@ -74,3 +74,6 @@ if $data11 != 4 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql_error select a.ts, b.ts from sta a left semi join sta b jlimit 3 where a.ts > b.ts;
|
||||
sql_error select a.ts, b.ts from sta a left semi join sta b where a.ts > b.ts;
|
||||
sql_error select a.ts, b.ts from sta a left semi join sta b on a.ts > 1 where a.ts = b.ts;
|
||||
|
|
|
@ -581,3 +581,7 @@ sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s,
|
|||
sql_error select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(a.ts > 0) order by count(*);
|
||||
sql_error select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s) slimit 1;
|
||||
sql_error select a.ts, b.ts from sta a left window join sta b on a.t1=1 window_offset(-1s, 1s) order by a.t1, a.ts;
|
||||
sql_error select a.ts, b.ts from sta a left window join sta b on a.ts > 1 window_offset(-1s, 1s) where a.ts = b.ts;
|
||||
sql_error select a.ts, b.ts from sta a left window join sta b on a.ts =b.ts window_offset(-1s, 1s) where a.ts = b.ts;
|
||||
sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1, 1) where a.ts = b.ts;
|
||||
sql_error select a.ts, b.ts from (select * from sta) a left window join sta b window_offset(-1s, 1s) where a.ts = b.ts;
|
||||
|
|
Loading…
Reference in New Issue