From 1c5346b13c137f107c15f5d9ae3c1dc3769788a6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 19 Aug 2023 16:10:03 +0800 Subject: [PATCH] refactor checkpoint --- source/libs/stream/src/streamBackendRocksdb.c | 159 +++++++++++------- 1 file changed, 100 insertions(+), 59 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d2fc7741f1..cdcd482442 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -420,7 +420,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { /*param@1: checkpointId dir param@2: state - copy checkpointdir's file to state dir + copy pChkpIdDir's file to state dir opt to set hard link to previous file */ char* state = taosMemoryCalloc(1, strlen(path) + 32); @@ -770,8 +770,9 @@ int32_t streamBackendLoadCheckpointInfo(void* arg) { return 0; } -int32_t streamBackendGetAllCfHandle(SStreamMeta* pMeta, SArray* pHandle, SArray* refs) { - void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); +int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*** ppHandle, SArray* refs) { + SArray* pHandle = taosArrayInit(16, POINTER_BYTES); + void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL); while (pIter) { int64_t id = *(int64_t*)pIter; @@ -790,6 +791,77 @@ int32_t streamBackendGetAllCfHandle(SStreamMeta* pMeta, SArray* pHandle, SArray* taosArrayPush(refs, &id); pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter); } + + int32_t nCf = taosArrayGetSize(pHandle); + + rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); + for (int i = 0; i < nCf; i++) { + ppCf[i] = taosArrayGetP(pHandle, i); + } + taosArrayDestroy(pHandle); + + *ppHandle = ppCf; + return nCf; +} +int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { + int32_t code = -1; + char* err = NULL; + rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(db, &err); + if (cp == NULL || err != NULL) { + qError("failed to do checkpoint at:%s, reason:%s", path, err); + taosMemoryFreeClear(err); + goto _ERROR; + } + + rocksdb_checkpoint_create(cp, path, 64 << 20, &err); + if (err != NULL) { + qError("failed to do checkpoint at:%s, reason:%s", path, err); + taosMemoryFreeClear(err); + } else { + code = 0; + } +_ERROR: + rocksdb_checkpoint_object_destroy(cp); + return code; +} +int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) { + int code = -1; + char* err = NULL; + rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); + rocksdb_flushoptions_set_wait(flushOpt, 1); + + rocksdb_flush_cfs(db, flushOpt, cf, nCf, &err); + if (err != NULL) { + qError("failed to flush db before streamBackend clean up, reason:%s", err); + taosMemoryFree(err); + } + code = 0; + rocksdb_flushoptions_destroy(flushOpt); + return code; +} +int32_t chkpPreCheckDir(char* path, int64_t chkpId, char** chkpDir, char** chkpIdDir) { + int32_t code = 0; + char* pChkpDir = taosMemoryCalloc(1, 256); + char* pChkpIdDir = taosMemoryCalloc(1, 256); + + sprintf(pChkpDir, "%s%s%s", path, TD_DIRSEP, "checkpoints"); + code = taosMulModeMkDir(pChkpDir, 0755); + if (code != 0) { + qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); + taosMemoryFree(pChkpDir); + taosMemoryFree(pChkpIdDir); + code = -1; + return code; + } + + sprintf(pChkpIdDir, "%s%scheckpoint%" PRId64, pChkpDir, TD_DIRSEP, chkpId); + if (taosIsDir(pChkpIdDir)) { + qInfo("stream rm exist checkpoint%s", pChkpIdDir); + taosRemoveFile(pChkpIdDir); + } + *chkpDir = pChkpDir; + *chkpIdDir = pChkpIdDir; + return 0; } int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { @@ -801,66 +873,33 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { SArray* refs = taosArrayInit(16, sizeof(int64_t)); SArray* pCf = taosArrayInit(16, POINTER_BYTES); - char path[256] = {0}; - sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, "checkpoints"); - code = taosMulModeMkDir(path, 0755); - if (code != 0) { - qError("failed to prepare checkpoint dir, path:%s, reason:%s", path, tstrerror(code)); - return code; - } + rocksdb_column_family_handle_t** ppCf = NULL; - char checkpointDir[256] = {0}; - snprintf(checkpointDir, tListLen(checkpointDir), "%s%scheckpoint%" PRId64, path, TD_DIRSEP, checkpointId); + char* pChkpDir = NULL; + char* pChkpIdDir = NULL; + if (chkpPreCheckDir(pMeta->path, checkpointId, &pChkpDir, &pChkpIdDir) != 0) { + goto _ERROR; + } SBackendWrapper* pHandle = taosAcquireRef(streamBackendId, backendRid); - if (pHandle == NULL) { - return -1; + if (pHandle == NULL || pHandle->db == NULL) { + goto _ERROR; } - streamBackendGetAllCfHandle(pMeta, pCf, refs); + int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs); + qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, nCf); - int32_t nCf = taosArrayGetSize(pCf); - rocksdb_column_family_handle_t** ppCf = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*)); - for (int i = 0; i < nCf; i++) { - ppCf[i] = taosArrayGetP(pCf, i); - } - - qDebug("stream backend:%p start to do checkpoint at:%s, %d ", pHandle, checkpointDir, nCf); - if (taosIsDir(checkpointDir)) { - qInfo("stream rm exist checkpoint%s", checkpointDir); - taosRemoveFile(checkpointDir); - } - - if (pHandle->db != NULL) { - char* err = NULL; - - rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - rocksdb_flushoptions_set_wait(flushOpt, 1); - - rocksdb_flush_cfs(pHandle->db, flushOpt, ppCf, nCf, &err); - if (err != NULL) { - qError("failed to flush db before streamBackend clean up, reason:%s", err); - taosMemoryFree(err); - } - rocksdb_flushoptions_destroy(flushOpt); - - rocksdb_checkpoint_t* cp = rocksdb_checkpoint_object_create(pHandle->db, &err); - if (cp == NULL || err != NULL) { - qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, checkpointDir, err); - taosMemoryFreeClear(err); - code = -1; - goto _ERROR; - } - rocksdb_checkpoint_create(cp, checkpointDir, 64 << 20, &err); - if (err != NULL) { - qError("stream backend:%p failed to do checkpoint at:%s, reason:%s", pHandle, checkpointDir, err); - taosMemoryFreeClear(err); + code = chkpPreFlushDb(pHandle->db, ppCf, nCf); + if (code == 0) { + code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir); + if (code != 0) { + qError("stream backend:%p failed to do checkpoint at:%s", pHandle, pChkpIdDir); } else { - code = 0; - qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, checkpointDir, + qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pHandle, pChkpIdDir, taosGetTimestampMs() - st); } - rocksdb_checkpoint_object_destroy(cp); + } else { + qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir); } for (int i = 0; i < taosArrayGetSize(refs); i++) { @@ -871,15 +910,17 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) { taosWLockLatch(&pMeta->chkpDirLock); taosArrayPush(pMeta->chkpSaved, &checkpointId); taosWUnLockLatch(&pMeta->chkpDirLock); + + // delete obsolte checkpoint + delObsoleteCheckpoint(arg, pChkpDir); } - taosArrayDestroy(refs); - taosMemoryFree(ppCf); - - delObsoleteCheckpoint(arg, path); - _ERROR: taosReleaseRef(streamBackendId, backendRid); + taosArrayDestroy(refs); + taosMemoryFree(ppCf); + taosMemoryFree(pChkpDir); + taosMemoryFree(pChkpIdDir); return code; }