diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d9eea23d21..56ea1611f4 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1209,6 +1209,7 @@ _EXIT: taosMemoryFree(ppCf); return code; } + int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskDbDoCheckpoint(arg, chkpId); } SListNode* streamBackendAddCompare(void* backend, void* arg) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 853c6881ba..a1e6838ac3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -353,42 +353,51 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l } int32_t uploadCheckpointData(void* param) { - SAsyncUploadArg* arg = param; + SAsyncUploadArg* pParam = param; char* path = NULL; int32_t code = 0; SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); - char* taskStr = arg->taskId ? arg->taskId : "NULL"; + char* taskStr = pParam->taskId ? pParam->taskId : "NULL"; - void* pBackend = taskAcquireDb(arg->dbRefId); + void* pBackend = taskAcquireDb(pParam->dbRefId); if (pBackend == NULL) { stError("s-task:%s failed to acquire db", taskStr); - taosMemoryFree(arg->taskId); - taosMemoryFree(arg); + taosMemoryFree(pParam->taskId); + taosMemoryFree(pParam); return -1; } - if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, ((SStreamMeta*)arg->pMeta)->bkdChkptMgt, arg->chkpId, - (int8_t)(arg->type), &path, toDelFiles)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); + if ((code = taskDbGenChkpUploadData(pParam->pTask->pBackend, ((SStreamMeta*)pParam->pMeta)->bkdChkptMgt, + pParam->chkpId, + (int8_t)(pParam->type), &path, toDelFiles)) != 0) { + stError("s-task:%s failed to gen upload checkpoint:%" PRId64, taskStr, pParam->chkpId); } - if (arg->type == DATA_UPLOAD_S3) { - if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpointId:%" PRId64 " meta", taskStr, arg->chkpId); + if (pParam->type == DATA_UPLOAD_S3) { + if (code == 0 && (code = getCheckpointDataMeta(pParam->taskId, path, toDelFiles)) != 0) { + stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", taskStr, pParam->chkpId); } } - if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { - stError("s-task:%s failed to upload checkpointId:%" PRId64, taskStr, arg->chkpId); + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskBackupCheckpoint(pParam->taskId, path); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to upload checkpoint data:%s, checkpointId:%" PRId64, taskStr, path, pParam->chkpId); + } else { + stDebug("s-task:%s backup checkpointId:%"PRId64" to remote succ", taskStr, pParam->chkpId); + } } - taskReleaseDb(arg->dbRefId); + taskReleaseDb(pParam->dbRefId); if (code == 0) { - for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { - char* p = taosArrayGetP(toDelFiles, i); - code = deleteCheckpointFile(arg->taskId, p); - stDebug("s-task:%s try to del file: %s", taskStr, p); + int32_t size = taosArrayGetSize(toDelFiles); + stDebug("s-task:%s remove redundant %d files", taskStr, size); + + for (int i = 0; i < size; i++) { + char* pName = taosArrayGetP(toDelFiles, i); + code = deleteCheckpointFile(pParam->taskId, pName); + stDebug("s-task:%s try to del file: %s", taskStr, pName); if (code != 0) { break; } @@ -396,15 +405,18 @@ int32_t uploadCheckpointData(void* param) { } taosArrayDestroyP(toDelFiles, taosMemoryFree); + + stDebug("s-task:%s remove local checkpoint dir:%s", taskStr, path); taosRemoveDir(path); taosMemoryFree(path); - taosMemoryFree(arg->taskId); - taosMemoryFree(arg); + + taosMemoryFree(pParam->taskId); + taosMemoryFree(pParam); return code; } -int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) { +int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId, char* taskId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { return 0; @@ -417,7 +429,7 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); arg->type = type; arg->taskId = taosStrdup(taskId); - arg->chkpId = chkpId; + arg->chkpId = checkpointId; arg->pTask = pTask; arg->dbRefId = taskGetDBRef(pTask->pBackend); arg->pMeta = pTask->pMeta; @@ -613,6 +625,7 @@ int32_t deleteCheckpoint(const char* id) { int32_t deleteCheckpointFile(const char* id, const char* name) { char object[128] = {0}; snprintf(object, sizeof(object), "%s/%s", id, name); + char* tmp = object; s3DeleteObjects((const char**)&tmp, 1); return 0;