diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 704bc9a2f2..bb6362ff73 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -247,6 +247,7 @@ int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId); int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); int32_t taskDbBuildSnap(void* arg, SArray* pSnap); +int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo); int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5f3eb9c630..af06ac94e7 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -146,6 +146,9 @@ static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt); +void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp); +void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp); + #define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX)); int32_t copyFiles(const char* src, const char* dst); uint32_t nextPow2(uint32_t x); @@ -1017,7 +1020,7 @@ int chkpIdComp(const void* a, const void* b) { return x < y ? -1 : 1; } -int32_t chkpLoadInfo(STaskDbWrapper* pBackend) { +int32_t taskDbLoadChkpInfo(STaskDbWrapper* pBackend) { int32_t code = 0; char* pChkpDir = taosMemoryCalloc(1, 256); @@ -1087,7 +1090,7 @@ int32_t chkpDoDbCheckpoint(rocksdb_t* db, char* path) { taosMemoryFreeClear(err); goto _ERROR; } - rocksdb_checkpoint_create(cp, path, UINT64_MAX, &err); + rocksdb_checkpoint_create(cp, path, 0, &err); if (err != NULL) { stError("failed to do checkpoint at:%s, reason:%s", path, err); taosMemoryFreeClear(err); @@ -1152,9 +1155,12 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { STaskDbWrapper* pTaskDb = *(STaskDbWrapper**)pIter; taskDbAddRef(pTaskDb); - code = taskDbDoCheckpoint(pTaskDb, pTaskDb->chkpId); + int64_t chkpId = pTaskDb->chkpId; + code = taskDbDoCheckpoint(pTaskDb, chkpId); taskDbRemoveRef(pTaskDb); + taskDbRefChkp(pTaskDb, pTaskDb->chkpId); + SStreamTask* pTask = pTaskDb->pTask; SStreamTaskSnap snap = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, @@ -1167,6 +1173,28 @@ int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { return code; } +int32_t taskDbDestroySnap(void* arg, SArray* pSnapInfo) { + if (pSnapInfo == NULL) return 0; + SStreamMeta* pMeta = arg; + int32_t code = 0; + taosThreadMutexLock(&pMeta->backendMutex); + + char buf[128] = {0}; + for (int i = 0; i < taosArrayGetSize(pSnapInfo); i++) { + SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfo, i); + sprintf(buf, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)pSnap->taskId); + STaskDbWrapper* pTaskDb = taosHashGet(pMeta->pTaskDbUnique, buf, strlen(buf)); + if (pTaskDb == NULL) { + stWarn("stream backend:%p failed to find task db, streamId:% " PRId64 "", pMeta, pSnap->streamId); + continue; + } + memset(buf, 0, sizeof(buf)); + + taskDbUnRefChkp(pTaskDb, pSnap->chkpId); + } + taosThreadMutexUnlock(&pMeta->backendMutex); + return 0; +} #ifdef BUILD_NO_CALL int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId) { // if (arg == NULL) return 0; @@ -1946,13 +1974,32 @@ void taskDbInitChkpOpt(STaskDbWrapper* pTaskDb) { pTaskDb->chkpId = -1; pTaskDb->chkpCap = 4; pTaskDb->chkpSaved = taosArrayInit(4, sizeof(int64_t)); - chkpLoadInfo(pTaskDb); + taskDbLoadChkpInfo(pTaskDb); pTaskDb->chkpInUse = taosArrayInit(4, sizeof(int64_t)); taosThreadRwlockInit(&pTaskDb->chkpDirLock, NULL); } +void taskDbRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { + taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); + taosArrayPush(pTaskDb->chkpInUse, &chkp); + taosArraySort(pTaskDb->chkpInUse, chkpIdComp); + taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); +} + +void taskDbUnRefChkp(STaskDbWrapper* pTaskDb, int64_t chkp) { + taosThreadRwlockWrlock(&pTaskDb->chkpDirLock); + for (int i = 0; i < taosArrayGetSize(pTaskDb->chkpInUse); i++) { + int64_t* p = taosArrayGet(pTaskDb->chkpInUse, i); + if (*p == chkp) { + taosArrayRemove(pTaskDb->chkpInUse, i); + break; + } + } + taosThreadRwlockUnlock(&pTaskDb->chkpDirLock); +} + void taskDbDestroyChkpOpt(STaskDbWrapper* pTaskDb) { taosArrayDestroy(pTaskDb->chkpSaved); taosArrayDestroy(pTaskDb->chkpInUse); @@ -2179,15 +2226,18 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { + int32_t code = -1; STaskDbWrapper* pDb = arg; ECHECKPOINT_BACKUP_TYPE utype = type; + taskDbRefChkp(pDb, chkpId); if (utype == DATA_UPLOAD_RSYNC) { - return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); + code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path); } else if (utype == DATA_UPLOAD_S3) { - return taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); + code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); } - return -1; + taskDbUnRefChkp(pDb, chkpId); + return code; } int32_t taskDbOpenCfByKey(STaskDbWrapper* pDb, const char* key) { diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 1800324cc8..63a3e9e0df 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -75,6 +75,7 @@ struct SStreamSnapHandle { char* metaPath; SArray* pDbSnapSet; + SArray* pSnapInfoSet; int32_t currIdx; int8_t delFlag; // 0 : not del, 1: del }; @@ -140,7 +141,9 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { return taosOpenFile(fullname, opt); } -int32_t streamTaskDbGetSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); } +int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); } + +int32_t streamDestroyTasdDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); } void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { @@ -291,29 +294,24 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { } int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { // impl later - SArray* pSnapSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); - int32_t code = streamTaskDbGetSnapInfo(pMeta, path, pSnapSet); + SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); + int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); if (code != 0) { return -1; } SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); - for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) { - SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); + for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) { + SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i); SBackendSnapFile2 snapFile = {0}; code = streamBackendSnapInitFile(path, pSnap, &snapFile); ASSERT(code == 0); taosArrayPush(pDbSnapSet, &snapFile); } - for (int32_t i = 0; i < taosArrayGetSize(pSnapSet); i++) { - SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); - taosMemoryFree(pSnap->dbPrefixPath); - } - taosArrayDestroy(pSnapSet); - pHandle->pDbSnapSet = pDbSnapSet; + pHandle->pSnapInfoSet = pSnapInfoSet; pHandle->currIdx = 0; return 0; @@ -333,6 +331,14 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) { } taosArrayDestroy(handle->pDbSnapSet); } + streamDestroyTasdDbSnapInfo(handle->handle, handle->pSnapInfoSet); + if (handle->pSnapInfoSet) { + for (int32_t i = 0; i < taosArrayGetSize(handle->pSnapInfoSet); i++) { + SStreamTaskSnap* pSnap = taosArrayGet(handle->pSnapInfoSet, i); + taosMemoryFree(pSnap->dbPrefixPath); + } + taosArrayDestroy(handle->pSnapInfoSet); + } taosMemoryFree(handle->metaPath); return; }