diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 0028efae17..00fdb7be78 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -138,5 +138,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamBackendTriggerChkp(void* pMeta, char* dst); + +int32_t streamBackendAddInUseChkpPos(void* arg, int64_t chkpId); +int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); + // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 618f3c52e7..1ebe82ba5c 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -671,13 +671,13 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { SArray* chkpDel = taosArrayInit(10, sizeof(int64_t)); SArray* chkpDup = taosArrayInit(10, sizeof(int64_t)); - int64_t minId = 0; + int64_t firsId = 0; if (taosArrayGetSize(pMeta->chkpInUse) >= 1) { - minId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0); + firsId = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0); for (int i = 0; i < taosArrayGetSize(pMeta->chkpSaved); i++) { int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpSaved, i); - if (id >= minId) { + if (id >= firsId) { taosArrayPush(chkpDup, &id); } else { taosArrayPush(chkpDel, &id); @@ -722,6 +722,7 @@ static int32_t compareCheckpoint(const void* a, const void* b) { int32_t streamBackendLoadCheckpointInfo(void* arg) { SStreamMeta* pMeta = arg; int32_t code = 0; + SArray* suffix = NULL; int32_t len = strlen(pMeta->path) + 30; char* chkpPath = taosMemoryCalloc(1, len); @@ -732,17 +733,15 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { taosMemoryFree(chkpPath); return 0; } - taosArrayClear(pMeta->chkpSaved); TdDirPtr pDir = taosOpenDir(chkpPath); - if (pDir == NULL) { taosMemoryFree(chkpPath); return 0; } TdDirEntryPtr de = NULL; - SArray* suffix = taosArrayInit(4, sizeof(int64_t)); + suffix = taosArrayInit(4, sizeof(int64_t)); while ((de = taosReadDir(pDir)) != NULL) { if (strcmp(taosGetDirEntryName(de), ".") == 0 || strcmp(taosGetDirEntryName(de), "..") == 0) continue; @@ -760,7 +759,8 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { } } taosArraySort(suffix, compareCheckpoint); - + // free previous chkpSaved + taosArrayClear(pMeta->chkpSaved); for (int i = 0; i < taosArrayGetSize(suffix); i++) { int64_t id = *(int64_t*)taosArrayGet(suffix, i); taosArrayPush(pMeta->chkpSaved, &id); @@ -909,6 +909,29 @@ _ERROR: taosArrayDestroy(refs); return code; } +int32_t streamBackendAddInUseChkpPos(void* arg, int64_t chkpId) { + if (arg == NULL) return 0; + + SStreamMeta* pMeta = arg; + taosWLockLatch(&pMeta->chkpDirLock); + taosArrayPush(pMeta->chkpInUse, &chkpId); + taosWUnLockLatch(&pMeta->chkpDirLock); + return 0; +} +int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) { + if (arg == NULL) return 0; + + SStreamMeta* pMeta = arg; + taosWLockLatch(&pMeta->chkpDirLock); + if (taosArrayGetSize(pMeta->chkpInUse) > 0) { + int64_t id = *(int64_t*)taosArrayGet(pMeta->chkpInUse, 0); + if (id == chkpId) { + taosArrayPopFrontBatch(pMeta->chkpInUse, 1); + } + } + taosWUnLockLatch(&pMeta->chkpDirLock); + return 0; +} int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { SStreamMeta* pMeta = arg; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 57abd16f1d..a8fa037592 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -169,10 +169,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->chkpCap = 8; taosInitRWLatch(&pMeta->chkpDirLock); - int64_t chkpId = streamGetLatestCheckpointId(pMeta); - pMeta->chkpId = chkpId; - - pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId); + pMeta->chkpId = streamGetLatestCheckpointId(pMeta); + pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); while (pMeta->streamBackend == NULL) { taosMsleep(2 * 1000); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); @@ -181,18 +179,15 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF qInfo("vgId:%d retry to init stream backend", pMeta->vgId); } } - pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); code = streamBackendLoadCheckpointInfo(pMeta); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - goto _err; - } + taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); - qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, chkpId, stage); + qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, + stage); return pMeta; _err: @@ -202,6 +197,10 @@ _err: if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); + + // taosThreadMutexDestroy(&pMeta->backendMutex); + // taosThreadRwlockDestroy(&pMeta->lock); + taosMemoryFree(pMeta); qError("failed to open stream meta"); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 367b45b9eb..29e1232ee8 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -116,19 +116,21 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk int32_t code = 0; - int8_t chkpFlag = 0; + int8_t validChkp = 0; if (chkpId != 0) { sprintf(tdir, "%s%s%s%s%s%scheckpoint%" PRId64 "", path, TD_DIRSEP, "stream", TD_DIRSEP, "checkpoints", TD_DIRSEP, chkpId); if (taosIsDir(tdir)) { - chkpFlag = 1; + validChkp = 1; qInfo("%s start to read snap %s", STREAM_STATE_TRANSFER, tdir); + streamBackendAddInUseChkpPos(pMeta, chkpId); } else { qWarn("%s failed to read from %s, reason: dir not exist,retry to default state dir", STREAM_STATE_TRANSFER, tdir); } } - if (chkpFlag == 0) { + // no checkpoint specified or not exists invalid checkpoint, do checkpoint at default path and translate it + if (validChkp == 0) { sprintf(tdir, "%s%s%s%s%s", path, TD_DIRSEP, "stream", TD_DIRSEP, "state"); char* chkpdir = taosMemoryCalloc(1, len + 256); sprintf(chkpdir, "%s%s%s", tdir, TD_DIRSEP, "tmp"); @@ -143,6 +145,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk taosMemoryFree(tdir); return code; } + chkpId = 0; } qInfo("%s start to read dir: %s", STREAM_STATE_TRANSFER, tdir); @@ -252,6 +255,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk pHandle->pFileList = list; pHandle->seraial = 0; pHandle->offset = 0; + pHandle->handle = pMeta; return 0; _err: streamSnapHandleDestroy(pHandle); @@ -265,9 +269,12 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { SBanckendFile* pFile = handle->pBackendFile; if (handle->checkpointId == 0) { + // del tmp dir if (taosIsDir(pFile->path)) { taosRemoveDir(pFile->path); } + } else { + streamBackendDelInUseChkp(handle->handle, handle->checkpointId); } if (pFile) { taosMemoryFree(pFile->pCheckpointMeta);