diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 1dc1db8e9c..704bc9a2f2 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -258,7 +258,12 @@ void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); -uint32_t nextPow2(uint32_t x); +void* taskAcquireDb(int64_t refId); +void taskReleaseDb(int64_t refId); + +int64_t taskGetDBRef(void* arg); + +uint32_t nextPow2(uint32_t x); #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 662d02a48f..1a80a26d35 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1150,6 +1150,23 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) { /* 0 */ + +void* taskAcquireDb(int64_t refId) { + // acquire + void* p = taosAcquireRef(taskDbWrapperId, refId); + return p; +} +void taskReleaseDb(int64_t refId) { + // release + taosReleaseRef(taskDbWrapperId, refId); +} + +int64_t taskGetDBRef(void* arg) { + if (arg == NULL) return -1; + STaskDbWrapper* pDb = arg; + return pDb->refId; +} + int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { STaskDbWrapper* pTaskDb = arg; int64_t st = taosGetTimestampMs(); @@ -2110,8 +2127,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { - STaskDbWrapper* pDb = arg; - ECHECKPOINT_BACKUP_TYPE utype = type; + STaskDbWrapper* pDb = arg; + ECHECKPOINT_BACKUP_TYPE utype = type; if (utype == DATA_UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 824a33dd13..b9a73b38a3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -24,6 +24,7 @@ typedef struct { int64_t chkpId; SStreamTask* pTask; + int64_t dbRefId; } SAsyncUploadArg; static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); @@ -428,6 +429,14 @@ int32_t uploadCheckpointData(void* param) { SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); char* taskStr = arg->taskId ? arg->taskId : "NULL"; + void* pBackend = taskAcquireDb(arg->dbRefId); + if (pBackend == NULL) { + stError("s-task:%s failed to acquire db", taskStr); + taosMemoryFree(arg->taskId); + taosMemoryFree(arg); + return -1; + } + if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, (int8_t)(arg->type), &path, toDelFiles)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); @@ -442,6 +451,8 @@ int32_t uploadCheckpointData(void* param) { stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId); } + taskReleaseDb(arg->dbRefId); + if (code == 0) { for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { char* p = taosArrayGetP(toDelFiles, i); @@ -454,11 +465,11 @@ int32_t uploadCheckpointData(void* param) { } taosArrayDestroyP(toDelFiles, taosMemoryFree); - taosRemoveDir(path); taosMemoryFree(path); taosMemoryFree(arg->taskId); taosMemoryFree(arg); + return code; } @@ -472,11 +483,17 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha return 0; } + // void* p = taskAcquireDb(pTask->pBackend); + // if (p == NULL) { + // return 0; + // } + SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); arg->type = type; arg->taskId = taosStrdup(taskId); arg->chkpId = chkpId; arg->pTask = pTask; + arg->dbRefId = taskGetDBRef(pTask->pBackend); return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); }