From 52788aca1ccc75f253d8c2e7a617512e57e7cb21 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 19 Aug 2023 12:25:22 +0800 Subject: [PATCH] fix checkpoint error --- source/libs/stream/src/streamBackendRocksdb.c | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 93b4d3f650..d2fc7741f1 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -826,6 +826,11 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { } qDebug("stream backend:%p start to do checkpoint at:%s, %d ", pHandle, checkpointDir, nCf); + if (taosIsDir(checkpointDir)) { + qInfo("stream rm exist checkpoint%s", checkpointDir); + taosRemoveFile(checkpointDir); + } + if (pHandle->db != NULL) { char* err = NULL; @@ -841,19 +846,18 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err); if (cp == NULL || err != NULL) { - qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); + qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, checkpointDir, err); taosMemoryFreeClear(err); code = -1; goto _ERROR; } - rocksdb_checkpoint_create(cp, checkpointDir, 64 << 20, &err); if (err != NULL) { - qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, path, err); + qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, checkpointDir, err); taosMemoryFreeClear(err); } else { code = 0; - qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, path, + qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, checkpointDir, taosGetTimestampMs() - st); } rocksdb_checkpoint_object_destroy(cp); @@ -863,9 +867,11 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { int64_t id = *(int64_t*)taosArrayGet(refs, i); taosReleaseRef(streamBackendCfWrapperId, id); } - taosWLockLatch(&pMeta->chkpDirLock); - taosArrayPush(pMeta->chkpSaved, &checkpointId); - taosWUnLockLatch(&pMeta->chkpDirLock); + if (code == 0) { + taosWLockLatch(&pMeta->chkpDirLock); + taosArrayPush(pMeta->chkpSaved, &checkpointId); + taosWUnLockLatch(&pMeta->chkpDirLock); + } taosArrayDestroy(refs); taosMemoryFree(ppCf); @@ -874,8 +880,6 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { _ERROR: taosReleaseRef(streamBackendId, backendRid); - taosArrayDestroy(refs); - taosMemoryFree(ppCf); return code; }