From bcac24b6b7ede099377b99cb85ca14e4bf53d848 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 6 Jun 2023 17:19:59 +0800 Subject: [PATCH 01/19] fix: tsma query with order by _wstart/_wend --- source/libs/parser/src/parTranslater.c | 1 + source/libs/planner/src/planOptimizer.c | 23 +++++- .../script/tsim/sma/tsmaCreateInsertQuery.sim | 74 +++++++++++++++++++ 3 files changed, 95 insertions(+), 3 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d1c3da0041..a9fe65492d 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4940,6 +4940,7 @@ static int32_t buildTableForSampleAst(SSampleAstInfo* pInfo, SNode** pOutput) { } snprintf(pTable->table.dbName, sizeof(pTable->table.dbName), "%s", pInfo->pDbName); snprintf(pTable->table.tableName, sizeof(pTable->table.tableName), "%s", pInfo->pTableName); + snprintf(pTable->table.tableAlias, sizeof(pTable->table.tableName), "%s", pInfo->pTableName); TSWAP(pTable->pMeta, pInfo->pRollupTableMeta); *pOutput = (SNode*)pTable; return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 8b75fe6b33..36378a5414 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1385,7 +1385,18 @@ static SNode* smaIndexOptFindWStartFunc(SNodeList* pSmaFuncs) { return NULL; } -static int32_t smaIndexOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeList* pSmaFuncs, +static SNode* smaIndexOptFuncInProject(SNodeList* pProjects, SFunctionNode* pFunc) { + SNode* pProject = NULL; + FOREACH(pProject, pProjects) { + if (0 != pFunc->node.aliasName[0] && + 0 == strncmp(pFunc->node.aliasName, ((SColumnNode*)pProject)->colName, TSDB_COL_NAME_LEN)) { + return pProject; + } + } + return NULL; +} + +static int32_t smaIndexOptCreateSmaCols(SWindowLogicNode* pWindow, uint64_t tableId, SNodeList* pSmaFuncs, SNodeList** pOutput) { SNodeList* pCols = NULL; SNode* pFunc = NULL; @@ -1393,11 +1404,16 @@ static int32_t smaIndexOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNo int32_t index = 0; int32_t smaFuncIndex = -1; bool hasWStart = false; - FOREACH(pFunc, pFuncs) { + + SProjectLogicNode* pProject = (SProjectLogicNode*)pWindow->node.pParent; + FOREACH(pFunc, pWindow->pFuncs) { smaFuncIndex = smaIndexOptFindSmaFunc(pFunc, pSmaFuncs); if (smaFuncIndex < 0) { break; } else { + if (pProject && !smaIndexOptFuncInProject(pProject->pProjections, (SFunctionNode*)pFunc)) { + continue; + } code = nodesListMakeStrictAppend(&pCols, smaIndexOptCreateSmaCol(pFunc, tableId, smaFuncIndex + 1)); if (TSDB_CODE_SUCCESS != code) { break; @@ -1444,10 +1460,11 @@ static int32_t smaIndexOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo if (!smaIndexOptEqualInterval(pScan, pWindow, pIndex)) { return TSDB_CODE_SUCCESS; } + SNodeList* pSmaFuncs = NULL; int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs); if (TSDB_CODE_SUCCESS == code) { - code = smaIndexOptCreateSmaCols(pWindow->pFuncs, pIndex->dstTbUid, pSmaFuncs, pCols); + code = smaIndexOptCreateSmaCols(pWindow, pIndex->dstTbUid, pSmaFuncs, pCols); } nodesDestroyList(pSmaFuncs); return code; diff --git a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim index 242231e408..60f769d2ae 100644 --- a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim @@ -340,6 +340,80 @@ if $data05 != 30.000000000 then return -1 endi +print =============== select with _wstart/order by _wstart from stb from file in designated vgroup +sql select _wstart, _wend, min(c1),max(c2),max(c1) from stb interval(5m,10s) sliding(5m) order by _wstart; +print $data00 $data01 $data02 $data03 $data04 +if $rows != 1 then + print rows $rows != 1 + return -1 +endi + +if $data02 != -13 then + print data02 $data02 != -13 + return -1 +endi + +if $data03 != 20.00000 then + print data03 $data03 != 20.00000 + return -1 +endi + +if $data04 != 20 then + print data04 $data04 != 20 + return -1 +endi + +print =============== select without _wstart/with order by _wstart from stb from file in designated vgroup +sql select _wend, min(c1),max(c2),max(c1) from stb interval(5m,10s) sliding(5m) order by _wstart; +print $data00 $data01 $data02 $data03 +if $rows != 1 then + print rows $rows != 1 + return -1 +endi + +if $data01 != -13 then + print data01 $data01 != -13 + return -1 +endi + +if $data02 != 20.00000 then + print data02 $data02 != 20.00000 + return -1 +endi + +if $data03 != 20 then + print data03 $data03 != 20 + return -1 +endi + +print =============== select * from stb from file in common vgroups +sql select _wstart, _wend, min(c1),max(c2),max(c1),max(c3) from stb interval(5m,10s) sliding(5m) order by _wstart; +print $data00 $data01 $data02 $data03 $data04 $data05 +if $rows != 1 then + print rows $rows != 1 + return -1 +endi + +if $data02 != -13 then + print data02 $data02 != -13 + return -1 +endi + +if $data03 != 20.00000 then + print data03 $data03 != 20.00000 + return -1 +endi + +if $data04 != 20 then + print data04 $data04 != 20 + return -1 +endi + +if $data05 != 30.000000000 then + print data05 $data05 != 30.000000000 + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT From 576e3d1156ae9a529e6a305d7f712e6a04dabea0 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 8 Jun 2023 11:49:49 +0800 Subject: [PATCH 02/19] chore: erase duplicated func node for window logic node --- source/libs/parser/src/parTranslater.c | 2 +- source/libs/planner/src/planLogicCreater.c | 19 ++++++++++++++++++ source/libs/planner/src/planOptimizer.c | 23 +++------------------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index a9fe65492d..9f94c7cb8c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4940,7 +4940,7 @@ static int32_t buildTableForSampleAst(SSampleAstInfo* pInfo, SNode** pOutput) { } snprintf(pTable->table.dbName, sizeof(pTable->table.dbName), "%s", pInfo->pDbName); snprintf(pTable->table.tableName, sizeof(pTable->table.tableName), "%s", pInfo->pTableName); - snprintf(pTable->table.tableAlias, sizeof(pTable->table.tableName), "%s", pInfo->pTableName); + snprintf(pTable->table.tableAlias, sizeof(pTable->table.tableAlias), "%s", pInfo->pTableName); TSWAP(pTable->pMeta, pInfo->pRollupTableMeta); *pOutput = (SNode*)pTable; return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 5bbc9acdad..8c30beffc9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -749,6 +749,25 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW, NULL); } + // erase duplicated funcNode by filtering colNode in pSelect->pProjectionList + int32_t funcIndex = 0; + SNode * pFunc = NULL, *pProject = NULL; + FOREACH(pFunc, pWindow->pFuncs) { + bool exist = false; + FOREACH(pProject, pSelect->pProjectionList) { + if (0 != ((SFunctionNode*)pFunc)->node.aliasName[0] && + 0 == strncmp(((SFunctionNode*)pFunc)->node.aliasName, ((SColumnNode*)pProject)->colName, TSDB_COL_NAME_LEN)) { + exist = true; + break; + } + } + if (!exist) { + nodesListErase(pWindow->pFuncs, nodesListGetCell(pWindow->pFuncs, funcIndex)); + } else { + ++funcIndex; + } + } + if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pWindow->pFuncs, &pWindow->node.pTargets); } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 36378a5414..8b75fe6b33 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1385,18 +1385,7 @@ static SNode* smaIndexOptFindWStartFunc(SNodeList* pSmaFuncs) { return NULL; } -static SNode* smaIndexOptFuncInProject(SNodeList* pProjects, SFunctionNode* pFunc) { - SNode* pProject = NULL; - FOREACH(pProject, pProjects) { - if (0 != pFunc->node.aliasName[0] && - 0 == strncmp(pFunc->node.aliasName, ((SColumnNode*)pProject)->colName, TSDB_COL_NAME_LEN)) { - return pProject; - } - } - return NULL; -} - -static int32_t smaIndexOptCreateSmaCols(SWindowLogicNode* pWindow, uint64_t tableId, SNodeList* pSmaFuncs, +static int32_t smaIndexOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeList* pSmaFuncs, SNodeList** pOutput) { SNodeList* pCols = NULL; SNode* pFunc = NULL; @@ -1404,16 +1393,11 @@ static int32_t smaIndexOptCreateSmaCols(SWindowLogicNode* pWindow, uint64_t tabl int32_t index = 0; int32_t smaFuncIndex = -1; bool hasWStart = false; - - SProjectLogicNode* pProject = (SProjectLogicNode*)pWindow->node.pParent; - FOREACH(pFunc, pWindow->pFuncs) { + FOREACH(pFunc, pFuncs) { smaFuncIndex = smaIndexOptFindSmaFunc(pFunc, pSmaFuncs); if (smaFuncIndex < 0) { break; } else { - if (pProject && !smaIndexOptFuncInProject(pProject->pProjections, (SFunctionNode*)pFunc)) { - continue; - } code = nodesListMakeStrictAppend(&pCols, smaIndexOptCreateSmaCol(pFunc, tableId, smaFuncIndex + 1)); if (TSDB_CODE_SUCCESS != code) { break; @@ -1460,11 +1444,10 @@ static int32_t smaIndexOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo if (!smaIndexOptEqualInterval(pScan, pWindow, pIndex)) { return TSDB_CODE_SUCCESS; } - SNodeList* pSmaFuncs = NULL; int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs); if (TSDB_CODE_SUCCESS == code) { - code = smaIndexOptCreateSmaCols(pWindow, pIndex->dstTbUid, pSmaFuncs, pCols); + code = smaIndexOptCreateSmaCols(pWindow->pFuncs, pIndex->dstTbUid, pSmaFuncs, pCols); } nodesDestroyList(pSmaFuncs); return code; From 90b4580ba9ccb784549cec2be8102e72bcabcdee Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 8 Jun 2023 14:16:05 +0800 Subject: [PATCH 03/19] chore: only filter window pseudo column --- source/libs/planner/src/planLogicCreater.c | 37 +++++++++++++--------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 8c30beffc9..a253405404 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -749,22 +749,29 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW, NULL); } - // erase duplicated funcNode by filtering colNode in pSelect->pProjectionList - int32_t funcIndex = 0; - SNode * pFunc = NULL, *pProject = NULL; - FOREACH(pFunc, pWindow->pFuncs) { - bool exist = false; - FOREACH(pProject, pSelect->pProjectionList) { - if (0 != ((SFunctionNode*)pFunc)->node.aliasName[0] && - 0 == strncmp(((SFunctionNode*)pFunc)->node.aliasName, ((SColumnNode*)pProject)->colName, TSDB_COL_NAME_LEN)) { - exist = true; - break; + // erase duplicated Window Pseudo funcNode by filtering colNode in pSelect->pProjectionList + if (pSelect->pProjectionList) { + int32_t funcIndex = 0; + SNode * pFunc = NULL, *pProject = NULL; + FOREACH(pFunc, pWindow->pFuncs) { + if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId)) { + ++funcIndex; + continue; + } + bool exist = false; + FOREACH(pProject, pSelect->pProjectionList) { + if (0 != ((SFunctionNode*)pFunc)->node.aliasName[0] && + 0 == strncmp(((SFunctionNode*)pFunc)->node.aliasName, ((SColumnNode*)pProject)->colName, + TSDB_COL_NAME_LEN)) { + exist = true; + break; + } + } + if (!exist) { + nodesListErase(pWindow->pFuncs, nodesListGetCell(pWindow->pFuncs, funcIndex)); + } else { + ++funcIndex; } - } - if (!exist) { - nodesListErase(pWindow->pFuncs, nodesListGetCell(pWindow->pFuncs, funcIndex)); - } else { - ++funcIndex; } } From e56c5950aa34dc31c5f074c1aeadcda9d088204a Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 8 Jun 2023 14:29:44 +0800 Subject: [PATCH 04/19] chore: more check --- source/libs/planner/src/planLogicCreater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a253405404..ffabfcc0dd 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -760,7 +760,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm } bool exist = false; FOREACH(pProject, pSelect->pProjectionList) { - if (0 != ((SFunctionNode*)pFunc)->node.aliasName[0] && + if (QUERY_NODE_COLUMN == nodeType(pProject) && 0 != ((SFunctionNode*)pFunc)->node.aliasName[0] && 0 == strncmp(((SFunctionNode*)pFunc)->node.aliasName, ((SColumnNode*)pProject)->colName, TSDB_COL_NAME_LEN)) { exist = true; From c3726b82643cd8eef15bd6941df6a8508f6bd3e7 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 8 Jun 2023 14:42:10 +0800 Subject: [PATCH 05/19] chore: more check --- source/libs/planner/src/planLogicCreater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index ffabfcc0dd..dd8fd3861d 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -750,7 +750,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm } // erase duplicated Window Pseudo funcNode by filtering colNode in pSelect->pProjectionList - if (pSelect->pProjectionList) { + if (WINDOW_TYPE_INTERVAL == pWindow->winType && pSelect->pProjectionList) { int32_t funcIndex = 0; SNode * pFunc = NULL, *pProject = NULL; FOREACH(pFunc, pWindow->pFuncs) { From e5e724a8baf4f0beed5c561a0ef8d5420a4175a7 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 8 Jun 2023 16:13:31 +0800 Subject: [PATCH 06/19] chore: code optimization --- source/libs/planner/src/planLogicCreater.c | 70 ++++++++++++++-------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index dd8fd3861d..5a8512c12d 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -732,6 +732,49 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return code; } +static int32_t eraseDuplicatedWindowPseudoCol(SWindowLogicNode* pWindow, SNodeList* pProjections) { + int32_t code = 0; + int32_t funcIndex = 0; + SSHashObj* pHashFunc = NULL; + SNode* pFuncNode = NULL; + FOREACH(pFuncNode, pWindow->pFuncs) { + SFunctionNode* pFunc = (SFunctionNode*)pFuncNode; + if (!fmIsWindowPseudoColumnFunc(pFunc->funcId)) { + ++funcIndex; + continue; + } + if (!pHashFunc && !(pHashFunc = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)))) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + + void* hashVal = tSimpleHashGet(pHashFunc, &pFunc->funcId, sizeof(pFunc->funcId)); + if (!hashVal) { + tSimpleHashPut(pHashFunc, &pFunc->funcId, sizeof(pFunc->funcId), &funcIndex, sizeof(funcIndex)); + ++funcIndex; + continue; + } + + bool exist = false; + SNode* pProject = NULL; + FOREACH(pProject, pProjections) { + if (QUERY_NODE_COLUMN == nodeType(pProject) && 0 != pFunc->node.aliasName[0] && + 0 == strncmp(pFunc->node.aliasName, ((SColumnNode*)pProject)->colName, TSDB_COL_NAME_LEN)) { + exist = true; + break; + } + } + if (!exist) { + nodesListErase(pWindow->pFuncs, nodesListGetCell(pWindow->pFuncs, funcIndex)); + } else { + nodesListErase(pWindow->pFuncs, nodesListGetCell(pWindow->pFuncs, *(int32_t*)hashVal)); + tSimpleHashPut(pHashFunc, &pFunc->funcId, sizeof(pFunc->funcId), &funcIndex, sizeof(funcIndex)); + } + } + tSimpleHashCleanup(pHashFunc); + return code; +} + static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) { if (pCxt->pPlanCxt->streamQuery) { @@ -749,30 +792,9 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW, NULL); } - // erase duplicated Window Pseudo funcNode by filtering colNode in pSelect->pProjectionList - if (WINDOW_TYPE_INTERVAL == pWindow->winType && pSelect->pProjectionList) { - int32_t funcIndex = 0; - SNode * pFunc = NULL, *pProject = NULL; - FOREACH(pFunc, pWindow->pFuncs) { - if (!fmIsWindowPseudoColumnFunc(((SFunctionNode*)pFunc)->funcId)) { - ++funcIndex; - continue; - } - bool exist = false; - FOREACH(pProject, pSelect->pProjectionList) { - if (QUERY_NODE_COLUMN == nodeType(pProject) && 0 != ((SFunctionNode*)pFunc)->node.aliasName[0] && - 0 == strncmp(((SFunctionNode*)pFunc)->node.aliasName, ((SColumnNode*)pProject)->colName, - TSDB_COL_NAME_LEN)) { - exist = true; - break; - } - } - if (!exist) { - nodesListErase(pWindow->pFuncs, nodesListGetCell(pWindow->pFuncs, funcIndex)); - } else { - ++funcIndex; - } - } + if (TSDB_CODE_SUCCESS == code && WINDOW_TYPE_INTERVAL == pWindow->winType && pSelect->pProjectionList) { + // erase duplicated Window Pseudo funcNode by filtering colNode in pSelect->pProjectionList + code = eraseDuplicatedWindowPseudoCol(pWindow, pSelect->pProjectionList); } if (TSDB_CODE_SUCCESS == code) { From 79625df5e4dac8f906e39ca3597c04df579f92b7 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 8 Jun 2023 18:39:51 +0800 Subject: [PATCH 07/19] chore: code optimization --- source/libs/planner/src/planLogicCreater.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 5a8512c12d..953ca13d3b 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -732,25 +732,25 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return code; } -static int32_t eraseDuplicatedWindowPseudoCol(SWindowLogicNode* pWindow, SNodeList* pProjections) { +static int32_t eraseDuplicatedPseudoColumnFuncs(SNodeList* pFuncs, SNodeList* pProjections) { int32_t code = 0; int32_t funcIndex = 0; - SSHashObj* pHashFunc = NULL; + SSHashObj* pFuncHash = NULL; SNode* pFuncNode = NULL; - FOREACH(pFuncNode, pWindow->pFuncs) { + FOREACH(pFuncNode, pFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pFuncNode; if (!fmIsWindowPseudoColumnFunc(pFunc->funcId)) { ++funcIndex; continue; } - if (!pHashFunc && !(pHashFunc = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)))) { + if (!pFuncHash && !(pFuncHash = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)))) { code = TSDB_CODE_OUT_OF_MEMORY; break; } - void* hashVal = tSimpleHashGet(pHashFunc, &pFunc->funcId, sizeof(pFunc->funcId)); + void* hashVal = tSimpleHashGet(pFuncHash, &pFunc->funcId, sizeof(pFunc->funcId)); if (!hashVal) { - tSimpleHashPut(pHashFunc, &pFunc->funcId, sizeof(pFunc->funcId), &funcIndex, sizeof(funcIndex)); + tSimpleHashPut(pFuncHash, &pFunc->funcId, sizeof(pFunc->funcId), &funcIndex, sizeof(funcIndex)); ++funcIndex; continue; } @@ -765,13 +765,13 @@ static int32_t eraseDuplicatedWindowPseudoCol(SWindowLogicNode* pWindow, SNodeLi } } if (!exist) { - nodesListErase(pWindow->pFuncs, nodesListGetCell(pWindow->pFuncs, funcIndex)); + nodesListErase(pFuncs, nodesListGetCell(pFuncs, funcIndex)); } else { - nodesListErase(pWindow->pFuncs, nodesListGetCell(pWindow->pFuncs, *(int32_t*)hashVal)); - tSimpleHashPut(pHashFunc, &pFunc->funcId, sizeof(pFunc->funcId), &funcIndex, sizeof(funcIndex)); + nodesListErase(pFuncs, nodesListGetCell(pFuncs, *(int32_t*)hashVal)); + tSimpleHashPut(pFuncHash, &pFunc->funcId, sizeof(pFunc->funcId), &funcIndex, sizeof(funcIndex)); } } - tSimpleHashCleanup(pHashFunc); + tSimpleHashCleanup(pFuncHash); return code; } @@ -794,7 +794,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm if (TSDB_CODE_SUCCESS == code && WINDOW_TYPE_INTERVAL == pWindow->winType && pSelect->pProjectionList) { // erase duplicated Window Pseudo funcNode by filtering colNode in pSelect->pProjectionList - code = eraseDuplicatedWindowPseudoCol(pWindow, pSelect->pProjectionList); + code = eraseDuplicatedPseudoColumnFuncs(pWindow->pFuncs, pSelect->pProjectionList); } if (TSDB_CODE_SUCCESS == code) { From 58c43901ede6b45a2f248d43c373e419a6c56d4b Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 12 Jun 2023 09:14:38 +0800 Subject: [PATCH 08/19] chore: another logic --- source/libs/parser/src/parTranslater.c | 6 +++ source/libs/planner/src/planLogicCreater.c | 48 ---------------------- 2 files changed, 6 insertions(+), 48 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3b94bae05c..5f3e402027 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3108,6 +3108,12 @@ static int32_t rewriteProjectAlias(SNodeList* pProjectionList) { if ('\0' == pExpr->userAlias[0]) { strcpy(pExpr->userAlias, pExpr->aliasName); } + if (QUERY_NODE_COLUMN == nodeType(pProject) && + ((0 == strcasecmp("_wstart", pExpr->userAlias) || 0 == strcasecmp("_wend", pExpr->userAlias) || + 0 == strcasecmp("_wduration", pExpr->userAlias)) && + '\0' != pExpr->aliasName[0])) { + continue; + } sprintf(pExpr->aliasName, "#expr_%d", no++); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 953ca13d3b..5bbc9acdad 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -732,49 +732,6 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p return code; } -static int32_t eraseDuplicatedPseudoColumnFuncs(SNodeList* pFuncs, SNodeList* pProjections) { - int32_t code = 0; - int32_t funcIndex = 0; - SSHashObj* pFuncHash = NULL; - SNode* pFuncNode = NULL; - FOREACH(pFuncNode, pFuncs) { - SFunctionNode* pFunc = (SFunctionNode*)pFuncNode; - if (!fmIsWindowPseudoColumnFunc(pFunc->funcId)) { - ++funcIndex; - continue; - } - if (!pFuncHash && !(pFuncHash = tSimpleHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)))) { - code = TSDB_CODE_OUT_OF_MEMORY; - break; - } - - void* hashVal = tSimpleHashGet(pFuncHash, &pFunc->funcId, sizeof(pFunc->funcId)); - if (!hashVal) { - tSimpleHashPut(pFuncHash, &pFunc->funcId, sizeof(pFunc->funcId), &funcIndex, sizeof(funcIndex)); - ++funcIndex; - continue; - } - - bool exist = false; - SNode* pProject = NULL; - FOREACH(pProject, pProjections) { - if (QUERY_NODE_COLUMN == nodeType(pProject) && 0 != pFunc->node.aliasName[0] && - 0 == strncmp(pFunc->node.aliasName, ((SColumnNode*)pProject)->colName, TSDB_COL_NAME_LEN)) { - exist = true; - break; - } - } - if (!exist) { - nodesListErase(pFuncs, nodesListGetCell(pFuncs, funcIndex)); - } else { - nodesListErase(pFuncs, nodesListGetCell(pFuncs, *(int32_t*)hashVal)); - tSimpleHashPut(pFuncHash, &pFunc->funcId, sizeof(pFunc->funcId), &funcIndex, sizeof(funcIndex)); - } - } - tSimpleHashCleanup(pFuncHash); - return code; -} - static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SWindowLogicNode* pWindow, SLogicNode** pLogicNode) { if (pCxt->pPlanCxt->streamQuery) { @@ -792,11 +749,6 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm code = rewriteExprsForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW, NULL); } - if (TSDB_CODE_SUCCESS == code && WINDOW_TYPE_INTERVAL == pWindow->winType && pSelect->pProjectionList) { - // erase duplicated Window Pseudo funcNode by filtering colNode in pSelect->pProjectionList - code = eraseDuplicatedPseudoColumnFuncs(pWindow->pFuncs, pSelect->pProjectionList); - } - if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pWindow->pFuncs, &pWindow->node.pTargets); } From b200fdba5120af4079b2e7dd82865c9e6ee0a87b Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 12 Jun 2023 10:44:01 +0800 Subject: [PATCH 09/19] chore: node type --- source/libs/parser/src/parTranslater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5f3e402027..ed109df1f9 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3108,7 +3108,7 @@ static int32_t rewriteProjectAlias(SNodeList* pProjectionList) { if ('\0' == pExpr->userAlias[0]) { strcpy(pExpr->userAlias, pExpr->aliasName); } - if (QUERY_NODE_COLUMN == nodeType(pProject) && + if (QUERY_NODE_FUNCTION == nodeType(pProject) && ((0 == strcasecmp("_wstart", pExpr->userAlias) || 0 == strcasecmp("_wend", pExpr->userAlias) || 0 == strcasecmp("_wduration", pExpr->userAlias)) && '\0' != pExpr->aliasName[0])) { From f5d28cdcd1fb99faa9f558bf22ad6c018c768ddc Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 12 Jun 2023 10:49:17 +0800 Subject: [PATCH 10/19] chore: more check --- source/libs/parser/src/parTranslater.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ed109df1f9..a6d010a814 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3108,12 +3108,12 @@ static int32_t rewriteProjectAlias(SNodeList* pProjectionList) { if ('\0' == pExpr->userAlias[0]) { strcpy(pExpr->userAlias, pExpr->aliasName); } - if (QUERY_NODE_FUNCTION == nodeType(pProject) && + if (QUERY_NODE_FUNCTION == nodeType(pProject) && fmIsWindowPseudoColumnFunc(((SFunctionNode*)pProject)->funcId) && ((0 == strcasecmp("_wstart", pExpr->userAlias) || 0 == strcasecmp("_wend", pExpr->userAlias) || 0 == strcasecmp("_wduration", pExpr->userAlias)) && '\0' != pExpr->aliasName[0])) { - continue; - } + continue; + } sprintf(pExpr->aliasName, "#expr_%d", no++); } return TSDB_CODE_SUCCESS; From 4cd039c38ea372f680f6c1c3fc2dabc218a13d2e Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 12 Jun 2023 16:47:03 +0800 Subject: [PATCH 11/19] chore: compare funcNode without aliasName --- source/libs/nodes/src/nodesUtilFuncs.c | 6 +++--- source/libs/parser/src/parTranslater.c | 6 ------ 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 39e288f694..830092bbf2 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1953,9 +1953,9 @@ static uint32_t funcNodeHash(const char* pKey, uint32_t len) { } static int32_t funcNodeEqual(const void* pLeft, const void* pRight, size_t len) { - if (0 != strcmp((*(const SExprNode**)pLeft)->aliasName, (*(const SExprNode**)pRight)->aliasName)) { - return 1; - } + // if (0 != strcmp((*(const SExprNode**)pLeft)->aliasName, (*(const SExprNode**)pRight)->aliasName)) { + // return 1; + // } return nodesEqualNode(*(const SNode**)pLeft, *(const SNode**)pRight) ? 0 : 1; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index a6d010a814..3b94bae05c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3108,12 +3108,6 @@ static int32_t rewriteProjectAlias(SNodeList* pProjectionList) { if ('\0' == pExpr->userAlias[0]) { strcpy(pExpr->userAlias, pExpr->aliasName); } - if (QUERY_NODE_FUNCTION == nodeType(pProject) && fmIsWindowPseudoColumnFunc(((SFunctionNode*)pProject)->funcId) && - ((0 == strcasecmp("_wstart", pExpr->userAlias) || 0 == strcasecmp("_wend", pExpr->userAlias) || - 0 == strcasecmp("_wduration", pExpr->userAlias)) && - '\0' != pExpr->aliasName[0])) { - continue; - } sprintf(pExpr->aliasName, "#expr_%d", no++); } return TSDB_CODE_SUCCESS; From 28e6c4502122cd5c79d5b111dbfde1d0ee69b424 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 13 Jun 2023 14:41:42 +0800 Subject: [PATCH 12/19] enhance: version not compatible log --- source/client/src/clientMsgHandler.c | 1 + source/dnode/mnode/impl/src/mndProfile.c | 1 + 2 files changed, 2 insertions(+) diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 6d53f2b4c5..d6fdb29b59 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -77,6 +77,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { } if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) { + tscError("version not compatible. client version: %s, server version: %s", version, connectRsp.sVer); setErrno(pRequest, code); tsem_post(&pRequest->body.rspSem); goto End; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index a1d815189c..aeec7eac00 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -227,6 +227,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { } if ((code = taosCheckVersionCompatibleFromStr(connReq.sVer, version, 3)) != 0) { + mGError("version not compatible. client version: %s, server version: %s", connReq.sVer, version); terrno = code; goto _OVER; } From 13ddd37529157ef54ebf631928c8e23d94ba2ac9 Mon Sep 17 00:00:00 2001 From: slzhou Date: Tue, 13 Jun 2023 14:51:41 +0800 Subject: [PATCH 13/19] fix: fix memory sanitizer error --- source/libs/executor/src/tsort.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 783597df67..3033441aad 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -101,7 +101,11 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; blockDataDestroy(pSource->src.pBlock); + if (pSource->pageIdList) { + taosArrayDestroy(pSource->pageIdList); + } taosMemoryFreeClear(pSource); + cmpParam->pSources[i] = NULL; } cmpParam->numOfSources = 0; @@ -123,9 +127,11 @@ void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *f // release pageIdList if ((*pSource)->pageIdList) { taosArrayDestroy((*pSource)->pageIdList); + (*pSource)->pageIdList = NULL; } if ((*pSource)->param && !(*pSource)->onlyRef) { taosMemoryFree((*pSource)->param); + (*pSource)->param = NULL; } if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) { From c28d7d9bf863e6dbbc6e35b96182a2c9bcdf9400 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 14 Jun 2023 12:01:54 +0800 Subject: [PATCH 14/19] fix:open info log in tmq client & modify parametes in show consumers --- include/common/tmsg.h | 3 - source/client/src/clientTmq.c | 109 +++++++++--------- source/common/src/systable.c | 6 +- source/dnode/mnode/impl/inc/mndDef.h | 1 - source/dnode/mnode/impl/src/mndConsumer.c | 24 ++-- source/dnode/mnode/impl/src/mndDef.c | 2 - tests/system-test/2-query/odbc.py | 4 +- .../system-test/7-tmq/checkOffsetRowParams.py | 4 + 8 files changed, 69 insertions(+), 84 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index aa0a243e68..f42defc788 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2035,7 +2035,6 @@ typedef struct { SArray* topicNames; // SArray int8_t withTbName; - int8_t useSnapshot; int8_t autoCommit; int32_t autoCommitInterval; int8_t resetOffsetCfg; @@ -2055,7 +2054,6 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc } tlen += taosEncodeFixedI8(buf, pReq->withTbName); - tlen += taosEncodeFixedI8(buf, pReq->useSnapshot); tlen += taosEncodeFixedI8(buf, pReq->autoCommit); tlen += taosEncodeFixedI32(buf, pReq->autoCommitInterval); tlen += taosEncodeFixedI8(buf, pReq->resetOffsetCfg); @@ -2079,7 +2077,6 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq } buf = taosDecodeFixedI8(buf, &pReq->withTbName); - buf = taosDecodeFixedI8(buf, &pReq->useSnapshot); buf = taosDecodeFixedI8(buf, &pReq->autoCommit); buf = taosDecodeFixedI32(buf, &pReq->autoCommitInterval); buf = taosDecodeFixedI8(buf, &pReq->resetOffsetCfg); diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 26887e2ade..e7927cd0ae 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -358,7 +358,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } - if (strcasecmp(key, "enable.heartbeat.background") == 0) { +// if (strcasecmp(key, "enable.heartbeat.background") == 0) { // if (strcasecmp(value, "true") == 0) { // conf->hbBgEnable = true; // return TMQ_CONF_OK; @@ -366,10 +366,10 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value // conf->hbBgEnable = false; // return TMQ_CONF_OK; // } else { - tscError("the default value of enable.heartbeat.background is true, can not be seted"); - return TMQ_CONF_INVALID; +// tscError("the default value of enable.heartbeat.background is true, can not be seted"); +// return TMQ_CONF_INVALID; // } - } +// } if (strcasecmp(key, "td.connect.ip") == 0) { conf->ip = taosStrdup(value); @@ -423,30 +423,30 @@ char** tmq_list_to_c_array(const tmq_list_t* list) { return container->pData; } -static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, - int32_t* numOfVgroups) { - int32_t numOfTopics = taosArrayGetSize(pTopicList); - *index = -1; - *numOfVgroups = 0; - - for (int32_t i = 0; i < numOfTopics; ++i) { - SMqClientTopic* pTopic = taosArrayGet(pTopicList, i); - if (strcmp(pTopic->topicName, pName) != 0) { - continue; - } - - *numOfVgroups = taosArrayGetSize(pTopic->vgs); - for (int32_t j = 0; j < (*numOfVgroups); ++j) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (pClientVg->vgId == vgId) { - *index = j; - return pClientVg; - } - } - } - - return NULL; -} +//static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index, +// int32_t* numOfVgroups) { +// int32_t numOfTopics = taosArrayGetSize(pTopicList); +// *index = -1; +// *numOfVgroups = 0; +// +// for (int32_t i = 0; i < numOfTopics; ++i) { +// SMqClientTopic* pTopic = taosArrayGet(pTopicList, i); +// if (strcmp(pTopic->topicName, pName) != 0) { +// continue; +// } +// +// *numOfVgroups = taosArrayGetSize(pTopic->vgs); +// for (int32_t j = 0; j < (*numOfVgroups); ++j) { +// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); +// if (pClientVg->vgId == vgId) { +// *index = j; +// return pClientVg; +// } +// } +// } +// +// return NULL; +//} // Two problems do not need to be addressed here // 1. update to of epset. the response of poll request will automatically handle this problem @@ -573,7 +573,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN char commitBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, + tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64, tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1, totalVgroups, pMsgSendInfo->requestId); @@ -811,7 +811,9 @@ void tmqSendHbReq(void* param, void* tmrId) { offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; offRows->offset = pVg->offsetInfo.committedOffset; - tscDebug("report offset: %d", offRows->offset.type); + char buf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); + tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows); } } tmq->needReportOffsetRows = false; @@ -862,7 +864,7 @@ OVER: static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { if (code != 0) { - tscDebug("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code)); + tscError("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code)); } } @@ -1161,7 +1163,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = 0; - tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); + tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); @@ -1174,7 +1176,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } req.withTbName = tmq->withTbName; - req.useSnapshot = tmq->useSnapshot; req.autoCommit = tmq->autoCommit; req.autoCommitInterval = tmq->autoCommitInterval; req.resetOffsetCfg = tmq->resetOffsetCfg; @@ -1190,7 +1191,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } tNameExtractFullName(&name, topicFName); - tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); + tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); taosArrayPush(req.topicNames, &topicFName); } @@ -1251,7 +1252,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { goto FAIL; } - tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); + tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); taosMsleep(500); } @@ -1478,7 +1479,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN); - tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); + tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); for (int32_t j = 0; j < vgNumGet; j++) { @@ -1531,7 +1532,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) int32_t topicNumGet = taosArrayGetSize(pRsp->topics); char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; - tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", + tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); if (epoch <= tmq->epoch) { return false; @@ -1554,14 +1555,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); if (pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); - tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); + tscInfo("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); char buf[TSDB_OFFSET_LEN]; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset); - tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, + tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows}; @@ -1591,7 +1592,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) atomic_store_8(&tmq->status, flag); atomic_store_32(&tmq->epoch, epoch); - tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); + tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); return set; } @@ -1627,7 +1628,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); if (head->epoch <= epoch) { - tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", + tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", tmq->consumerId, head->epoch, epoch); if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) { @@ -1639,7 +1640,7 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { } } else { - tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, + tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, head->epoch, epoch); } @@ -2067,12 +2068,12 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, + tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout); // in no topic status, delayed task also need to be processed if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { - tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); + tscInfo("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); taosMsleep(500); // sleep for a while return NULL; } @@ -2084,7 +2085,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { return NULL; } - tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt); + tscInfo("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt); taosMsleep(500); } } @@ -2093,7 +2094,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tmqHandleAllDelayedTask(tmq); if (tmqPollImpl(tmq, timeout) < 0) { - tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); + tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); } rspObj = tmqHandleAllRsp(tmq, timeout, false); @@ -2101,7 +2102,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); + tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId); return NULL; } @@ -2109,7 +2110,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; if (elapsedTime > timeout) { - tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, + tscInfo("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } @@ -2142,7 +2143,7 @@ static void displayConsumeStatistics(const tmq_t* pTmq) { } int32_t tmq_consumer_close(tmq_t* tmq) { - tscDebug("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); + tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); displayConsumeStatistics(tmq); if (tmq->status == TMQ_CONSUMER_STATUS__READY) { @@ -2169,7 +2170,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { tmq_list_destroy(lst); } else { - tscWarn("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId); + tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId); } taosRemoveRef(tmqMgmt.rsetId, tmq->refId); @@ -2432,7 +2433,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); - tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); + tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); int64_t transporterId = 0; asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); @@ -2656,7 +2657,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a char offsetFormatBuf[TSDB_OFFSET_LEN]; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); - tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, + tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); } @@ -2693,7 +2694,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a char offsetBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf); + tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; @@ -2772,7 +2773,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); - tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); + tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); if (pInfo == NULL) { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 722092a043..2c8f484a32 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -361,11 +361,7 @@ static const SSysDbTableSchema consumerSchema[] = { {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "msg.with.table.name", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, - {.name = "experimental.snapshot.enable", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, - {.name = "enable.auto.commit", .bytes = 1, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = false}, - {.name = "auto.commit.interval.ms", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "auto.offset.reset", .bytes = TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, + {.name = "parameters", .bytes = 64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, }; static const SSysDbTableSchema offsetSchema[] = { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index f74a593b00..2dcba291cd 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -552,7 +552,6 @@ typedef struct { int64_t rebalanceTime; int8_t withTbName; - int8_t useSnapshot; int8_t autoCommit; int32_t autoCommitInterval; int32_t resetOffsetCfg; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index b5254dbb88..7730ab054d 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -697,7 +697,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); pConsumerNew->withTbName = subscribe.withTbName; - pConsumerNew->useSnapshot = subscribe.useSnapshot; pConsumerNew->autoCommit = subscribe.autoCommit; pConsumerNew->autoCommitInterval = subscribe.autoCommitInterval; pConsumerNew->resetOffsetCfg = subscribe.resetOffsetCfg; @@ -1186,25 +1185,16 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->withTbName, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->useSnapshot, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommit, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->autoCommitInterval, false); - - char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; + char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal pVal = {.type = pConsumer->resetOffsetCfg}; - tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &pVal); - varDataSetLen(buf, strlen(varDataVal(buf))); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); + + char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; + sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%d,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); + varDataSetLen(parasStr, strlen(varDataVal(parasStr))); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)buf, false); + colDataSetVal(pColInfo, numOfRows, (const char *)parasStr, false); numOfRows++; } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 7f79408e8c..2e8a937f07 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -322,7 +322,6 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { } tlen += taosEncodeFixedI8(buf, pConsumer->withTbName); - tlen += taosEncodeFixedI8(buf, pConsumer->useSnapshot); tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit); tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval); tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg); @@ -382,7 +381,6 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s if(sver > 1){ buf = taosDecodeFixedI8(buf, &pConsumer->withTbName); - buf = taosDecodeFixedI8(buf, &pConsumer->useSnapshot); buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit); buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval); buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg); diff --git a/tests/system-test/2-query/odbc.py b/tests/system-test/2-query/odbc.py index b6d2ab2a3f..ea897260d1 100644 --- a/tests/system-test/2-query/odbc.py +++ b/tests/system-test/2-query/odbc.py @@ -22,8 +22,8 @@ class TDTestCase: tdSql.execute("insert into db.ctb using db.stb tags(1) (ts, c1) values (now, 1)") tdSql.query("select count(*) from information_schema.ins_columns") - # enterprise version: 295, community version: 285 - tdSql.checkData(0, 0, 295) + # enterprise version: 291, community version: 281 + tdSql.checkData(0, 0, 291) tdSql.query("select * from information_schema.ins_columns where table_name = 'ntb'") tdSql.checkRows(14) diff --git a/tests/system-test/7-tmq/checkOffsetRowParams.py b/tests/system-test/7-tmq/checkOffsetRowParams.py index 17c80c68bf..8a24148064 100644 --- a/tests/system-test/7-tmq/checkOffsetRowParams.py +++ b/tests/system-test/7-tmq/checkOffsetRowParams.py @@ -243,6 +243,10 @@ class TDTestCase: tdSql.checkData(0, 5, 0) break + tdSql.query("show consumers") + tdSql.checkRows(1) + tdSql.checkData(0, 8, "tbname:1,commit:1,interval:2000,reset:earliest") + time.sleep(2) tdLog.info("start insert data") self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) From 05393d90539282eb3df95c5b97deed2f9caabed2 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 14 Jun 2023 13:05:47 +0800 Subject: [PATCH 15/19] fix: make vmProcessCreateVnodeReq idempotent --- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 814a155cfb..dd880a87c8 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -265,6 +265,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); +#if 0 if (pMgmt->pTfs) { if (tfsDirExistAt(pMgmt->pTfs, path, (SDiskID){0})) { terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST; @@ -278,8 +279,9 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return -1; } } +#endif -if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { + if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { tFreeSCreateVnodeReq(&req); dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr()); code = terrno; From 0fac3f7d845823c0718275a12fef53abe260d0ca Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 14 Jun 2023 19:27:47 +0800 Subject: [PATCH 16/19] fix: add compact split resume stream pause stream to tab key --- tools/shell/src/shellAuto.c | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 19a888fe82..943842947a 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -91,9 +91,14 @@ SWords shellCommands[] = { {"create stream into as select", 0, 0, NULL}, // 26 append sub sql {"create topic as select", 0, 0, NULL}, // 27 append sub sql {"create function as outputtype language ", 0, 0, NULL}, + {"create or replace as outputtype language ", 0, 0, NULL}, {"create aggregate function as outputtype bufsize language ", 0, 0, NULL}, + {"create or replace aggregate function as outputtype bufsize language ", 0, 0, NULL}, {"create user pass sysinfo 0;", 0, 0, NULL}, {"create user pass sysinfo 1;", 0, 0, NULL}, +#ifdef TD_ENTERPRISE + {"compact database ", 0, 0, NULL}, +#endif {"describe ", 0, 0, NULL}, {"delete from where ", 0, 0, NULL}, {"drop database ", 0, 0, NULL}, @@ -117,7 +122,11 @@ SWords shellCommands[] = { {"kill connection ;", 0, 0, NULL}, {"kill query ", 0, 0, NULL}, {"kill transaction ", 0, 0, NULL}, +#ifdef TD_ENTERPRISE {"merge vgroup ", 0, 0, NULL}, +#endif + {"pause stream ;", 0, 0, NULL}, + {"resume stream ;", 0, 0, NULL}, {"reset query cache;", 0, 0, NULL}, {"restore dnode ;", 0, 0, NULL}, {"restore vnode on dnode ;", 0, 0, NULL}, @@ -173,7 +182,9 @@ SWords shellCommands[] = { {"show vgroups;", 0, 0, NULL}, {"show consumers;", 0, 0, NULL}, {"show grants;", 0, 0, NULL}, +#ifdef TD_ENTERPRISE {"split vgroup ", 0, 0, NULL}, +#endif {"insert into values(", 0, 0, NULL}, {"insert into using tags(", 0, 0, NULL}, {"insert into using values(", 0, 0, NULL}, @@ -432,8 +443,6 @@ void showHelp() { kill connection ; \n\ kill query ; \n\ kill transaction ;\n\ - ----- M ----- \n\ - merge vgroup ...\n\ ----- R ----- \n\ reset query cache;\n\ restore dnode ;\n\ @@ -489,14 +498,20 @@ void showHelp() { show vgroups;\n\ show consumers;\n\ show grants;\n\ - split vgroup ...\n\ ----- T ----- \n\ trim database ;\n\ ----- U ----- \n\ use ;"); - printf("\n\n"); +#ifdef TD_ENTERPRISE + printf( + "\n\ + ----- special commands on enterpise version ----- \n\ + compact database ; \n\ + split vgroup ;\n"); +#endif + printf("\n\n"); // define in getDuration() function printf( "\ From 8018cbf63976132318e7d5555333d2962bb7b6b5 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 14 Jun 2023 19:35:55 +0800 Subject: [PATCH 17/19] fix: add blank line --- tools/shell/src/shellAuto.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 943842947a..27f3ae7a82 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -505,10 +505,10 @@ void showHelp() { #ifdef TD_ENTERPRISE printf( - "\n\ + "\n\n\ ----- special commands on enterpise version ----- \n\ compact database ; \n\ - split vgroup ;\n"); + split vgroup ;"); #endif printf("\n\n"); From d7f03678b3ce537956a246796c5c5035a9058d09 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 14 Jun 2023 19:54:32 +0800 Subject: [PATCH 18/19] fix: pause and resume stream add to show help --- tools/shell/src/shellAuto.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 27f3ae7a82..41cdb0f928 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -443,7 +443,10 @@ void showHelp() { kill connection ; \n\ kill query ; \n\ kill transaction ;\n\ + ----- P ----- \n\ + pause stream ;\n\ ----- R ----- \n\ + resume stream ;\n\ reset query cache;\n\ restore dnode ;\n\ restore vnode on dnode ;\n\ From b86fc3a697cc73ba4d0aeb59118f27106ab2804a Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 14 Jun 2023 21:38:25 +0800 Subject: [PATCH 19/19] fix: vnodeRenameVgroupId correctly --- source/dnode/vnode/src/vnd/vnodeOpen.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index b5e7c6875b..0655a46388 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -129,6 +129,12 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p return 0; } +static int32_t vnodeVgroupIdLen(int32_t vgId) { + char tmp[TSDB_FILENAME_LEN]; + sprintf(tmp, "%d", vgId); + return strlen(tmp); +} + int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs) { int32_t ret = tfsRename(pTfs, srcPath, dstPath); if (ret != 0) return ret; @@ -154,8 +160,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr int32_t tsdbFileVgId = atoi(tsdbFilePrefixPos + 6); if (tsdbFileVgId == srcVgId) { - char *tsdbFileSurfixPos = strstr(tsdbFilePrefixPos, "f"); - if (tsdbFileSurfixPos == NULL) continue; + char *tsdbFileSurfixPos = tsdbFilePrefixPos + 6 + vnodeVgroupIdLen(srcVgId); tsdbFilePrefixPos[6] = 0; snprintf(newRname, TSDB_FILENAME_LEN, "%s%d%s", oldRname, dstVgId, tsdbFileSurfixPos);