enh: add merge join primary expr type

This commit is contained in:
dapan1121 2025-02-26 16:30:36 +08:00
parent 4224d55b39
commit e41cd44c51
7 changed files with 103 additions and 46 deletions

View File

@ -61,6 +61,7 @@ typedef struct SExprNode {
bool asAlias;
bool asParam;
bool asPosition;
bool joinSrc;
int32_t projIdx;
int32_t relatedTo;
int32_t bindExprID;
@ -209,6 +210,7 @@ typedef struct STableNode {
char tableAlias[TSDB_TABLE_NAME_LEN];
uint8_t precision;
bool singleTable;
bool inJoin;
} STableNode;
struct STableMeta;

View File

@ -40,6 +40,11 @@ typedef enum EJoinTableType {
E_JOIN_TB_PROBE
} EJoinTableType;
typedef enum EPrimExprType {
E_PRIM_TIMETRUNCATE = 1,
E_PRIM_VALUE
} EPrimExprType;
#define MJOIN_TBTYPE(_type) (E_JOIN_TB_BUILD == (_type) ? "BUILD" : "PROBE")
#define IS_FULL_OUTER_JOIN(_jtype, _stype) ((_jtype) == JOIN_TYPE_FULL && (_stype) == JOIN_STYPE_OUTER)
@ -87,9 +92,15 @@ typedef struct SMJoinNMatchCtx {
// for now timetruncate only
typedef struct SMJoinPrimExprCtx {
EPrimExprType type;
// FOR TIMETRUNCATE
int64_t truncateUnit;
int64_t timezoneUnit;
int32_t targetSlotId;
// FOR VALUE
int64_t constTs;
} SMJoinPrimExprCtx;
typedef struct SMJoinTableCtx {

View File

@ -894,21 +894,7 @@ static int32_t mJoinInitFinColsInfo(SMJoinTableCtx* pTable, SNodeList* pList) {
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) {
if (NULL == pNode) {
pCtx->targetSlotId = pTable->primCol->srcSlot;
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_TARGET != nodeType(pNode)) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
STargetNode* pTarget = (STargetNode*)pNode;
if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr)) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
static int32_t mJoinInitFuncPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
SFunctionNode* pFunc = (SFunctionNode*)pTarget->pExpr;
if (FUNCTION_TYPE_TIMETRUNCATE != pFunc->funcType) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
@ -939,6 +925,47 @@ static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoi
pCtx->timezoneUnit = offsetFromTz(varDataVal(pTimeZone->datum.p), TSDB_TICK_PER_SECOND(pFunc->node.resType.precision));
}
pCtx->type = E_PRIM_TIMETRUNCATE;
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitValPrimExprCtx(SMJoinPrimExprCtx* pCtx, STargetNode* pTarget) {
SValueNode* pVal = (SValueNode*)pTarget->pExpr;
if (TSDB_DATA_TYPE_TIMESTAMP != pVal->node.resType.type) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
pCtx->constTs = pVal->datum.i;
pCtx->type = E_PRIM_VALUE;
return TSDB_CODE_SUCCESS;
}
static int32_t mJoinInitPrimExprCtx(SNode* pNode, SMJoinPrimExprCtx* pCtx, SMJoinTableCtx* pTable) {
if (NULL == pNode) {
pCtx->targetSlotId = pTable->primCol->srcSlot;
return TSDB_CODE_SUCCESS;
}
if (QUERY_NODE_TARGET != nodeType(pNode)) {
qError("primary expr node is not target, type:%d", nodeType(pNode));
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
STargetNode* pTarget = (STargetNode*)pNode;
if (QUERY_NODE_FUNCTION != nodeType(pTarget->pExpr) && QUERY_NODE_VALUE != nodeType(pTarget->pExpr)) {
qError("Invalid primary expr node type:%d", nodeType(pTarget->pExpr));
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
if (QUERY_NODE_FUNCTION == nodeType(pTarget->pExpr)) {
MJ_ERR_RET(mJoinInitFuncPrimExprCtx(pCtx, pTarget));
} else if (QUERY_NODE_VALUE == nodeType(pTarget->pExpr)) {
MJ_ERR_RET(mJoinInitValPrimExprCtx(pCtx, pTarget));
}
pCtx->targetSlotId = pTarget->slotId;
return TSDB_CODE_SUCCESS;
@ -1045,14 +1072,16 @@ int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
return TSDB_CODE_SUCCESS;
}
SMJoinPrimExprCtx* pCtx = &pTable->primCtx;
SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
if (NULL == pPrimIn) {
SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
if (NULL == pPrimOut) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId);
if (NULL == pPrimOut) {
SMJoinPrimExprCtx* pCtx = &pTable->primCtx;
switch (pCtx->type) {
case E_PRIM_TIMETRUNCATE: {
SColumnInfoData* pPrimIn = taosArrayGet(pBlock->pDataBlock, pTable->primCol->srcSlot);
if (NULL == pPrimIn) {
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
@ -1065,6 +1094,15 @@ int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) {
((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit;
}
}
break;
}
case E_PRIM_VALUE: {
MJ_ERR_RET(colDataSetNItems(pPrimOut, 0, (char*)&pCtx->constTs, pBlock->info.rows, false));
break;
}
default:
break;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -105,6 +105,7 @@ static int32_t exprNodeCopy(const SExprNode* pSrc, SExprNode* pDst) {
COPY_SCALAR_FIELD(asAlias);
COPY_SCALAR_FIELD(asParam);
COPY_SCALAR_FIELD(asPosition);
COPY_SCALAR_FIELD(joinSrc);
COPY_SCALAR_FIELD(projIdx);
COPY_SCALAR_FIELD(relatedTo);
COPY_SCALAR_FIELD(bindExprID);
@ -242,6 +243,7 @@ static int32_t tableNodeCopy(const STableNode* pSrc, STableNode* pDst) {
COPY_CHAR_ARRAY_FIELD(tableAlias);
COPY_SCALAR_FIELD(precision);
COPY_SCALAR_FIELD(singleTable);
COPY_SCALAR_FIELD(inJoin);
return TSDB_CODE_SUCCESS;
}
@ -321,8 +323,6 @@ static int32_t joinTableNodeCopy(const SJoinTableNode* pSrc, SJoinTableNode* pDs
CLONE_NODE_FIELD(addPrimCond);
COPY_SCALAR_FIELD(hasSubQuery);
COPY_SCALAR_FIELD(isLowLevelJoin);
COPY_SCALAR_FIELD(leftNoOrderQuery);
COPY_SCALAR_FIELD(rightNoOrderQuery);
CLONE_NODE_FIELD(pLeft);
CLONE_NODE_FIELD(pRight);
CLONE_NODE_FIELD(pOnCond);
@ -544,11 +544,8 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
COPY_SCALAR_FIELD(grpJoin);
COPY_SCALAR_FIELD(hashJoinHint);
COPY_SCALAR_FIELD(batchScanHint);
COPY_SCALAR_FIELD(leftNoOrderQuery);
COPY_SCALAR_FIELD(rightNoOrderQuery);
COPY_SCALAR_FIELD(noPrimKeyEqCond);
CLONE_NODE_FIELD(pLeftConstPrim);
CLONE_NODE_FIELD(pRightConstPrim);
COPY_SCALAR_FIELD(constPrimGot);
CLONE_NODE_FIELD(pLeftOnCond);
CLONE_NODE_FIELD(pRightOnCond);
COPY_SCALAR_FIELD(timeRangeTarget);

View File

@ -42,12 +42,12 @@ int32_t translatePostCreateStream(SParseContext* pParseCxt, SQuery* pQuery, SSDa
int32_t translatePostCreateSmaIndex(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock);
int32_t translatePostCreateTSMA(SParseContext* pParseCxt, SQuery* pQuery, SSDataBlock* pBlock);
int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placeholderNo, SArray** pPlaceholderValues);
int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinParent);
int32_t translateTable(STranslateContext* pCxt, SNode** pTable, bool inJoin);
int32_t getMetaDataFromHash(const char* pKey, int32_t len, SHashObj* pHash, void** pOutput);
void tfreeSParseQueryRes(void* p);
#ifdef TD_ENTERPRISE
int32_t translateView(STranslateContext* pCxt, SNode** pTable, SName* pName);
int32_t translateView(STranslateContext* pCxt, SNode** pTable, SName* pName, bool inJoin);
int32_t getViewMetaFromMetaCache(STranslateContext* pCxt, SName* pName, SViewMeta** ppViewMeta);
#endif
#ifdef __cplusplus

View File

@ -207,6 +207,7 @@ static int32_t findAndReplaceNode(SCalcConstContext* pCxt, SNode** pRoot, SNode*
static int32_t calcConstProject(SCalcConstContext* pCxt, SNode* pProject, bool dual, SNode** pNew) {
SArray* pAssociation = NULL;
if (NULL != ((SExprNode*)pProject)->pAssociation) {
pAssociation = taosArrayDup(((SExprNode*)pProject)->pAssociation, NULL);
if (NULL == pAssociation) {
@ -227,6 +228,10 @@ static int32_t calcConstProject(SCalcConstContext* pCxt, SNode* pProject, bool d
for (int32_t i = 0; i < size; ++i) {
SAssociationNode* pAssNode = taosArrayGet(pAssociation, i);
SNode** pCol = pAssNode->pPlace;
if (((SExprNode*)pAssNode->pAssociationNode)->joinSrc) {
continue;
}
if (*pCol == pAssNode->pAssociationNode) {
tstrncpy(aliasName, ((SExprNode*)*pCol)->aliasName, TSDB_COL_NAME_LEN);
SArray* pOrigAss = NULL;

View File

@ -1383,7 +1383,7 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p
pCol->numOfPKs = pTable->pMeta->tableInfo.numOfPKs;
}
static int32_t setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) {
static int32_t setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef, bool joinSrc) {
SColumnNode* pCol = *pColRef;
if (NULL == pExpr->pAssociation) {
@ -1413,6 +1413,7 @@ static int32_t setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SCo
tstrncpy(pCol->node.userAlias, pExpr->userAlias, TSDB_COL_NAME_LEN);
}
pCol->node.resType = pExpr->resType;
pCol->node.joinSrc = pTable->table.inJoin && joinSrc;
return TSDB_CODE_SUCCESS;
}
@ -1494,7 +1495,7 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p
code = nodesListStrictAppend(pList, (SNode*)pCol);
if (TSDB_CODE_SUCCESS == code) {
SListCell* pCell = nodesListGetCell(pList, LIST_LENGTH(pList) - 1);
code = setColumnInfoByExpr(pTempTable, (SExprNode*)pNode, (SColumnNode**)&pCell->pNode);
code = setColumnInfoByExpr(pTempTable, (SExprNode*)pNode, (SColumnNode**)&pCell->pNode, true);
}
if (TSDB_CODE_SUCCESS == code) {
if (!skipProjRef)
@ -1591,7 +1592,7 @@ static int32_t findAndSetColumn(STranslateContext* pCxt, SColumnNode** pColRef,
}
}
if (pFoundExpr) {
code = setColumnInfoByExpr(pTempTable, pFoundExpr, pColRef);
code = setColumnInfoByExpr(pTempTable, pFoundExpr, pColRef, SQL_CLAUSE_FROM != pCxt->currClause);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -5097,9 +5098,12 @@ static int32_t setJoinTimeLineResMode(STranslateContext* pCxt) {
return TSDB_CODE_SUCCESS;
}
int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinParent) {
int32_t translateTable(STranslateContext* pCxt, SNode** pTable, bool inJoin) {
SSelectStmt* pCurrSmt = (SSelectStmt*)(pCxt->pCurrStmt);
int32_t code = TSDB_CODE_SUCCESS;
((STableNode*)*pTable)->inJoin = inJoin;
switch (nodeType(*pTable)) {
case QUERY_NODE_REAL_TABLE: {
SRealTableNode* pRealTable = (SRealTableNode*)*pTable;
@ -5115,7 +5119,7 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare
}
#ifdef TD_ENTERPRISE
if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType && (!pCurrSmt->tagScan || pCxt->pParseCxt->biMode)) {
return translateView(pCxt, pTable, &name);
return translateView(pCxt, pTable, &name, inJoin);
}
code = translateAudit(pCxt, pRealTable, &name);
#endif
@ -5172,10 +5176,10 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare
SJoinTableNode* pJoinTable = (SJoinTableNode*)*pTable;
code = translateJoinTable(pCxt, pJoinTable);
if (TSDB_CODE_SUCCESS == code) {
code = translateTable(pCxt, &pJoinTable->pLeft, (SNode*)pJoinTable);
code = translateTable(pCxt, &pJoinTable->pLeft, true);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateTable(pCxt, &pJoinTable->pRight, (SNode*)pJoinTable);
code = translateTable(pCxt, &pJoinTable->pRight, true);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkJoinTable(pCxt, pJoinTable);
@ -7132,7 +7136,7 @@ static int32_t translateWhere(STranslateContext* pCxt, SSelectStmt* pSelect) {
static int32_t translateFrom(STranslateContext* pCxt, SNode** pTable) {
pCxt->currClause = SQL_CLAUSE_FROM;
return translateTable(pCxt, pTable, NULL);
return translateTable(pCxt, pTable, false);
}
static int32_t checkLimit(STranslateContext* pCxt, SSelectStmt* pSelect) {