From 4364fa8a1eaedc212f46fac7ede99c383ad70956 Mon Sep 17 00:00:00 2001 From: haoranchen Date: Wed, 31 Jul 2024 10:48:06 +0800 Subject: [PATCH 01/16] recover test case for TD-31057 --- tests/parallel_test/cases.task | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index e3f5e54698..dc2897baca 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -27,9 +27,9 @@ ,,y,army,./pytest.sh python3 ./test.py -f insert/insert_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f cluster/splitVgroupByLearner.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f authorith/authBasic.py -N 3 -# ,,n,army,python3 ./test.py -f cmdline/fullopt.py -,,n,army,python3 ./test.py -f query/show.py -N 3 -,,n,army,python3 ./test.py -f alter/alterConfig.py -N 3 +,,y,army,./pytest.sh python3 ./test.py -f cmdline/fullopt.py +,,y,army,./pytest.sh python3 ./test.py -f query/show.py -N 3 +,,y,army,./pytest.sh python3 ./test.py -f alter/alterConfig.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f query/subquery/subqueryBugs.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f storage/oneStageComp.py -N 3 -L 3 -D 1 ,,y,army,./pytest.sh python3 ./test.py -f storage/compressBasic.py -N 3 @@ -279,8 +279,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 -n 3 -i True ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-db-removewal.py -N 2 -n 1 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb-removewal.py -N 6 -n 3 -#,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 2 -n 1 -#,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 6 -n 3 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 2 -n 1 +,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-stb.py -N 6 -n 3 #,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeTransform-db.py -N 6 -n 3 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select.py -N 2 -n 1 ,,y,system-test,./pytest.sh python3 test.py -f 7-tmq/tmqVnodeSplit-stb-select-duplicatedata.py -N 3 -n 3 From 6f8657ed08dff8029919518e14af3bd382bb64c9 Mon Sep 17 00:00:00 2001 From: haoranchen Date: Wed, 31 Jul 2024 10:49:41 +0800 Subject: [PATCH 02/16] fix test case in fullopt.py --- tests/army/cmdline/fullopt.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/army/cmdline/fullopt.py b/tests/army/cmdline/fullopt.py index 6fc8e858a3..b80d7eac4a 100644 --- a/tests/army/cmdline/fullopt.py +++ b/tests/army/cmdline/fullopt.py @@ -60,7 +60,7 @@ class TDTestCase(TBase): "enableCoreFile 1", "fqdn 127.0.0.1", "firstEp 127.0.0.1", - "locale ENG", + "locale en_US.UTF-8", "metaCacheMaxSize 10000", "minimalTmpDirGB 5", "minimalLogDirGB 1", From fdc5b5709eca7bf1efe1d8eea0b85291a76ed698 Mon Sep 17 00:00:00 2001 From: haoranchen Date: Wed, 31 Jul 2024 11:32:08 +0800 Subject: [PATCH 03/16] Update cases.task --- tests/parallel_test/cases.task | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index dc2897baca..7397c45772 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -27,7 +27,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f insert/insert_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f cluster/splitVgroupByLearner.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f authorith/authBasic.py -N 3 -,,y,army,./pytest.sh python3 ./test.py -f cmdline/fullopt.py +,,n,army,python3 ./test.py -f cmdline/fullopt.py ,,y,army,./pytest.sh python3 ./test.py -f query/show.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f alter/alterConfig.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f query/subquery/subqueryBugs.py -N 3 From c02ac1fe4c1a320519042193f2b865bbcfb0db0c Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Thu, 8 Aug 2024 16:48:17 +0800 Subject: [PATCH 04/16] test: Adding test case for TD-31203 --- tests/parallel_test/cases.task | 1 + tests/system-test/2-query/agg_null.py | 162 ++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 tests/system-test/2-query/agg_null.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index e70042001d..d45627fcba 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -162,6 +162,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/stt_blocks_check.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/agg_null.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_null_none.py -Q 2 diff --git a/tests/system-test/2-query/agg_null.py b/tests/system-test/2-query/agg_null.py new file mode 100644 index 0000000000..bb4fbf41a2 --- /dev/null +++ b/tests/system-test/2-query/agg_null.py @@ -0,0 +1,162 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import numpy as np +from util.log import * +from util.cases import * +from util.sql import * +from util.common import * +from util.sqlset import * +from scipy.stats import gaussian_kde +from hyperloglog import HyperLogLog +''' +Test case for TS-5150 +''' +def approximate_percentile(data, percentile): + """ + 使用 KDE 近似计算百分位数。 + + Parameters: + - data: 包含数据的列表或数组 + - percentile: 要计算的百分位数(0到100之间) + + Returns: + - 近似百分位数的值 + """ + # 使用高斯核估计概率密度 + kde = gaussian_kde(data) + + # 生成一组足够密集的点,计算累积分布函数 + min_val = min(data) + max_val = max(data) + x = np.linspace(min_val, max_val, 1000) + cdf = np.cumsum(kde(x) / kde(x).sum()) + + # 找到最接近所需百分位数的值 + idx = np.abs(cdf - percentile / 100.0).argmin() + approximate_value = x[idx] + + return approximate_value +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.ts = 1537146000000 + def initdabase(self): + tdSql.execute('create database if not exists db_test vgroups 2 buffer 10') + tdSql.execute('use db_test') + tdSql.execute('create stable stb(ts timestamp, delay int) tags(groupid int)') + tdSql.execute('create table t1 using stb tags(1)') + tdSql.execute('create table t2 using stb tags(2)') + tdSql.execute('create table t3 using stb tags(3)') + tdSql.execute('create table t4 using stb tags(4)') + tdSql.execute('create table t5 using stb tags(5)') + tdSql.execute('create table t6 using stb tags(6)') + def insert_data(self): + for i in range(5000): + tdSql.execute(f"insert into t1 values({self.ts + i * 1000}, {i%5})") + tdSql.execute(f"insert into t2 values({self.ts + i * 1000}, {i%5})") + tdSql.execute(f"insert into t3 values({self.ts + i * 1000}, {i%5})") + + def verify_agg_null(self): + for i in range(20): + col_val_list = [] + tdSql.query(f'select CASE WHEN delay != 0 THEN delay ELSE NULL END from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}') + for col_va in tdSql.queryResult: + if col_va[0] is not None: + col_val_list.append(col_va[0]) + tdSql.query(f'SELECT APERCENTILE(CASE WHEN delay != 0 THEN delay ELSE NULL END,50) AS apercentile,\ + MAX(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS maxDelay,\ + MIN(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS minDelay,\ + AVG(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS avgDelay,\ + STDDEV(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS jitter,\ + COUNT(CASE WHEN delay = 0 THEN 1 ELSE NULL END) AS timeoutCount,\ + COUNT(*) AS totalCount ,\ + ELAPSED(ts) AS elapsed_time,\ + SPREAD(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS spread,\ + SUM(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS sum,\ + HYPERLOGLOG(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS hyperloglog from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}') + #verify apercentile + apercentile_res = tdSql.queryResult[0][0] + approximate_median = approximate_percentile(col_val_list, 50) + assert np.abs(apercentile_res - approximate_median) < 1 + #verify max + max_res = tdSql.queryResult[0][1] + tdSql.checkEqual(max_res,max(col_val_list)) + #verify min + min_res = tdSql.queryResult[0][2] + tdSql.checkEqual(min_res,min(col_val_list)) + #verify avg + avg_res = tdSql.queryResult[0][3] + tdSql.checkEqual(avg_res,np.average(col_val_list)) + #verify stddev + stddev_res = tdSql.queryResult[0][4] + assert np.abs(stddev_res - np.std(col_val_list)) < 0.0001 + #verify count of 0 + count of !0 == count(*) + count_res = tdSql.queryResult[0][6] + tdSql.checkEqual(count_res,len(col_val_list)+tdSql.queryResult[0][5]) + #verify elapsed + elapsed_res = tdSql.queryResult[0][7] + assert elapsed_res == 10000 + #verify spread + spread_res = tdSql.queryResult[0][8] + tdSql.checkEqual(spread_res,max(col_val_list) - min(col_val_list)) + #verify sum + sum_res = tdSql.queryResult[0][9] + tdSql.checkEqual(sum_res,sum(col_val_list)) + #verify hyperloglog + error_rate = 0.01 + hll = HyperLogLog(error_rate) + for col_val in col_val_list: + hll.add(col_val) + hll_res = tdSql.queryResult[0][10] + assert np.abs(hll_res - hll.card()) < 0.01 + #verify leastsquares + tdSql.query(f'SELECT leastsquares(CASE WHEN delay != 0 THEN delay ELSE NULL END,1,1) from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}') + cleaned_data = tdSql.queryResult[0][0].strip('{}').replace(' ', '') + pairs = cleaned_data.split(',') + slope = None + intercept = None + for pair in pairs: + key, value = pair.split(':') + key = key.strip() + value = float(value.strip()) + if key == 'slop': + slope = value + elif key == 'intercept': + intercept = value + assert slope != 0 + assert intercept != 0 + #verify histogram + tdSql.query(f'SELECT histogram(CASE WHEN delay != 0 THEN delay ELSE NULL END, "user_input", "[1,3,5,7]", 1) from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}') + cleaned_data = tdSql.queryResult[0][0].strip('{}').replace(' ', '') + pairs = cleaned_data.split(',') + count = None + for pair in pairs: + key, value = pair.split(':') + key = key.strip() + if key == 'count': + count = float(value.strip()) + assert count != 0 + def run(self): + self.initdabase() + self.insert_data() + self.verify_agg_null() + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From 645b49023bfa092a2d1aac008a9db7d079205510 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Thu, 8 Aug 2024 19:39:43 +0800 Subject: [PATCH 05/16] add hyperloglog to requirements.txt --- tests/requirements.txt | 1 + tests/system-test/2-query/agg_null.py | 28 +-------------------------- 2 files changed, 2 insertions(+), 27 deletions(-) diff --git a/tests/requirements.txt b/tests/requirements.txt index 5cdd9e02be..c6dd044c86 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -9,3 +9,4 @@ requests pexpect faker pyopenssl +hyperloglog \ No newline at end of file diff --git a/tests/system-test/2-query/agg_null.py b/tests/system-test/2-query/agg_null.py index bb4fbf41a2..bec879abbe 100644 --- a/tests/system-test/2-query/agg_null.py +++ b/tests/system-test/2-query/agg_null.py @@ -17,36 +17,10 @@ from util.cases import * from util.sql import * from util.common import * from util.sqlset import * -from scipy.stats import gaussian_kde from hyperloglog import HyperLogLog ''' Test case for TS-5150 ''' -def approximate_percentile(data, percentile): - """ - 使用 KDE 近似计算百分位数。 - - Parameters: - - data: 包含数据的列表或数组 - - percentile: 要计算的百分位数(0到100之间) - - Returns: - - 近似百分位数的值 - """ - # 使用高斯核估计概率密度 - kde = gaussian_kde(data) - - # 生成一组足够密集的点,计算累积分布函数 - min_val = min(data) - max_val = max(data) - x = np.linspace(min_val, max_val, 1000) - cdf = np.cumsum(kde(x) / kde(x).sum()) - - # 找到最接近所需百分位数的值 - idx = np.abs(cdf - percentile / 100.0).argmin() - approximate_value = x[idx] - - return approximate_value class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -89,7 +63,7 @@ class TDTestCase: HYPERLOGLOG(CASE WHEN delay != 0 THEN delay ELSE NULL END) AS hyperloglog from stb where ts between {1537146000000 + i * 1000} and {1537146000000 + (i+10) * 1000}') #verify apercentile apercentile_res = tdSql.queryResult[0][0] - approximate_median = approximate_percentile(col_val_list, 50) + approximate_median = np.percentile(col_val_list, 50) assert np.abs(apercentile_res - approximate_median) < 1 #verify max max_res = tdSql.queryResult[0][1] From 3826fec13de2d5cc7107075e8f2fe1904e125cd1 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 9 Aug 2024 09:28:13 +0800 Subject: [PATCH 06/16] adj error code --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 84ca2c36ea..880e73c5c0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4708,8 +4708,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); if (pReader->pSchemaMap == NULL) { tsdbError("failed init schema hash for reader %s", pReader->idStr); - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_NULL(pReader->pSchemaMap, code, lino, _err, terrno); } tSimpleHashSetFreeFp(pReader->pSchemaMap, freeSchemaFunc); From bc14da28e5277b1fc373ca5cc506c5e6d1468db9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 9 Aug 2024 10:31:05 +0800 Subject: [PATCH 07/16] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index dc8f494914..20f0e7b105 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2420,7 +2420,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { if (pStream != NULL) { // TODO:handle error code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false); if (code) { - mError("failed to create checkpoint trans, code:%s", strerror(code)); + mError("failed to create checkpoint trans, code:%s", tstrerror(code)); } } else { // todo: wait for the create stream trans completed, and launch the checkpoint trans From 10acd19e714ece29a0d81e2250bf2195513259d4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 9 Aug 2024 10:36:10 +0800 Subject: [PATCH 08/16] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/libs/stream/src/streamSched.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 37a171e9a4..11787a015b 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -443,7 +443,7 @@ static int32_t mndInitTimer(SMnode *pMnode) { (void)taosThreadAttrInit(&thAttr); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if ((code = taosThreadCreate(&pMnode->thread, &thAttr, mndThreadFp, pMnode)) != 0) { - mError("failed to create timer thread since %s", strerror(errno)); + mError("failed to create timer thread since %s", tstrerror(code)); TAOS_RETURN(code); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 6506d449a6..e8c7be5204 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -107,7 +107,7 @@ void streamTaskResumeHelper(void* param, void* tmrId) { int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); if (code) { - stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, strerror(code), ref); + stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref); } else { stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, ref); From 71b8f67ea681263212b5b4cd10ea687efafc2b3a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 9 Aug 2024 11:37:34 +0800 Subject: [PATCH 09/16] refactor: do some internal refactor. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 3 +++ source/dnode/vnode/src/tqCommon/tqCommon.c | 1 - source/libs/stream/src/streamDispatch.c | 8 ++++++-- source/libs/stream/src/streamHb.c | 10 +++++++--- 4 files changed, 16 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 47b0fdb412..383ffe16da 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1069,6 +1069,9 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) { while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { SChkptReportInfo* px = (SChkptReportInfo *)pIter; + if (taosArrayGetSize(px->pTaskList) == 0) { + continue; + } STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0); if (pInfo == NULL) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index faca2020c5..7164c7f543 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -417,7 +417,6 @@ int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return code; } else { tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId); - terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST; } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 493b5013d0..4da108507a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1083,7 +1083,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; - int32_t code = -1; + int32_t code = 0; SRpcMsg msg = {0}; // serialize @@ -1093,9 +1093,9 @@ int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, in goto FAIL; } - code = -1; buf = rpcMallocCont(sizeof(SMsgHead) + tlen); if (buf == NULL) { + code = terrno; goto FAIL; } @@ -1119,6 +1119,10 @@ FAIL: rpcFreeCont(buf); } + if (code == -1) { + code = TSDB_CODE_INVALID_MSG; + } + return code; } diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 8513a8ba06..898e2bbc0b 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -295,10 +295,14 @@ void streamMetaHbToMnode(void* param, void* tmrId) { if (code) { stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code)); } - streamMetaRUnLock(pMeta); - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, - "meta-hb-tmr"); + + if (code != TSDB_CODE_APP_IS_STOPPING) { + streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, + "meta-hb-tmr"); + } else { + stDebug("vgId:%d is stopping, not start hb again", pMeta->vgId); + } code = taosReleaseRef(streamMetaId, rid); if (code) { From ab2007adb75689dd3e3152f2562463e08e172600 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Aug 2024 11:41:17 +0800 Subject: [PATCH 10/16] fix:[TD-31146]memory leak --- source/client/src/clientTmq.c | 2 ++ source/common/src/tmsg.c | 1 + 2 files changed, 3 insertions(+) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c3867f2821..b8474dd9f3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2088,6 +2088,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { taosWUnLockLatch(&tmq->lock); } setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; @@ -2789,6 +2790,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { pWrapper->epoch = head->epoch; (void)memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg) == NULL){ + tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); taosFreeQitem(pWrapper); }else{ (void)taosWriteQitem(tmq->mqueue, pWrapper); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4015701a29..3b2672f5a2 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10068,6 +10068,7 @@ void *tDecodeMqSubTopicEp(void *buf, SMqSubTopicEp *pTopicEp) { buf = tDecodeSMqSubVgEp(buf, &vgEp); if (taosArrayPush(pTopicEp->vgs, &vgEp) == NULL) { taosArrayDestroy(pTopicEp->vgs); + pTopicEp->vgs = NULL; return NULL; } } From d763e559e18dbaea9f9e9e96f53fccabf6b43d9c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 9 Aug 2024 13:57:57 +0800 Subject: [PATCH 11/16] refactor errno code --- source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index a3e1a64012..aeb519596d 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -293,7 +293,7 @@ int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) { (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); if (taosThreadCreate(&pMgmt->notifyThread, &thAttr, dmNotifyThreadFp, pMgmt) != 0) { code = TAOS_SYSTEM_ERROR(errno); - dError("failed to create notify thread since %s", strerror(code)); + dError("failed to create notify thread since %s", tstrerror(code)); return code; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 537aa72d91..ee87d3b897 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -247,7 +247,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { } else { stError("failed to start stream backend at %s, reason: %s, restart from default state dir:%s", chkp, - tstrerror(TAOS_SYSTEM_ERROR(errno)), state); + tstrerror(terrno), state); code = taosMkDir(state); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); From a145d9db22518ace3680fa154d0a18ee9e7d74a6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 9 Aug 2024 14:08:08 +0800 Subject: [PATCH 12/16] fix(stream): allowed continue hb msg. --- source/libs/stream/src/streamCheckpoint.c | 20 +++++++++++++------- source/libs/stream/src/streamHb.c | 10 +++------- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index c555da9865..270f678d26 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -558,10 +558,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); } else { // not in restore status, must be in checkpoint status - stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 - " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, - id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, - pInfo->checkpointTime, pReq->checkpointTs); + if (pStatus.state == TASK_STATUS__CK) { + stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, + id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs); + } else { + stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64 + " checkpointVer:%" PRId64 "->%" PRId64, + id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, + pReq->checkpointVer); + } } ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && @@ -573,12 +580,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; - streamTaskClearCheckInfo(pTask, true); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - } else { - stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name); } + streamTaskClearCheckInfo(pTask, true); + if (pReq->dropRelHTask) { stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", pReq->taskId, vgId, pReq->hTaskId); diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 898e2bbc0b..d2c5cb05b7 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -297,14 +297,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) { } streamMetaRUnLock(pMeta); - if (code != TSDB_CODE_APP_IS_STOPPING) { - streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, - "meta-hb-tmr"); - } else { - stDebug("vgId:%d is stopping, not start hb again", pMeta->vgId); - } - + streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, + "meta-hb-tmr"); code = taosReleaseRef(streamMetaId, rid); + if (code) { stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); } From 4fd86887953857702971b7ca485e90aac82e0ae3 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 9 Aug 2024 14:43:50 +0800 Subject: [PATCH 13/16] postfix exchange operator blocking due to addref failed --- source/libs/executor/src/exchangeoperator.c | 6 ++++-- source/libs/executor/src/tfill.c | 5 ++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 78c0d939ad..5afae596a4 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -390,11 +390,13 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* } initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo); - pInfo->self = taosAddRef(exchangeObjRefPool, pInfo); - if (pInfo->self < 0) { + int64_t refId = taosAddRef(exchangeObjRefPool, pInfo); + if (refId < 0) { int32_t code = terrno; qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); return code; + } else { + pInfo->self = refId; } return initDataSource(numOfSources, pInfo, id); diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 3158c85987..957a5d1d2e 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -758,7 +758,10 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index); QUERY_CHECK_NULL(pv, code, lino, _end, terrno); - nodesValueNodeToVariant(pv, &pFillCol[i].fillVal); + code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal); + } + if (TSDB_CODE_SUCCESS != code) { + goto _end; } } pFillCol->numOfFillExpr = numOfFillExpr; From acf17054d3b65443d99115ab273fa7b13b92e537 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 9 Aug 2024 15:20:33 +0800 Subject: [PATCH 14/16] fix: possible error handle in syncPipeline.c --- source/libs/sync/src/syncPipeline.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index ef2cbece79..782d97f789 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -892,7 +892,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if (pMsg->matchIndex < pNode->pLogBuf->matchIndex) { code = syncLogReplGetPrevLogTerm(pMgr, pNode, index + 1, &term); - if (term < 0 && (errno == ENFILE || errno == EMFILE)) { + if (term < 0 && (errno == ENFILE || errno == EMFILE || errno == ENOENT)) { sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64, pNode->vgId, tstrerror(code), index + 1); TAOS_RETURN(code); } From fe48c405709589a2ab930d37fbf8c35bd5b5eaba Mon Sep 17 00:00:00 2001 From: gccgdb1234 Date: Fri, 9 Aug 2024 15:49:54 +0800 Subject: [PATCH 15/16] doc: correct join error --- docs/zh/14-reference/03-taos-sql/30-join.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/14-reference/03-taos-sql/30-join.md b/docs/zh/14-reference/03-taos-sql/30-join.md index c0daeb41c0..60b634d310 100644 --- a/docs/zh/14-reference/03-taos-sql/30-join.md +++ b/docs/zh/14-reference/03-taos-sql/30-join.md @@ -202,7 +202,7 @@ SELECT ... FROM table_name1 LEFT|RIGHT ASOF JOIN table_name2 [ON ...] [JLIMIT jl 表 d1001 电压值大于 220V 且表 d1002 中同一时刻或稍早前最后时刻出现电压大于 220V 的时间及各自的电压值: ```sql -SELECT a.ts, a.voltage, a.ts, b.voltage FROM d1001 a LEFT ASOF JOIN d1002 b ON a.ts >= b.ts where a.voltage > 220 and b.voltage > 220 +SELECT a.ts, a.voltage, b.ts, b.voltage FROM d1001 a LEFT ASOF JOIN d1002 b ON a.ts >= b.ts where a.voltage > 220 and b.voltage > 220 ``` ### Left/Right Window Join From 2553bb1745bd1ad308e548c58386000c8305492e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 9 Aug 2024 15:51:58 +0800 Subject: [PATCH 16/16] fix: remove an invalid assert in syncMain.c --- source/libs/sync/src/syncMain.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 171b73eba7..fd1d3e371e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1287,7 +1287,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { } // tools - (void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value + (void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value if (pSyncNode->pSyncRespMgr == NULL) { sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId); goto _error; @@ -1407,7 +1407,8 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); - if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE && (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) { + if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE && + (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) { TAOS_RETURN(code); } @@ -2187,7 +2188,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { } SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - ASSERT(lastIndex >= 0); + // ASSERT(lastIndex >= 0); sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId, raftStoreGetTerm(pSyncNode), pSyncNode->commitIndex, lastIndex); }