fix: inner join issue

This commit is contained in:
dapan1121 2024-03-06 11:18:57 +08:00
parent f13c9d2ca4
commit 7045126aa6
13 changed files with 199 additions and 46 deletions

View File

@ -207,6 +207,8 @@ typedef struct SViewNode {
int8_t cacheLastMode;
} SViewNode;
#define JOIN_JLIMIT_MAX_VALUE 1024
#define IS_INNER_NONE_JOIN(_type, _stype) ((_type) == JOIN_TYPE_INNER && (_stype) == JOIN_STYPE_NONE)
#define IS_WINDOW_JOIN(_stype) ((_stype) == JOIN_STYPE_WIN)
#define IS_ASOF_JOIN(_stype) ((_stype) == JOIN_STYPE_ASOF)

View File

@ -761,6 +761,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT TAOS_DEF_ERROR_CODE(0, 0x2670)
#define TSDB_CODE_PAR_GRP_WINDOW_NOT_ALLOWED TAOS_DEF_ERROR_CODE(0, 0x2671)
#define TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR TAOS_DEF_ERROR_CODE(0, 0x2672)
#define TSDB_CODE_PAR_INVALID_WIN_OFFSET_UNIT TAOS_DEF_ERROR_CODE(0, 0x2673)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner

View File

@ -270,6 +270,7 @@ typedef struct SMJoinFlowFlags {
typedef struct SMJoinCtx {
SMJoinFlowFlags* pFlags;
bool mergeCtxInUse;
union {
SMJoinMergeCtx mergeCtx;
SMJoinWindowCtx windowCtx;
@ -424,6 +425,7 @@ void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin);
void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin);
int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
void mWinJoinResetWindowCache(SMJoinWinCache* pCache);
SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator);
SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator);

View File

