diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index eedd8f20d6..6676a05548 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -562,76 +562,67 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l return code; } -int32_t uploadCheckpointData(void* param) { - SAsyncUploadArg* pParam = param; +int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) { char* path = NULL; int32_t code = 0; SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES); - char* taskStr = pParam->taskId ? pParam->taskId : "NULL"; + int64_t now = taosGetTimestampMs(); + SStreamMeta* pMeta = pTask->pMeta; + const char* idStr = pTask->id.idStr; - void* pBackend = taskAcquireDb(pParam->dbRefId); - if (pBackend == NULL) { - stError("s-task:%s failed to acquire db", taskStr); - taosMemoryFree(pParam->taskId); - taosMemoryFree(pParam); - return -1; + if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles, + pTask->id.idStr)) != 0) { + stError("s-task:%s failed to gen upload checkpoint:%" PRId64, idStr, checkpointId); } - if ((code = taskDbGenChkpUploadData(pParam->pTask->pBackend, ((SStreamMeta*)pParam->pMeta)->bkdChkptMgt, - pParam->chkpId, (int8_t)(pParam->type), &path, toDelFiles)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64, taskStr, pParam->chkpId); - } - - if (pParam->type == DATA_UPLOAD_S3) { - if (code == 0 && (code = getCheckpointDataMeta(pParam->taskId, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", taskStr, pParam->chkpId); + if (type == DATA_UPLOAD_S3) { + if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) { + stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", idStr, checkpointId); } } if (code == TSDB_CODE_SUCCESS) { - code = streamTaskUploadCheckpoint(pParam->taskId, path); + code = streamTaskUploadCheckpoint(idStr, path); if (code == TSDB_CODE_SUCCESS) { - stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", taskStr, pParam->chkpId); + stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId); } else { - stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", taskStr, pParam->chkpId, path); + stError("s-task:%s failed to upload checkpointId:%" PRId64 " data:%s", idStr, checkpointId, path); } } - taskReleaseDb(pParam->dbRefId); - - if (code == 0) { + if (code == TSDB_CODE_SUCCESS) { int32_t size = taosArrayGetSize(toDelFiles); - stDebug("s-task:%s remove redundant %d files", taskStr, size); + stDebug("s-task:%s remove redundant %d files", idStr, size); for (int i = 0; i < size; i++) { char* pName = taosArrayGetP(toDelFiles, i); - code = deleteCheckpointFile(pParam->taskId, pName); + code = deleteCheckpointFile(idStr, pName); if (code != 0) { - stDebug("s-task:%s failed to del file: %s", taskStr, pName); + stDebug("s-task:%s failed to remove file: %s", idStr, pName); break; } } - stDebug("s-task:%s remove redundant files done", taskStr); + stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId); } taosArrayDestroyP(toDelFiles, taosMemoryFree); + double el = (taosGetTimestampMs() - now) / 1000.0; if (code == TSDB_CODE_SUCCESS) { - stDebug("s-task:%s remove local checkpointId:%" PRId64 " data %s", taskStr, pParam->chkpId, path); + stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s", + idStr, checkpointId, el, path); taosRemoveDir(path); } else { - stDebug("s-task:%s update checkpointId:%" PRId64 " keep local checkpoint data", taskStr, pParam->chkpId); + stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", + idStr, checkpointId, el); } taosMemoryFree(path); - taosMemoryFree(pParam->taskId); - taosMemoryFree(pParam); - return code; } -int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId, char* taskId) { +int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) { ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); if (type == DATA_UPLOAD_DISABLE) { return 0; @@ -641,15 +632,17 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointI return 0; } - SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); - arg->type = type; - arg->taskId = taosStrdup(taskId); - arg->chkpId = checkpointId; - arg->pTask = pTask; - arg->dbRefId = taskGetDBRef(pTask->pBackend); - arg->pMeta = pTask->pMeta; + int64_t dbRefId = taskGetDBRef(pTask->pBackend); + void* pBackend = taskAcquireDb(dbRefId); + if (pBackend == NULL) { + stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData", pTask->id.idStr); + return -1; + } - return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); + int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type); + taskReleaseDb(dbRefId); + + return code; } int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { @@ -670,6 +663,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } } + // TODO: monitoring the checkpoint-source msg // send check point response to upstream task if (code == TSDB_CODE_SUCCESS) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -679,27 +673,26 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } if (code != TSDB_CODE_SUCCESS) { - // todo: let's retry send rsp to upstream/mnode + // todo: let's retry send rsp to mnode, checkpoint-ready has monitor now stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId, tstrerror(code)); } } + if (code == TSDB_CODE_SUCCESS) { + code = streamTaskRemoteBackupCheckpoint(pTask, ckId); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code)); + } + } else { + stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId); + } + + // TODO: monitoring the checkpoint-report msg // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null. if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) { code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask); - if (code == TSDB_CODE_SUCCESS) { - code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id); - if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s failed to upload checkpoint:%" PRId64 " failed", id, ckId); - } - } else { - stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code)); - } - } - - // clear the checkpoint info if failed - if (code != TSDB_CODE_SUCCESS) { + } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); streamTaskClearCheckInfo(pTask, false); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); @@ -710,7 +703,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } double el = (taosGetTimestampMs() - startTs) / 1000.0; - stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id, + stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id, pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el, (code == TSDB_CODE_SUCCESS) ? "succ" : "failed");