diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 654a0b6abc..f407290c00 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -825,6 +825,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaReopen(SStreamMeta* pMeta); +void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3ae0eb1ddf..2717f1b78c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1974,6 +1974,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } + streamMetaInitBackend(pMeta); + if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) { tqError("vgId:%d failed to load stream tasks", vgId); streamMetaWUnLock(pMeta); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 41392ba27b..7a8147f83b 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -169,10 +169,15 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); + + streamMetaWLock(pWriter->pTq->pStreamMeta); int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta); if (code == 0) { + streamMetaInitBackend(pWriter->pTq->pStreamMeta); code = streamStateLoadTasks(pWriter); } + + streamMetaWUnLock(pWriter->pTq->pStreamMeta); tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); taosMemoryFree(pWriter); return code; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index e578638e9d..7ec0490e3f 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -144,7 +144,6 @@ int32_t tqRestartStreamTasks(STQ* pTq) { } streamMetaWLock(pMeta); - code = streamMetaReopen(pMeta); if (code != TSDB_CODE_SUCCESS) { tqError("vgId:%d failed to reopen stream meta", vgId); @@ -153,6 +152,7 @@ int32_t tqRestartStreamTasks(STQ* pTq) { return code; } + streamMetaInitBackend(pMeta); int64_t el = taosGetTimestampMs() - st; tqInfo("vgId:%d close&reload state elapsed time:%.3fms", vgId, el/1000.); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7013b43a6f..228da65021 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -262,18 +262,33 @@ int32_t streamMetaReopen(SStreamMeta* pMeta) { } } - // todo: not wait in a critical region - while ((pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId)) == NULL) { - stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); - taosMsleep(100); + taosMemoryFree(defaultPath); + taosMemoryFree(newPath); + + return 0; +} + +// todo refactor: the lock shoud be restricted in one function +void streamMetaInitBackend(SStreamMeta* pMeta) { + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); + if (pMeta->streamBackend == NULL) { + streamMetaWUnLock(pMeta); + + while (1) { + streamMetaWLock(pMeta); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); + if (pMeta->streamBackend != NULL) { + break; + } + + streamMetaWUnLock(pMeta); + stInfo("vgId:%d failed to init stream backend, retry in 100ms", pMeta->vgId); + taosMsleep(100); + } } pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); streamBackendLoadCheckpointInfo(pMeta); - - taosMemoryFree(defaultPath); - taosMemoryFree(newPath); - return 0; } void streamMetaClear(SStreamMeta* pMeta) {