@ -457,6 +457,7 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
}
}
bool initParam = pSrcParam0 ? true : false;
if (TSDB_CODE_SUCCESS == code) {
code = buildGroupCacheOperatorParam(&pGcParam0, 0, *leftVg, *leftUid, pPost->leftNeedCache, pSrcParam0);
pSrcParam0 = NULL;
@ -466,7 +467,7 @@ static int32_t buildSeqStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SS
pSrcParam1 = NULL;
}
if (TSDB_CODE_SUCCESS == code) {
code = buildMergeJoinOperatorParam(ppParam, pSrcParam0 ? true : false, pGcParam0, pGcParam1);
code = buildMergeJoinOperatorParam(ppParam, initParam, pGcParam0, pGcParam1);
}
if (TSDB_CODE_SUCCESS != code) {
if (pSrcParam0) {

View File

@ -2598,19 +2598,7 @@ void mAsofJoinGroupReset(SMJoinOperatorInfo* pJoin) {
pWin->eqPostDone = false;
pWin->lastTs = INT64_MIN;
pCache->outRowIdx = 0;
pCache->rowNum = 0;
pCache->grpIdx = 0;
if (pCache->grpsQueue) {
TSWAP(pCache->grps, pCache->grpsQueue);
}
taosArrayClear(pCache->grps);
if (pCache->outBlk) {
blockDataCleanup(pCache->outBlk);
}
mWinJoinResetWindowCache(pCache);
mJoinResetGroupTableCtx(pJoin->probe);
mJoinResetGroupTableCtx(pJoin->build);
@ -3004,19 +2992,7 @@ void mWinJoinGroupReset(SMJoinOperatorInfo* pJoin) {
pWin->eqPostDone = false;
pWin->lastTs = INT64_MIN;
pCache->outRowIdx = 0;
pCache->rowNum = 0;
pCache->grpIdx = 0;
if (pCache->grpsQueue) {
TSWAP(pCache->grps, pCache->grpsQueue);
}
taosArrayClear(pCache->grps);
if (pCache->outBlk) {
blockDataCleanup(pCache->outBlk);
}
mWinJoinResetWindowCache(pCache);
mJoinResetGroupTableCtx(pJoin->probe);
mJoinResetGroupTableCtx(pJoin->build);

View File

@ -674,11 +674,11 @@ SOperatorInfo** mJoinBuildDownstreams(SMJoinOperatorInfo* pInfo, SOperatorInfo**
return p;
}
int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t *numOfDownstream, bool *newDownstreams) {
int32_t mJoinInitDownstreamInfo(SMJoinOperatorInfo* pInfo, SOperatorInfo*** pDownstream, int32_t *numOfDownstream, bool *newDownstreams) {
if (1 == *numOfDownstream) {
*newDownstreams = true;
pDownstream = mJoinBuildDownstreams(pInfo, pDownstream);
if (NULL == pDownstream) {
*pDownstream = mJoinBuildDownstreams(pInfo, *pDownstream);
if (NULL == *pDownstream) {
return TSDB_CODE_OUT_OF_MEMORY;
}
*numOfDownstream = 2;
@ -807,10 +807,10 @@ static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoi
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat) {
static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode, SOperatorInfo** pDownstream, int32_t idx, SQueryStat* pStat, bool sameDs) {
SMJoinTableCtx* pTable = &pJoin->tbs[idx];
pTable->downStream = pDownstream[idx];
pTable->blkId = pDownstream[idx]->resultDataBlockId;
pTable->blkId = getOperatorResultBlockId(pDownstream[idx], sameDs ? idx : 0);
MJ_ERR_RET(mJoinInitPrimKeyInfo(pTable, (0 == idx) ? pJoinNode->leftPrimSlotId : pJoinNode->rightPrimSlotId));
MJ_ERR_RET(mJoinInitKeyColsInfo(pTable, (0 == idx) ? pJoinNode->pEqLeft : pJoinNode->pEqRight, JOIN_TYPE_FULL == pJoin->joinType));
@ -1011,9 +1011,11 @@ static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode*
if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType)))
|| (JOIN_STYPE_WIN == pJoin->subType)) {
pJoin->ctx.mergeCtxInUse = false;
return mJoinInitWindowCtx(pJoin, pJoinNode);
}
pJoin->ctx.mergeCtxInUse = true;
return mJoinInitMergeCtx(pJoin, pJoinNode);
}
@ -1423,8 +1425,38 @@ void mJoinResetMergeCtx(SMJoinMergeCtx* pCtx) {
pCtx->hashJoin = false;
}
void mWinJoinResetWindowCache(SMJoinWinCache* pCache) {
pCache->outRowIdx = 0;
pCache->rowNum = 0;
pCache->grpIdx = 0;
if (pCache->grpsQueue) {
TSWAP(pCache->grps, pCache->grpsQueue);
}
taosArrayClear(pCache->grps);
if (pCache->outBlk) {
blockDataCleanup(pCache->outBlk);
}
}
void mJoinResetWindowCtx(SMJoinWindowCtx* pCtx) {
pCtx->grpRemains = false;
pCtx->lastEqGrp = false;
pCtx->lastProbeGrp = false;
pCtx->eqPostDone = false;
pCtx->lastTs = INT64_MIN;
mWinJoinResetWindowCache(&pCtx->cache);
}
void mJoinResetCtx(SMJoinOperatorInfo* pJoin) {
mJoinResetMergeCtx(&pJoin->ctx.mergeCtx);
if (pJoin->ctx.mergeCtxInUse) {
mJoinResetMergeCtx(&pJoin->ctx.mergeCtx);
} else {
mJoinResetWindowCtx(&pJoin->ctx.windowCtx);
}
}
void mJoinResetOperator(struct SOperatorInfo* pOperator) {
@ -1612,7 +1644,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
}
pInfo->pOperator = pOperator;
MJ_ERR_JRET(mJoinInitDownstreamInfo(pInfo, pDownstream, &numOfDownstream, &newDownstreams));
MJ_ERR_JRET(mJoinInitDownstreamInfo(pInfo, &pDownstream, &numOfDownstream, &newDownstreams));
setOperatorInfo(pOperator, "MergeJoinOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
@ -1620,8 +1652,8 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
MJ_ERR_JRET(mJoinHandleConds(pInfo, pJoinNode));
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0]);
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1]);
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 0, &pJoinNode->inputStat[0], newDownstreams);
mJoinInitTableInfo(pInfo, pJoinNode, pDownstream, 1, &pJoinNode->inputStat[1], newDownstreams);
MJ_ERR_JRET(mJoinInitCtx(pInfo, pJoinNode));
MJ_ERR_JRET(mJoinSetImplFp(pInfo));

