diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 4c70f3f7b9..e0d58176ed 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -51,15 +51,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS SStreamSnapReader* pSnapReader = NULL; - // restore from checkpoint if checkpointid != 0 - if (checkpointId != 0) { - sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "checkpoints", - TD_DIRSEP, checkpointId); - } else { - sprintf(tdir, "%s%s%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM, TD_DIRSEP, "state"); - } - - if (streamSnapReaderOpen(pTq, sver, checkpointId, tdir, &pSnapReader) == 0) { + if (streamSnapReaderOpen(pTq, sver, checkpointId, pTq->path, &pSnapReader) == 0) { pReader->complete = 1; } else { code = -1; diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index ce82268f7e..50187cc01e 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -79,31 +79,42 @@ const char* ROCKSDB_CURRENT = "CURRENT"; const char* ROCKSDB_CHECKPOINT_META = "CHECKPOINT"; static int64_t kBlockSize = 64 * 1024; -int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path); +int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, int64_t chkpId); void streamSnapHandleDestroy(SStreamSnapHandle* handle); // static void streamBuildFname(char* path, char* file, char* fullname) #define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \ do { \ - sprintf(fullname, "%s/%s", path, file); \ + sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \ } while (0) -int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { +int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chkpId) { // impl later + int len = strlen(path); + char* tdir = taosMemoryCalloc(1, len + 128); + memcpy(tdir, path, len); + + if (chkpId != 0) { + sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP, + chkpId); + } else { + sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); + } int32_t code = 0; - TdDirPtr pDir = taosOpenDir(path); + TdDirPtr pDir = taosOpenDir(tdir); if (NULL == pDir) { + qError("stream-state failed to open %s", tdir); goto _err; } SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); pHandle->pBackendFile = pFile; - pHandle->checkpointId = 0; + pHandle->checkpointId = chkpId; pHandle->seraial = 0; - pFile->path = taosStrdup(path); + pFile->path = tdir; pFile->pSst = taosArrayInit(16, sizeof(void*)); TdDirEntryPtr pDirEntry; @@ -135,7 +146,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { taosCloseDir(&pDir); if (pFile->pCurrent == NULL) { + qError("stream-state failed to open %s, reason: no valid file", tdir); code = -1; + tdir = NULL; goto _err; } SArray* list = taosArrayInit(64, sizeof(SBackendFileItem)); @@ -176,11 +189,13 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { pHandle->pFileList = list; char fullname[256] = {0}; - char* file = taosArrayGet(pHandle->pFileList, pHandle->currFileIdx); + char* file = ((SBackendFileItem*)taosArrayGet(pHandle->pFileList, pHandle->currFileIdx))->name; STREAM_ROCKSDB_BUILD_FULLNAME(pFile->path, file, fullname); pHandle->fd = taosOpenFile(fullname, TD_FILE_READ); if (pHandle->fd == NULL) { + qError("stream-state failed to open %s, reason: %s", tdir, tstrerror(errno)); + tdir = NULL; goto _err; } pHandle->seraial = 0; @@ -188,6 +203,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path) { return 0; _err: streamSnapHandleDestroy(pHandle); + taosMemoryFreeClear(tdir); code = -1; return code; @@ -195,32 +211,33 @@ _err: void streamSnapHandleDestroy(SStreamSnapHandle* handle) { SBanckendFile* pFile = handle->pBackendFile; - taosMemoryFree(pFile->pCheckpointMeta); - taosMemoryFree(pFile->pCurrent); - taosMemoryFree(pFile->pMainfest); - taosMemoryFree(pFile->pOptions); - taosMemoryFree(pFile->path); - for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { - char* sst = taosArrayGetP(pFile->pSst, i); - taosMemoryFree(sst); + if (pFile) { + taosMemoryFree(pFile->pCheckpointMeta); + taosMemoryFree(pFile->pCurrent); + taosMemoryFree(pFile->pMainfest); + taosMemoryFree(pFile->pOptions); + taosMemoryFree(pFile->path); + for (int i = 0; i < taosArrayGetSize(pFile->pSst); i++) { + char* sst = taosArrayGetP(pFile->pSst, i); + taosMemoryFree(sst); + } + taosArrayDestroy(pFile->pSst); + taosMemoryFree(pFile); } - taosArrayDestroy(pFile->pSst); - taosMemoryFree(pFile); taosArrayDestroy(handle->pFileList); taosCloseFile(&handle->fd); return; } -int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t ever, char* path, SStreamSnapReader** ppReader) { +int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* path, SStreamSnapReader** ppReader) { // impl later SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader)); if (pReader == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - pReader->handle.checkpointId = ever; - // const char* path = NULL; - if (streamSnapHandleInit(&pReader->handle, (char*)path) < 0) { + + if (streamSnapHandleInit(&pReader->handle, (char*)path, chkpId) < 0) { taosMemoryFree(pReader); return -1; } diff --git a/source/libs/tdb/test/tdbPageRecycleTest.cpp b/source/libs/tdb/test/tdbPageRecycleTest.cpp index 4d7b314917..d740bd0f94 100644 --- a/source/libs/tdb/test/tdbPageRecycleTest.cpp +++ b/source/libs/tdb/test/tdbPageRecycleTest.cpp @@ -804,7 +804,7 @@ TEST(TdbPageRecycleTest, recycly_delete_interior_ofp_nocommit) { // sprintf(&key[count - 2], "%c", i); key[count - 2] = '0' + i; - ret = tdbTbInsert(pDb, key, count, NULL, NULL, txn); + ret = tdbTbInsert(pDb, key, count, NULL, 0, txn); GTEST_ASSERT_EQ(ret, 0); } }