diff --git a/include/common/cos.h b/include/common/cos.h index 8e48533304..17c48d594b 100644 --- a/include/common/cos.h +++ b/include/common/cos.h @@ -45,7 +45,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size, int32_t s3GetObjectsByPrefix(const char *prefix, const char *path); void s3EvictCache(const char *path, long object_size); long s3Size(const char *object_name); -int32_t s3GetObjectToFile(const char *object_name, char *fileName); +int32_t s3GetObjectToFile(const char *object_name, const char *fileName); #define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index 990bfdcea3..8ad5fb36b5 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -1196,7 +1196,7 @@ static S3Status getObjectCallback(int bufferSize, const char *buffer, void *call return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK); } -int32_t s3GetObjectToFile(const char *object_name, char *fileName) { +int32_t s3GetObjectToFile(const char *object_name, const char *fileName) { int64_t ifModifiedSince = -1, ifNotModifiedSince = -1; const char *ifMatch = 0, *ifNotMatch = 0; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 07dce9a451..45a75ea5e7 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -122,7 +122,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); -void streamTaskSetCheckpointFailedId(SStreamTask* pTask); +void streamTaskSetFailedCheckpointId(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*); STaskId streamTaskGetTaskId(const SStreamTask* pTask); @@ -161,7 +161,7 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path); int32_t downloadCheckpoint(char* id, char* path); int32_t deleteCheckpoint(char* id); int32_t deleteCheckpointFile(char* id, char* name); -int32_t downloadCheckpointByName(char* id, char* fname, char* dstName); +//int32_t downloadCheckpointDataByName(const char* id, char* fname, char* dstName); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e6d7c2fde8..8244df2995 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -26,6 +26,8 @@ typedef struct { SStreamTask* pTask; } SAsyncUploadArg; +static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); + int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -376,21 +378,23 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { return code; } -void streamTaskSetCheckpointFailedId(SStreamTask* pTask) { +void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); } -int32_t getChkpMeta(char* id, char* path, SArray* list) { +static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { char* file = taosMemoryCalloc(1, strlen(path) + 32); sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP"); - int32_t code = downloadCheckpointByName(id, "META", file); + + int32_t code = downloadCheckpointDataByName(id, "META", file); if (code != 0) { stDebug("chkp failed to download meta file:%s", file); taosMemoryFree(file); return code; } + TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); char buf[128] = {0}; if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) { @@ -427,7 +431,7 @@ int32_t uploadCheckpointData(void* param) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); } if (arg->type == DATA_UPLOAD_S3) { - if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { + if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) { stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); } } @@ -546,7 +550,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); taosThreadMutexUnlock(&pTask->lock); - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } @@ -590,7 +594,7 @@ static int32_t uploadCheckpointToS3(char* id, char* path) { return 0; } -static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { +static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { int32_t code = 0; char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); @@ -625,16 +629,18 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path) { } // fileName: CURRENT -int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) { +int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { stError("uploadCheckpointByName parameters invalid"); return -1; } + if (strlen(tsSnodeAddress) != 0) { return 0; } else if (tsS3StreamEnabled) { return downloadCheckpointByNameS3(id, fname, dstName); } + return 0; } @@ -643,11 +649,13 @@ int32_t downloadCheckpoint(char* id, char* path) { stError("downloadCheckpoint parameters invalid"); return -1; } + if (strlen(tsSnodeAddress) != 0) { return downloadRsync(id, path); } else if (tsS3StreamEnabled) { return s3GetObjectsByPrefix(id, path); } + return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a464594233..03f8d2adfd 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1399,7 +1399,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { SStreamTaskState* pState = streamTaskGetStatus(pTask); if (pState->state == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } else { stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name); } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index c76536aedf..b3df5755ea 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -193,7 +193,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ taosThreadMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } taosThreadMutexUnlock(&pTask->lock); } @@ -203,7 +203,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ taosThreadMutexLock(&pTask->lock); ETaskStatus status = streamTaskGetStatus(pTask)->state; if (status == TASK_STATUS__CK) { - streamTaskSetCheckpointFailedId(pTask); + streamTaskSetFailedCheckpointId(pTask); } taosThreadMutexUnlock(&pTask->lock);