refactor(stream): 1) notify closing in main thread. 2) not saving the halt status.

This commit is contained in:
Haojun Liao 2025-02-08 23:20:10 +08:00
parent cdc38a99b7
commit a2dcba10a7
3 changed files with 19 additions and 3 deletions

View File

@ -711,6 +711,18 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
pMgmt->state.openVnodes = 0; pMgmt->state.openVnodes = 0;
dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum); dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
dInfo("notify all streams closed in all %d vnodes", numOfVnodes);
if (ppVnodes != NULL) {
for (int32_t i = 0; i < numOfVnodes; ++i) {
if (ppVnodes[i] != NULL) {
if (ppVnodes[i]->pImpl != NULL) {
tqNotifyClose(ppVnodes[i]->pImpl->pTq);
}
}
}
}
dInfo("notify close stream completed in %d vnodes", numOfVnodes);
for (int32_t t = 0; t < threadNum; ++t) { for (int32_t t = 0; t < threadNum; ++t) {
SVnodeThread *pThread = &threads[t]; SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum == 0) continue; if (pThread->vnodeNum == 0) continue;
@ -718,6 +730,7 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
TdThreadAttr thAttr; TdThreadAttr thAttr;
(void)taosThreadAttrInit(&thAttr); (void)taosThreadAttrInit(&thAttr);
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); (void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) { if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno)); dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
} }

View File

@ -174,7 +174,10 @@ void tqNotifyClose(STQ* pTq) {
if (pTq == NULL) { if (pTq == NULL) {
return; return;
} }
streamMetaNotifyClose(pTq->pStreamMeta);
if (pTq->pStreamMeta != NULL) {
streamMetaNotifyClose(pTq->pStreamMeta);
}
} }
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {

View File

@ -464,8 +464,8 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
// 2. send msg to mnode to launch a checkpoint to keep the state for current stream // 2. send msg to mnode to launch a checkpoint to keep the state for current stream
code = streamTaskSendCheckpointReq(pStreamTask); code = streamTaskSendCheckpointReq(pStreamTask);
// 3. assign the status to the value that will be kept in disk // 3. the default task status should be ready or something, not halt.
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask).state; // status to the value that will be kept in disk
// 4. open the inputQ for all upstream tasks // 4. open the inputQ for all upstream tasks
streamTaskOpenAllUpstreamInput(pStreamTask); streamTaskOpenAllUpstreamInput(pStreamTask);