From c7c1bdef6e1b86fcb8d2333abce86cfb81b83655 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Nov 2023 11:44:21 +0800 Subject: [PATCH] fix stream backend convert --- source/libs/stream/src/streamBackendRocksdb.c | 33 ++++++++++++++----- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 22f5b93416..60738b96bf 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -259,38 +259,53 @@ int32_t remoteChkp_readMetaData(char* path, SArray* list) { taosMemoryFree(metaPath); return 0; } +int32_t remoteChkp_validMetaFile(char* name, char* prename, int64_t chkpId) { + int8_t valid = 0; + for (int i = 0; i < strlen(name); i++) { + if (name[i] == '_') { + memcpy(prename, name, i); + if (taosStr2int64(name + i + 1) != chkpId) { + break; + } else { + valid = 1; + } + } + } + return valid; +} int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) { int32_t complete = 1; int32_t len = strlen(path) + 32; char* src = taosMemoryCalloc(1, len); char* dst = taosMemoryCalloc(1, len); + int8_t count = 0; for (int i = 0; i < taosArrayGetSize(list); i++) { char* p = taosArrayGetP(list, i); sprintf(src, "%s%s%s", path, TD_DIRSEP, p); + + // check file exist if (taosStatFile(src, NULL, NULL, NULL) != 0) { complete = 0; break; } + // check file name char temp[64] = {0}; - for (int j = 0; j < strlen(p); j++) { - if (p[j] == '_') { - memcpy(temp, p, j); - } - if (taosStr2int64(p + j + 1) != chkpId) { - complete = 0; - break; - } + if (remoteChkp_validMetaFile(p, temp, chkpId)) { + count++; } - if (complete == 0) break; + // rename file sprintf(dst, "%s%s%s", path, TD_DIRSEP, temp); taosRenameFile(src, dst); memset(src, 0, len); memset(dst, 0, len); } + if (count != taosArrayGetSize(list)) { + complete = 0; + } taosMemoryFree(src); taosMemoryFree(dst);