View File

@ -2742,6 +2742,38 @@ static bool isWindowJoinProbeTablePrimCol(SSelectStmt* pSelect, SNode* p
return false;
}
typedef struct SCheckColContaisCtx {
SNode* pTarget;
bool contains;
} SCheckColContaisCtx;
static EDealRes checkColContains(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return DEAL_RES_CONTINUE;
}
SCheckColContaisCtx* pCtx = (SCheckColContaisCtx*)pContext;
if (nodesEqualNode(pCtx->pTarget, pNode)) {
pCtx->contains = true;
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
static bool isWindowJoinGroupCol(SSelectStmt* pSelect, SNode* pNode) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
SCheckColContaisCtx ctx = {.pTarget = pNode, .contains = false};
SJoinTableNode* pJoinTable = (SJoinTableNode*)pSelect->pFromTable;
nodesWalkExpr(pJoinTable->pOnCond, checkColContains, &ctx);
return ctx.contains;
}
static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
STranslateContext* pCxt = (STranslateContext*)pContext;
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
@ -2770,7 +2802,7 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
}
}
if (isWindowJoinStmt(pSelect)) {
if (isWindowJoinProbeTablePrimCol(pSelect, *pNode)) {
if (isWindowJoinProbeTablePrimCol(pSelect, *pNode) || isWindowJoinGroupCol(pSelect, *pNode)) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
}
@ -3363,6 +3395,12 @@ static int32_t translateJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoin
if (TSDB_CODE_SUCCESS == code) {
SValueNode* pStart = (SValueNode*)((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pStartOffset;
SValueNode* pEnd = (SValueNode*)((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pEndOffset;
if (TIME_UNIT_MONTH == pStart->unit || TIME_UNIT_YEAR == pStart->unit) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WIN_OFFSET_UNIT, pStart->unit);
}
if (TIME_UNIT_MONTH == pEnd->unit || TIME_UNIT_YEAR == pEnd->unit) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WIN_OFFSET_UNIT, pEnd->unit);
}
if (pStart->datum.i > pEnd->datum.i) {
TSWAP(((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pStartOffset, ((SWindowOffsetNode*)pJoinTable->pWindowOffset)->pEndOffset);
}
@ -3371,8 +3409,14 @@ static int32_t translateJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoin
return buildInvalidOperationMsg(&pCxt->msgBuf, "WINDOW_OFFSET required for WINDOW join");
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinTable->pJLimit && *pSType != JOIN_STYPE_ASOF && *pSType != JOIN_STYPE_WIN) {
return buildInvalidOperationMsgExt(&pCxt->msgBuf, "JLIMIT not supported for %s join", getFullJoinTypeString(type, *pSType));
if (TSDB_CODE_SUCCESS == code && NULL != pJoinTable->pJLimit) {
if (*pSType != JOIN_STYPE_ASOF && *pSType != JOIN_STYPE_WIN) {
return buildInvalidOperationMsgExt(&pCxt->msgBuf, "JLIMIT not supported for %s join", getFullJoinTypeString(type, *pSType));
}
SLimitNode* pJLimit = (SLimitNode*)pJoinTable->pJLimit;
if (pJLimit->limit > JOIN_JLIMIT_MAX_VALUE) {
return buildInvalidOperationMsg(&pCxt->msgBuf, "JLIMIT value is out of valid range [0, 1024]");
}
}
return code;

View File

@ -196,6 +196,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Operator not supported multi result: %s";
case TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR:
return "Not supported window join having expr";
case TSDB_CODE_PAR_INVALID_WIN_OFFSET_UNIT:
return "Invalid WINDOW_OFFSET unit \"%s\"";
default:
return "Unknown error";
}

View File

@ -1157,7 +1157,7 @@ static int32_t pdcJoinAddPreFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogi
}
static int32_t pdcJoinAddWhereFilterColsToTarget(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
if (NULL == pJoin->node.pConditions) {
if (NULL == pJoin->node.pConditions && (!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) || NULL == pJoin->pFullOnCond)) {
return TSDB_CODE_SUCCESS;
}
@ -1165,10 +1165,15 @@ static int32_t pdcJoinAddWhereFilterColsToTarget(SOptimizeContext* pCxt, SJoinLo
SNodeList* pCondCols = nodesMakeList();
SNodeList* pTargets = NULL;
if (NULL == pCondCols) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL != pJoin->node.pConditions) {
code = nodesCollectColumnsFromNode(pJoin->node.pConditions, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
}
if (TSDB_CODE_SUCCESS == code && IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType) && NULL != pJoin->pFullOnCond) {
code = nodesCollectColumnsFromNode(pJoin->pFullOnCond, NULL, COLLECT_COL_TYPE_ALL, &pCondCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExprs(pCondCols, &pTargets);
}
@ -4365,12 +4370,16 @@ static void stbJoinOptRemoveTagEqCond(SJoinLogicNode* pJoin) {
} else if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pTagEqCond)) {
SLogicConditionNode* pTags = (SLogicConditionNode*)pJoin->pTagEqCond;
SNode* pTag = NULL;
bool found = false;
FOREACH(pTag, pTags->pParameterList) {
if (nodesEqualNode(pTag, pNode)) {
ERASE_NODE(pLogic->pParameterList);
found = true;
break;
}
}
if (found) {
ERASE_NODE(pLogic->pParameterList);
}
}
}

View File

@ -625,6 +625,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is confli
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_SUPPORT_MULTI_RESULT, "Operator not supported multi result")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR, "Invalid window join having expr")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GRP_WINDOW_NOT_ALLOWED, "GROUP BY/PARTITION BY/WINDOW-clause can't be used in WINDOW join")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_WIN_OFFSET_UNIT, "Invalid window offset unit")
//planner
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")

