diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 4a1b3961cd..53f9cc3c92 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -168,10 +168,8 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback) } int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) { tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); - int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); - if (code == 0) { - code = streamStateLoadTasks(pWriter); - } + // int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId); + int32_t code = streamStateLoadTasks(pWriter); tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); taosMemoryFree(pWriter); return code; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index f19068ea88..35c8a4102e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -256,6 +256,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (code) { pReader->streamStateDone = 1; pReader->pStreamStateReader = NULL; + code = 0; goto _err; } } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 775eaebf89..ba05d9968b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -922,7 +922,8 @@ int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32 rocksdb_flushoptions_destroy(flushOpt); return code; } -int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) { + +int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) { int32_t code = 0; char* pChkpDir = taosMemoryCalloc(1, 256); char* pChkpIdDir = taosMemoryCalloc(1, 256); @@ -1026,7 +1027,7 @@ int32_t taskBackendDoCheckpoint(void* arg, uint64_t chkpId) { char* pChkpDir = NULL; char* pChkpIdDir = NULL; - if (chkpPreCheckDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { + if (chkpPreBuildDir(pBackend->path, chkpId, &pChkpDir, &pChkpIdDir) != 0) { goto _EXIT; } // Get all cf and acquire cfWrappter diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8cce4dc2f9..72399865e6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -806,7 +806,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { continue; } - // do duplicate task check. + // do duplicate task checke int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 8a4500dd86..42d77fc4c0 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -270,7 +270,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { if (handle->checkpointId == 0) { // del tmp dir - if (taosIsDir(pFile->path)) { + if (pFile && taosIsDir(pFile->path)) { taosRemoveDir(pFile->path); } } else {