From 6c0bbe684b0e2bd1a150c4540efc158921278f41 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Nov 2022 22:28:03 +0800 Subject: [PATCH 1/3] fix: heap-use-after-free in syncNodeEqPingTimer --- source/libs/sync/src/syncMain.c | 6 +++--- tests/parallel_test/cases.task | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c39558ca1d..c623628978 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1842,7 +1842,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { return; } - sNTrace(pNode, "enqueue ping msg"); + sTrace(pNode, "enqueue ping msg"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { sError("failed to sync enqueue ping msg since %s", terrstr()); @@ -1870,7 +1870,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } SyncTimeout* pTimeout = rpcMsg.pCont; - sNTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock); + sTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { @@ -1894,7 +1894,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { return; } - sNTrace(pNode, "enqueue heartbeat timer"); + sTrace(pNode, "enqueue heartbeat timer"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { sError("failed to enqueue heartbeat msg since %s", terrstr()); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index e4edc43600..f0dff430b3 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -215,7 +215,7 @@ ,,y,script,./test.sh -f tsim/stream/basic2.sim ,,,script,./test.sh -f tsim/stream/drop_stream.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim -,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim +,,,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim ,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim ,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim From aab1ec632c304d8d9bb3615a90cfb3349799b52a Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Nov 2022 22:29:59 +0800 Subject: [PATCH 2/3] fix: heap-use-after-free in syncNodeEqPingTimer --- source/libs/sync/src/syncMain.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c623628978..514160235f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1842,7 +1842,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { return; } - sTrace(pNode, "enqueue ping msg"); + sTrace("enqueue ping msg"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { sError("failed to sync enqueue ping msg since %s", terrstr()); @@ -1870,7 +1870,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } SyncTimeout* pTimeout = rpcMsg.pCont; - sTrace(pNode, "enqueue elect msg lc:%" PRId64, pTimeout->logicClock); + sTrace("enqueue elect msg lc:%" PRId64, pTimeout->logicClock); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { @@ -1894,7 +1894,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { return; } - sTrace(pNode, "enqueue heartbeat timer"); + sTrace("enqueue heartbeat timer"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { sError("failed to enqueue heartbeat msg since %s", terrstr()); From 6478fcb2c11ddab16b3cf90413450533c90f29dd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 15 Nov 2022 22:32:52 +0800 Subject: [PATCH 3/3] fix(stream): check task --- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/wal/src/walWrite.c | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 419d4c3b9b..092128fe6e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -983,7 +983,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { rsp.status = 0; } - streamMetaReleaseTask(pTq->pStreamMeta, pTask); + if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 7c3b4cba30..34195f0193 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -22,6 +22,8 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) { taosThreadMutexLock(&pWal->mutex); + wInfo("vgId:%d, restore from snapshot, version %" PRId64, pWal->cfg.vgId, ver); + void *pIter = NULL; while (1) { pIter = taosHashIterate(pWal->pRefHash, pIter);