From 9a8d03f0ca34fefc0562a8c60ffa9d47a4d21c43 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 2 Apr 2024 14:03:02 +0800 Subject: [PATCH 01/27] adj last function --- source/libs/parser/src/parTranslater.c | 4 ++-- tests/script/tsim/parser/first_last_query.sim | 4 ++-- tests/script/tsim/parser/join_multitables.sim | 16 ++++++------- tests/script/tsim/parser/last_cache_query.sim | 4 ++-- tests/script/tsim/parser/lastrow2.sim | 6 ++--- tests/script/tsim/query/bi_star_table.sim | 24 +++++++++---------- tests/script/tsim/query/cache_last.sim | 5 ++-- .../system-test/2-query/last_and_last_row.py | 4 ++-- 8 files changed, 34 insertions(+), 33 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 2110013310..1ea0ec8574 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3368,9 +3368,9 @@ static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrc SNode* pPara = NULL; FOREACH(pPara, pSrcParas) { if (nodesIsStar(pPara)) { - code = createAllColumns(pCxt, true, &pExprs); + code = createAllColumns(pCxt, false, &pExprs); } else if (nodesIsTableStar(pPara)) { - code = createTableAllCols(pCxt, (SColumnNode*)pPara, true, &pExprs); + code = createTableAllCols(pCxt, (SColumnNode*)pPara, false, &pExprs); } else { code = nodesListMakeStrictAppend(&pExprs, nodesCloneNode(pPara)); } diff --git a/tests/script/tsim/parser/first_last_query.sim b/tests/script/tsim/parser/first_last_query.sim index 533f59f4b2..8d02f817e3 100644 --- a/tests/script/tsim/parser/first_last_query.sim +++ b/tests/script/tsim/parser/first_last_query.sim @@ -298,7 +298,7 @@ if $data01 != 112 then return -1 endi -if $data02 != @tm0@ then +if $data03 != @tm0@ then return -1 endi @@ -310,7 +310,7 @@ if $data11 != 421 then return -1 endi -if $data12 != @tm1@ then +if $data13 != @tm1@ then return -1 endi diff --git a/tests/script/tsim/parser/join_multitables.sim b/tests/script/tsim/parser/join_multitables.sim index d2f8ea4a88..59e41bc9d9 100644 --- a/tests/script/tsim/parser/join_multitables.sim +++ b/tests/script/tsim/parser/join_multitables.sim @@ -805,16 +805,16 @@ endi if $data04 != 01 then return -1 endi -if $data05 != @21-03-01 01:00:00.000@ then +if $data[0][10] != @21-03-01 01:00:00.000@ then return -1 endi -if $data06 != 9911 then +if $data[0][11] != 9911 then return -1 endi -if $data07 != 9911.000000000 then +if $data[0][12] != 9911.000000000 then return -1 endi -if $data08 != 11 then +if $data[0][13] != 11 then return -1 endi @@ -837,16 +837,16 @@ endi if $data04 != 05 then return -1 endi -if $data05 != @21-03-01 05:00:00.000@ then +if $data[0][10] != @21-03-01 05:00:00.000@ then return -1 endi -if $data06 != 9915 then +if $data[0][11] != 9915 then return -1 endi -if $data07 != 9915.000000000 then +if $data[0][12] != 9915.000000000 then return -1 endi -if $data08 != 15 then +if $data[0][13] != 15 then return -1 endi diff --git a/tests/script/tsim/parser/last_cache_query.sim b/tests/script/tsim/parser/last_cache_query.sim index 30196e0b62..7bf6a51731 100644 --- a/tests/script/tsim/parser/last_cache_query.sim +++ b/tests/script/tsim/parser/last_cache_query.sim @@ -243,11 +243,11 @@ endi if $data04 != @70-01-01 07:59:57.000@ then return -1 endi -if $data05 != @21-05-12 10:10:12.000@ then +if $data06 != @21-05-12 10:10:12.000@ then print $data00 return -1 endi -if $data06 != @70-01-01 07:59:57.000@ then +if $data07 != @70-01-01 07:59:57.000@ then return -1 endi diff --git a/tests/script/tsim/parser/lastrow2.sim b/tests/script/tsim/parser/lastrow2.sim index 33267e3cfd..278de7ab49 100644 --- a/tests/script/tsim/parser/lastrow2.sim +++ b/tests/script/tsim/parser/lastrow2.sim @@ -54,13 +54,13 @@ sql select last_row(*), ts, 'abc', 123.981, tbname from m1 if $rows != 1 then return -1 endi -if $data02 != @19-01-01 01:01:01.000@ then +if $data03 != @19-01-01 01:01:01.000@ then return -1 endi -if $data03 != @abc@ then +if $data04 != @abc@ then return -1 endi -if $data04 != 123.981000000 then +if $data05 != 123.981000000 then print expect 123.981000000, actual: $data04 return -1 endi diff --git a/tests/script/tsim/query/bi_star_table.sim b/tests/script/tsim/query/bi_star_table.sim index 6bd6938678..1d71d6a68e 100644 --- a/tests/script/tsim/query/bi_star_table.sim +++ b/tests/script/tsim/query/bi_star_table.sim @@ -33,29 +33,29 @@ if $data06 != tba1 then endi sql select last(*) from db1.sta; -if $cols != 4 then +if $cols != 7 then return -1 endi -if $data03 != tba2 then +if $data06 != tba2 then return -1 endi sql select last_row(*) from db1.sta; -if $cols != 4 then +if $cols != 7 then return -1 endi -if $data03 != tba2 then +if $data06 != tba2 then return -1 endi sql select first(*) from db1.sta; -if $cols != 4 then +if $cols != 7 then return -1 endi -if $data03 != tba1 then +if $data06 != tba1 then return -1 endi @@ -71,29 +71,29 @@ if $data06 != tba1 then endi sql select last(b.*) from db1.sta b; -if $cols != 4 then +if $cols != 7 then return -1 endi -if $data03 != tba2 then +if $data06 != tba2 then return -1 endi sql select last_row(b.*) from db1.sta b; -if $cols != 4 then +if $cols != 7 then return -1 endi -if $data03 != tba2 then +if $data06 != tba2 then return -1 endi sql select first(b.*) from db1.sta b; -if $cols != 4 then +if $cols != 7 then return -1 endi -if $data03 != tba1 then +if $data06 != tba1 then return -1 endi diff --git a/tests/script/tsim/query/cache_last.sim b/tests/script/tsim/query/cache_last.sim index 65eb46de69..f936f822a5 100644 --- a/tests/script/tsim/query/cache_last.sim +++ b/tests/script/tsim/query/cache_last.sim @@ -35,11 +35,12 @@ if $data03 != b then return -1 endi sql explain select count(*), last(*) from sta; -if $data00 != @-> Merge (columns=4 width=226 input_order=unknown output_order=unknown mode=column)@ then +if $data00 != @-> Merge (columns=5 width=230 input_order=unknown output_order=unknown mode=column)@ then + print $data00 return -1 endi sql explain select first(f1), last(*) from sta; -if $data00 != @-> Merge (columns=4 width=226 input_order=unknown output_order=unknown mode=column)@ then +if $data00 != @-> Merge (columns=5 width=230 input_order=unknown output_order=unknown mode=column)@ then return -1 endi sql select first(f1), last(*) from sta; diff --git a/tests/system-test/2-query/last_and_last_row.py b/tests/system-test/2-query/last_and_last_row.py index b04b3a75f3..ce25c3dd99 100644 --- a/tests/system-test/2-query/last_and_last_row.py +++ b/tests/system-test/2-query/last_and_last_row.py @@ -152,8 +152,8 @@ class TDTestCase: tdSql.checkRows(1) tdSql.checkData(0, 0, last_ts4) tdSql.checkData(0, 1, 5 * maxRange - 1) - tdSql.checkData(0, 2, last_ts4) - tdSql.checkData(0, 3, 4 * maxRange + 1) + tdSql.checkData(0, 3, last_ts4) + tdSql.checkData(0, 4, 4 * maxRange + 1) explain_res = self.explain_sql(sql) self.check_explain_res_no_row("Last Row Scan", explain_res, sql) From 71ff0bcb843601ab6df73d97330925697197c1a5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 2 Apr 2024 17:24:47 +0800 Subject: [PATCH 02/27] adj ci --- tests/script/tsim/parser/last_both_query.sim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/script/tsim/parser/last_both_query.sim b/tests/script/tsim/parser/last_both_query.sim index 5f86412199..1cfb2a316f 100644 --- a/tests/script/tsim/parser/last_both_query.sim +++ b/tests/script/tsim/parser/last_both_query.sim @@ -243,11 +243,11 @@ endi if $data04 != @70-01-01 07:59:57.000@ then return -1 endi -if $data05 != @21-05-12 10:10:12.000@ then +if $data06 != @21-05-12 10:10:12.000@ then print $data00 return -1 endi -if $data06 != @70-01-01 07:59:57.000@ then +if $data07 != @70-01-01 07:59:57.000@ then return -1 endi From e0a5b2dd9df895a25053cf64ea04e018c8c7e07c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 8 Apr 2024 08:55:13 +0800 Subject: [PATCH 03/27] split last tag --- source/libs/planner/src/planOptimizer.c | 12 +++++++++--- tests/system-test/2-query/last_and_last_row.py | 12 ++++++------ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 40e1d12a13..d322174fd0 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2641,7 +2641,8 @@ static bool isNeedSplitCacheLastFunc(SFunctionNode* pFunc, SScanLogicNode* pScan int32_t funcType = pFunc->funcType; if ((FUNCTION_TYPE_LAST_ROW != funcType || (FUNCTION_TYPE_LAST_ROW == funcType && TSDB_CACHE_MODEL_LAST_VALUE == pScan->cacheLastMode)) && (FUNCTION_TYPE_LAST != funcType || (FUNCTION_TYPE_LAST == funcType && (TSDB_CACHE_MODEL_LAST_ROW == pScan->cacheLastMode || - QUERY_NODE_OPERATOR == nodeType(nodesListGetNode(pFunc->pParameterList, 0)) || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pFunc->pParameterList, 0))))) && + QUERY_NODE_OPERATOR == nodeType(nodesListGetNode(pFunc->pParameterList, 0)) || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pFunc->pParameterList, 0)) || + COLUMN_TYPE_COLUMN != ((SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0))->colType))) && FUNCTION_TYPE_SELECT_VALUE != funcType && FUNCTION_TYPE_GROUP_KEY != funcType) { return true; } @@ -2661,8 +2662,9 @@ static bool lastRowScanOptCheckFuncList(SLogicNode* pNode, int8_t cacheLastModel if (FUNCTION_TYPE_LAST == pAggFunc->funcType) { if (QUERY_NODE_COLUMN == nodeType(pParam)) { SColumnNode* pCol = (SColumnNode*)pParam; - if (pCol->colType != COLUMN_TYPE_COLUMN) { - return false; + if (pCol->colType != COLUMN_TYPE_COLUMN && TSDB_CACHE_MODEL_LAST_ROW != cacheLastModel) { + needSplitFuncCount++; + *hasOtherFunc = true; } if (lastColId != pCol->colId) { lastColId = pCol->colId; @@ -3070,6 +3072,10 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, if (TSDB_CODE_SUCCESS != code) { return code; } + code = nodesCollectColumnsFromNode((SNode*)list, NULL, COLLECT_COL_TYPE_TAG, &pScan->pScanPseudoCols); + if (TSDB_CODE_SUCCESS != code) { + return code; + } nodesFree(list); bool found = false; FOREACH(pNode, pScan->pScanCols) { diff --git a/tests/system-test/2-query/last_and_last_row.py b/tests/system-test/2-query/last_and_last_row.py index ce25c3dd99..cd572b05cb 100644 --- a/tests/system-test/2-query/last_and_last_row.py +++ b/tests/system-test/2-query/last_and_last_row.py @@ -302,8 +302,8 @@ class TDTestCase: tdSql.checkRows(1) tdSql.checkData(0, 0, last_ts4) tdSql.checkData(0, 1, 5 * maxRange - 1) - tdSql.checkData(0, 2, last_ts4) - tdSql.checkData(0, 3, 4 * maxRange + 1) + tdSql.checkData(0, 3, last_ts4) + tdSql.checkData(0, 4, 4 * maxRange + 1) explain_res = self.explain_sql(sql) self.check_explain_res_has_row("Last Row Scan", explain_res, sql) @@ -453,8 +453,8 @@ class TDTestCase: tdSql.checkRows(1) tdSql.checkData(0, 0, last_ts4) tdSql.checkData(0, 1, 5 * maxRange - 1) - tdSql.checkData(0, 2, last_ts4) - tdSql.checkData(0, 3, 4 * maxRange + 1) + tdSql.checkData(0, 3, last_ts4) + tdSql.checkData(0, 4, 4 * maxRange + 1) explain_res = self.explain_sql(sql) self.check_explain_res_has_row("Last Row Scan", explain_res, sql) @@ -587,7 +587,7 @@ class TDTestCase: explain_res = self.explain_sql(sql) self.check_explain_res_has_row("Last Row Scan", explain_res, sql) - self.check_explain_res_no_row("Table Scan", explain_res, sql) + self.check_explain_res_has_row("Table Scan", explain_res, sql) sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_both_model.st;' tdSql.query(sql) @@ -607,7 +607,7 @@ class TDTestCase: tdSql.checkData(0, 0, last_ts4) tdSql.checkData(0, 1, 5 * maxRange - 1) #tdSql.checkData(0, 2, last_ts4) - tdSql.checkData(0, 3, 4 * maxRange + 1) + tdSql.checkData(0, 4, 4 * maxRange + 1) explain_res = self.explain_sql(sql) self.check_explain_res_has_row("Last Row Scan", explain_res, sql) From 0f6e87b3c931c7af219c5099f68679d90ae2798c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 9 Apr 2024 14:53:50 +0800 Subject: [PATCH 04/27] add tag for last row --- source/libs/planner/src/planOptimizer.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index d322174fd0..0cc6a5057d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2943,6 +2943,13 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic } } } + FOREACH(pColNode, pScan->pScanPseudoCols) { + if (nodesEqualNode(pParamNode, pColNode)) { + if (funcType != FUNCTION_TYPE_LAST) { + nodesListMakeAppend(&pLastRowCols, nodesCloneNode(pColNode)); + } + } + } } } From 20efb35c6edcbf9a8066157bc0680472d5a625de Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 9 Apr 2024 19:09:20 +0800 Subject: [PATCH 05/27] add tag for split plan --- source/libs/planner/src/planOptimizer.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 0cc6a5057d..1eaa37cc03 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3104,6 +3104,10 @@ static int32_t splitCacheLastFuncOptCreateAggLogicNode(SAggLogicNode** pNewAgg, if (TSDB_CODE_SUCCESS != code) { return code; } + code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets); + if (TSDB_CODE_SUCCESS != code) { + return code; + } OPTIMIZE_FLAG_CLEAR_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_SCAN_PATH); } From f55ac240d5bc61bcc3d4c3e0390ba728e9be7234 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 11 Apr 2024 11:45:44 +0800 Subject: [PATCH 06/27] tbname supported --- source/libs/parser/src/parTranslater.c | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3a37d841aa..8910325912 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2464,6 +2464,20 @@ static int32_t translateFunctionImpl(STranslateContext* pCxt, SFunctionNode** pF static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc) { SNode* pParam = NULL; + if (strcmp((*pFunc)->functionName, "tbname") == 0 && (*pFunc)->pParameterList != NULL) { + pParam = nodesListGetNode((*pFunc)->pParameterList, 0); + if(pParam && nodeType(pParam) == QUERY_NODE_VALUE) { + if (pCxt && pCxt->pCurrStmt && pCxt->pCurrStmt->type == QUERY_NODE_SELECT_STMT && + ((SSelectStmt*)pCxt->pCurrStmt)->pFromTable && + nodeType(((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) == QUERY_NODE_REAL_TABLE) { + SRealTableNode* pRealTable = (SRealTableNode*)((SSelectStmt*)pCxt->pCurrStmt)->pFromTable; + if (strcmp(((SValueNode*)pParam)->literal, pRealTable->table.tableName) == 0) { + nodesClearList((*pFunc)->pParameterList); + (*pFunc)->pParameterList = NULL; + } + } + } + } FOREACH(pParam, (*pFunc)->pParameterList) { if (isMultiResFunc(pParam)) { pCxt->errCode = TSDB_CODE_FUNC_FUNTION_PARA_NUM; @@ -2822,7 +2836,9 @@ static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) { return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode); } } - if (partionByTbname && QUERY_NODE_COLUMN == nodeType(*pNode) && ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) { + if (partionByTbname && + ((QUERY_NODE_COLUMN == nodeType(*pNode) && ((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) || + (QUERY_NODE_FUNCTION == nodeType(*pNode) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)*pNode)->funcType))) { return rewriteExprToGroupKeyFunc(pCxt->pTranslateCxt, pNode); } if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { From af97f9e0007938bd94db517331c6ed7496a25d4d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 11 Apr 2024 16:48:58 +0800 Subject: [PATCH 07/27] add config --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 8 ++++++- source/libs/parser/src/parTranslater.c | 4 ++-- tests/script/tsim/parser/first_last_query.sim | 4 ++-- tests/script/tsim/parser/join_multitables.sim | 16 ++++++------- tests/script/tsim/parser/last_both_query.sim | 4 ++-- tests/script/tsim/parser/last_cache_query.sim | 4 ++-- tests/script/tsim/parser/lastrow2.sim | 6 ++--- tests/script/tsim/query/bi_star_table.sim | 24 +++++++++---------- tests/script/tsim/query/cache_last.sim | 5 ++-- .../system-test/2-query/last_and_last_row.py | 16 ++++++------- 11 files changed, 49 insertions(+), 43 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 7a7f19c3af..e04488a68c 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -158,6 +158,7 @@ extern int32_t tsMetaCacheMaxSize; extern int32_t tsSlowLogThreshold; extern int32_t tsSlowLogScope; extern int32_t tsTimeSeriesThreshold; +extern bool tsMultiResultFunctionStarReturnTags; // client extern int32_t tsMinSlidingTime; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3381d52050..dae86f1a32 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -170,6 +170,7 @@ int32_t tsMetaCacheMaxSize = -1; // MB int32_t tsSlowLogThreshold = 3; // seconds int32_t tsSlowLogScope = SLOW_LOG_TYPE_ALL; int32_t tsTimeSeriesThreshold = 50; +bool tsMultiResultFunctionStarReturnTags = false; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -545,6 +546,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; + + if (cfgAddBool(pCfg, "multiResultFunctionStarReturnTags", tsMultiResultFunctionStarReturnTags, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1; return 0; } @@ -1103,6 +1106,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsKeepAliveIdle = cfgGetItem(pCfg, "keepAliveIdle")->i32; tsExperimental = cfgGetItem(pCfg, "experimental")->bval; + + tsMultiResultFunctionStarReturnTags = cfgGetItem(pCfg, "multiResultFunctionStarReturnTags")->bval; return 0; } @@ -1742,7 +1747,8 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, char *name) { {"shellActivityTimer", &tsShellActivityTimer}, {"slowLogThreshold", &tsSlowLogThreshold}, {"useAdapter", &tsUseAdapter}, - {"experimental", &tsExperimental}}; + {"experimental", &tsExperimental}, + {"multiResultFunctionStarReturnTags", &tsMultiResultFunctionStarReturnTags} }; if (taosCfgSetOption(debugOptions, tListLen(debugOptions), pItem, true) != 0) { taosCfgSetOption(options, tListLen(options), pItem, false); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1ea0ec8574..778c9b1590 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3368,9 +3368,9 @@ static int32_t createMultiResFuncsParas(STranslateContext* pCxt, SNodeList* pSrc SNode* pPara = NULL; FOREACH(pPara, pSrcParas) { if (nodesIsStar(pPara)) { - code = createAllColumns(pCxt, false, &pExprs); + code = createAllColumns(pCxt, !tsMultiResultFunctionStarReturnTags, &pExprs); } else if (nodesIsTableStar(pPara)) { - code = createTableAllCols(pCxt, (SColumnNode*)pPara, false, &pExprs); + code = createTableAllCols(pCxt, (SColumnNode*)pPara, !tsMultiResultFunctionStarReturnTags, &pExprs); } else { code = nodesListMakeStrictAppend(&pExprs, nodesCloneNode(pPara)); } diff --git a/tests/script/tsim/parser/first_last_query.sim b/tests/script/tsim/parser/first_last_query.sim index 8d02f817e3..533f59f4b2 100644 --- a/tests/script/tsim/parser/first_last_query.sim +++ b/tests/script/tsim/parser/first_last_query.sim @@ -298,7 +298,7 @@ if $data01 != 112 then return -1 endi -if $data03 != @tm0@ then +if $data02 != @tm0@ then return -1 endi @@ -310,7 +310,7 @@ if $data11 != 421 then return -1 endi -if $data13 != @tm1@ then +if $data12 != @tm1@ then return -1 endi diff --git a/tests/script/tsim/parser/join_multitables.sim b/tests/script/tsim/parser/join_multitables.sim index 59e41bc9d9..d2f8ea4a88 100644 --- a/tests/script/tsim/parser/join_multitables.sim +++ b/tests/script/tsim/parser/join_multitables.sim @@ -805,16 +805,16 @@ endi if $data04 != 01 then return -1 endi -if $data[0][10] != @21-03-01 01:00:00.000@ then +if $data05 != @21-03-01 01:00:00.000@ then return -1 endi -if $data[0][11] != 9911 then +if $data06 != 9911 then return -1 endi -if $data[0][12] != 9911.000000000 then +if $data07 != 9911.000000000 then return -1 endi -if $data[0][13] != 11 then +if $data08 != 11 then return -1 endi @@ -837,16 +837,16 @@ endi if $data04 != 05 then return -1 endi -if $data[0][10] != @21-03-01 05:00:00.000@ then +if $data05 != @21-03-01 05:00:00.000@ then return -1 endi -if $data[0][11] != 9915 then +if $data06 != 9915 then return -1 endi -if $data[0][12] != 9915.000000000 then +if $data07 != 9915.000000000 then return -1 endi -if $data[0][13] != 15 then +if $data08 != 15 then return -1 endi diff --git a/tests/script/tsim/parser/last_both_query.sim b/tests/script/tsim/parser/last_both_query.sim index 1cfb2a316f..5f86412199 100644 --- a/tests/script/tsim/parser/last_both_query.sim +++ b/tests/script/tsim/parser/last_both_query.sim @@ -243,11 +243,11 @@ endi if $data04 != @70-01-01 07:59:57.000@ then return -1 endi -if $data06 != @21-05-12 10:10:12.000@ then +if $data05 != @21-05-12 10:10:12.000@ then print $data00 return -1 endi -if $data07 != @70-01-01 07:59:57.000@ then +if $data06 != @70-01-01 07:59:57.000@ then return -1 endi diff --git a/tests/script/tsim/parser/last_cache_query.sim b/tests/script/tsim/parser/last_cache_query.sim index 7bf6a51731..30196e0b62 100644 --- a/tests/script/tsim/parser/last_cache_query.sim +++ b/tests/script/tsim/parser/last_cache_query.sim @@ -243,11 +243,11 @@ endi if $data04 != @70-01-01 07:59:57.000@ then return -1 endi -if $data06 != @21-05-12 10:10:12.000@ then +if $data05 != @21-05-12 10:10:12.000@ then print $data00 return -1 endi -if $data07 != @70-01-01 07:59:57.000@ then +if $data06 != @70-01-01 07:59:57.000@ then return -1 endi diff --git a/tests/script/tsim/parser/lastrow2.sim b/tests/script/tsim/parser/lastrow2.sim index 278de7ab49..33267e3cfd 100644 --- a/tests/script/tsim/parser/lastrow2.sim +++ b/tests/script/tsim/parser/lastrow2.sim @@ -54,13 +54,13 @@ sql select last_row(*), ts, 'abc', 123.981, tbname from m1 if $rows != 1 then return -1 endi -if $data03 != @19-01-01 01:01:01.000@ then +if $data02 != @19-01-01 01:01:01.000@ then return -1 endi -if $data04 != @abc@ then +if $data03 != @abc@ then return -1 endi -if $data05 != 123.981000000 then +if $data04 != 123.981000000 then print expect 123.981000000, actual: $data04 return -1 endi diff --git a/tests/script/tsim/query/bi_star_table.sim b/tests/script/tsim/query/bi_star_table.sim index 1d71d6a68e..6bd6938678 100644 --- a/tests/script/tsim/query/bi_star_table.sim +++ b/tests/script/tsim/query/bi_star_table.sim @@ -33,29 +33,29 @@ if $data06 != tba1 then endi sql select last(*) from db1.sta; -if $cols != 7 then +if $cols != 4 then return -1 endi -if $data06 != tba2 then +if $data03 != tba2 then return -1 endi sql select last_row(*) from db1.sta; -if $cols != 7 then +if $cols != 4 then return -1 endi -if $data06 != tba2 then +if $data03 != tba2 then return -1 endi sql select first(*) from db1.sta; -if $cols != 7 then +if $cols != 4 then return -1 endi -if $data06 != tba1 then +if $data03 != tba1 then return -1 endi @@ -71,29 +71,29 @@ if $data06 != tba1 then endi sql select last(b.*) from db1.sta b; -if $cols != 7 then +if $cols != 4 then return -1 endi -if $data06 != tba2 then +if $data03 != tba2 then return -1 endi sql select last_row(b.*) from db1.sta b; -if $cols != 7 then +if $cols != 4 then return -1 endi -if $data06 != tba2 then +if $data03 != tba2 then return -1 endi sql select first(b.*) from db1.sta b; -if $cols != 7 then +if $cols != 4 then return -1 endi -if $data06 != tba1 then +if $data03 != tba1 then return -1 endi diff --git a/tests/script/tsim/query/cache_last.sim b/tests/script/tsim/query/cache_last.sim index f936f822a5..65eb46de69 100644 --- a/tests/script/tsim/query/cache_last.sim +++ b/tests/script/tsim/query/cache_last.sim @@ -35,12 +35,11 @@ if $data03 != b then return -1 endi sql explain select count(*), last(*) from sta; -if $data00 != @-> Merge (columns=5 width=230 input_order=unknown output_order=unknown mode=column)@ then - print $data00 +if $data00 != @-> Merge (columns=4 width=226 input_order=unknown output_order=unknown mode=column)@ then return -1 endi sql explain select first(f1), last(*) from sta; -if $data00 != @-> Merge (columns=5 width=230 input_order=unknown output_order=unknown mode=column)@ then +if $data00 != @-> Merge (columns=4 width=226 input_order=unknown output_order=unknown mode=column)@ then return -1 endi sql select first(f1), last(*) from sta; diff --git a/tests/system-test/2-query/last_and_last_row.py b/tests/system-test/2-query/last_and_last_row.py index cd572b05cb..b04b3a75f3 100644 --- a/tests/system-test/2-query/last_and_last_row.py +++ b/tests/system-test/2-query/last_and_last_row.py @@ -152,8 +152,8 @@ class TDTestCase: tdSql.checkRows(1) tdSql.checkData(0, 0, last_ts4) tdSql.checkData(0, 1, 5 * maxRange - 1) - tdSql.checkData(0, 3, last_ts4) - tdSql.checkData(0, 4, 4 * maxRange + 1) + tdSql.checkData(0, 2, last_ts4) + tdSql.checkData(0, 3, 4 * maxRange + 1) explain_res = self.explain_sql(sql) self.check_explain_res_no_row("Last Row Scan", explain_res, sql) @@ -302,8 +302,8 @@ class TDTestCase: tdSql.checkRows(1) tdSql.checkData(0, 0, last_ts4) tdSql.checkData(0, 1, 5 * maxRange - 1) - tdSql.checkData(0, 3, last_ts4) - tdSql.checkData(0, 4, 4 * maxRange + 1) + tdSql.checkData(0, 2, last_ts4) + tdSql.checkData(0, 3, 4 * maxRange + 1) explain_res = self.explain_sql(sql) self.check_explain_res_has_row("Last Row Scan", explain_res, sql) @@ -453,8 +453,8 @@ class TDTestCase: tdSql.checkRows(1) tdSql.checkData(0, 0, last_ts4) tdSql.checkData(0, 1, 5 * maxRange - 1) - tdSql.checkData(0, 3, last_ts4) - tdSql.checkData(0, 4, 4 * maxRange + 1) + tdSql.checkData(0, 2, last_ts4) + tdSql.checkData(0, 3, 4 * maxRange + 1) explain_res = self.explain_sql(sql) self.check_explain_res_has_row("Last Row Scan", explain_res, sql) @@ -587,7 +587,7 @@ class TDTestCase: explain_res = self.explain_sql(sql) self.check_explain_res_has_row("Last Row Scan", explain_res, sql) - self.check_explain_res_has_row("Table Scan", explain_res, sql) + self.check_explain_res_no_row("Table Scan", explain_res, sql) sql = f'select last_row(ts), last(ts), last_row(id), last(id) from last_test_both_model.st;' tdSql.query(sql) @@ -607,7 +607,7 @@ class TDTestCase: tdSql.checkData(0, 0, last_ts4) tdSql.checkData(0, 1, 5 * maxRange - 1) #tdSql.checkData(0, 2, last_ts4) - tdSql.checkData(0, 4, 4 * maxRange + 1) + tdSql.checkData(0, 3, 4 * maxRange + 1) explain_res = self.explain_sql(sql) self.check_explain_res_has_row("Last Row Scan", explain_res, sql) From deff7a76b1f8b9cf397b43bccd8eb854f122ce55 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Thu, 11 Apr 2024 18:07:10 +0800 Subject: [PATCH 08/27] add test case --- source/libs/parser/src/parTranslater.c | 15 +- tests/parallel_test/cases.task | 4 + tests/system-test/2-query/tbname.py | 194 +++++++++++++++++++++++++ 3 files changed, 207 insertions(+), 6 deletions(-) create mode 100644 tests/system-test/2-query/tbname.py diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8910325912..32503203e5 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2472,7 +2472,7 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode** pFunc nodeType(((SSelectStmt*)pCxt->pCurrStmt)->pFromTable) == QUERY_NODE_REAL_TABLE) { SRealTableNode* pRealTable = (SRealTableNode*)((SSelectStmt*)pCxt->pCurrStmt)->pFromTable; if (strcmp(((SValueNode*)pParam)->literal, pRealTable->table.tableName) == 0) { - nodesClearList((*pFunc)->pParameterList); + NODES_DESTORY_LIST((*pFunc)->pParameterList); (*pFunc)->pParameterList = NULL; } } @@ -2724,11 +2724,14 @@ static bool hasTbnameFunction(SNodeList* pPartitionByList) { return false; } -static bool fromSubtable(SNode* table) { +static bool fromSingleTable(SNode* table) { if (NULL == table) return false; - if (table->type == QUERY_NODE_REAL_TABLE && ((SRealTableNode*)table)->pMeta && - ((SRealTableNode*)table)->pMeta->tableType == TSDB_CHILD_TABLE) { - return true; + if (table->type == QUERY_NODE_REAL_TABLE && ((SRealTableNode*)table)->pMeta) { + int8_t type = ((SRealTableNode*)table)->pMeta->tableType; + if(type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE + || type == TSDB_SYSTEM_TABLE) { + return true; + } } return false; } @@ -2827,7 +2830,7 @@ static EDealRes doCheckAggColCoexist(SNode** pNode, void* pContext) { } SNode* pPartKey = NULL; bool partionByTbname = false; - if (fromSubtable(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pFromTable) || + if (fromSingleTable(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pFromTable) || hasTbnameFunction(((SSelectStmt*)pCxt->pTranslateCxt->pCurrStmt)->pPartitionByList)) { partionByTbname = true; } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 2402f6a94a..43d7c2b765 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -105,6 +105,10 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_cache_scan.py -Q 4 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname.py -Q 2 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py diff --git a/tests/system-test/2-query/tbname.py b/tests/system-test/2-query/tbname.py new file mode 100644 index 0000000000..bb2bb43d14 --- /dev/null +++ b/tests/system-test/2-query/tbname.py @@ -0,0 +1,194 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def td29092(self, dbname="db"): + tdSql.execute(f'use {dbname}') + tdSql.execute('CREATE STABLE `st` (`ts` TIMESTAMP, `v1` INT) TAGS (`t1` INT);') + tdSql.execute('CREATE STABLE `st2` (`ts` TIMESTAMP, `v1` INT) TAGS (`t1` INT);') + tdSql.execute('CREATE TABLE `t1` USING `st` (`t1`) TAGS (1);') + tdSql.execute('CREATE TABLE `t2` USING `st` (`t1`) TAGS (2);') + tdSql.execute('CREATE TABLE `t21` USING `st2` (`t1`) TAGS (21);') + tdSql.execute('CREATE TABLE `nt` (`ts` TIMESTAMP, `v1` INT);') + + now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) + for i in range(3): + tdSql.execute( + f"insert into {dbname}.t1 values ( { now_time + i * 1000 }, {i} )" + ) + tdSql.execute( + f"insert into {dbname}.t2 values ( { now_time + i * 1000 }, {i} )" + ) + tdSql.execute( + f"insert into {dbname}.nt values ( { now_time + i * 1000 }, {i} )" + ) + + tdLog.debug(f"-------------- step1: normal table test ------------------") + tdSql.query("select tbname, count(*) from nt;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, "nt") + tdSql.checkData(0, 1, 3) + + tdSql.query("select nt.tbname, count(*) from nt;") + tdSql.checkRows(1) + tdSql.checkData(0, 0, "nt") + tdSql.checkData(0, 1, 3) + + tdSql.query("select tbname, count(*) from nt group by tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 0, "nt") + tdSql.checkData(0, 1, 3) + + tdSql.query("select nt.tbname, count(*) from nt group by tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 0, "nt") + tdSql.checkData(0, 1, 3) + + tdSql.query("select nt.tbname, count(*) from nt group by nt.tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 0, "nt") + tdSql.checkData(0, 1, 3) + + tdLog.debug(f"-------------- step2: system table test ------------------") + tdSql.query("select tbname, count(*) from information_schema.ins_dnodes") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 1) + + tdSql.query("select ins_dnodes.tbname, count(*) from information_schema.ins_dnodes") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 1) + + tdSql.query("select tbname, count(*) from information_schema.ins_dnodes group by tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 1) + + tdSql.query("select ins_dnodes.tbname, count(*) from information_schema.ins_dnodes group by tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 1) + + tdSql.query("select ins_dnodes.tbname, count(*) from information_schema.ins_dnodes group by ins_dnodes.tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 1) + + tdLog.debug(f"-------------- step3: subtable test ------------------") + tdSql.query("select tbname, count(*) from t1") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 3) + + tdSql.query("select t1.tbname, count(*) from t1") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 3) + + tdSql.query("select tbname, count(*) from t1 group by tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 3) + + tdSql.query("select t1.tbname, count(*) from t1 group by tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 3) + + tdSql.query("select t1.tbname, count(*) from t1 group by t1.tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 3) + + tdSql.error("select t1.tbname, count(*) from t2 group by t1.tbname") + tdSql.error("select t1.tbname, count(*) from t1 group by t2.tbname") + tdSql.error("select t2.tbname, count(*) from t1 group by t1.tbname") + + tdLog.debug(f"-------------- step4: super table test ------------------") + tdSql.query("select tbname, count(*) from st group by tbname") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 3) + tdSql.checkData(1, 1, 3) + + tdSql.query("select tbname, count(*) from st partition by tbname") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 3) + tdSql.checkData(1, 1, 3) + + tdSql.query("select ts, t1 from st where st.tbname=\"t1\"") + tdSql.checkRows(3) + tdSql.checkData(0, 1, 1) + tdSql.checkData(1, 1, 1) + tdSql.checkData(2, 1, 1) + + tdSql.query("select tbname, ts from st where tbname=\"t2\"") + tdSql.checkRows(3) + + tdSql.query("select tbname, ts from st where tbname=\"t2\" order by tbname") + tdSql.checkRows(3) + + tdSql.query("select tbname, ts from st where tbname=\"t2\" order by st.tbname") + tdSql.checkRows(3) + + tdSql.query("select tbname, count(*) from st where tbname=\"t2\" group by tbname order by tbname") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 3) + + tdSql.query("select tbname, count(*) from st group by tbname order by tbname") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 3) + tdSql.checkData(1, 1, 3) + + tdSql.query("select tbname, count(*) from st group by st.tbname order by st.tbname") + tdSql.checkRows(2) + tdSql.checkData(0, 1, 3) + tdSql.checkData(1, 1, 3) + + tdLog.debug(f"-------------- step4: join test ------------------") + tdSql.query("select t1.tbname, t2.tbname from t1, t2 where t1.ts=t2.ts and t1.tbname!=t2.tbname") + tdSql.checkRows(3) + + tdSql.query("select t1.tbname, t2.tbname from t1, t2 where t1.ts=t2.ts and t1.tbname!=t2.tbname order by t1.tbname") + tdSql.checkRows(3) + + tdSql.query("select st.tbname, st2.tbname from st, st2 where st.ts=st2.ts and st.tbname!=st2.tbname order by st.tbname") + tdSql.checkRows(0) + + tdSql.execute(f"insert into t21 values ( { now_time + 1000 }, 1 )") + tdSql.query("select st.tbname, st2.tbname from st, st2 where st.ts=st2.ts and st.tbname!=st2.tbname order by st.tbname") + tdSql.checkRows(2) + + tdSql.query("select t1.tbname, st2.tbname from t1, st2 where t1.ts=st2.ts and t1.tbname!=st2.tbname order by t1.tbname") + tdSql.checkRows(1) + + tdSql.query("select nt.ts, st.tbname from nt, st where nt.ts=st.ts order by st.tbname") + tdSql.checkRows(6) + + tdSql.query("select nt.ts, t1.tbname from nt, t1 where nt.ts=t1.ts order by t1.tbname") + tdSql.checkRows(3) + + def run(self): + tdSql.prepare() + + self.td29092() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) From d4ebd2ec71cb7a1cb10928d95baada6c7bd132ee Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 11 Apr 2024 18:16:24 +0800 Subject: [PATCH 09/27] add ci --- tests/script/tsim/query/cache_last_tag.sim | 189 +++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 tests/script/tsim/query/cache_last_tag.sim diff --git a/tests/script/tsim/query/cache_last_tag.sim b/tests/script/tsim/query/cache_last_tag.sim new file mode 100644 index 0000000000..458254625f --- /dev/null +++ b/tests/script/tsim/query/cache_last_tag.sim @@ -0,0 +1,189 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql alter local "multiResultFunctionStarReturnTags" "0"; + +print step1===================== +sql drop database if exists test; +sql create database test vgroups 4 CACHEMODEL 'both'; +sql use test; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); +sql create table t4 using st tags(NULL,4,4); + +sql insert into t1 values(1648791211000,1,1,1); +sql insert into t1 values(1648791211001,2,2,2); +sql insert into t2 values(1648791211002,3,3,3); +sql insert into t2 values(1648791211003,4,4,4); +sql insert into t3 values(1648791211004,5,5,5); +sql insert into t3 values(1648791211005,6,6,6); +sql insert into t4 values(1648791211007,NULL,NULL,NULL); + +sql select last(*),last_row(*) from st; + +if $cols != 8 then + print ======cols=$cols + return -1 +endi + +sql alter local "multiResultFunctionStarReturnTags" "1"; + +sql select last(*),last_row(*) from st; + +if $cols != 14 then + print ======cols=$cols + return -1 +endi + +sql select last(*) from st; + +if $cols != 7 then + return -1 +endi + +sql select last_row(*) from st; + +if $cols != 7 then + return -1 +endi + +sql select last(*),last_row(*) from t1; + +if $cols != 8 then + return -1 +endi + +print step2===================== + +sql drop database if exists test1; +sql create database test1 vgroups 4 CACHEMODEL 'last_row'; +sql use test1; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); +sql create table t4 using st tags(NULL,4,4); + +sql insert into t1 values(1648791211000,1,1,1); +sql insert into t1 values(1648791211001,2,2,2); +sql insert into t2 values(1648791211002,3,3,3); +sql insert into t2 values(1648791211003,4,4,4); +sql insert into t3 values(1648791211004,5,5,5); +sql insert into t3 values(1648791211005,6,6,6); +sql insert into t4 values(1648791211007,NULL,NULL,NULL); + +sql select last(*),last_row(*) from st; + +if $cols != 14 then + return -1 +endi + +sql select last(*) from st; + +if $cols != 7 then + return -1 +endi + +return -1 + +sql select last_row(*) from st; + +if $cols != 7 then + return -1 +endi + +sql select last(*),last_row(*) from t1; + +if $cols != 8 then + return -1 +endi + +print step3===================== + +sql drop database if exists test2; +sql create database test2 vgroups 4 CACHEMODEL 'last_value'; +sql use test2; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); +sql create table t4 using st tags(NULL,4,4); + +sql insert into t1 values(1648791211000,1,1,1); +sql insert into t1 values(1648791211001,2,2,2); +sql insert into t2 values(1648791211002,3,3,3); +sql insert into t2 values(1648791211003,4,4,4); +sql insert into t3 values(1648791211004,5,5,5); +sql insert into t3 values(1648791211005,6,6,6); +sql insert into t4 values(1648791211007,NULL,NULL,NULL); + +sql select last(*),last_row(*) from st; + +if $cols != 14 then + return -1 +endi + +sql select last(*) from st; + +if $cols != 7 then + return -1 +endi + +sql select last_row(*) from st; + +if $cols != 7 then + return -1 +endi + +sql select last(*),last_row(*) from t1; + +if $cols != 8 then + return -1 +endi + +sql drop database if exists test4; +sql create database test4 vgroups 4; +sql use test4; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); +sql create table t3 using st tags(3,3,3); +sql create table t4 using st tags(NULL,4,4); + +sql insert into t1 values(1648791211000,1,1,1); +sql insert into t1 values(1648791211001,2,2,2); +sql insert into t2 values(1648791211002,3,3,3); +sql insert into t2 values(1648791211003,4,4,4); +sql insert into t3 values(1648791211004,5,5,5); +sql insert into t3 values(1648791211005,6,6,6); +sql insert into t4 values(1648791211007,NULL,NULL,NULL); + +sql select last(*),last_row(*) from st; + +if $cols != 14 then + return -1 +endi + +sql select last(*) from st; + +if $cols != 7 then + return -1 +endi + +sql select last_row(*) from st; + +if $cols != 7 then + return -1 +endi + +sql select last(*),last_row(*) from t1; + +if $cols != 8 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT From d799212fb2a1973291489425fea9fcde2493da3b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Apr 2024 10:02:18 +0800 Subject: [PATCH 10/27] refactor: do some internal refactor. --- include/dnode/vnode/tqCommon.h | 2 + source/dnode/snode/inc/sndInt.h | 1 - source/dnode/snode/src/snode.c | 42 ++--------- source/dnode/vnode/src/tq/tq.c | 88 +--------------------- source/dnode/vnode/src/tqCommon/tqCommon.c | 61 +++++++++++++++ 5 files changed, 72 insertions(+), 122 deletions(-) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 22a176f0bb..93e0064192 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -40,4 +40,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); +int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode); + #endif // TDENGINE_TQ_COMMON_H diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 024c3c6bae..8c5d056893 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -31,7 +31,6 @@ extern "C" { #endif struct SSnode { - char* path; SStreamMeta* pMeta; SMsgCb msgCb; }; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3bef5b595b..bd07974c3f 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -32,6 +32,7 @@ static STaskId replaceStreamTaskId(SStreamTask *pTask) { pTask->id.taskId = pTask->streamTaskId.taskId; return id; } + static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) { ASSERT(pTask->info.fillHistory); pTask->id.taskId = pId->taskId; @@ -48,42 +49,16 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer streamTaskOpenAllUpstreamInput(pTask); - STaskId taskId = {0}; - if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); + code = tqExpandStreamTask(pTask, pSnode->pMeta, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; } - pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - sndError("s-task:%s failed to open state for task", pTask->id.idStr); - return -1; - } else { - sndDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); - } - - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); - SReadHandle handle = { - .checkpointId = pTask->chkInfo.checkpointId, - .vnode = NULL, - .numOfVgroups = numOfVgroups, - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window, - }; - initStreamStateAPI(&handle.api); - - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, SNODE_HANDLE, pTask->id.taskId); - ASSERT(pTask->exec.pExecutor); - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); - streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); SCheckpointInfo *pChkInfo = &pTask->chkInfo; + // checkpoint ver is the kept version, handled data should be the next version. if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; @@ -117,11 +92,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pSnode->path = taosStrdup(path); - if (pSnode->path == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; - } pSnode->msgCb = pOption->msgCb; pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback); @@ -140,7 +110,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { return pSnode; FAIL: - taosMemoryFree(pSnode->path); taosMemoryFree(pSnode); return NULL; } @@ -156,7 +125,6 @@ void sndClose(SSnode *pSnode) { streamMetaNotifyClose(pSnode->pMeta); streamMetaCommit(pSnode->pMeta); streamMetaClose(pSnode->pMeta); - taosMemoryFree(pSnode->path); taosMemoryFree(pSnode); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7886967be0..fb47414fb9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -699,22 +699,6 @@ end: static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } -static STaskId replaceStreamTaskId(SStreamTask* pTask) { - ASSERT(pTask->info.fillHistory); - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; - - return id; -} - -static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { - ASSERT(pTask->info.fillHistory); - pTask->id.taskId = pId->taskId; - pTask->id.streamId = pId->streamId; -} - int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); @@ -724,74 +708,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { return code; } - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - STaskId taskId = {0}; - if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); - } - - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId); - return -1; - } - - tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - - SReadHandle handle = { - .checkpointId = pTask->chkInfo.checkpointId, - .vnode = pTq->pVnode, - .initTqReader = 1, - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window, - }; - - initStorageAPI(&handle.api); - - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - return -1; - } - - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); - } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - STaskId taskId = {0}; - if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); - } - - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); - if (pTask->pState == NULL) { - tqError("s-task:%s (vgId:%d) failed to open state for task", pTask->id.idStr, vgId); - return -1; - } else { - tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); - } - - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - - SReadHandle handle = { - .checkpointId = pTask->chkInfo.checkpointId, - .vnode = NULL, - .numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), - .pStateBackend = pTask->pState, - .fillHistory = pTask->info.fillHistory, - .winRange = pTask->dataRange.window, - }; - - initStorageAPI(&handle.api); - - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - return -1; - } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); + code = tqExpandStreamTask(pTask, pTq->pStreamMeta, pTq->pVnode); + if (code != TSDB_CODE_SUCCESS) { + return code; } // sink @@ -827,6 +746,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); + SCheckpointInfo* pChkInfo = &pTask->chkInfo; // checkpoint ver is the kept version, handled data should be the next version. diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2fa9f9a9ff..f85bb8cee5 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -23,6 +23,67 @@ typedef struct STaskUpdateEntry { int32_t transId; } STaskUpdateEntry; +static STaskId replaceStreamTaskId(SStreamTask* pTask) { + ASSERT(pTask->info.fillHistory); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + pTask->id.streamId = pTask->streamTaskId.streamId; + pTask->id.taskId = pTask->streamTaskId.taskId; + + return id; +} + +static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { + ASSERT(pTask->info.fillHistory); + pTask->id.taskId = pId->taskId; + pTask->id.streamId = pId->streamId; +} + +int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) { + int32_t vgId = pMeta->vgId; + STaskId taskId = {0}; + + if (pTask->info.fillHistory) { + taskId = replaceStreamTaskId(pTask); + } + + pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1); + if (pTask->pState == NULL) { + tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); + return -1; + } else { + tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); + } + + if (pTask->info.fillHistory) { + restoreStreamTaskId(pTask, &taskId); + } + + SReadHandle handle = { + .checkpointId = pTask->chkInfo.checkpointId, + .pStateBackend = pTask->pState, + .fillHistory = pTask->info.fillHistory, + .winRange = pTask->dataRange.window, + }; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + handle.vnode = pVnode; + handle.initTqReader = 1; + } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { + handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); + } + + initStorageAPI(&handle.api); + + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); + return -1; + } + + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); + return TSDB_CODE_SUCCESS; +} + int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); From d9aaa3d083bfeb190f29ae92506efd5156435c83 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 12 Apr 2024 11:06:41 +0800 Subject: [PATCH 11/27] Update index.md --- docs/zh/14-reference/12-config/index.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index 4d47f0771c..607b5881bd 100755 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -230,6 +230,16 @@ taos -C | 缺省值 | 1 | | 补充说明 | 该参数设置为 1 时,如果查询中含有 GROUP BY,PARTITION BY 以及 INTERVAL 子句且相应的组或窗口内数据为空或者NULL, 对应的组或窗口将不返回查询结果 | +### multiResultFunctionStarReturnTags + +| 属性 | 说明 | +| -------- | ---------------------------------------------------------------------------------------------------------------------------------------------- | +| 适用范围 | 仅客户端适用 | +| 含义 | 查询超级表时,last(\*)/last_row(\*)/first(\*) 是否返回标签列 | +| 取值范围 | 0:不返回标签列,1:返回标签列 | +| 缺省值 | 0 | +| 补充说明 | 该参数设置为 0 时,last(\*)/last_row(\*)/first(\*) 只返回超级表的普通列;为 1 时,返回超级表的普通列和标签列 | + ## 区域相关 ### timezone From 678b3126e9fcc893a8047e3778e48512af98b355 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 12 Apr 2024 11:15:36 +0800 Subject: [PATCH 12/27] Update index.md --- docs/zh/14-reference/12-config/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/14-reference/12-config/index.md b/docs/zh/14-reference/12-config/index.md index 607b5881bd..1bada64431 100755 --- a/docs/zh/14-reference/12-config/index.md +++ b/docs/zh/14-reference/12-config/index.md @@ -235,7 +235,7 @@ taos -C | 属性 | 说明 | | -------- | ---------------------------------------------------------------------------------------------------------------------------------------------- | | 适用范围 | 仅客户端适用 | -| 含义 | 查询超级表时,last(\*)/last_row(\*)/first(\*) 是否返回标签列 | +| 含义 | 查询超级表时,last(\*)/last_row(\*)/first(\*) 是否返回标签列;查询普通表、子表时,不受该参数影响。 | | 取值范围 | 0:不返回标签列,1:返回标签列 | | 缺省值 | 0 | | 补充说明 | 该参数设置为 0 时,last(\*)/last_row(\*)/first(\*) 只返回超级表的普通列;为 1 时,返回超级表的普通列和标签列 | From f0a7f53dc8dde5710619dbf2ec22d378bdff8809 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Fri, 12 Apr 2024 11:21:14 +0800 Subject: [PATCH 13/27] Update 10-function.md --- docs/zh/12-taos-sql/10-function.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index 0482022d95..81b6732f6b 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -954,7 +954,7 @@ FIRST(expr) **使用说明**: -- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*); +- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*);查询超级表,且multiResultFunctionStarReturnTags设置为 0 时,这个是默认值,FIRST(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。 - 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL; - 如果结果集中所有列全部为 NULL 值,则不返回结果。 @@ -1006,7 +1006,7 @@ LAST(expr) **使用说明**: -- 如果要返回各个列的最后(时间戳最大)一个非 NULL 值,可以使用 LAST(\*); +- 如果要返回各个列的最后(时间戳最大)一个非 NULL 值,可以使用 LAST(\*);查询超级表,且multiResultFunctionStarReturnTags设置为 0 时,这个是默认值,LAST(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。 - 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL;如果结果集中所有列全部为 NULL 值,则不返回结果。 - 在用于超级表时,时间戳完全一样且同为最大的数据行可能有多个,那么会从中随机返回一条,而并不保证多次运行所挑选的数据行必然一致。 @@ -1026,7 +1026,7 @@ LAST_ROW(expr) **适用于**:表和超级表。 **使用说明**: - +- 如果要返回各个列的最后一条记录(时间戳最大),可以使用 LAST_ROW(\*);查询超级表,且multiResultFunctionStarReturnTags设置为 0 时,这个是默认值,LAST_ROW(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。 - 在用于超级表时,时间戳完全一样且同为最大的数据行可能有多个,那么会从中随机返回一条,而并不保证多次运行所挑选的数据行必然一致。 - 不能与 INTERVAL 一起使用。 From 15c18af221948986b94f559d689abb238339c552 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 Apr 2024 18:27:38 +0800 Subject: [PATCH 14/27] fix(stream): fix init error. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 12 +++++++----- source/libs/stream/src/streamMeta.c | 3 --- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index f85bb8cee5..1c3a760bab 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -74,13 +74,15 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) initStorageAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); - return -1; + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) { + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); + if (pTask->exec.pExecutor == NULL) { + tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); + return -1; + } + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8d5e4f3c87..ea18e791a6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -591,19 +591,16 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - tFreeStreamTask(pTask); return -1; } taosArrayPush(pMeta->pTaskList, &pTask->id); if (streamMetaSaveTask(pMeta, pTask) < 0) { - tFreeStreamTask(pTask); return -1; } if (streamMetaCommit(pMeta) < 0) { - tFreeStreamTask(pTask); return -1; } From 31a728b1b83de9f3563f648fae722da2bc9edcc4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 09:49:41 +0800 Subject: [PATCH 15/27] fix(stream): update the stream task meta table. --- source/common/src/systable.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index e0c0cc89ab..80b3efb05d 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -180,7 +180,12 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "stage", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "in_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, - {.name = "info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "checkpointId", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpointInfo", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema userTblsSchema[] = { From 665107ad3a439d0c8ec4d4b608d9843f7651b378 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 14:23:37 +0800 Subject: [PATCH 16/27] enh(stream): add more info in meta table. --- include/libs/stream/tstream.h | 53 ++++++++++------- source/common/src/systable.c | 10 +++- source/dnode/mnode/impl/src/mndStream.c | 69 ++++++++++++++++++++++- source/dnode/mnode/impl/src/mndStreamHb.c | 8 ++- source/dnode/snode/src/snode.c | 3 + source/dnode/vnode/src/tq/tq.c | 2 + source/libs/stream/src/streamCheckpoint.c | 1 + source/libs/stream/src/streamMeta.c | 54 +++++++++++++----- source/libs/stream/src/streamStart.c | 2 +- source/libs/stream/src/streamTask.c | 12 ++-- 10 files changed, 167 insertions(+), 47 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c12bb146b4..5cecb1af42 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -304,9 +304,9 @@ typedef struct SStreamTaskId { typedef struct SCheckpointInfo { int64_t startTs; - int64_t checkpointId; - - int64_t checkpointVer; // latest checkpointId version + int64_t checkpointId; // latest checkpoint id + int64_t checkpointVer; // latest checkpoint offset in wal + int64_t checkpointTime; // latest checkpoint time int64_t processedVer; int64_t nextProcessVer; // current offset in WAL, not serialize it int64_t failedId; // record the latest failed checkpoint id @@ -386,6 +386,9 @@ typedef struct STaskExecStatisInfo { int64_t created; int64_t init; int64_t start; + int64_t startCheckpointId; + int64_t startCheckpointVer; + int64_t step1Start; double step1El; int64_t step2Start; @@ -672,24 +675,34 @@ typedef struct { int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); +typedef struct STaskCkptInfo { + int64_t latestId; // saved checkpoint id + int64_t latestVer; // saved checkpoint ver + int64_t latestTime; // latest checkpoint time + int64_t activeId; // current active checkpoint id + int32_t activeTransId; // checkpoint trans id + int8_t failed; // denote if the checkpoint is failed or not +} STaskCkptInfo; + typedef struct STaskStatusEntry { - STaskId id; - int32_t status; - int32_t statusLastDuration; // to record the last duration of current status - int64_t stage; - int32_t nodeId; - int64_t verStart; // start version in WAL, only valid for source task - int64_t verEnd; // end version in WAL, only valid for source task - int64_t processedVer; // only valid for source task - int64_t checkpointId; // current active checkpoint id - int32_t chkpointTransId; // checkpoint trans id - int8_t checkpointFailed; // denote if the checkpoint is failed or not - bool inputQChanging; // inputQ is changing or not - int64_t inputQUnchangeCounter; - double inputQUsed; // in MiB - double inputRate; - double sinkQuota; // existed quota size for sink task - double sinkDataSize; // sink to dst data size + STaskId id; + int32_t status; + int32_t statusLastDuration; // to record the last duration of current status + int64_t stage; + int32_t nodeId; + SVersionRange verRange; // start/end version in WAL, only valid for source task + int64_t processedVer; // only valid for source task + bool inputQChanging; // inputQ is changing or not + int64_t inputQUnchangeCounter; + double inputQUsed; // in MiB + double inputRate; + double sinkQuota; // existed quota size for sink task + double sinkDataSize; // sink to dst data size + int64_t startTime; + int64_t startCheckpointId; + int64_t startCheckpointVer; + int64_t hTaskId; + STaskCkptInfo checkpointInfo; } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 80b3efb05d..9f1509077c 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -159,6 +159,8 @@ static const SSysDbTableSchema userStbsSchema[] = { static const SSysDbTableSchema streamSchema[] = { {.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "stream_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "source_db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -182,9 +184,13 @@ static const SSysDbTableSchema streamTaskSchema[] = { // {.name = "out_queue", .bytes = 20, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "info", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, - {.name = "checkpointId", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, - {.name = "checkpointInfo", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "start_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "start_ver", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, + {.name = "checkpoint_id", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "checkpoint_version", .bytes = 25, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "ds_err_info", .bytes = 25, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "history_task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_task_status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8f9afb2adc..05b06e83a8 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1304,9 +1304,31 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false); + // create time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->createTime, false); + // stream id + char buf[128] = {0}; + int32_t len = tintToHex(pStream->uid, &buf[4]); + buf[2] = '0'; + buf[3] = 'x'; + varDataSetLen(buf, len + 2); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, buf, false); + + // related fill-history stream id + memset(buf, 0, tListLen(buf)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pStream->hTaskUid != 0) { + len = tintToHex(pStream->hTaskUid, &buf[4]); + varDataSetLen(buf, len + 2); + colDataSetVal(pColInfo, numOfRows, buf, false); + } else { + colDataSetVal(pColInfo, numOfRows, buf, true); + } + + // related fill-history stream id char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(sql, pStream->sql, sizeof(sql)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1469,13 +1491,14 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); // colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + // info if (pTask->info.taskLevel == TASK_LEVEL__SINK) { const char *sinkStr = "%.2fMiB"; sprintf(buf, sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->processedVer, pe->verStart, pe->verEnd); + sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); } STR_TO_VARSTR(vbuf, buf); @@ -1483,6 +1506,50 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false); + // start_time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startTime, false); + + // start id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointId, false); + + // start ver + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->startCheckpointVer, false); + + // checkpoint time + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pe->checkpointInfo.latestTime != 0) { + colDataSetVal(pColInfo, numOfRows, (const char *)&pe->checkpointInfo.latestTime, false); + } else { + colDataSetVal(pColInfo, numOfRows, 0, true); + } + + // checkpoint_id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestId, false); + + // checkpoint info + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->checkpointInfo.latestVer, false); + + // ds_err_info + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, 0, true); + + // history_task_id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pe->hTaskId != 0) { + colDataSetVal(pColInfo, numOfRows, (const char*)&pe->hTaskId, false); + } else { + colDataSetVal(pColInfo, numOfRows, 0, true); + } + + // history_task_status + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, 0, true); + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index c8f943b931..1fedee3bcf 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -294,12 +294,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } streamTaskStatusCopy(pTaskEntry, p); - if ((p->checkpointId != 0) && p->checkpointFailed) { + + STaskCkptInfo *pChkInfo = &p->checkpointInfo; + if ((pChkInfo->activeId != 0) && pChkInfo->failed) { mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, - p->checkpointId, p->chkpointTransId); + pChkInfo->activeId, pChkInfo->activeTransId); SFailedCheckpointInfo info = { - .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; + .transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId}; addIntoCheckpointList(pFailedTasks, &info); } } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index bd07974c3f..f17716eda0 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -63,6 +63,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; pChkInfo->processedVer = pChkInfo->checkpointVer; + pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; + pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; + sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fb47414fb9..92dc55c0c3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -753,6 +753,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { if (pChkInfo->checkpointId != 0) { pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; pChkInfo->processedVer = pChkInfo->checkpointVer; + pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; + pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7f52c5d2f0..86ee2b837d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -309,6 +309,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { pCKInfo->checkpointId = pCKInfo->checkpointingId; pCKInfo->checkpointVer = pCKInfo->processedVer; + pCKInfo->checkpointTime = pCKInfo->startTs; streamTaskClearCheckInfo(p, false); taosThreadMutexUnlock(&p->lock); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ea18e791a6..3c22f33f93 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -957,11 +957,18 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeDouble(pEncoder, ps->sinkQuota) < 0) return -1; if (tEncodeDouble(pEncoder, ps->sinkDataSize) < 0) return -1; if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1; - if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; - if (tEncodeI64(pEncoder, ps->checkpointId) < 0) return -1; - if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; - if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verRange.minVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->verRange.maxVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.activeId) < 0) return -1; + if (tEncodeI8(pEncoder, ps->checkpointInfo.failed) < 0) return -1; + if (tEncodeI32(pEncoder, ps->checkpointInfo.activeTransId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->checkpointInfo.latestTime) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startTime) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startCheckpointId) < 0) return -1; + if (tEncodeI64(pEncoder, ps->startCheckpointVer) < 0) return -1; + if (tEncodeI64(pEncoder, ps->hTaskId) < 0) return -1; } int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); @@ -996,11 +1003,19 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeDouble(pDecoder, &entry.sinkQuota) < 0) return -1; if (tDecodeDouble(pDecoder, &entry.sinkDataSize) < 0) return -1; if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; - if (tDecodeI64(pDecoder, &entry.checkpointId) < 0) return -1; - if (tDecodeI8(pDecoder, &entry.checkpointFailed) < 0) return -1; - if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verRange.minVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.verRange.maxVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.activeId) < 0) return -1; + if (tDecodeI8(pDecoder, &entry.checkpointInfo.failed) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.checkpointInfo.activeTransId) < 0) return -1; + + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.checkpointInfo.latestTime) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startTime) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startCheckpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.startCheckpointVer) < 0) return -1; + if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1; entry.id.taskId = taskId; taosArrayPush(pReq->pTaskStatus, &entry); @@ -1102,7 +1117,16 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .status = streamTaskGetStatus(*pTask)->state, .nodeId = hbMsg.vgId, .stage = pMeta->stage, + .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), + .startTime = (*pTask)->execInfo.start, + .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, + .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, + .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, + .hTaskId = (*pTask)->hTaskInfo.id.taskId, + + .startCheckpointId = (*pTask)->execInfo.startCheckpointId, + .startCheckpointVer = (*pTask)->execInfo.startCheckpointVer, }; entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); @@ -1112,11 +1136,11 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { } if ((*pTask)->chkInfo.checkpointingId != 0) { - entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0; - entry.checkpointId = (*pTask)->chkInfo.checkpointingId; - entry.chkpointTransId = (*pTask)->chkInfo.transId; + entry.checkpointInfo.failed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId) ? 1 : 0; + entry.checkpointInfo.activeId = (*pTask)->chkInfo.checkpointingId; + entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId; - if (entry.checkpointFailed) { + if (entry.checkpointInfo.failed) { stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); } } @@ -1127,7 +1151,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.processedVer = (*pTask)->chkInfo.processedVer; } - walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd); + walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer); } addUpdateNodeIntoHbMsg(*pTask, &hbMsg); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index f2a694a554..32bd3742ad 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -44,7 +44,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId); static void doProcessDownstreamReadyRsp(SStreamTask* pTask); int32_t streamTaskSetReady(SStreamTask* pTask) { - int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); + int32_t numOfDowns = streamTaskGetNumOfDownstream(pTask); SStreamTaskState* p = streamTaskGetStatus(pTask); if ((p->state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c34e162326..7badbfa9f3 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -849,13 +849,15 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->inputQUsed = pSrc->inputQUsed; pDst->inputRate = pSrc->inputRate; pDst->processedVer = pSrc->processedVer; - pDst->verStart = pSrc->verStart; - pDst->verEnd = pSrc->verEnd; + pDst->verRange = pSrc->verRange; pDst->sinkQuota = pSrc->sinkQuota; pDst->sinkDataSize = pSrc->sinkDataSize; - pDst->checkpointId = pSrc->checkpointId; - pDst->checkpointFailed = pSrc->checkpointFailed; - pDst->chkpointTransId = pSrc->chkpointTransId; + pDst->checkpointInfo = pSrc->checkpointInfo; + pDst->startCheckpointId = pSrc->startCheckpointId; + pDst->startCheckpointVer = pSrc->startCheckpointVer; + + pDst->startTime = pSrc->startTime; + pDst->hTaskId = pSrc->hTaskId; } static int32_t taskPauseCallback(SStreamTask* pTask, void* param) { From ac2853f28c14ac5f43173c520c546be994e44674 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 15 Apr 2024 15:36:54 +0800 Subject: [PATCH 17/27] fix: drop table after commit --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 79 ++++------------------- 1 file changed, 11 insertions(+), 68 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 700d6b10b7..db57ec9835 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -37,7 +37,6 @@ typedef struct { int64_t cid; int64_t now; TSKEY nextKey; - TSKEY maxDelKey; int32_t fid; int32_t expLevel; SDiskID did; @@ -46,7 +45,6 @@ typedef struct { STFileSet *fset; TABLEID tbid[1]; bool hasTSData; - bool skipTsRow; } ctx[1]; // reader @@ -128,21 +126,8 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) { continue; } } - /* - extern int8_t tsS3Enabled; - int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); - committer->ctx->skipTsRow = false; - if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) { - committer->ctx->skipTsRow = true; - } - */ int64_t ts = TSDBROW_TS(&row->row); - - if (committer->ctx->skipTsRow && ts <= committer->ctx->maxKey) { - ts = committer->ctx->maxKey + 1; - } - if (ts > committer->ctx->maxKey) { committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts); code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid); @@ -175,15 +160,13 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { int64_t numRecord = 0; SMetaInfo info; - if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) { - if (committer->ctx->maxKey < committer->ctx->maxDelKey) { - committer->ctx->nextKey = committer->ctx->maxKey + 1; - } else { - committer->ctx->nextKey = TSKEY_MAX; - } - return 0; + if (committer->tsdb->imem->nDel == 0) { + goto _exit; } + // do not need to write tomb data if there is no ts data + bool skip = (committer->ctx->fset == NULL && !committer->ctx->hasTSData); + committer->ctx->tbid->suid = 0; committer->ctx->tbid->uid = 0; for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) { @@ -210,9 +193,11 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) { record->skey = TMAX(record->skey, committer->ctx->minKey); record->ekey = TMIN(record->ekey, committer->ctx->maxKey); - numRecord++; - code = tsdbFSetWriteTombRecord(committer->writer, record); - TSDB_CHECK_CODE(code, lino, _exit); + if (!skip) { + numRecord++; + code = tsdbFSetWriteTombRecord(committer->writer, record); + TSDB_CHECK_CODE(code, lino, _exit); + } } code = tsdbIterMergerNext(committer->tombIterMerger); @@ -406,31 +391,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { // reset nextKey committer->ctx->nextKey = TSKEY_MAX; - committer->ctx->skipTsRow = false; - - extern int8_t tsS3Enabled; - extern int32_t tsS3UploadDelaySec; - long s3Size(const char *object_name); - int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs); - if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) { - STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA]; - if (fobj && fobj->f->did.level == nlevel - 1) { - // if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay - const char *object_name = taosDirEntryBaseName((char *)fobj->fname); - - if (taosCheckExistFile(fobj->fname)) { - int32_t mtime = 0; - taosStatFile(fobj->fname, NULL, &mtime, NULL); - if (mtime < committer->ctx->now - tsS3UploadDelaySec) { - committer->ctx->skipTsRow = true; - } - } else /*if (s3Size(object_name) > 0) */ { - committer->ctx->skipTsRow = true; - } - } - // new fset can be written with ts data - } - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); @@ -519,28 +479,11 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co STbData *tbData = TCONTAINER_OF(node, STbData, rbtn); for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) { - if (delData->sKey < committer->ctx->nextKey) { - committer->ctx->nextKey = delData->sKey; - } + committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey); } } } - committer->ctx->maxDelKey = TSKEY_MIN; - TSKEY minKey = TSKEY_MAX; - TSKEY maxKey = TSKEY_MIN; - if (TARRAY2_SIZE(committer->fsetArr) > 0) { - STFileSet *fset = TARRAY2_LAST(committer->fsetArr); - tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &committer->ctx->maxDelKey); - - fset = TARRAY2_FIRST(committer->fsetArr); - tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &maxKey); - } - - if (committer->ctx->nextKey < TMIN(tsdb->imem->minKey, minKey)) { - committer->ctx->nextKey = TMIN(tsdb->imem->minKey, minKey); - } - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); From 34a15d1431eefb8aaf6d1923f9337ed0dc0c43a8 Mon Sep 17 00:00:00 2001 From: dapan1121 <72057773+dapan1121@users.noreply.github.com> Date: Mon, 15 Apr 2024 16:07:09 +0800 Subject: [PATCH 18/27] Update 10-function.md --- docs/zh/12-taos-sql/10-function.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index 81b6732f6b..71accc6322 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -954,7 +954,7 @@ FIRST(expr) **使用说明**: -- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*);查询超级表,且multiResultFunctionStarReturnTags设置为 0 时,这个是默认值,FIRST(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。 +- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*);查询超级表,且multiResultFunctionStarReturnTags设置为 0 (默认值) 时,FIRST(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。 - 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL; - 如果结果集中所有列全部为 NULL 值,则不返回结果。 @@ -1006,7 +1006,7 @@ LAST(expr) **使用说明**: -- 如果要返回各个列的最后(时间戳最大)一个非 NULL 值,可以使用 LAST(\*);查询超级表,且multiResultFunctionStarReturnTags设置为 0 时,这个是默认值,LAST(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。 +- 如果要返回各个列的最后(时间戳最大)一个非 NULL 值,可以使用 LAST(\*);查询超级表,且multiResultFunctionStarReturnTags设置为 0 (默认值) 时,LAST(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。 - 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL;如果结果集中所有列全部为 NULL 值,则不返回结果。 - 在用于超级表时,时间戳完全一样且同为最大的数据行可能有多个,那么会从中随机返回一条,而并不保证多次运行所挑选的数据行必然一致。 @@ -1026,7 +1026,7 @@ LAST_ROW(expr) **适用于**:表和超级表。 **使用说明**: -- 如果要返回各个列的最后一条记录(时间戳最大),可以使用 LAST_ROW(\*);查询超级表,且multiResultFunctionStarReturnTags设置为 0 时,这个是默认值,LAST_ROW(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。 +- 如果要返回各个列的最后一条记录(时间戳最大),可以使用 LAST_ROW(\*);查询超级表,且multiResultFunctionStarReturnTags设置为 0 (默认值) 时,LAST_ROW(\*)只返回超级表的普通列;设置为 1 时,返回超级表的普通列和标签列。 - 在用于超级表时,时间戳完全一样且同为最大的数据行可能有多个,那么会从中随机返回一条,而并不保证多次运行所挑选的数据行必然一致。 - 不能与 INTERVAL 一起使用。 From af5af1c6ea6e5eef4f2337f8df4c670fb01f7c47 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Mon, 15 Apr 2024 16:28:29 +0800 Subject: [PATCH 19/27] Update index.md --- docs/en/14-reference/12-config/index.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md index af88978603..0bdf143a60 100755 --- a/docs/en/14-reference/12-config/index.md +++ b/docs/en/14-reference/12-config/index.md @@ -231,6 +231,16 @@ Please note the `taoskeeper` needs to be installed and running to create the `lo | Default Value | 0 | | Notes | When multiple of the above functions act on the same column at the same time and no alias is specified, if the order by clause refers to the column name, column selection ambiguous will occur because the aliases of multiple columns are the same. | +### multiResultFunctionStarReturnTags + +| Attribute | Description | +| ------------- | --------------------------------------------------------------------------------------------------------------- | +| Applicable | Client only | +| Meaning | When querying a super table, whether last(\*)/last_row(\*)/first(\*) returns tags is affected by this parameter. When querying a normal table or subtable, this parameter has no effect. | +| Value Range | 0: do not return tags, 1: return tags | +| Default Value | 0 | +| Notes | When this parameter is set to 0, last(\*)/last_row(\*)/first(\*) only returns the columns of the super table; When it is 1, return the columns and tags of the super table. | + ## Locale Parameters ### timezone From fd9ed4fb70f19b98422bf6a63eeb2f6422af4bd4 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Mon, 15 Apr 2024 16:37:25 +0800 Subject: [PATCH 20/27] Update 10-function.md --- docs/en/12-taos-sql/10-function.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index b4f1cf65da..89ff8aeb5e 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -952,7 +952,7 @@ FIRST(expr) **More explanation**: -- FIRST(\*) can be used to get the first non-null value of all columns +- FIRST(\*) can be used to get the first non-null value of all columns;When querying a super table and multiResultFunctionStarReturnTags is set to 0 (default), FIRST(\*) only returns columns of super table; When set to 1, returns columns and tags of the super table. - NULL will be returned if all the values of the specified column are all NULL - A result will NOT be returned if all the columns in the result set are all NULL @@ -1014,7 +1014,7 @@ LAST(expr) **More explanation**: -- LAST(\*) can be used to get the last non-NULL value of all columns +- LAST(\*) can be used to get the last non-NULL value of all columns; When querying a super table and multiResultFunctionStarReturnTags is set to 0 (default), LAST(\*) only returns columns of super table; When set to 1, returns columns and tags of the super table. - If the values of a column in the result set are all NULL, NULL is returned for that column; if all columns in the result are all NULL, no result will be returned. - When it's used on a STable, if there are multiple values with the timestamp in the result set, one of them will be returned randomly and it's not guaranteed that the same value is returned if the same query is run multiple times. @@ -1035,6 +1035,7 @@ LAST_ROW(expr) **More explanations**: +- LAST_ROW(\*) can be used to get the last value of all columns; When querying a super table and multiResultFunctionStarReturnTags is set to 0 (default), LAST_ROW(\*) only returns columns of super table; When set to 1, returns columns and tags of the super table. - When it's used on a STable, if there are multiple values with the timestamp in the result set, one of them will be returned randomly and it's not guaranteed that the same value is returned if the same query is run multiple times. - Can't be used with `INTERVAL`. From 01cbd7fb4994510624dad1fddee750422e99be62 Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Mon, 15 Apr 2024 16:38:08 +0800 Subject: [PATCH 21/27] Update 10-function.md --- docs/en/12-taos-sql/10-function.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 89ff8aeb5e..07be7ae5ce 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -952,7 +952,7 @@ FIRST(expr) **More explanation**: -- FIRST(\*) can be used to get the first non-null value of all columns;When querying a super table and multiResultFunctionStarReturnTags is set to 0 (default), FIRST(\*) only returns columns of super table; When set to 1, returns columns and tags of the super table. +- FIRST(\*) can be used to get the first non-null value of all columns; When querying a super table and multiResultFunctionStarReturnTags is set to 0 (default), FIRST(\*) only returns columns of super table; When set to 1, returns columns and tags of the super table. - NULL will be returned if all the values of the specified column are all NULL - A result will NOT be returned if all the columns in the result set are all NULL From b936969a013e7cee86034f5ee31c0d58307dc0a9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 15 Apr 2024 18:25:23 +0800 Subject: [PATCH 22/27] more fix --- source/dnode/vnode/src/vnd/vnodeSvr.c | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b5e049b692..8fcd64373f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -2060,12 +2060,14 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in tDecoderInit(pCoder, pReq, len); tDecodeDeleteRes(pCoder, pRes); - for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) { - uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid); - code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey); - if (code) goto _err; - code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs); - if (code) goto _err; + if (pRes->affectedRows > 0) { + for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) { + uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid); + code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey); + if (code) goto _err; + code = metaUpdateChangeTimeWithLock(pVnode->pMeta, uid, pRes->ctimeMs); + if (code) goto _err; + } } code = tdProcessRSmaDelete(pVnode->pSma, ver, pRes, pReq, len); From 9cce9c221d94c9868c1d634d0bbd418340cf0610 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Mon, 15 Apr 2024 18:32:06 +0800 Subject: [PATCH 23/27] cos/multi-write: empty impl for tsdb async compact --- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8a2438d5a4..e91437a699 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -2296,5 +2296,5 @@ _OVER: int32_t vnodeProcessCompactVnodeReqImpl(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; } -int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync); +int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; } #endif From 87dfc1f931a0c8311ca3d8c8b6d3d27f3079b677 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 18:40:51 +0800 Subject: [PATCH 24/27] fix(stream):fix error in unit test cases. --- source/dnode/mnode/impl/test/stream/stream.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/test/stream/stream.cpp b/source/dnode/mnode/impl/test/stream/stream.cpp index 8d106b1ede..ae00f47ab7 100644 --- a/source/dnode/mnode/impl/test/stream/stream.cpp +++ b/source/dnode/mnode/impl/test/stream/stream.cpp @@ -62,8 +62,8 @@ SRpcMsg buildHbReq() { entry.id.taskId = 5; entry.id.streamId = defStreamId; - entry.checkpointId = 1; - entry.checkpointFailed = true; + entry.checkpointInfo.activeId = 1; + entry.checkpointInfo.failed = true; taosArrayPush(msg.pTaskStatus, &entry); } From 1ac192c069900ed8c034ca90bca11dbc679899e9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 18:41:38 +0800 Subject: [PATCH 25/27] fix(stream): keep the original tsdb scan version range. --- include/libs/stream/tstream.h | 3 ++- source/dnode/mnode/impl/src/mndStream.c | 7 ++++++- source/dnode/vnode/src/tq/tq.c | 16 ++++++++-------- source/dnode/vnode/src/tq/tqStreamTask.c | 14 ++++++++------ source/libs/executor/src/executor.c | 4 ++-- source/libs/stream/src/streamStart.c | 6 ++++-- 6 files changed, 30 insertions(+), 20 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5cecb1af42..8bced20ca3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -445,6 +445,7 @@ struct SStreamTask { SCheckpointInfo chkInfo; STaskExec exec; SDataRange dataRange; + SVersionRange step2Range; SHistoryTaskInfo hTaskInfo; STaskId streamTaskId; STaskExecStatisInfo execInfo; @@ -901,4 +902,4 @@ void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp); } #endif -#endif /* ifndef _STREAM_H_ */ \ No newline at end of file +#endif /* ifndef _STREAM_H_ */ diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 05b06e83a8..5d18d0d22e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1541,7 +1541,12 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // history_task_id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); if (pe->hTaskId != 0) { - colDataSetVal(pColInfo, numOfRows, (const char*)&pe->hTaskId, false); + memset(idstr, 0, tListLen(idstr)); + len = tintToHex(pe->hTaskId, &idstr[4]); + idstr[2] = '0'; + idstr[3] = 'x'; + varDataSetLen(idstr, len + 2); + colDataSetVal(pColInfo, numOfRows, idstr, false); } else { colDataSetVal(pColInfo, numOfRows, 0, true); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 92dc55c0c3..791a2c2d92 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -800,33 +800,33 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { const char* id = pTask->id.idStr; int64_t nextProcessedVer = pStreamTask->hTaskInfo.haltVer; - SVersionRange* pRange = &pTask->dataRange.range; + SVersionRange* pStep2Range = &pTask->step2Range; // if it's an source task, extract the last version in wal. bool done = streamHistoryTaskSetVerRangeStep2(pTask, nextProcessedVer); pTask->execInfo.step2Start = taosGetTimestampMs(); if (done) { - qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pRange->minVer, - pRange->maxVer, 0.0); + qDebug("s-task:%s scan wal(step 2) verRange:%" PRId64 "-%" PRId64 " ended, elapsed time:%.2fs", id, pStep2Range->minVer, + pStep2Range->maxVer, 0.0); streamTaskPutTranstateIntoInputQ(pTask); streamExecTask(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; - tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 + tqDebug("s-task:%s level:%d verRange:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 ", do secondary scan-history from WAL after halt the related stream task:%s", - id, pTask->info.taskLevel, pRange->minVer, pRange->maxVer, pWindow->skey, pWindow->ekey, + id, pTask->info.taskLevel, pStep2Range->minVer, pStep2Range->maxVer, pWindow->skey, pWindow->ekey, pStreamTask->id.idStr); ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__WAITING); - streamSetParamForStreamScannerStep2(pTask, pRange, pWindow); + streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow); - int64_t dstVer = pTask->dataRange.range.minVer; + int64_t dstVer =pStep2Range->minVer; pTask->chkInfo.nextProcessVer = dstVer; walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer); tqDebug("s-task:%s wal reader start scan WAL verRange:%" PRId64 "-%" PRId64 ", set sched-status:%d", id, dstVer, - pTask->dataRange.range.maxVer, TASK_SCHED_STATUS__INACTIVE); + pStep2Range->maxVer, TASK_SCHED_STATUS__INACTIVE); /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 73508202d9..19e53c7d15 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -242,21 +242,23 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // todo handle memory error bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { const char* id = pTask->id.idStr; - int64_t maxVer = pTask->dataRange.range.maxVer; + int64_t maxVer = pTask->step2Range.maxVer; - if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { + if ((pTask->info.fillHistory == 1) && ver > maxVer) { if (!pTask->status.appendTranstateBlock) { qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal anymore, add transfer-state block into inputQ", id, ver, maxVer); double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0; - qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, range:%" PRId64 "-%" PRId64 ", elapsed time:%.2fs", + id, pTask->step2Range.minVer, maxVer, el); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); return true; } else { - qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", - id, ver, maxVer); + qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the ver range:%" PRId64 "-%" PRId64 + ", not scan wal", + id, ver, pTask->step2Range.minVer, maxVer); } } @@ -389,7 +391,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); - int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; + int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->step2Range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 26a80cc6b5..12c5504007 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -926,8 +926,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; - qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64 - " - %" PRId64, + qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 "-%" PRId64 ", window:%" PRId64 + "-%" PRId64, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, pWindow->ekey); return 0; diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 32bd3742ad..3abca307da 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -868,8 +868,10 @@ bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVe } else { // 2. do secondary scan of the history data, the time window remain, and the version range is updated to // [pTask->dataRange.range.maxVer, ver1] - pRange->minVer = walScanStartVer; - pRange->maxVer = nextProcessVer - 1; + pTask->step2Range.minVer = walScanStartVer; + pTask->step2Range.maxVer = nextProcessVer - 1; + stDebug("s-task:%s set step2 verRange:%" PRId64 "-%" PRId64 ", step1 verRange:%" PRId64 "-%" PRId64, pTask->id.idStr, + pTask->step2Range.minVer, pTask->step2Range.maxVer, pRange->minVer, pRange->maxVer); return false; } } From d92f0706e797f503c7a5db73f41ef6448f0cc461 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 18:43:58 +0800 Subject: [PATCH 26/27] fix(tsdb): check the boundary value when reseting range. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 69f4f82459..6100b8363e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4807,7 +4807,13 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t step = asc ? 1 : -1; - int64_t ts = asc ? pReader->info.window.skey - 1 : pReader->info.window.ekey + 1; + + int64_t ts = 0; + if (asc) { + ts = (pReader->info.window.skey > INT64_MIN)? pReader->info.window.skey-1:pReader->info.window.skey; + } else { + ts = (pReader->info.window.ekey < INT64_MAX)? pReader->info.window.ekey + 1:pReader->info.window.ekey; + } resetAllDataBlockScanInfo(pStatus->pTableMap, ts, step); // no data in files, let's try buffer in memory From eba924776ff34f7257054a5450dbc6bfee34b2d9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Apr 2024 23:26:18 +0800 Subject: [PATCH 27/27] fix(stream): update test cases. --- tests/system-test/0-others/information_schema.py | 2 +- tests/system-test/1-insert/drop.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index c3d65482fc..ffdd9d191d 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -221,7 +221,7 @@ class TDTestCase: tdSql.checkEqual(20470,len(tdSql.queryResult)) tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") - tdSql.checkEqual(True, len(tdSql.queryResult) in range(215, 230)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(226, 241)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) diff --git a/tests/system-test/1-insert/drop.py b/tests/system-test/1-insert/drop.py index 8775450ff0..21817ef20d 100644 --- a/tests/system-test/1-insert/drop.py +++ b/tests/system-test/1-insert/drop.py @@ -147,11 +147,11 @@ class TDTestCase: tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname') tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"') print(tdSql.queryResult) - tdSql.checkEqual(tdSql.queryResult[0][2],f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname') + tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb as select * from {self.dbname}.{stbname} partition by tbname') tdSql.execute(f'drop stream {stream_name}') tdSql.execute(f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb') tdSql.query(f'select * from information_schema.ins_streams where stream_name = "{stream_name}"') - tdSql.checkEqual(tdSql.queryResult[0][2],f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb') + tdSql.checkEqual(tdSql.queryResult[0][4],f'create stream {stream_name} trigger at_once ignore expired 0 into stb1 as select * from tb') tdSql.execute(f'drop database {self.dbname}') def run(self): self.drop_ntb_check()