From c9475060de3d834be1153e4dc6584b73441c070f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Dec 2023 09:34:49 +0800 Subject: [PATCH] fix(stream): fix race condition. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 4 ++-- source/dnode/vnode/src/tq/tqStreamTask.c | 7 ++++++- source/dnode/vnode/src/tq/tqUtil.c | 8 ++++++-- source/libs/stream/src/streamMeta.c | 9 ++------- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index b8a1c3d4ae..8f94843500 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -90,7 +90,7 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) { - mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, + mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, tInfo.name); return true; } else { @@ -98,7 +98,7 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char } } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) { - mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, + mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, tInfo.name); return true; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 99cc3a36ea..4fc1c6cc01 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -126,10 +126,13 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t tqStopStreamTasks(STQ* pTq) { SStreamMeta* pMeta = pTq->pStreamMeta; int32_t vgId = TD_VID(pTq->pVnode); - int32_t num = taosArrayGetSize(pMeta->pTaskList); + streamMetaRLock(pMeta); + + int32_t num = taosArrayGetSize(pMeta->pTaskList); tqDebug("vgId:%d stop all %d stream task(s)", vgId, num); if (num == 0) { + streamMetaRUnLock(pMeta); return TSDB_CODE_SUCCESS; } @@ -149,6 +152,8 @@ int32_t tqStopStreamTasks(STQ* pTq) { } taosArrayDestroy(pTaskList); + + streamMetaRUnLock(pMeta); return 0; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d3ea1f19e5..64c733d63d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -39,6 +39,8 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { SStreamMeta* pMeta = pTq->pStreamMeta; int64_t stage = pMeta->stage; + streamMetaWLock(pMeta); + pMeta->stage = state.term; // mark the sign to send msg before close all tasks @@ -52,9 +54,11 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { state.term, stage, isLeader); streamMetaStartHb(pMeta); } else { - tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d", pMeta->vgId, state.term, stage, - isLeader); + tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId, + state.term, stage, isLeader, pMeta->sendMsgBeforeClosing); } + + streamMetaWUnLock(pMeta); } static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d1df74b8d3..3504c97398 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1328,14 +1328,9 @@ int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, voi } SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { - SArray* pTaskList = NULL; - bool sendMsg = false; - - streamMetaWLock(pMeta); - pTaskList = taosArrayDup(pMeta->pTaskList, NULL); - sendMsg = pMeta->sendMsgBeforeClosing; - streamMetaWUnLock(pMeta); + SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL); + bool sendMsg = pMeta->sendMsgBeforeClosing; if (!sendMsg) { stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId); return pTaskList;