diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 1dc1db8e9c..704bc9a2f2 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -258,7 +258,12 @@ void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); -uint32_t nextPow2(uint32_t x); +void* taskAcquireDb(int64_t refId); +void taskReleaseDb(int64_t refId); + +int64_t taskGetDBRef(void* arg); + +uint32_t nextPow2(uint32_t x); #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 662d02a48f..1a80a26d35 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1150,6 +1150,23 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) { /* 0 */ + +void* taskAcquireDb(int64_t refId) { + // acquire + void* p = taosAcquireRef(taskDbWrapperId, refId); + return p; +} +void taskReleaseDb(int64_t refId) { + // release + taosReleaseRef(taskDbWrapperId, refId); +} + +int64_t taskGetDBRef(void* arg) { + if (arg == NULL) return -1; + STaskDbWrapper* pDb = arg; + return pDb->refId; +} + int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { STaskDbWrapper* pTaskDb = arg; int64_t st = taosGetTimestampMs(); @@ -2110,8 +2127,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { - STaskDbWrapper* pDb = arg; - ECHECKPOINT_BACKUP_TYPE utype = type; + STaskDbWrapper* pDb = arg; + ECHECKPOINT_BACKUP_TYPE utype = type; if (utype == DATA_UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3428fc36e1..e9dfbf7693 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -20,10 +20,11 @@ typedef struct { ECHECKPOINT_BACKUP_TYPE type; - char* taskId; - int64_t chkpId; + char* taskId; + int64_t chkpId; SStreamTask* pTask; + int64_t dbRefId; } SAsyncUploadArg; static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); @@ -323,7 +324,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { taosThreadMutexLock(&p->lock); SStreamTaskState* pStatus = streamTaskGetStatus(p); - ETaskStatus prevStatus = pStatus->state; + ETaskStatus prevStatus = pStatus->state; if (pStatus->state == TASK_STATUS__CK) { ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && @@ -426,26 +427,37 @@ int32_t uploadCheckpointData(void* param) { char* path = NULL; int32_t code = 0; SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); + char* taskStr = arg->taskId ? arg->taskId : "NULL"; + + void* pBackend = taskAcquireDb(arg->dbRefId); + if (pBackend == NULL) { + stError("s-task:%s failed to acquire db", taskStr); + taosMemoryFree(arg->taskId); + taosMemoryFree(arg); + return -1; + } if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, (int8_t)(arg->type), &path, toDelFiles)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); } if (arg->type == DATA_UPLOAD_S3) { if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", taskStr, arg->chkpId); } } if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { - stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId); } + taskReleaseDb(arg->dbRefId); + if (code == 0) { for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { char* p = taosArrayGetP(toDelFiles, i); code = deleteCheckpointFile(arg->taskId, p); - stDebug("s-task:%s try to del file: %s", arg->pTask->id.idStr, p); + stDebug("s-task:%s try to del file: %s", taskStr, p); if (code != 0) { break; } @@ -453,12 +465,11 @@ int32_t uploadCheckpointData(void* param) { } taosArrayDestroyP(toDelFiles, taosMemoryFree); - taosRemoveDir(path); taosMemoryFree(path); - taosMemoryFree(arg->taskId); taosMemoryFree(arg); + return code; } @@ -477,6 +488,7 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha arg->taskId = taosStrdup(taskId); arg->chkpId = chkpId; arg->pTask = pTask; + arg->dbRefId = taskGetDBRef(pTask->pBackend); return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); } @@ -595,8 +607,8 @@ static int32_t uploadCheckpointToS3(char* id, char* path) { } static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { - int32_t code = 0; - char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); + int32_t code = 0; + char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); if (s3GetObjectToFile(buf, dstName) != 0) { code = -1;