diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 4400d7fac0..47533a102a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -711,6 +711,18 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { pMgmt->state.openVnodes = 0; dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum); + dInfo("notify all streams closed in all %d vnodes", numOfVnodes); + if (ppVnodes != NULL) { + for (int32_t i = 0; i < numOfVnodes; ++i) { + if (ppVnodes[i] != NULL) { + if (ppVnodes[i]->pImpl != NULL) { + tqNotifyClose(ppVnodes[i]->pImpl->pTq); + } + } + } + } + dInfo("notify close stream completed in %d vnodes", numOfVnodes); + for (int32_t t = 0; t < threadNum; ++t) { SVnodeThread *pThread = &threads[t]; if (pThread->vnodeNum == 0) continue; @@ -718,6 +730,7 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) { TdThreadAttr thAttr; (void)taosThreadAttrInit(&thAttr); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) { dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno)); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5b19d4cd87..1c0f73249c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -174,7 +174,10 @@ void tqNotifyClose(STQ* pTq) { if (pTq == NULL) { return; } - streamMetaNotifyClose(pTq->pStreamMeta); + + if (pTq->pStreamMeta != NULL) { + streamMetaNotifyClose(pTq->pStreamMeta); + } } void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 89cb4153fe..5e099712ca 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -464,8 +464,8 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { // 2. send msg to mnode to launch a checkpoint to keep the state for current stream code = streamTaskSendCheckpointReq(pStreamTask); - // 3. assign the status to the value that will be kept in disk - pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask).state; + // 3. the default task status should be ready or something, not halt. + // status to the value that will be kept in disk // 4. open the inputQ for all upstream tasks streamTaskOpenAllUpstreamInput(pStreamTask);