diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 97be6b981e..6b4451f678 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -196,11 +196,6 @@ void streamBackendCleanup(void* arg) { return; } -/* - * checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--| - * checkpointInUse: |--cp2--|--cp4--|, checkpointDir in checkpointInUse do replicate trans, cannot del until - * replication is finished - */ int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { SStreamMeta* pMeta = arg; taosWLockLatch(&pMeta->checkpointDirLock); @@ -218,12 +213,16 @@ int32_t getLatestCheckpoint(void* arg, int64_t* checkpoint) { taosWUnLockLatch(&pMeta->checkpointDirLock); return 0; } +/* + * checkpointSave |--cp1--|--cp2--|--cp3--|--cp4--|--cp5--| + * checkpointInUse: |--cp2--|--cp4--| + * checkpointInUse is doing translation, cannot del until + * replication is finished + */ int32_t delObsoleteCheckpoint(void* arg, const char* path) { SStreamMeta* pMeta = arg; taosWLockLatch(&pMeta->checkpointDirLock); - int64_t checkpointId = pMeta->checkpointTs; - taosArrayPush(pMeta->checkpointSaved, &checkpointId); SArray* checkpointDel = taosArrayInit(10, sizeof(int64_t)); SArray* checkpointDup = taosArrayInit(10, sizeof(int64_t)); @@ -241,13 +240,16 @@ int32_t delObsoleteCheckpoint(void* arg, const char* path) { } } } else { - for (int i = taosArrayGetSize(pMeta->checkpointSaved); i >= 0; i--) { + int32_t sz = taosArrayGetSize(pMeta->checkpointSaved); + int32_t dsz = sz - pMeta->checkpointCap; // del size + + for (int i = 0; i < dsz; i++) { int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); - if (taosArrayGetSize(checkpointDup) < pMeta->checkpointCap) { - taosArrayPush(checkpointDup, &id); - } else { - taosArrayPush(checkpointDel, &id); - } + taosArrayPush(checkpointDel, &id); + } + for (int i = dsz < 0 ? 0 : dsz; i < sz; i++) { + int64_t id = *(int64_t*)taosArrayGet(pMeta->checkpointSaved, i); + taosArrayPush(checkpointDup, &id); } } taosArrayDestroy(pMeta->checkpointSaved); @@ -301,6 +303,10 @@ int32_t streamBackendDoCheckpoint(void* arg, const char* path) { } rocksdb_checkpoint_object_destroy(cp); } + taosWLockLatch(&pMeta->checkpointDirLock); + taosArrayPush(pMeta->checkpointSaved, &checkpointId); + taosWUnLockLatch(&pMeta->checkpointDirLock); + delObsoleteCheckpoint(arg, path); _ERROR: taosReleaseRef(streamBackendId, backendRid);