View File

@ -105,3 +105,33 @@ if $data11 != 4 then
return -1
endi
sql select a.ts, a.col1, b.ts,b.col1 from sta a join sta b on a.ts = b.ts and a.t1=b.t1 order by a.t1, a.ts;
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data02 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data03 != 1 then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != 3 then
return -1
endi
if $data12 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data13 != 3 then
return -1
endi

View File

@ -987,4 +987,4 @@ sql_error select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts and
sql_error select a.t1, a.ts, b.ts from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 having(a.ts>0) order by a.t1, a.ts, b.ts;
sql_error select count(*) from sta a left asof join sta b on a.ts <= b.ts and a.t1=b.t1 and a.col1=b.col1 jlimit 2 where a.ts > '2023-11-17 16:29:00.000' slimit 1;
sql_error select a.ts, b.ts from (select * from sta) a left asof join sta b where a.ts = b.ts;
sql_error select a.ts, b.ts from tba1 a left asof join tba2 b jlimit 1025;

View File

@ -572,6 +572,58 @@ if $data71 != NULL then
return -1
endi
sql SELECT a.ts, count(b.*) FROM tba1 a LEFT WINDOW JOIN tba2 b ON a.col1 = b.col1 WINDOW_OFFSET(-1s, 1s) where a.col1 > 1 order by a.ts;
if $rows != 3 then
return -1
endi
if $data00 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data10 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data11 != 0 then
return -1
endi
if $data20 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data21 != 1 then
return -1
endi
sql SELECT a.ts, count(b.*) FROM tba1 a LEFT WINDOW JOIN tba2 b ON a.col1 = b.col1 WINDOW_OFFSET(-1s, 1s) order by a.col1;
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != 1 then
return -1
endi
if $data20 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data21 != 0 then
return -1
endi
if $data30 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data31 != 1 then
return -1
endi
sql_error select a.col1, count(*) from sta a left window join sta b window_offset(-1s, 1s);
sql_error select b.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s);
sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(b.ts > 0);
@ -585,3 +637,4 @@ sql_error select a.ts, b.ts from sta a left window join sta b on a.ts > 1 window
sql_error select a.ts, b.ts from sta a left window join sta b on a.ts =b.ts window_offset(-1s, 1s) where a.ts = b.ts;
sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1, 1) where a.ts = b.ts;
sql_error select a.ts, b.ts from (select * from sta) a left window join sta b window_offset(-1s, 1s) where a.ts = b.ts;
sql_error select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-10a, 1a) jlimit 1025;