From 955679276fa8c4f4b88ace0c682ab34efeb69d33 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 27 May 2022 22:05:53 +0800 Subject: [PATCH 01/14] feat:optimize for group by tag --- include/libs/nodes/plannodes.h | 1 + source/libs/executor/inc/executorInt.h | 4 ++ source/libs/executor/inc/executorimpl.h | 8 +++- source/libs/executor/src/executorimpl.c | 6 +-- source/libs/executor/src/groupoperator.c | 13 ++++-- source/libs/executor/src/scanoperator.c | 51 ++++++++++++++++++++---- 6 files changed, 68 insertions(+), 15 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 3ae2d18e5d..b2244c572c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -211,6 +211,7 @@ typedef struct STableScanPhysiNode { double ratio; int32_t dataRequired; SNodeList* pDynamicScanFuncs; + SNodeList* pPartitionKeys; int64_t interval; int64_t offset; int64_t sliding; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index f2f0bc2055..85572a9e17 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -27,6 +27,10 @@ typedef struct { int32_t bytes; } SGroupKeys, SStateKeys; +int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList); +uint64_t calcGroupId(char* pData, int32_t len); +void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols); +int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index dc613ddd86..aaa2cccf47 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -334,6 +334,12 @@ typedef struct STableScanInfo { int32_t dataBlockLoadFlag; double sampleRatio; // data block sample ratio, 1 by default SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded. + + SArray* pGroupCols; + SArray* pGroupColVals; // current group column values, SArray + char* keyBuf; // group by keys for hash + int32_t groupKeyLen; // total group by column width + SHashObj* pGroupSet; // quick locate the window object for each result } STableScanInfo; typedef struct STagScanInfo { @@ -704,7 +710,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 684c657d17..96b3b9f9e3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4477,9 +4477,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (pDataReader == NULL && terrno != 0) { return NULL; } - + SArray* groupKyes = extractPartitionColInfo(pTableScanNode->pPartitionKeys); extractTableSchemaVersion(pHandle, pTableScanNode->scan.uid, pTaskInfo); - SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, groupKyes, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; @@ -4510,7 +4510,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc; - SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo); + SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, NULL, pTaskInfo); SArray* tableIdList = extractTableIdList(pTableListInfo); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index ef770e8afc..18f24d7640 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -24,10 +24,10 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" +#include "executorInt.h" static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len); -static uint64_t calcGroupId(char* pData, int32_t len); static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; @@ -37,7 +37,7 @@ static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pGroupColVals); } -static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { +int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** keyBuf, const SArray* pGroupColList) { *pGroupColVals = taosArrayInit(4, sizeof(SGroupKeys)); if ((*pGroupColVals) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -110,7 +110,7 @@ static bool groupKeyCompare(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlo return true; } -static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { +void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSDataBlock* pBlock, int32_t rowIndex, int32_t numOfGroupCols) { SColumnDataAgg* pColAgg = NULL; for (int32_t i = 0; i < numOfGroupCols; ++i) { @@ -137,7 +137,7 @@ static void recordNewGroupKeys(SArray* pGroupCols, SArray* pGroupColVals, SSData } } -static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { +int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { ASSERT(pKey != NULL); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); @@ -607,8 +607,13 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); taosArrayDestroy(pInfo->pGroupCols); + for(int i = 0; i < taosArrayGetSize(pInfo->pGroupColVals); i++){ + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pInfo->pGroupColVals, i); + taosMemoryFree(key.pData); + } taosArrayDestroy(pInfo->pGroupColVals); taosMemoryFree(pInfo->keyBuf); + taosHashCleanup(pInfo->pGroupSet); taosMemoryFree(pInfo->columnOffset); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 32187c81a7..3131c27e28 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -33,6 +33,8 @@ #include "ttypes.h" #include "vnode.h" +#include "executorInt.h" + #define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) @@ -369,6 +371,19 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { longjmp(pOperator->pTaskInfo->env, code); } + int32_t numOfGroupCols = taosArrayGetSize(pTableScanInfo->pGroupCols); + recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, 0, numOfGroupCols); + int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); + + uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); + if (!groupId) { + pBlock->info.groupId = *groupId; + + }else{ + pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len); + taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t)); + } + // current block is filter out according to filter condition, continue load the next block if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { continue; @@ -479,21 +494,25 @@ static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFree(pTableScanInfo->pResBlock); tsdbCleanupReadHandle(pTableScanInfo->dataReader); + taosArrayDestroy(pTableScanInfo->pGroupCols); + for(int i = 0; i < taosArrayGetSize(pTableScanInfo->pGroupColVals); i++){ + SGroupKeys key = *(SGroupKeys*)taosArrayGet(pTableScanInfo->pGroupColVals, i); + taosMemoryFree(key.pData); + } + taosArrayDestroy(pTableScanInfo->pGroupColVals); + taosMemoryFree(pTableScanInfo->keyBuf); + taosHashCleanup(pTableScanInfo->pGroupSet); if (pTableScanInfo->pColMatchInfo != NULL) { taosArrayDestroy(pTableScanInfo->pColMatchInfo); } } SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, - SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { + SReadHandle* readHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - - pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; @@ -504,7 +523,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { - return NULL; + goto _error; } if (pTableScanNode->scan.pScanPseudoCols != NULL) { @@ -533,6 +552,18 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; + // for table group + pInfo->pGroupCols = groupKyes; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + if (pInfo->pGroupSet == NULL) { + goto _error; + } + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, groupKyes); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo); @@ -540,6 +571,12 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->cost.openCost = 0; return pOperator; + +_error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); + return NULL; } SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pTaskInfo) { From 3fe2f16213339ddb24b29c7e71f9e523da06e574 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Thu, 2 Jun 2022 17:58:01 +0800 Subject: [PATCH 02/14] feat: group by distributed split --- source/libs/planner/src/planSpliter.c | 67 ++++++++++++++++++++++++++- 1 file changed, 65 insertions(+), 2 deletions(-) diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e3c8b82e39..3aad68ae7d 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -151,8 +151,8 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { switch (nodeType(pNode)) { - // case QUERY_NODE_LOGIC_PLAN_AGG: - // return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode); + case QUERY_NODE_LOGIC_PLAN_AGG: + return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: { SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; if (WINDOW_TYPE_INTERVAL != pWindow->winType) { @@ -365,6 +365,66 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf } } +static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) { + SNodeList* pFunc = pMergeAgg->pAggFuncs; + pMergeAgg->pAggFuncs = NULL; + SNodeList* pGroupKeys = pMergeAgg->pGroupKeys; + pMergeAgg->pGroupKeys = NULL; + SNodeList* pTargets = pMergeAgg->node.pTargets; + pMergeAgg->node.pTargets = NULL; + SNodeList* pChildren = pMergeAgg->node.pChildren; + pMergeAgg->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SAggLogicNode* pPartAgg = nodesCloneNode(pMergeAgg); + if (NULL == pPartAgg) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pPartAgg->pGroupKeys = pGroupKeys; + code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { + pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets); + if (NULL == pMergeAgg->pGroupKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + pMergeAgg->node.pTargets = pTargets; + pPartAgg->node.pChildren = pChildren; + + code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs); + } + if (TSDB_CODE_SUCCESS == code) { + code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets); + } + + nodesDestroyList(pFunc); + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartAgg; + } else { + nodesDestroyNode(pPartAgg); + } + + return code; +} + +static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartAgg = NULL; + int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); if (TSDB_CODE_SUCCESS == code) { @@ -386,6 +446,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(info.pSplitNode)) { + case QUERY_NODE_LOGIC_PLAN_AGG: + code = stbSplSplitAggNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_WINDOW: code = stbSplSplitWindowNode(pCxt, &info); break; From ed87c39ff1d4df083a91aa5845ebef315aed13ca Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 3 Jun 2022 16:45:33 +0800 Subject: [PATCH 03/14] fix: a problem of parser async --- source/libs/parser/src/parAstParser.c | 18 ++++++++++ source/libs/parser/test/parInitialDTest.cpp | 38 +++++++++++++-------- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 18c1341da8..7abe244261 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -199,6 +199,22 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre return code; } +static int32_t collectMetaKeyFromDropTable(SCollectMetaKeyCxt* pCxt, SDropTableStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + SNode* pNode = NULL; + FOREACH(pNode, pStmt->pTables) { + SDropTableClause* pClause = (SDropTableClause*)pNode; + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache); + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + return code; +} + static int32_t collectMetaKeyFromAlterTable(SCollectMetaKeyCxt* pCxt, SAlterTableStmt* pStmt) { int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); if (TSDB_CODE_SUCCESS == code) { @@ -341,6 +357,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromCreateTable(pCxt, (SCreateTableStmt*)pStmt); case QUERY_NODE_CREATE_MULTI_TABLE_STMT: return collectMetaKeyFromCreateMultiTable(pCxt, (SCreateMultiTableStmt*)pStmt); + case QUERY_NODE_DROP_TABLE_STMT: + return collectMetaKeyFromDropTable(pCxt, (SDropTableStmt*)pStmt); case QUERY_NODE_ALTER_TABLE_STMT: return collectMetaKeyFromAlterTable(pCxt, (SAlterTableStmt*)pStmt); case QUERY_NODE_USE_DATABASE_STMT: diff --git a/source/libs/parser/test/parInitialDTest.cpp b/source/libs/parser/test/parInitialDTest.cpp index 5ad427d964..1b1b930851 100644 --- a/source/libs/parser/test/parInitialDTest.cpp +++ b/source/libs/parser/test/parInitialDTest.cpp @@ -24,7 +24,7 @@ class ParserInitialDTest : public ParserDdlTest {}; // todo delete // todo desc // todo describe -// todo drop account +// todo DROP account TEST_F(ParserInitialDTest, dropBnode) { useDb("root", "test"); @@ -62,51 +62,61 @@ TEST_F(ParserInitialDTest, dropCGroup) { run("DROP CONSUMER GROUP IF EXISTS cg1 ON tp1"); } -// todo drop database -// todo drop dnode -// todo drop function +// todo DROP database +// todo DROP dnode +// todo DROP function TEST_F(ParserInitialDTest, dropIndex) { useDb("root", "test"); - run("drop index index1 on t1"); + run("DROP index index1 on t1"); } TEST_F(ParserInitialDTest, dropMnode) { useDb("root", "test"); - run("drop mnode on dnode 1"); + run("DROP mnode on dnode 1"); } TEST_F(ParserInitialDTest, dropQnode) { useDb("root", "test"); - run("drop qnode on dnode 1"); + run("DROP qnode on dnode 1"); } TEST_F(ParserInitialDTest, dropSnode) { useDb("root", "test"); - run("drop snode on dnode 1"); + run("DROP snode on dnode 1"); } -// todo drop stable -// todo drop stream -// todo drop table +TEST_F(ParserInitialDTest, dropSTable) { + useDb("root", "test"); + + run("DROP STABLE st1"); +} + +// todo DROP stream + +TEST_F(ParserInitialDTest, dropTable) { + useDb("root", "test"); + + run("DROP TABLE t1"); +} TEST_F(ParserInitialDTest, dropTopic) { useDb("root", "test"); - run("drop topic tp1"); + run("DROP topic tp1"); - run("drop topic if exists tp1"); + run("DROP topic if exists tp1"); } TEST_F(ParserInitialDTest, dropUser) { login("root"); useDb("root", "test"); - run("drop user wxy"); + run("DROP user wxy"); } } // namespace ParserTest From bc6326788a311110c3eedf610ef3c4cf310d670f Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 3 Jun 2022 16:57:52 +0800 Subject: [PATCH 04/14] feat: group by distributed split --- source/libs/planner/test/planGroupByTest.cpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/source/libs/planner/test/planGroupByTest.cpp b/source/libs/planner/test/planGroupByTest.cpp index cf51603470..201df2efde 100644 --- a/source/libs/planner/test/planGroupByTest.cpp +++ b/source/libs/planner/test/planGroupByTest.cpp @@ -67,3 +67,11 @@ TEST_F(PlanGroupByTest, selectFunc) { run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3"); run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3"); } + +TEST_F(PlanGroupByTest, stable) { + useDb("root", "test"); + + run("SELECT COUNT(*) FROM st1"); + + run("SELECT COUNT(*) FROM st1 GROUP BY c1"); +} From 0c94c65acb6e81c367f0d9364f803dbb274444e8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 3 Jun 2022 23:13:21 +0800 Subject: [PATCH 05/14] fix(query): add ref for job --- source/client/src/clientImpl.c | 2 +- source/libs/catalog/src/ctgAsync.c | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 774ef5f248..4c68edaff0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1087,7 +1087,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc tsem_wait(&pParam->sem); } - if (pRequest->code == TSDB_CODE_SUCCESS && setupOneRowPtr) { + if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) { doSetOneRowPtr(pResultInfo); pResultInfo->current += 1; } diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 312f0c9250..a9e808e749 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -328,6 +328,18 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++)); } + pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob); + if (pJob->refId < 0) { + ctgError("add job to ref failed, error: %s", tstrerror(terrno)); + CTG_ERR_JRET(terrno); + } + + taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); + + qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum); + return TSDB_CODE_SUCCESS; + + _return: taosMemoryFreeClear(*job); CTG_RET(code); From 7ede8db3c4addd25f001fb5df4d599e337e7b479 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 4 Jun 2022 09:33:13 +0800 Subject: [PATCH 06/14] fix: a problem of parser async --- source/libs/function/src/functionMgt.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index 611ae8d81f..67efdc5c0b 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -226,6 +226,9 @@ static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) { } bool fmIsDistExecFunc(int32_t funcId) { + if (fmIsUserDefinedFunc(funcId)) { + return false; + } if (!fmIsVectorFunc(funcId)) { return true; } From 6e6c9bda1750b37b26375935fbeabe2272686815 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 4 Jun 2022 10:16:52 +0800 Subject: [PATCH 07/14] feat: order by distributed split --- source/libs/planner/src/planSpliter.c | 91 +++++++++++++++++--- source/libs/planner/test/planOrderByTest.cpp | 19 ++-- 2 files changed, 91 insertions(+), 19 deletions(-) diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 3aad68ae7d..a3d5086908 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -161,7 +161,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); } // case QUERY_NODE_LOGIC_PLAN_SORT: - // return stbSplHasMultiTbScan(pNode); + // return stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_SCAN: return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode); default: @@ -295,7 +295,8 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic return code; } -static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SLogicNode* pPartChild) { +static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, SNodeList* pMergeKeys, + SLogicNode* pPartChild) { SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); if (NULL == pMerge) { return TSDB_CODE_OUT_OF_MEMORY; @@ -304,25 +305,28 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicNode* pParent, S pMerge->srcGroupId = pCxt->groupId; pMerge->node.pParent = pParent; pMerge->node.precision = pPartChild->precision; - int32_t code = nodesListMakeStrictAppend(&pMerge->pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pParent)->pTspk)); - if (TSDB_CODE_SUCCESS == code) { - pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); - if (NULL == pMerge->node.pTargets) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesListMakeAppend(&pParent->pChildren, pMerge); + pMerge->pMergeKeys = pMergeKeys; + pMerge->node.pTargets = nodesCloneList(pPartChild->pTargets); + if (NULL == pMerge->node.pTargets) { + nodesDestroyNode(pMerge); + return TSDB_CODE_OUT_OF_MEMORY; } - return code; + return nodesListMakeAppend(&pParent->pChildren, pMerge); } static int32_t stbSplSplitWindowNodeForBatch(SSplitContext* pCxt, SStableSplitInfo* pInfo) { SLogicNode* pPartWindow = NULL; int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); if (TSDB_CODE_SUCCESS == code) { - code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pPartWindow); + SNodeList* pMergeKeys = NULL; + code = nodesListMakeStrictAppend(&pMergeKeys, nodesCloneNode(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk)); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartWindow); + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, @@ -425,6 +429,64 @@ static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } +static int32_t stbSplCreatePartSortNode(SSortLogicNode* pMergeSort, SLogicNode** pOutput) { + SNodeList* pSortKeys = pMergeSort->pSortKeys; + pMergeSort->pSortKeys = NULL; + SNodeList* pTargets = pMergeSort->node.pTargets; + pMergeSort->node.pTargets = NULL; + SNodeList* pChildren = pMergeSort->node.pChildren; + pMergeSort->node.pChildren = NULL; + + int32_t code = TSDB_CODE_SUCCESS; + SSortLogicNode* pPartSort = nodesCloneNode(pMergeSort); + if (NULL == pPartSort) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + + pMergeSort->node.pTargets = pTargets; + pPartSort->node.pChildren = pChildren; + if (TSDB_CODE_SUCCESS == code) { + pPartSort->pSortKeys = pSortKeys; + code = createColumnByRewriteExps(pPartSort->pSortKeys, &pPartSort->node.pTargets); + } + if (TSDB_CODE_SUCCESS == code) { + pMergeSort->pSortKeys = nodesCloneList(pPartSort->node.pTargets); + if (NULL == pMergeSort->pSortKeys) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutput = (SLogicNode*)pPartSort; + } else { + nodesDestroyNode(pPartSort); + } + + return code; +} + +static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartSort = NULL; + int32_t code = stbSplCreatePartSortNode((SSortLogicNode*)pInfo->pSplitNode, &pPartSort); + if (TSDB_CODE_SUCCESS == code) { + SNodeList* pMergeKeys = nodesCloneList(((SSortLogicNode*)pInfo->pSplitNode)->pSortKeys); + if (NULL != pMergeKeys) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSplitNode, pMergeKeys, pPartSort); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyList(pMergeKeys); + } + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + splCreateScanSubplan(pCxt, pPartSort, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + return code; +} + static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE); if (TSDB_CODE_SUCCESS == code) { @@ -452,6 +514,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { case QUERY_NODE_LOGIC_PLAN_WINDOW: code = stbSplSplitWindowNode(pCxt, &info); break; + case QUERY_NODE_LOGIC_PLAN_SORT: + code = stbSplSplitSortNode(pCxt, &info); + break; case QUERY_NODE_LOGIC_PLAN_SCAN: code = stbSplSplitScanNode(pCxt, &info); break; diff --git a/source/libs/planner/test/planOrderByTest.cpp b/source/libs/planner/test/planOrderByTest.cpp index d95d1bdf1d..ef6f438b55 100644 --- a/source/libs/planner/test/planOrderByTest.cpp +++ b/source/libs/planner/test/planOrderByTest.cpp @@ -23,20 +23,27 @@ class PlanOrderByTest : public PlannerTestBase {}; TEST_F(PlanOrderByTest, basic) { useDb("root", "test"); - // order by key is in the projection list - run("select c1 from t1 order by c1"); - // order by key is not in the projection list - run("select c1 from t1 order by c2"); + // ORDER BY key is in the projection list + run("SELECT c1 FROM t1 ORDER BY c1"); + // ORDER BY key is not in the projection list + run("SELECT c1 FROM t1 ORDER BY c2"); } TEST_F(PlanOrderByTest, expr) { useDb("root", "test"); - run("select * from t1 order by c1 + 10, c2"); + run("SELECT * FROM t1 ORDER BY c1 + 10, c2"); } TEST_F(PlanOrderByTest, nullsOrder) { useDb("root", "test"); - run("select * from t1 order by c1 desc nulls first"); + run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST"); +} + +TEST_F(PlanOrderByTest, stable) { + useDb("root", "test"); + + // ORDER BY key is in the projection list + run("SELECT c1 FROM st1 ORDER BY c1"); } From f1860b7918629f6e9d246eb3366393ebb06ef674 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 4 Jun 2022 10:25:55 +0800 Subject: [PATCH 08/14] fix:fix error in table scan by group --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b9507f9f06..7eb8e9fea4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -379,7 +379,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); - if (!groupId) { + if (groupId) { pBlock->info.groupId = *groupId; }else{ From a3033b59e6c8db0b1737a4a27bb69158e5605806 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 4 Jun 2022 11:19:38 +0800 Subject: [PATCH 09/14] fix:fix error in table scan by group --- source/libs/executor/src/scanoperator.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7eb8e9fea4..0774ccec1c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -381,8 +381,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { uint64_t *groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); if (groupId) { pBlock->info.groupId = *groupId; - - }else{ + }else if(len != 0){ pBlock->info.groupId = calcGroupId(pTableScanInfo->keyBuf, len); taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &pBlock->info.groupId, sizeof(uint64_t)); } From 6c1966a28e707086b09574e09bb85eaff7346d70 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 4 Jun 2022 11:30:57 +0800 Subject: [PATCH 10/14] refactor: adjust msg type --- include/common/tmsgdef.h | 50 ++++++++++++++------------- source/dnode/mnode/impl/src/mndMain.c | 8 ++--- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index fe9ffda69c..c8a809c175 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -103,6 +103,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_QNODE, "create-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SNODE, "create-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_SNODE, "alter-snode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_SNODE, "drop-snode", NULL, NULL) @@ -115,26 +116,26 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DB, "alter-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SYNC_DB, "sync-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_DB, "compact-db", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_FUNC, "create-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_RETRIEVE_FUNC, "retrieve-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_FUNC, "drop-func", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STB, "create-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STB, "alter-stb", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STB, "drop-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_SMA, "create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_SMA, "drop-sma", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TABLE_META, "table-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VGROUP_LIST, "vgroup-list", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_QNODE_LIST, "qnode-list", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "heartbeat", SClientHbBatchReq, SClientHbBatchRsp) TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "show", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "systable-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "status", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "grant", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp) @@ -153,8 +154,8 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "drop-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "get-db-cfg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CONFIRM_WRITE, "confirm-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_APPLY_MSG, "apply-msg", NULL, NULL) // Requests handled by VNODE @@ -182,8 +183,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) -// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp) -// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) @@ -203,26 +202,29 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "vnode-sync-ping", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "vnode-sync-ping-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST, "vnode-sync-client-request", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, "vnode-sync-client-request-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE, "vnode-sync-request-vote", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE_REPLY, "vnode-sync-request-vote-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES, "vnode-sync-append-entries", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, "vnode-sync-append-entries-reply", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_NOOP, "vnode-sync-noop", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "vnode-sync-unknown", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "vnode-sync-common-response", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "sync-timeout", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING, "sync-ping", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_NOOP, "sync-noop", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_UNKNOWN, "sync-unknown", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "vnode-alter-config", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "vnode-alter-replica", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_START_WRITE, "start-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STOP_WRITE, "stop-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CONFIRM_WRITE, "confirm-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "vnode-delete-data", SVDeleteReq, SVDeleteRsp) + TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) // Requests handled by QNODE TD_NEW_MSG_SEG(TDMT_QND_MSG) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 3a3fd7ebdb..48a470c215 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -625,7 +625,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) { code = -1; } else { int32_t ref = atomic_add_fetch_32(&pMnode->rpcRef, 1); - mTrace("mnode rpc is acquired, ref:%d", ref); + // mTrace("mnode rpc is acquired, ref:%d", ref); } taosThreadRwlockUnlock(&pMnode->lock); return code; @@ -634,7 +634,7 @@ int32_t mndAcquireRpcRef(SMnode *pMnode) { void mndReleaseRpcRef(SMnode *pMnode) { taosThreadRwlockRdlock(&pMnode->lock); int32_t ref = atomic_sub_fetch_32(&pMnode->rpcRef, 1); - mTrace("mnode rpc is released, ref:%d", ref); + // mTrace("mnode rpc is released, ref:%d", ref); taosThreadRwlockUnlock(&pMnode->lock); } @@ -675,7 +675,7 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) { code = -1; } else { int32_t ref = atomic_add_fetch_32(&pMnode->syncRef, 1); - mTrace("mnode sync is acquired, ref:%d", ref); + // mTrace("mnode sync is acquired, ref:%d", ref); } taosThreadRwlockUnlock(&pMnode->lock); return code; @@ -684,6 +684,6 @@ int32_t mndAcquireSyncRef(SMnode *pMnode) { void mndReleaseSyncRef(SMnode *pMnode) { taosThreadRwlockRdlock(&pMnode->lock); int32_t ref = atomic_sub_fetch_32(&pMnode->syncRef, 1); - mTrace("mnode sync is released, ref:%d", ref); + // mTrace("mnode sync is released, ref:%d", ref); taosThreadRwlockUnlock(&pMnode->lock); } From 978f73393d62a9cb46f90bccf0dad362f00dafa4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 4 Jun 2022 12:31:35 +0800 Subject: [PATCH 11/14] enh(query): add retension for the results of show databases; --- source/common/src/systable.c | 7 +-- source/dnode/mnode/impl/src/mndDb.c | 71 ++++++++++++++++++++++-- tests/script/tsim/testsuit.sim | 84 ++++++++++++++--------------- 3 files changed, 114 insertions(+), 48 deletions(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5b8d214cb4..da42a955ba 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -87,11 +87,12 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "wal", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, - {.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "cache_model", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "single_stable", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, + {.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update }; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6d7a638c30..41f8d741a6 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1428,6 +1428,52 @@ const char *mndGetDbStr(const char *src) { return pos; } +int64_t getValOfDiffPrecision(int8_t unit, int64_t val) { + int64_t v = 0; + switch(unit) { + case 's': v = val / 1000; break; + case 'm': v = val / tsTickPerMin[TSDB_TIME_PRECISION_MILLI]; break; + case 'h': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 60); break; + case 'd': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60); break; + case 'w': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60 * 7); break; + default: + break; + } + + return v; +} + +char* buildRetension(SArray* pRetension) { + size_t size = taosArrayGetSize(pRetension); + if (size == 0) { + return NULL; + } + + char* p1 = taosMemoryCalloc(1, 100); + SRetention* p = taosArrayGet(pRetension, 0); + + int32_t len = 2; + + int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit); + + p = taosArrayGet(pRetension, 1); + + v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit); + + p = taosArrayGet(pRetension, 2); + + v1 = getValOfDiffPrecision(p->freqUnit, p->freq); + v2 = getValOfDiffPrecision(p->keepUnit, p->keep); + len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c", v1, p->freqUnit, v2, p->keepUnit); + + varDataSetLen(p1, len); + return p1; +} + static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows, int64_t numOfTables, bool sysDb) { int32_t cols = 0; @@ -1475,7 +1521,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.replications, false); - const char *src = pDb->cfg.strict ? "strict" : "nostrict"; + const char *src = pDb->cfg.strict ? "strict" : "no_strict"; char strict[24] = {0}; STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1523,7 +1569,16 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.compression, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLastRow, false); + + STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src)); + + char cacheModel[24] = {0}; + if (pDb->cfg.cacheLastRow == 0) { + STR_TO_VARSTR(cacheModel, "no_cache"); + } else { + // + } + colDataAppend(pColInfo, rows, cacheModel, false); char *prec = NULL; switch (pDb->cfg.precision) { @@ -1552,8 +1607,18 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)statusB, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); + + char* p = buildRetension(pDb->cfg.pRetensions); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols); + if (p == NULL) { + colDataAppendNULL(pColInfo, rows); + } else { + colDataAppend(pColInfo, rows, (const char *)p, false); + taosMemoryFree(p); + } } } diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim index 883dc1e789..b2ca1e0345 100644 --- a/tests/script/tsim/testsuit.sim +++ b/tests/script/tsim/testsuit.sim @@ -1,45 +1,45 @@ -run tsim/user/pass_alter.sim -run tsim/user/basic1.sim -run tsim/user/privilege2.sim -run tsim/user/user_len.sim -run tsim/user/privilege1.sim -run tsim/user/pass_len.sim -run tsim/table/basic1.sim -run tsim/trans/lossdata1.sim -run tsim/trans/create_db.sim -run tsim/stable/alter_metrics.sim -run tsim/stable/tag_modify.sim -run tsim/stable/alter_comment.sim -run tsim/stable/column_drop.sim -run tsim/stable/column_modify.sim -run tsim/stable/tag_rename.sim -run tsim/stable/vnode3.sim -run tsim/stable/metrics.sim -run tsim/stable/alter_insert2.sim -run tsim/stable/show.sim -run tsim/stable/alter_import.sim -run tsim/stable/tag_add.sim -run tsim/stable/tag_drop.sim -run tsim/stable/column_add.sim -run tsim/stable/alter_count.sim -run tsim/stable/values.sim -run tsim/stable/dnode3.sim -run tsim/stable/alter_insert1.sim -run tsim/stable/refcount.sim -run tsim/stable/disk.sim -run tsim/db/basic1.sim -run tsim/db/basic3.sim -run tsim/db/basic7.sim -run tsim/db/basic6.sim -run tsim/db/create_all_options.sim -run tsim/db/basic2.sim -run tsim/db/error1.sim -run tsim/db/taosdlog.sim -run tsim/db/alter_option.sim -run tsim/mnode/basic1.sim -run tsim/mnode/basic3.sim -run tsim/mnode/basic2.sim -run tsim/parser/fourArithmetic-basic.sim +#run tsim/user/pass_alter.sim +#run tsim/user/basic1.sim +#run tsim/user/privilege2.sim +#run tsim/user/user_len.sim +#run tsim/user/privilege1.sim +##run tsim/user/pass_len.sim +##run tsim/table/basic1.sim +#run tsim/trans/lossdata1.sim +#run tsim/trans/create_db.sim +##run tsim/stable/alter_metrics.sim +##run tsim/stable/tag_modify.sim +##run tsim/stable/alter_comment.sim +##run tsim/stable/column_drop.sim +##run tsim/stable/column_modify.sim +##run tsim/stable/tag_rename.sim +##run tsim/stable/vnode3.sim +##run tsim/stable/metrics.sim +##run tsim/stable/alter_insert2.sim +##run tsim/stable/show.sim +##run tsim/stable/alter_import.sim +##run tsim/stable/tag_add.sim +##run tsim/stable/tag_drop.sim +##run tsim/stable/column_add.sim +##run tsim/stable/alter_count.sim +##run tsim/stable/values.sim +##run tsim/stable/dnode3.sim +##run tsim/stable/alter_insert1.sim +##run tsim/stable/refcount.sim +##run tsim/stable/disk.sim +#run tsim/db/basic1.sim +#run tsim/db/basic3.sim +##run tsim/db/basic7.sim +#run tsim/db/basic6.sim +#run tsim/db/create_all_options.sim +#run tsim/db/basic2.sim +#run tsim/db/error1.sim +#run tsim/db/taosdlog.sim +#run tsim/db/alter_option.sim +#run tsim/mnode/basic1.sim +#run tsim/mnode/basic3.sim +#run tsim/mnode/basic2.sim +#run tsim/parser/fourArithmetic-basic.sim run tsim/parser/groupby-basic.sim run tsim/snode/basic1.sim run tsim/query/time_process.sim From 1f419e0a3642fa217dc4169729cfb6dd86996634 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 4 Jun 2022 12:31:53 +0800 Subject: [PATCH 12/14] refactor: adjust msg type --- include/common/tmsgdef.h | 5 +- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 58 ++++++++++----------- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a4bbd48434..2cf188866b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -93,7 +93,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_GET_USER_AUTH, "get-user-auth", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_DNODE, "create-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CONFIG_DNODE, "config-dnode", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_ALTER_DNODE, "alter-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_DNODE, "drop-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL) @@ -159,6 +158,8 @@ enum { TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) TD_DEF_MSG_TYPE(TDMT_VND_QUERY, "query", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "query-continue", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_FETCH, "fetch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TABLE, "create-table", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TABLE, "alter-table", NULL, NULL) @@ -180,8 +181,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqPollReq, SMqDataBlkRsp) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index bbdafbdfe1..4a79513736 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -129,12 +129,7 @@ SArray *mmGetMsgHandles() { SArray *pArray = taosArrayInit(64, sizeof(SMgmtHandle)); if (pArray == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; - - // Requests handled by DNODE if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_QNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -146,7 +141,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - // Requests handled by MNODE if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_ACCT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -159,6 +153,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_DNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -172,53 +168,50 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_USE_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_COMPACT_DB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_FUNC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_VGROUP_LIST, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TOPIC, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_CFG, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_MND_GET_INDEX, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_TRANS, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIRM_WRITE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - // Requests handled by VNODE + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; - - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_CONTINUE, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER; - + if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER; @@ -233,6 +226,9 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_INFO, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MON_MM_LOAD, mmPutNodeMsgToMonitorQueue, 0) == NULL) goto _OVER; + code = 0; _OVER: From e752d4e45f1fe7f7aee8d43253ac82438d6cf404 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 4 Jun 2022 14:21:03 +0800 Subject: [PATCH 13/14] test: update test script. --- tests/script/tsim/db/alter_option.sim | 2 +- tests/script/tsim/db/create_all_options.sim | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/script/tsim/db/alter_option.sim b/tests/script/tsim/db/alter_option.sim index f3adb4535e..7444511504 100644 --- a/tests/script/tsim/db/alter_option.sim +++ b/tests/script/tsim/db/alter_option.sim @@ -89,7 +89,7 @@ endi if $data4_db != 3 then # replica return -1 endi -if $data5_db != nostrict then # strict +if $data5_db != no_strict then # strict return -1 endi if $data6_db != 345600 then # days diff --git a/tests/script/tsim/db/create_all_options.sim b/tests/script/tsim/db/create_all_options.sim index d7d72c4a1d..88f0378d61 100644 --- a/tests/script/tsim/db/create_all_options.sim +++ b/tests/script/tsim/db/create_all_options.sim @@ -110,7 +110,7 @@ if $data4_db != 1 then # replica print expect 1, actual: $data4_db return -1 endi -if $data5_db != nostrict then # strict +if $data5_db != no_strict then # strict return -1 endi if $data6_db != 14400 then # days From 2b3ff4d88897c40ee65a72e384e0deff3ee538ea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 4 Jun 2022 14:22:30 +0800 Subject: [PATCH 14/14] fix: update the column type in show. --- source/common/src/systable.c | 2 +- source/dnode/mnode/impl/src/mndDb.c | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index da42a955ba..2b59354d60 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -87,7 +87,7 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "wal", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, - {.name = "cache_model", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "cache_model", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 41f8d741a6..c0460ee8bf 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1571,14 +1571,19 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src)); - +#if 0 char cacheModel[24] = {0}; + bool null = false; if (pDb->cfg.cacheLastRow == 0) { STR_TO_VARSTR(cacheModel, "no_cache"); + } else if (pDb->cfg.cacheLastRow == 1) { + STR_TO_VARSTR(cacheModel, "last_row_cache") } else { - // + null = true; } - colDataAppend(pColInfo, rows, cacheModel, false); + colDataAppend(pColInfo, rows, cacheModel, null); +#endif + colDataAppend(pColInfo, rows, (const char*) &pDb->cfg.cacheLastRow, false); char *prec = NULL; switch (pDb->cfg.precision) {