From 6fc364db199ada4a9f9b4410ed1a5bc2cd40faea Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Apr 2023 13:53:55 +0800 Subject: [PATCH 1/4] fix(query): add null ptr check. --- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index fa8870835c..eb383df48d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -88,6 +88,10 @@ void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double } void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { + if (pLoadInfo == NULL) { + return NULL; + } + for (int32_t i = 0; i < pLoadInfo->numOfStt; ++i) { pLoadInfo[i].currentLoadBlockIndex = 1; pLoadInfo[i].blockIndex[0] = -1; From af65f9703bf809091c3b6c7579f143c9f80b253f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 23 Apr 2023 17:36:59 +0800 Subject: [PATCH 2/4] fix:tmq error if consume callback is earlier than consume --- utils/test/c/tmqSim.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index f2de219f4e..530b142173 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -690,11 +690,12 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { } static int32_t g_once_commit_flag = 0; +static int32_t g_once_consume_flag = 0; static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (0 == g_once_commit_flag) { + if (g_once_consume_flag == 1 && 0 == g_once_commit_flag) { g_once_commit_flag = 1; notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } @@ -773,8 +774,6 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { void loop_consume(SThreadInfo* pInfo) { int32_t code; - int32_t once_flag = 0; - int64_t totalMsgs = 0; int64_t totalRows = 0; @@ -834,8 +833,8 @@ void loop_consume(SThreadInfo* pInfo) { lastTotalMsgs = totalMsgs; } - if (0 == once_flag) { - once_flag = 1; + if (0 == g_once_consume_flag) { + g_once_consume_flag = 1; notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); } From 623ea6df5fcd8f016dc36bb3303be5bd2e0bb6cd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 23 Apr 2023 17:42:56 +0800 Subject: [PATCH 3/4] fix(stream): add task status check. --- source/libs/stream/src/stream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 86ba91f76d..046dab380e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -52,7 +52,7 @@ void streamCleanUp() { void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { + if (streamTaskShouldStop(&pTask->status)) { streamMetaReleaseTask(NULL, pTask); return; } From 7e9432606d245dc4df81c06e4d78135a627c1b2b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 24 Apr 2023 09:50:39 +0800 Subject: [PATCH 4/4] fix(tmq): revoke the fix, it will definitly cause the deadlock. --- tests/system-test/7-tmq/tmqCommon.py | 2 +- utils/test/c/tmqSim.c | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index f63c70a4c6..32bb22c8cc 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -170,7 +170,7 @@ class TMQCom: if tdSql.getData(i, 1) == 1: loopFlag = 0 break - time.sleep(0.02) + time.sleep(0.10) return def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1): diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index 530b142173..f2de219f4e 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -690,12 +690,11 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) { } static int32_t g_once_commit_flag = 0; -static int32_t g_once_consume_flag = 0; static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (g_once_consume_flag == 1 && 0 == g_once_commit_flag) { + if (0 == g_once_commit_flag) { g_once_commit_flag = 1; notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } @@ -774,6 +773,8 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { void loop_consume(SThreadInfo* pInfo) { int32_t code; + int32_t once_flag = 0; + int64_t totalMsgs = 0; int64_t totalRows = 0; @@ -833,8 +834,8 @@ void loop_consume(SThreadInfo* pInfo) { lastTotalMsgs = totalMsgs; } - if (0 == g_once_consume_flag) { - g_once_consume_flag = 1; + if (0 == once_flag) { + once_flag = 1; notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); }