diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8f3e100db6..3c475d0a03 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -840,11 +840,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int3 int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaReopen(SStreamMeta* pMeta); +void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); -int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta); int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 2ab710176d..50f413bcc9 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -181,5 +181,5 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) } int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { - return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta); + return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b1d49bf31b..a761d15eff 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -712,9 +712,9 @@ int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { } static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { - int32_t vgId = pMeta->vgId; - int32_t code = 0; - int64_t st = taosGetTimestampMs(); + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); while(1) { int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); @@ -736,17 +736,9 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } streamMetaWLock(pMeta); - code = streamMetaReopen(pMeta); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } + streamMetaClear(pMeta); - streamMetaInitBackend(pMeta); int64_t el = taosGetTimestampMs() - st; - tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); code = streamMetaLoadAllTasks(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 807f120cb7..23cb6f5a35 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -31,7 +31,6 @@ int32_t streamMetaId = 0; int32_t taskDbWrapperId = 0; static void metaHbToMnode(void* param, void* tmrId); -static void streamMetaClear(SStreamMeta* pMeta); static int32_t streamMetaBegin(SStreamMeta* pMeta); static void streamMetaCloseImpl(void* arg); @@ -395,41 +394,6 @@ _err: return NULL; } -int32_t streamMetaReopen(SStreamMeta* pMeta) { - streamMetaClear(pMeta); - - // NOTE: role should not be changed during reopen meta - pMeta->streamBackendRid = -1; - pMeta->streamBackend = NULL; - - char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128); - sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); - taosRemoveDir(defaultPath); - - char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128); - sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); - - int32_t code = taosStatFile(newPath, NULL, NULL, NULL); - if (code == 0) { - // directory exists - code = taosRenameFile(newPath, defaultPath); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath, - tstrerror(terrno)); - - taosMemoryFree(defaultPath); - taosMemoryFree(newPath); - return -1; - } - } - - taosMemoryFree(defaultPath); - taosMemoryFree(newPath); - - return 0; -} - // todo refactor: the lock shoud be restricted in one function void streamMetaInitBackend(SStreamMeta* pMeta) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); @@ -829,28 +793,27 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { taosArrayDestroy(pRecycleList); } -int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) { - if (pMeta == NULL) return 0; - - return streamMetaLoadAllTasks(pMeta); -} int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; - int32_t vgId = pMeta->vgId; - - stInfo("vgId:%d load stream tasks from meta files", vgId); - - if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); - return -1; - } - void* pKey = NULL; int32_t kLen = 0; void* pVal = NULL; int32_t vLen = 0; SDecoder decoder; - SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); + + if (pMeta == NULL) { + return TSDB_CODE_SUCCESS; + } + + SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); + int32_t vgId = pMeta->vgId; + stInfo("vgId:%d load stream tasks from meta files", vgId); + + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { + stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); + taosArrayDestroy(pRecycleList); + return -1; + } tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {