From 974dcef723b7231318ff34357d6e35de166d038e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Jul 2022 21:18:42 +0800 Subject: [PATCH] other: merge 3.0 --- source/libs/nodes/src/nodesCloneFuncs.c | 3 + source/libs/nodes/src/nodesCodeFuncs.c | 21 ++ source/libs/planner/inc/planInt.h | 1 + source/libs/planner/src/planLogicCreater.c | 62 +++++- source/libs/planner/src/planOptimizer.c | 78 ++++++-- source/libs/planner/src/planPhysiCreater.c | 13 ++ source/libs/planner/src/planSpliter.c | 3 + source/libs/planner/src/planUtil.c | 183 ++++++++++++++++++ source/libs/planner/test/planIntervalTest.cpp | 6 + source/libs/planner/test/planSubqueryTest.cpp | 20 +- source/libs/stream/src/streamData.c | 2 + source/libs/stream/src/streamDispatch.c | 2 + source/libs/stream/src/streamExec.c | 1 + source/libs/sync/src/syncMain.c | 65 +++---- source/libs/sync/src/syncRaftCfg.c | 10 +- source/libs/transport/inc/transComm.h | 12 +- source/libs/transport/src/transCli.c | 57 +++--- source/libs/transport/src/transComm.c | 43 ++-- source/libs/transport/src/transSvr.c | 6 +- 19 files changed, 484 insertions(+), 104 deletions(-) diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index e7109b5a87..121e697630 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -332,6 +332,9 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) { COPY_SCALAR_FIELD(precision); CLONE_NODE_FIELD(pLimit); CLONE_NODE_FIELD(pSlimit); + COPY_SCALAR_FIELD(requireDataOrder); + COPY_SCALAR_FIELD(resultDataOrder); + COPY_SCALAR_FIELD(groupAction); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index e3c50de654..c92e6908f1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -504,6 +504,9 @@ static const char* jkLogicPlanConditions = "Conditions"; static const char* jkLogicPlanChildren = "Children"; static const char* jkLogicPlanLimit = "Limit"; static const char* jkLogicPlanSlimit = "SLimit"; +static const char* jkLogicPlanRequireDataOrder = "RequireDataOrder"; +static const char* jkLogicPlanResultDataOrder = "ResultDataOrder"; +static const char* jkLogicPlanGroupAction = "GroupAction"; static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) { const SLogicNode* pNode = (const SLogicNode*)pObj; @@ -521,6 +524,15 @@ static int32_t logicPlanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkLogicPlanSlimit, nodeToJson, pNode->pSlimit); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkLogicPlanRequireDataOrder, pNode->requireDataOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkLogicPlanResultDataOrder, pNode->resultDataOrder); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkLogicPlanGroupAction, pNode->groupAction); + } return code; } @@ -541,6 +553,15 @@ static int32_t jsonToLogicPlanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkLogicPlanSlimit, &pNode->pSlimit); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkLogicPlanRequireDataOrder, pNode->requireDataOrder, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkLogicPlanResultDataOrder, pNode->resultDataOrder, code); + } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkLogicPlanGroupAction, pNode->groupAction, code); + } return code; } diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index b7fba83235..88c7c26276 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -35,6 +35,7 @@ int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...); int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList); int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList); int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew); +int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement); int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan); int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 9ced5c1cb6..366f970710 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -250,6 +250,9 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect SScanLogicNode* pScan = NULL; int32_t code = makeScanLogicNode(pCxt, pRealTable, pSelect->hasRepeatScanFuncs, (SLogicNode**)&pScan); + pScan->node.groupAction = GROUP_ACTION_NONE; + pScan->node.resultDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; + // set columns to scan if (TSDB_CODE_SUCCESS == code) { code = nodesCollectColumns(pSelect, SQL_CLAUSE_FROM, pRealTable->table.tableAlias, COLLECT_COL_TYPE_COL, @@ -336,6 +339,9 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->joinType = pJoinTable->joinType; pJoin->isSingleTableJoin = pJoinTable->table.singleTable; + pJoin->node.groupAction = GROUP_ACTION_CLEAR; + pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; + pJoin->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; int32_t code = TSDB_CODE_SUCCESS; @@ -472,6 +478,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, } pAgg->hasLastRow = pSelect->hasLastRowFunc; + pAgg->node.groupAction = GROUP_ACTION_SET; + pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; int32_t code = TSDB_CODE_SUCCESS; @@ -540,6 +549,10 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt pIdfRowsFunc->isTailFunc = pSelect->hasTailFunc; pIdfRowsFunc->isUniqueFunc = pSelect->hasUniqueFunc; pIdfRowsFunc->isTimeLineFunc = pSelect->hasTimeLineFunc; + pIdfRowsFunc->node.groupAction = GROUP_ACTION_KEEP; + pIdfRowsFunc->node.requireDataOrder = + pIdfRowsFunc->isTimeLineFunc ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_NONE; + pIdfRowsFunc->node.resultDataOrder = pIdfRowsFunc->node.requireDataOrder; // indefinite rows functions and _select_values functions int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pIdfRowsFunc->pFuncs); @@ -571,6 +584,10 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return TSDB_CODE_OUT_OF_MEMORY; } + pInterpFunc->node.groupAction = GROUP_ACTION_KEEP; + pInterpFunc->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + pInterpFunc->node.resultDataOrder = pInterpFunc->node.requireDataOrder; + int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsInterpFunc, &pInterpFunc->pFuncs); if (TSDB_CODE_SUCCESS == code) { code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); @@ -642,10 +659,12 @@ static int32_t createWindowLogicNodeByState(SLogicPlanContext* pCxt, SStateWindo } pWindow->winType = WINDOW_TYPE_STATE; + pWindow->node.groupAction = GROUP_ACTION_KEEP; + pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP; + pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pWindow->pStateExpr = nodesCloneNode(pState->pExpr); - pWindow->pTspk = nodesCloneNode(pState->pCol); - if (NULL == pWindow->pTspk) { + if (NULL == pWindow->pStateExpr || NULL == pWindow->pTspk) { nodesDestroyNode((SNode*)pWindow); return TSDB_CODE_OUT_OF_MEMORY; } @@ -663,6 +682,9 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW pWindow->winType = WINDOW_TYPE_SESSION; pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i; pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE; + pWindow->node.groupAction = GROUP_ACTION_KEEP; + pWindow->node.requireDataOrder = pCxt->pPlanCxt->streamQuery ? DATA_ORDER_LEVEL_IN_BLOCK : DATA_ORDER_LEVEL_IN_GROUP; + pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol); if (NULL == pWindow->pTspk) { @@ -689,6 +711,9 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit); pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH; + pWindow->node.groupAction = GROUP_ACTION_KEEP; + pWindow->node.requireDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; + pWindow->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; pWindow->pTspk = nodesCloneNode(pInterval->pCol); if (NULL == pWindow->pTspk) { @@ -734,6 +759,10 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return TSDB_CODE_OUT_OF_MEMORY; } + pFill->node.groupAction = GROUP_ACTION_KEEP; + pFill->node.requireDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + pFill->node.resultDataOrder = DATA_ORDER_LEVEL_IN_GROUP; + int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_WINDOW, NULL, COLLECT_COL_TYPE_ALL, &pFill->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pFill->node.pTargets) { code = nodesListMakeStrictAppend(&pFill->node.pTargets, @@ -768,6 +797,9 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect } pSort->groupSort = pSelect->groupSort; + pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; + pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pSort->node.resultDataOrder = pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL; int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pSort->node.pTargets) { @@ -818,6 +850,9 @@ static int32_t createProjectLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSel TSWAP(pProject->node.pLimit, pSelect->pLimit); TSWAP(pProject->node.pSlimit, pSelect->pSlimit); + pProject->node.groupAction = GROUP_ACTION_CLEAR; + pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; int32_t code = TSDB_CODE_SUCCESS; @@ -850,6 +885,10 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS return TSDB_CODE_OUT_OF_MEMORY; } + pPartition->node.groupAction = GROUP_ACTION_SET; + pPartition->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pPartition->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; + int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_PARTITION_BY, NULL, COLLECT_COL_TYPE_ALL, &pPartition->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pPartition->node.pTargets) { @@ -882,11 +921,23 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe return TSDB_CODE_OUT_OF_MEMORY; } + pAgg->node.groupAction = GROUP_ACTION_SET; + pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; + int32_t code = TSDB_CODE_SUCCESS; // set grouyp keys, agg funcs and having conditions - pAgg->pGroupKeys = nodesCloneList(pSelect->pProjectionList); - if (NULL == pAgg->pGroupKeys) { - code = TSDB_CODE_OUT_OF_MEMORY; + SNodeList* pGroupKeys = NULL; + SNode* pProjection = NULL; + FOREACH(pProjection, pSelect->pProjectionList) { + code = nodesListMakeStrictAppend(&pGroupKeys, createGroupingSetNode(pProjection)); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pGroupKeys); + break; + } + } + if (TSDB_CODE_SUCCESS == code) { + pAgg->pGroupKeys = pGroupKeys; } // rewrite the expression in subsequent clauses @@ -1361,6 +1412,7 @@ int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { if (TSDB_CODE_SUCCESS == code) { setLogicNodeParent(pSubplan->pNode); setLogicSubplanType(cxt.hasScan, pSubplan); + code = adjustLogicNodeDataRequirement(pSubplan->pNode, DATA_ORDER_LEVEL_NONE); } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 36b58afb76..b006ac2b0a 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1579,6 +1579,34 @@ static bool eliminateProjOptMayBeOptimized(SLogicNode* pNode) { return eliminateProjOptCheckProjColumnNames(pProjectNode); } +typedef struct CheckNewChildTargetsCxt { + SNodeList* pNewChildTargets; + bool canUse; +} CheckNewChildTargetsCxt; + +static EDealRes eliminateProjOptCanUseNewChildTargetsImpl(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + CheckNewChildTargetsCxt* pCxt = pContext; + SNode* pTarget = NULL; + FOREACH(pTarget, pCxt->pNewChildTargets) { + if (!nodesEqualNode(pTarget, pNode)) { + pCxt->canUse = false; + return DEAL_RES_END; + } + } + } + return DEAL_RES_CONTINUE; +} + +static bool eliminateProjOptCanUseNewChildTargets(SLogicNode* pChild, SNodeList* pNewChildTargets) { + if (NULL == pChild->pConditions) { + return true; + } + CheckNewChildTargetsCxt cxt = {.pNewChildTargets = pNewChildTargets, .canUse = true}; + nodesWalkExpr(pChild->pConditions, eliminateProjOptCanUseNewChildTargetsImpl, &cxt); + return cxt.canUse; +} + static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SProjectLogicNode* pProjectNode) { SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProjectNode->node.pChildren, 0); @@ -1594,8 +1622,13 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* } } } - nodesDestroyList(pChild->pTargets); - pChild->pTargets = pNewChildTargets; + if (eliminateProjOptCanUseNewChildTargets(pChild, pNewChildTargets)) { + nodesDestroyList(pChild->pTargets); + pChild->pTargets = pNewChildTargets; + } else { + nodesDestroyList(pNewChildTargets); + return TSDB_CODE_SUCCESS; + } int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild); if (TSDB_CODE_SUCCESS == code) { @@ -1873,6 +1906,8 @@ static int32_t rewriteUniqueOptCreateAgg(SIndefRowsFuncLogicNode* pIndef, SLogic TSWAP(pAgg->node.pChildren, pIndef->node.pChildren); optResetParent((SLogicNode*)pAgg); pAgg->node.precision = pIndef->node.precision; + pAgg->node.requireDataOrder = DATA_ORDER_LEVEL_IN_BLOCK; // first function requirement + pAgg->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; int32_t code = TSDB_CODE_SUCCESS; bool hasSelectPrimaryKey = false; @@ -1945,6 +1980,8 @@ static int32_t rewriteUniqueOptCreateProject(SIndefRowsFuncLogicNode* pIndef, SL TSWAP(pProject->node.pTargets, pIndef->node.pTargets); pProject->node.precision = pIndef->node.precision; + pProject->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; + pProject->node.resultDataOrder = DATA_ORDER_LEVEL_NONE; int32_t code = TSDB_CODE_SUCCESS; SNode* pNode = NULL; @@ -1973,12 +2010,17 @@ static int32_t rewriteUniqueOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeAppend(&pProject->pChildren, (SNode*)pAgg); - pAgg->pParent = pProject; - pAgg = NULL; } if (TSDB_CODE_SUCCESS == code) { + pAgg->pParent = pProject; + pAgg = NULL; code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pIndef, pProject); } + if (TSDB_CODE_SUCCESS == code) { + code = adjustLogicNodeDataRequirement( + pProject, NULL == pProject->pParent ? DATA_ORDER_LEVEL_NONE : pProject->pParent->requireDataOrder); + pProject = NULL; + } if (TSDB_CODE_SUCCESS == code) { nodesDestroyNode((SNode*)pIndef); } else { @@ -2145,11 +2187,20 @@ static bool tagScanMayBeOptimized(SLogicNode* pNode) { } SAggLogicNode* pAgg = (SAggLogicNode*)(pNode->pParent); - if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || - planOptNodeListHasCol(pAgg->pGroupKeys) || !planOptNodeListHasTbname(pAgg->pGroupKeys)) { + if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || planOptNodeListHasCol(pAgg->pGroupKeys) || + !planOptNodeListHasTbname(pAgg->pGroupKeys)) { return false; } - + + SNode* pGroupKey = NULL; + FOREACH(pGroupKey, pAgg->pGroupKeys) { + SNode* pGroup = NULL; + FOREACH(pGroup, ((SGroupingSetNode*)pGroupKey)->pParameterList) { + if (QUERY_NODE_COLUMN != nodeType(pGroup)) { + return false; + } + } + } return true; } @@ -2162,11 +2213,12 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp pScanNode->scanType = SCAN_TYPE_TAG; SNode* pTarget = NULL; FOREACH(pTarget, pScanNode->node.pTargets) { - if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)(pTarget))->colId) { - ERASE_NODE(pScanNode->node.pTargets); - break; - } + if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)(pTarget))->colId) { + ERASE_NODE(pScanNode->node.pTargets); + break; + } } + NODES_DESTORY_LIST(pScanNode->pScanCols); SLogicNode* pAgg = pScanNode->node.pParent; @@ -2176,8 +2228,8 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp SNode* pAggTarget = NULL; FOREACH(pAggTarget, pAgg->pTargets) { SNode* pScanTarget = NULL; - FOREACH(pScanTarget, pScanNode->node.pTargets) { - if (0 == strcmp( ((SColumnNode*)pAggTarget)->colName, ((SColumnNode*)pAggTarget)->colName )) { + FOREACH(pScanTarget, pScanNode->node.pTargets) { + if (0 == strcmp(((SColumnNode*)pAggTarget)->colName, ((SColumnNode*)pAggTarget)->colName)) { nodesListAppend(pScanTargets, nodesCloneNode(pScanTarget)); break; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index ee2457e400..0a1f8bbd0b 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -974,6 +974,17 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh return code; } +static bool projectCanMergeDataBlock(SProjectLogicNode* pProject) { + if (DATA_ORDER_LEVEL_NONE == pProject->node.resultDataOrder) { + return true; + } + if (1 != LIST_LENGTH(pProject->node.pChildren)) { + return false; + } + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0); + return DATA_ORDER_LEVEL_GLOBAL == pChild->resultDataOrder ? true : false; +} + static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SProjectLogicNode* pProjectLogicNode, SPhysiNode** pPhyNode) { SProjectPhysiNode* pProject = @@ -982,6 +993,8 @@ static int32_t createProjectPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChild return TSDB_CODE_OUT_OF_MEMORY; } + pProject->mergeDataBlock = projectCanMergeDataBlock(pProjectLogicNode); + int32_t code = TSDB_CODE_SUCCESS; if (0 == LIST_LENGTH(pChildren)) { pProject->pProjections = nodesCloneList(pProjectLogicNode->pProjections); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 4cbbf12385..a7ccb72792 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -657,6 +657,9 @@ static int32_t stbSplSplitWindowForPartTable(SSplitContext* pCxt, SStableSplitIn return TSDB_CODE_SUCCESS; } + if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_FILL == nodeType(pInfo->pSplitNode->pParent)) { + pInfo->pSplitNode = pInfo->pSplitNode->pParent; + } SExchangeLogicNode* pExchange = NULL; int32_t code = splCreateExchangeNode(pCxt, pInfo->pSplitNode, &pExchange); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 77e4e05530..d4bd7e5162 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "functionMgt.h" #include "planInt.h" static char* getUsageErrFormat(int32_t errCode) { @@ -121,3 +122,185 @@ int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* } return TSDB_CODE_PLAN_INTERNAL_ERROR; } + +static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel requirement) { + if (SCAN_TYPE_TABLE != pScan->scanType || SCAN_TYPE_TABLE_MERGE != pScan->scanType) { + return TSDB_CODE_SUCCESS; + } + // The lowest sort level of scan output data is DATA_ORDER_LEVEL_IN_BLOCK + if (requirement < DATA_ORDER_LEVEL_IN_BLOCK) { + requirement = DATA_ORDER_LEVEL_IN_BLOCK; + } + if (DATA_ORDER_LEVEL_IN_BLOCK == requirement) { + pScan->scanType = SCAN_TYPE_TABLE; + } else if (TSDB_SUPER_TABLE == pScan->tableType) { + pScan->scanType = SCAN_TYPE_TABLE_MERGE; + } + pScan->node.resultDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustJoinDataRequirement(SJoinLogicNode* pJoin, EDataOrderLevel requirement) { + // The lowest sort level of join input and output data is DATA_ORDER_LEVEL_GLOBAL + return TSDB_CODE_SUCCESS; +} + +static bool isKeepOrderAggFunc(SNodeList* pFuncs) { + SNode* pFunc = NULL; + FOREACH(pFunc, pFuncs) { + if (!fmIsKeepOrderFunc(((SFunctionNode*)pFunc)->funcId)) { + return false; + } + } + return true; +} + +static int32_t adjustAggDataRequirement(SAggLogicNode* pAgg, EDataOrderLevel requirement) { + // The sort level of agg with group by output data can only be DATA_ORDER_LEVEL_NONE + if (requirement > DATA_ORDER_LEVEL_NONE && (NULL != pAgg->pGroupKeys || !isKeepOrderAggFunc(pAgg->pAggFuncs))) { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + pAgg->node.resultDataOrder = requirement; + pAgg->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustProjectDataRequirement(SProjectLogicNode* pProject, EDataOrderLevel requirement) { + pProject->node.resultDataOrder = requirement; + pProject->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustIntervalDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + // The lowest sort level of interval output data is DATA_ORDER_LEVEL_IN_GROUP + if (requirement < DATA_ORDER_LEVEL_IN_GROUP) { + requirement = DATA_ORDER_LEVEL_IN_GROUP; + } + // The sort level of interval input data is always DATA_ORDER_LEVEL_IN_BLOCK + pWindow->node.resultDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustSessionDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= pWindow->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustStateDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + if (requirement <= pWindow->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pWindow->node.resultDataOrder = requirement; + pWindow->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustWindowDataRequirement(SWindowLogicNode* pWindow, EDataOrderLevel requirement) { + switch (pWindow->winType) { + case WINDOW_TYPE_INTERVAL: + return adjustIntervalDataRequirement(pWindow, requirement); + case WINDOW_TYPE_SESSION: + return adjustSessionDataRequirement(pWindow, requirement); + case WINDOW_TYPE_STATE: + return adjustStateDataRequirement(pWindow, requirement); + default: + break; + } + return TSDB_CODE_PLAN_INTERNAL_ERROR; +} + +static int32_t adjustFillDataRequirement(SFillLogicNode* pFill, EDataOrderLevel requirement) { + if (requirement <= pFill->node.requireDataOrder) { + return TSDB_CODE_SUCCESS; + } + pFill->node.resultDataOrder = requirement; + pFill->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustSortDataRequirement(SSortLogicNode* pSort, EDataOrderLevel requirement) { + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustPartitionDataRequirement(SPartitionLogicNode* pPart, EDataOrderLevel requirement) { + if (DATA_ORDER_LEVEL_GLOBAL == requirement) { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + pPart->node.resultDataOrder = requirement; + pPart->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustIndefRowsDataRequirement(SIndefRowsFuncLogicNode* pIndef, EDataOrderLevel requirement) { + if (requirement <= pIndef->node.resultDataOrder) { + return TSDB_CODE_SUCCESS; + } + pIndef->node.resultDataOrder = requirement; + pIndef->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustInterpDataRequirement(SInterpFuncLogicNode* pInterp, EDataOrderLevel requirement) { + if (requirement <= pInterp->node.requireDataOrder) { + return TSDB_CODE_SUCCESS; + } + pInterp->node.resultDataOrder = requirement; + pInterp->node.requireDataOrder = requirement; + return TSDB_CODE_SUCCESS; +} + +int32_t adjustLogicNodeDataRequirement(SLogicNode* pNode, EDataOrderLevel requirement) { + int32_t code = TSDB_CODE_SUCCESS; + switch (nodeType(pNode)) { + case QUERY_NODE_LOGIC_PLAN_SCAN: + code = adjustScanDataRequirement((SScanLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_JOIN: + code = adjustJoinDataRequirement((SJoinLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_AGG: + code = adjustAggDataRequirement((SAggLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_PROJECT: + code = adjustProjectDataRequirement((SProjectLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY: + case QUERY_NODE_LOGIC_PLAN_EXCHANGE: + case QUERY_NODE_LOGIC_PLAN_MERGE: + break; + case QUERY_NODE_LOGIC_PLAN_WINDOW: + code = adjustWindowDataRequirement((SWindowLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_FILL: + code = adjustFillDataRequirement((SFillLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_SORT: + code = adjustSortDataRequirement((SSortLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_PARTITION: + code = adjustPartitionDataRequirement((SPartitionLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_INDEF_ROWS_FUNC: + code = adjustIndefRowsDataRequirement((SIndefRowsFuncLogicNode*)pNode, requirement); + break; + case QUERY_NODE_LOGIC_PLAN_INTERP_FUNC: + code = adjustInterpDataRequirement((SInterpFuncLogicNode*)pNode, requirement); + break; + default: + break; + } + if (TSDB_CODE_SUCCESS == code) { + SNode* pChild = NULL; + FOREACH(pChild, pNode->pChildren) { + code = adjustLogicNodeDataRequirement((SLogicNode*)pChild, pNode->requireDataOrder); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + } + return code; +} diff --git a/source/libs/planner/test/planIntervalTest.cpp b/source/libs/planner/test/planIntervalTest.cpp index 9f34ead6ff..30f7051722 100644 --- a/source/libs/planner/test/planIntervalTest.cpp +++ b/source/libs/planner/test/planIntervalTest.cpp @@ -38,9 +38,15 @@ TEST_F(PlanIntervalTest, fill) { run("SELECT COUNT(*) FROM t1 WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' " "INTERVAL(10s) FILL(LINEAR)"); + run("SELECT COUNT(*) FROM st1 WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' " + "INTERVAL(10s) FILL(LINEAR)"); + run("SELECT COUNT(*), SUM(c1) FROM t1 " "WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' " "INTERVAL(10s) FILL(VALUE, 10, 20)"); + + run("SELECT COUNT(*) FROM st1 WHERE ts > TIMESTAMP '2022-04-01 00:00:00' and ts < TIMESTAMP '2022-04-30 23:59:59' " + "PARTITION BY TBNAME interval(10s) fill(prev)"); } TEST_F(PlanIntervalTest, selectFunc) { diff --git a/source/libs/planner/test/planSubqueryTest.cpp b/source/libs/planner/test/planSubqueryTest.cpp index 2ba3794d5c..16dd91846c 100644 --- a/source/libs/planner/test/planSubqueryTest.cpp +++ b/source/libs/planner/test/planSubqueryTest.cpp @@ -48,10 +48,28 @@ TEST_F(PlanSubqeuryTest, doubleGroupBy) { "WHERE a > 100 GROUP BY b"); } -TEST_F(PlanSubqeuryTest, withSetOperator) { +TEST_F(PlanSubqeuryTest, innerSetOperator) { useDb("root", "test"); run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)"); run("SELECT c1 FROM (SELECT c1 FROM t1 UNION SELECT c1 FROM t1)"); } + +TEST_F(PlanSubqeuryTest, innerFill) { + useDb("root", "test"); + + run("SELECT cnt FROM (SELECT _WSTART ts, COUNT(*) cnt FROM t1 " + "WHERE ts > '2022-04-01 00:00:00' and ts < '2022-04-30 23:59:59' INTERVAL(10s) FILL(LINEAR)) " + "WHERE ts > '2022-04-06 00:00:00'"); +} + +TEST_F(PlanSubqeuryTest, outerInterval) { + useDb("root", "test"); + + run("SELECT COUNT(*) FROM (SELECT * FROM st1) INTERVAL(5s)"); + + run("SELECT COUNT(*) + SUM(c1) FROM (SELECT * FROM st1) INTERVAL(5s)"); + + run("SELECT COUNT(*) FROM (SELECT ts, TOP(c1, 10) FROM st1s1) INTERVAL(5s)"); +} diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index d476980393..eb14990c0e 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -34,6 +34,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); + pDataBlock->info.version = be64toh(pRetrieve->version); pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.childId = pReq->upstreamChildId; @@ -54,6 +55,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); + pDataBlock->info.version = be64toh(pRetrieve->version); pDataBlock->info.type = pRetrieve->streamBlockType; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5dec33d0fb..5d4adb2896 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -108,6 +108,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey); + pRetrieve->version = htobe64(pBlock->info.version); int32_t actualLen = 0; blockEncode(pBlock, pRetrieve->data, &actualLen, numOfCols, false); @@ -182,6 +183,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis pRetrieve->numOfRows = htonl(pBlock->info.rows); pRetrieve->skey = htobe64(pBlock->info.window.skey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey); + pRetrieve->version = htobe64(pBlock->info.version); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); pRetrieve->numOfCols = htonl(numOfCols); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a8192b49f3..52b610228e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -159,6 +159,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (data == NULL) { data = qItem; streamQueueProcessSuccess(pTask->inputQueue); + if (pTask->execType == TASK_EXEC__NONE) break; /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ /*}*/ diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e0133641b3..a453b2572c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1550,12 +1550,12 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 - ", snapshot:%" PRId64 ", snapshot-term:%" PRIu64 - ", standby:%d, " - "strategy:%d, batch:%d, " - "replica-num:%d, " - "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", + "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 + ", snap-tm:%" PRIu64 + ", sby:%d, " + "stgy:%d, bch:%d, " + "r-num:%d, " + "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, @@ -1573,12 +1573,12 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 - ", snapshot:%" PRId64 ", snapshot-term:%" PRIu64 - ", standby:%d, " - "strategy:%d, batch:%d, " - "replica-num:%d, " - "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", + "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 + ", snap-tm:%" PRIu64 + ", sby:%d, " + "stgy:%d, bch:%d, " + "r-num:%d, " + "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, @@ -1621,12 +1621,12 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char logBuf[256 + 256]; if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(logBuf, sizeof(logBuf), - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 - ", snapshot:%" PRId64 ", snapshot-term:%" PRIu64 - ", standby:%d, " - "strategy:%d, batch:%d, " - "replica-num:%d, " - "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", + "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 + ", snap-tm:%" PRIu64 + ", sby:%d, " + "stgy:%d, bch:%d, " + "r-num:%d, " + "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, @@ -1642,12 +1642,12 @@ void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { char* s = (char*)taosMemoryMalloc(len); if (pSyncNode != NULL && pSyncNode->pRaftCfg != NULL && pSyncNode->pRaftStore != NULL) { snprintf(s, len, - "vgId:%d, sync %s %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 - ", snapshot:%" PRId64 ", snapshot-term:%" PRIu64 - ", standby:%d, " - "strategy:%d, batch:%d, " - "replica-num:%d, " - "lconfig:%" PRId64 ", changing:%d, restore:%d, %s", + "vgId:%d, sync %s %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 + ", snap-tm:%" PRIu64 + ", sby:%d, " + "stgy:%d, bch:%d, " + "r-num:%d, " + "lcfg:%" PRId64 ", chging:%d, rsto:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, pSyncNode->pRaftCfg->batchSize, @@ -1675,11 +1675,10 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); snprintf(s, len, - "vgId:%d, sync %s, term:%" PRIu64 ", commit:%" PRId64 ", first:%" PRId64 ", last:%" PRId64 - ", snapshot:%" PRId64 - ", standby:%d, " - "replica-num:%d, " - "lconfig:%" PRId64 ", changing:%d, restore:%d", + "vgId:%d, sync %s, tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", snap:%" PRId64 + ", sby:%d, " + "r-num:%d, " + "lcfg:%" PRId64 ", chging:%d, rsto:%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); @@ -2977,7 +2976,7 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", pterm:%" PRIu64 ", commit:%" PRId64 + ", pterm:%" PRIu64 ", cmt:%" PRId64 ", " "datalen:%d}, %s", host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, @@ -2992,7 +2991,7 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries from %s:%d {term:%" PRIu64 ", pre-index:%" PRIu64 ", pre-term:%" PRIu64 - ", commit:%" PRIu64 ", pterm:%" PRIu64 + ", cmt:%" PRIu64 ", pterm:%" PRIu64 ", " "datalen:%d}, %s", host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm, @@ -3007,7 +3006,7 @@ void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s", + ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s); syncNodeEventLog(pSyncNode, logBuf); @@ -3020,7 +3019,7 @@ void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntries char logBuf[256]; snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64 - ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s", + ", pterm:%" PRIu64 ", cmt:%" PRId64 ", datalen:%d, count:%d}, %s", host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount, s); syncNodeEventLog(pSyncNode, logBuf); diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 0bbeaaf5b0..c634a1bf49 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -109,10 +109,10 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { if (pSyncCfg != NULL) { int32_t len = 512; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); memset(s, 0, len); - snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); + snprintf(s, len, "{r-num:%d, my:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); char *p = s + strlen(s); for (int i = 0; i < pSyncCfg->replicaNum; ++i) { /* @@ -206,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -285,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 9dd1a745d3..3fa6344009 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -229,8 +229,8 @@ typedef struct { int8_t stop; } SAsyncPool; -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); -void transDestroyAsyncPool(SAsyncPool* pool); +SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); +void transAsyncPoolDestroy(SAsyncPool* pool); int transAsyncSend(SAsyncPool* pool, queue* mq); bool transAsyncPoolIsEmpty(SAsyncPool* pool); @@ -322,7 +322,7 @@ typedef struct STransReq { } STransReq; void transReqQueueInit(queue* q); -void* transReqQueuePushReq(queue* q); +void* transReqQueuePush(queue* q); void* transReqQueueRemove(void* arg); void transReqQueueClear(queue* q); @@ -393,9 +393,9 @@ typedef struct SDelayQueue { uv_loop_t* loop; } SDelayQueue; -int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); -void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); -int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); +int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); +void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); +SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); /* diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f94a7f3c37..9de8c273d9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -26,7 +26,7 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue cliMsgs; - queue conn; + queue q; uint64_t expireTime; STransCtx ctx; @@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) { while (p != NULL) { while (!QUEUE_IS_EMPTY(&p->conn)) { queue* h = QUEUE_HEAD(&p->conn); - SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + SCliConn* c = QUEUE_DATA(h, SCliConn, q); if (c->expireTime < currentTime) { QUEUE_REMOVE(h); transUnrefCliHandle(c); @@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) { while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { queue* h = QUEUE_HEAD(&connList->conn); - SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } connList = taosHashIterate((SHashObj*)pool, connList); @@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { return NULL; } queue* h = QUEUE_HEAD(&plist->conn); - SCliConn* conn = QUEUE_DATA(h, SCliConn, conn); + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; - QUEUE_REMOVE(&conn->conn); - QUEUE_INIT(&conn->conn); - assert(h == &conn->conn); + QUEUE_REMOVE(&conn->q); + QUEUE_INIT(&conn->q); + assert(h == &conn->q); return conn; } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); // list already create before assert(plist != NULL); - QUEUE_INIT(&conn->conn); - QUEUE_PUSH(&plist->conn, &conn->conn); + QUEUE_INIT(&conn->q); + QUEUE_PUSH(&plist->conn, &conn->q); assert(!QUEUE_IS_EMPTY(&plist->conn)); } static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { transReqQueueInit(&conn->wreqQueue); transQueueInit(&conn->cliMsgs, NULL); - QUEUE_INIT(&conn->conn); + QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; conn->broken = 0; @@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { } static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - QUEUE_REMOVE(&conn->conn); - QUEUE_INIT(&conn->conn); + QUEUE_REMOVE(&conn->q); + QUEUE_INIT(&conn->q); transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; @@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) { CONN_SET_PERSIST_BY_APP(pConn); } - uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); + uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); return; _RETURN: @@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb); uv_timer_init(pThrd->loop, &pThrd->timer); pThrd->timer.data = pThrd; @@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); - transDestroyAsyncPool(pThrd->asyncPool); + transAsyncPoolDestroy(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue, destroyCmsg); taosMemoryFree(pThrd->loop); @@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) { cliHandleReq(pMsg, pThrd); } +static void doCloseIdleConn(void* param) { + STaskArg* arg = param; + SCliConn* conn = arg->param1; + SCliThrd* pThrd = arg->param2; +} + static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STransConnCtx* pCtx = pMsg->ctx; @@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { } } -bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { +bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) { return false; } @@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { */ STransConnCtx* pCtx = pMsg->ctx; int32_t code = pResp->code; - bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false; + + bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false; if (retry) { pMsg->sent = 0; pCtx->retryCnt += 1; @@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (pCtx->retryCnt < pCtx->retryLimit) { transUnrefCliHandle(pConn); EPSET_FORWARD_INUSE(&pCtx->epSet); + transFreeMsg(pResp->pCont); cliSchedMsgToNextNode(pMsg, pThrd); return -1; } @@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STraceId* trace = &pResp->info.traceId; - bool hasEpSet = cliTryToExtractEpSet(pResp, &pCtx->epSet); + bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); if (hasEpSet) { char tbuf[256] = {0}; EPSET_DEBUG_STR(&pCtx->epSet, tbuf); @@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) { - tsem_destroy(sem); - taosMemoryFree(sem); + int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (ret != 0) { destroyCmsg(cliMsg); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return -1; + goto _RETURN; } tsem_wait(sem); + +_RETURN: tsem_destroy(sem); taosMemoryFree(sem); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return 0; + return ret; } /* * diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index c3cba3118c..34849df2b2 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) { return ret; } -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { +SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); @@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) return pool; } -void transDestroyAsyncPool(SAsyncPool* pool) { +void transAsyncPoolDestroy(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); // uv_close((uv_handle_t*)async, NULL); @@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) { taosMemoryFree(pool->asyncs); taosMemoryFree(pool); } +bool transAsyncPoolIsEmpty(SAsyncPool* pool) { + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + if (!QUEUE_IS_EMPTY(&item->qmsg)) return false; + } + return true; +} int transAsyncSend(SAsyncPool* pool, queue* q) { if (atomic_load_8(&pool->stop) == 1) { return -1; @@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { } return uv_async_send(async); } -bool transAsyncPoolIsEmpty(SAsyncPool* pool) { - for (int i = 0; i < pool->nAsync; i++) { - uv_async_t* async = &(pool->asyncs[i]); - SAsyncItem* item = async->data; - if (!QUEUE_IS_EMPTY(&item->qmsg)) return false; - } - return true; -} void transCtxInit(STransCtx* ctx) { // init transCtx @@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) { // init req queue QUEUE_INIT(q); } -void* transReqQueuePushReq(queue* q) { +void* transReqQueuePush(queue* q) { uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq)); wreq->data = req; @@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { heapDestroy(queue->heap); taosMemoryFree(queue); } +void transDQCancel(SDelayQueue* queue, SDelayTask* task) { + uv_timer_stop(queue->timer); -int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { + if (heapSize(queue->heap) <= 0) return; + heapRemove(queue->heap, &task->node); + + if (heapSize(queue->heap) != 0) { + HeapNode* minNode = heapMin(queue->heap); + if (minNode != NULL) return; + + uint64_t now = taosGetTimestampMs(); + SDelayTask* task = container_of(minNode, SDelayTask, node); + uint64_t timeout = now > task->execTime ? now - task->execTime : 0; + + uv_timer_start(queue->timer, transDQTimeout, timeout, 0); + } +} + +SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { uint64_t now = taosGetTimestampMs(); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); task->func = func; @@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_ tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs); heapInsert(queue->heap, &task->node); uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0); - return 0; + return task; } void transPrintEpSet(SEpSet* pEpSet) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 7b9402f954..3fb947bdba 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) { uvPrepareSendData(smsg, &wb); transRefSrvHandle(pConn); - uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); + uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } static void uvStartSendResp(SSvrMsg* smsg) { @@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { // conn set QUEUE_INIT(&pThrd->conn); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; @@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { taosThreadJoin(pThrd->thread, NULL); SRV_RELEASE_UV(pThrd->loop); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); - transDestroyAsyncPool(pThrd->asyncPool); + transAsyncPoolDestroy(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); }