fix(stream): set the correct guard flag.

This commit is contained in:
Haojun Liao 2023-09-14 22:44:16 +08:00
parent 3d1f29ff21
commit 39e0c57323
2 changed files with 8 additions and 3 deletions

View File

@ -551,13 +551,13 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
walApplyVer(pVnode->pWal, commitIdx);
pVnode->restored = true;
if (!pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
return;
}
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
if (!pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
if (pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
return;

View File

@ -794,11 +794,14 @@ void metaHbToMnode(void* param, void* tmrId) {
}
if (!enoughTimeDuration(&pMeta->hbInfo)) {
qInfo("vgId:%d not enough time, %d", pMeta->vgId, pMeta->hbInfo.tickCounter);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosReleaseRef(streamMetaId, rid);
return;
}
qInfo("vgId:%d start hb", pMeta->vgId);
taosRLockLatch(&pMeta->lock);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
@ -820,7 +823,7 @@ void metaHbToMnode(void* param, void* tmrId) {
STaskStatusEntry entry = {.streamId = pId->streamId, .taskId = pId->taskId, .status = (*pTask)->status.taskStatus};
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (i == 0) {
if (!hasValEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasValEpset = true;
}
@ -866,6 +869,8 @@ void metaHbToMnode(void* param, void* tmrId) {
qDebug("vgId:%d, build and send hb to mnode", pMeta->vgId);
tmsgSendReq(&epset, &msg);
} else {
qError("vgId:%d no mnd epset", pMeta->vgId);
}
taosArrayDestroy(hbMsg.pTaskStatus);