From a2dcba10a7fe4a65c7fc48558d81ca8b99324d7b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 8 Feb 2025 23:20:10 +0800 Subject: [PATCH 1/4] refactor(stream): 1) notify closing in main thread. 2) not saving the halt status. --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 13 +++++++++++++ source/dnode/vnode/src/tq/tq.c | 5 ++++- source/libs/stream/src/streamExec.c | 4 ++-- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 4400d7fac0..47533a102a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -711,6 +711,18 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { pMgmt->state.openVnodes = 0; dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum); + dInfo("notify all streams closed in all %d vnodes", numOfVnodes); + if (ppVnodes != NULL) { + for (int32_t i = 0; i < numOfVnodes; ++i) { + if (ppVnodes[i] != NULL) { + if (ppVnodes[i]->pImpl != NULL) { + tqNotifyClose(ppVnodes[i]->pImpl->pTq); + } + } + } + } + dInfo("notify close stream completed in %d vnodes", numOfVnodes); + for (int32_t t = 0; t < threadNum; ++t) { SVnodeThread *pThread = &threads[t]; if (pThread->vnodeNum == 0) continue; @@ -718,6 +730,7 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { TdThreadAttr thAttr; (void)taosThreadAttrInit(&thAttr); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) { dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno)); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5b19d4cd87..1c0f73249c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -174,7 +174,10 @@ void tqNotifyClose(STQ* pTq) { if (pTq == NULL) { return; } - streamMetaNotifyClose(pTq->pStreamMeta); + + if (pTq->pStreamMeta != NULL) { + streamMetaNotifyClose(pTq->pStreamMeta); + } } void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 89cb4153fe..5e099712ca 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -464,8 +464,8 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { // 2. send msg to mnode to launch a checkpoint to keep the state for current stream code = streamTaskSendCheckpointReq(pStreamTask); - // 3. assign the status to the value that will be kept in disk - pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask).state; + // 3. the default task status should be ready or something, not halt. + // status to the value that will be kept in disk // 4. open the inputQ for all upstream tasks streamTaskOpenAllUpstreamInput(pStreamTask); From cb7e2cb1e06af504f3d1f1ca7f479a97d95168db Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 8 Feb 2025 23:47:29 +0800 Subject: [PATCH 2/4] refactor(stream): record the elapsed time for close stream tasks. --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 47533a102a..00fa2a8c95 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -711,7 +711,8 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { pMgmt->state.openVnodes = 0; dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum); - dInfo("notify all streams closed in all %d vnodes", numOfVnodes); + int64_t st = taosGetTimestampMs(); + dInfo("notify all streams closed in all %d vnodes, ts:%" PRId64, numOfVnodes, st); if (ppVnodes != NULL) { for (int32_t i = 0; i < numOfVnodes; ++i) { if (ppVnodes[i] != NULL) { @@ -721,7 +722,9 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { } } } - dInfo("notify close stream completed in %d vnodes", numOfVnodes); + + int64_t et = taosGetTimestampMs(); + dInfo("notify close stream completed in %d vnodes, elapsed time: %" PRId64 "ms", numOfVnodes, et - st); for (int32_t t = 0; t < threadNum; ++t) { SVnodeThread *pThread = &threads[t]; From ec954bf4083b278e0275c60a87a0724e501eab84 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 9 Feb 2025 21:13:10 +0800 Subject: [PATCH 3/4] refactor(stream): record the dropped stream. --- source/dnode/mnode/impl/src/mndDb.c | 3 --- source/dnode/mnode/impl/src/mndStream.c | 1 + 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 1d1f5744d4..2271017298 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1705,9 +1705,6 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { TAOS_CHECK_GOTO(mndSetDropDbPrepareLogs(pMnode, pTrans, pDb), NULL, _OVER); TAOS_CHECK_GOTO(mndSetDropDbCommitLogs(pMnode, pTrans, pDb), NULL, _OVER); - /*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ - /*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ - /*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ TAOS_CHECK_GOTO(mndDropStreamByDb(pMnode, pTrans, pDb), NULL, _OVER); #ifdef TD_ENTERPRISE TAOS_CHECK_GOTO(mndDropViewByDb(pMnode, pTrans, pDb), NULL, _OVER); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 30953736eb..e78996c231 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2643,6 +2643,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream); if (pStream == NULL || code != 0) { // stream has been dropped already mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId); + void *p = taosArrayPush(pStreamList, &pInfo->streamId); taosArrayDestroy(pList); continue; } From 694b6385d55a3cac8a59c116f8b850b4bcf51c8d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 9 Feb 2025 21:40:33 +0800 Subject: [PATCH 4/4] refactor(stream): adjust reset and mark failed Id. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 06b7b33cd8..bcf60f5c32 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -960,9 +960,9 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); streamMutexLock(&pTask->lock); - streamTaskClearCheckInfo(pTask, true); streamTaskSetFailedCheckpointId(pTask, pReq->chkptId); + streamTaskClearCheckInfo(pTask, true); // clear flag set during do checkpoint, and open inputQ for all upstream tasks SStreamTaskState pState = streamTaskGetStatus(pTask);