From 85f87ae246900401712c0c83e3d15c266b55b305 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 26 Mar 2022 05:45:48 -0400 Subject: [PATCH] bugfix --- .gitignore | 1 + include/libs/nodes/plannodes.h | 2 + include/libs/nodes/querynodes.h | 3 +- source/libs/nodes/src/nodesCloneFuncs.c | 16 ++++---- source/libs/nodes/src/nodesCodeFuncs.c | 7 ++++ source/libs/nodes/src/nodesTraverseFuncs.c | 6 +++ source/libs/parser/inc/parUtil.h | 2 + source/libs/parser/src/parAstCreater.c | 7 ++++ source/libs/parser/src/parTranslater.c | 4 ++ source/libs/planner/src/planLogicCreater.c | 8 +++- source/libs/planner/src/planPhysiCreater.c | 46 +++++++++++++++++++--- 11 files changed, 88 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 1bfbf00cd5..b62bd62d9c 100644 --- a/.gitignore +++ b/.gitignore @@ -89,6 +89,7 @@ tests/examples/JDBC/JDBCDemo/.project tests/examples/JDBC/JDBCDemo/.settings/ source/libs/parser/inc/sql.* tests/script/tmqResult.txt +tests/tmqResult.txt # Emacs # -*- mode: gitignore; -*- diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 7b0a563907..c87dd0ec8b 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -97,6 +97,7 @@ typedef struct SWindowLogicNode { int8_t slidingUnit; SFillNode* pFill; int64_t sessionGap; + SNode* pTspk; } SWindowLogicNode; typedef struct SSortLogicNode { @@ -228,6 +229,7 @@ typedef struct SWinodwPhysiNode { typedef struct SIntervalPhysiNode { SWinodwPhysiNode window; + SNode* pTspk; // timestamp primary key int64_t interval; int64_t offset; int64_t sliding; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 5609bb4269..1ba7ae10d5 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -191,12 +191,13 @@ typedef struct SStateWindowNode { typedef struct SSessionWindowNode { ENodeType type; // QUERY_NODE_SESSION_WINDOW - SNode* pCol; + SNode* pCol; // timestamp primary key SNode* pGap; // gap between two session window(in microseconds) } SSessionWindowNode; typedef struct SIntervalWindowNode { ENodeType type; // QUERY_NODE_INTERVAL_WINDOW + SNode* pCol; // timestamp primary key SNode* pInterval; // SValueNode SNode* pOffset; // SValueNode SNode* pSliding; // SValueNode diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 161427b4b5..17c66f6ed7 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -278,16 +278,18 @@ static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNo } static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) { + COPY_ALL_SCALAR_FIELDS; COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); - COPY_SCALAR_FIELD(winType); + // COPY_SCALAR_FIELD(winType); CLONE_NODE_LIST_FIELD(pFuncs); - COPY_SCALAR_FIELD(interval); - COPY_SCALAR_FIELD(offset); - COPY_SCALAR_FIELD(sliding); - COPY_SCALAR_FIELD(intervalUnit); - COPY_SCALAR_FIELD(slidingUnit); + // COPY_SCALAR_FIELD(interval); + // COPY_SCALAR_FIELD(offset); + // COPY_SCALAR_FIELD(sliding); + // COPY_SCALAR_FIELD(intervalUnit); + // COPY_SCALAR_FIELD(slidingUnit); CLONE_NODE_FIELD(pFill); - COPY_SCALAR_FIELD(sessionGap); + // COPY_SCALAR_FIELD(sessionGap); + CLONE_NODE_FIELD(pTspk); return (SNode*)pDst; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 4225b5c9a8..849764c995 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -988,6 +988,7 @@ static const char* jkIntervalPhysiPlanSliding = "Sliding"; static const char* jkIntervalPhysiPlanIntervalUnit = "intervalUnit"; static const char* jkIntervalPhysiPlanSlidingUnit = "slidingUnit"; static const char* jkIntervalPhysiPlanFill = "Fill"; +static const char* jkIntervalPhysiPlanTsPk = "TsPk"; static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) { const SIntervalPhysiNode* pNode = (const SIntervalPhysiNode*)pObj; @@ -1011,6 +1012,9 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkIntervalPhysiPlanTsPk, nodeToJson, pNode->pTspk); + } return code; } @@ -1037,6 +1041,9 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill); } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkIntervalPhysiPlanTsPk, (SNode**)&pNode->pTspk); + } return code; } diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 3af5a6d8cc..7eaa049946 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -99,6 +99,9 @@ static EDealRes walkNode(SNode* pNode, ETraversalOrder order, FNodeWalker walker if (DEAL_RES_ERROR != res) { res = walkNode(pInterval->pFill, order, walker, pContext); } + if (DEAL_RES_ERROR != res) { + res = walkNode(pInterval->pCol, order, walker, pContext); + } break; } case QUERY_NODE_NODE_LIST: @@ -225,6 +228,9 @@ static EDealRes rewriteNode(SNode** pRawNode, ETraversalOrder order, FNodeRewrit if (DEAL_RES_ERROR != res) { res = rewriteNode(&(pInterval->pFill), order, rewriter, pContext); } + if (DEAL_RES_ERROR != res) { + res = rewriteNode(&(pInterval->pCol), order, rewriter, pContext); + } break; } case QUERY_NODE_NODE_LIST: diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 171b406e18..742ab303d3 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -30,6 +30,8 @@ extern "C" { #define parserDebug(param, ...) qDebug("PARSER: " param, __VA_ARGS__) #define parserTrace(param, ...) qTrace("PARSER: " param, __VA_ARGS__) +#define PK_TS_COL_INTERNAL_NAME "_rowts" + typedef struct SMsgBuf { int32_t len; char *buf; diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 5b2f36878b..d9ee1309ad 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -702,6 +702,13 @@ SNode* createStateWindowNode(SAstCreateContext* pCxt, SNode* pCol) { SNode* createIntervalWindowNode(SAstCreateContext* pCxt, SNode* pInterval, SNode* pOffset, SNode* pSliding, SNode* pFill) { SIntervalWindowNode* interval = (SIntervalWindowNode*)nodesMakeNode(QUERY_NODE_INTERVAL_WINDOW); CHECK_OUT_OF_MEM(interval); + interval->pCol = nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == interval->pCol) { + nodesDestroyNode(interval); + CHECK_OUT_OF_MEM(interval->pCol); + } + ((SColumnNode*)interval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + strcpy(((SColumnNode*)interval->pCol)->colName, PK_TS_COL_INTERNAL_NAME); interval->pInterval = pInterval; interval->pOffset = pOffset; interval->pSliding = pSliding; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 19c6cb3f27..3e88ce0831 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -256,6 +256,10 @@ static bool findAndSetColumn(SColumnNode* pCol, const STableNode* pTable) { bool found = false; if (QUERY_NODE_REAL_TABLE == nodeType(pTable)) { const STableMeta* pMeta = ((SRealTableNode*)pTable)->pMeta; + if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && 0 == strcmp(pCol->colName, PK_TS_COL_INTERNAL_NAME)) { + setColumnInfoBySchema((SRealTableNode*)pTable, pMeta->schema, false, pCol); + return true; + } int32_t nums = pMeta->tableInfo.numOfTags + pMeta->tableInfo.numOfColumns; for (int32_t i = 0; i < nums; ++i) { if (0 == strcmp(pCol->colName, pMeta->schema[i].name)) { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 7e3b67f809..0eb4ec23fa 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -88,7 +88,7 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) { } static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) { - static int32_t rewriteId = 1; + static int32_t rewriteId = 1; // todo modify SNameExprCxt nameCxt = { .rewriteId = rewriteId }; nodesWalkList(pExprs, doNameExpr, &nameCxt); SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs }; @@ -461,6 +461,12 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval); pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit); + pWindow->pTspk = nodesCloneNode(pInterval->pCol); + if (NULL == pWindow->pTspk) { + nodesDestroyNode(pWindow); + return TSDB_CODE_OUT_OF_MEMORY; + } + if (NULL != pInterval->pFill) { pWindow->pFill = nodesCloneNode(pInterval->pFill); if (NULL == pWindow->pFill) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index dbaf64bdbf..fe8bafbed3 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -17,9 +17,14 @@ #include "functionMgt.h" +typedef struct SSlotIdInfo { + int16_t slotId; + bool set; +} SSlotIdInfo; + typedef struct SSlotIndex { int16_t dataBlockId; - int16_t slotId; + SArray* pSlotIdsInfo; // duplicate name slot } SSlotIndex; typedef struct SPhysiPlanContext { @@ -79,7 +84,19 @@ static int32_t createTarget(SNode* pNode, int16_t dataBlockId, int16_t slotId, S } static int32_t putSlotToHashImpl(int16_t dataBlockId, int16_t slotId, const char* pName, int32_t len, SHashObj* pHash) { - SSlotIndex index = { .dataBlockId = dataBlockId, .slotId = slotId }; + SSlotIndex* pIndex = taosHashGet(pHash, pName, len); + if (NULL != pIndex) { + SSlotIdInfo info = { .slotId = slotId, .set = false }; + taosArrayPush(pIndex->pSlotIdsInfo, &info); + return TSDB_CODE_SUCCESS; + } + + SSlotIndex index = { .dataBlockId = dataBlockId, .pSlotIdsInfo = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SSlotIdInfo)) }; + if (NULL == index.pSlotIdsInfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SSlotIdInfo info = { .slotId = slotId, .set = false }; + taosArrayPush(index.pSlotIdsInfo, &info); return taosHashPut(pHash, pName, len, &index, sizeof(SSlotIndex)); } @@ -90,7 +107,7 @@ static int32_t putSlotToHash(int16_t dataBlockId, int16_t slotId, SNode* pNode, } static int32_t createDataBlockDescHash(SPhysiPlanContext* pCxt, int32_t capacity, int16_t dataBlockId, SHashObj** pDescHash) { - SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + SHashObj* pHash = taosHashInit(capacity, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (NULL == pHash) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -149,6 +166,18 @@ static int32_t createDataBlockDesc(SPhysiPlanContext* pCxt, SNodeList* pList, SD return code; } +static int16_t getUnsetSlotId(const SArray* pSlotIdsInfo) { + int32_t size = taosArrayGetSize(pSlotIdsInfo); + for (int32_t i = 0; i < size; ++i) { + SSlotIdInfo* pInfo = taosArrayGet(pSlotIdsInfo, i); + if (!pInfo->set) { + pInfo->set = true; + return pInfo->slotId; + } + } + return ((SSlotIdInfo*)taosArrayGet(pSlotIdsInfo, 0))->slotId; +} + static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc, const char* pStmtName, bool output) { int32_t code = TSDB_CODE_SUCCESS; SHashObj* pHash = taosArrayGetP(pCxt->pLocationHelper, pDataBlockDesc->dataBlockId); @@ -167,7 +196,7 @@ static int32_t addDataBlockSlotsImpl(SPhysiPlanContext* pCxt, SNodeList* pList, slotId = nextSlotId; ++nextSlotId; } else { - slotId = pIndex->slotId; + slotId = getUnsetSlotId(pIndex->pSlotIdsInfo); } if (TSDB_CODE_SUCCESS == code) { @@ -217,7 +246,7 @@ static EDealRes doSetSlotId(SNode* pNode, void* pContext) { return DEAL_RES_ERROR; } ((SColumnNode*)pNode)->dataBlockId = pIndex->dataBlockId; - ((SColumnNode*)pNode)->slotId = pIndex->slotId; + ((SColumnNode*)pNode)->slotId = ((SSlotIdInfo*)taosArrayGet(pIndex->pSlotIdsInfo, 0))->slotId; return DEAL_RES_IGNORE_CHILD; } return DEAL_RES_CONTINUE; @@ -792,6 +821,13 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil return TSDB_CODE_OUT_OF_MEMORY; } + SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc); + int32_t code = setNodeSlotId(pCxt, pChildTupe->dataBlockId, -1, pWindowLogicNode->pTspk, &pInterval->pTspk); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(pInterval); + return code; + } + return createWindowPhysiNodeFinalize(pCxt, pChildren, &pInterval->window, pWindowLogicNode, pPhyNode); }