diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e6d7c2fde8..c0082507b4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -20,10 +20,11 @@ typedef struct { ECHECKPOINT_BACKUP_TYPE type; - char* taskId; - int64_t chkpId; + char* taskId; + int64_t chkpId; SStreamTask* pTask; + char* taskStr; } SAsyncUploadArg; int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { @@ -320,7 +321,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { taosThreadMutexLock(&p->lock); SStreamTaskState* pStatus = streamTaskGetStatus(p); - ETaskStatus prevStatus = pStatus->state; + ETaskStatus prevStatus = pStatus->state; if (pStatus->state == TASK_STATUS__CK) { ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && @@ -421,26 +422,27 @@ int32_t uploadCheckpointData(void* param) { char* path = NULL; int32_t code = 0; SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); + char* taskStr = arg->taskStr != NULL ? arg->taskStr : "NULL"; if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, (int8_t)(arg->type), &path, toDelFiles)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); } if (arg->type == DATA_UPLOAD_S3) { if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", taskStr, arg->chkpId); } } if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { - stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId); } if (code == 0) { for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { char* p = taosArrayGetP(toDelFiles, i); code = deleteCheckpointFile(arg->taskId, p); - stDebug("s-task:%s try to del file: %s", arg->pTask->id.idStr, p); + stDebug("s-task:%s try to del file: %s", arg->taskStr != NULL ? arg->taskStr : "NULL", p); if (code != 0) { break; } @@ -451,8 +453,8 @@ int32_t uploadCheckpointData(void* param) { taosRemoveDir(path); taosMemoryFree(path); - taosMemoryFree(arg->taskId); + taosMemoryFree(arg->taskStr); taosMemoryFree(arg); return code; } @@ -473,6 +475,12 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { arg->taskId = taosStrdup(taskId); arg->chkpId = chkpId; arg->pTask = pTask; + if (arg->pTask->id.idStr != NULL) { + arg->taskStr = taosStrdup(arg->pTask->id.idStr); + return 0; + } else { + arg->taskStr = NULL; + } return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); } @@ -591,8 +599,8 @@ static int32_t uploadCheckpointToS3(char* id, char* path) { } static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { - int32_t code = 0; - char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); + int32_t code = 0; + char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); if (s3GetObjectToFile(buf, dstName) != 0) { code = -1;