fix(stream): set the correct guard flag.

This commit is contained in:
Haojun Liao 2023-09-14 22:44:16 +08:00
parent 27f626769e
commit 303e8caf0c
2 changed files with 8 additions and 3 deletions

View File

@ -551,13 +551,13 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
walApplyVer(pVnode->pWal, commitIdx); walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true; pVnode->restored = true;
if (!pVnode->pTq->pStreamMeta->taskWillbeLaunched) { if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
return; return;
} }
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock); taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
if (!pVnode->pTq->pStreamMeta->taskWillbeLaunched) { if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
return; return;

View File

@ -793,11 +793,14 @@ void metaHbToMnode(void* param, void* tmrId) {
} }
if (!enoughTimeDuration(&pMeta->hbInfo)) { if (!enoughTimeDuration(&pMeta->hbInfo)) {
qInfo("vgId:%d not enough time, %d", pMeta->vgId, pMeta->hbInfo.tickCounter);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr); taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosReleaseRef(streamMetaId, rid); taosReleaseRef(streamMetaId, rid);
return; return;
} }
qInfo("vgId:%d start hb", pMeta->vgId);
taosRLockLatch(&pMeta->lock); taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
@ -819,7 +822,7 @@ void metaHbToMnode(void* param, void* tmrId) {
STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus}; STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus};
taosArrayPush(hbMsg.pTaskStatus, &entry); taosArrayPush(hbMsg.pTaskStatus, &entry);
if (i == 0) { if (!hasValEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset); epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasValEpset = true; hasValEpset = true;
} }
@ -865,6 +868,8 @@ void metaHbToMnode(void* param, void* tmrId) {
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId); qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg); tmsgSendReq(&epset, &msg);
} else {
qError("vgId:%d no mnd epset", pMeta->vgId);
} }
taosArrayDestroy(hbMsg.pTaskStatus); taosArrayDestroy(hbMsg.pTaskStatus);