refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-04-28 09:50:21 +08:00
parent 8e47adba09
commit 8adaeb59a3
6 changed files with 22 additions and 14 deletions

View File

@ -45,7 +45,7 @@ int32_t s3GetObjectBlock(const char *object_name, int64_t offset, int64_t size,
int32_t s3GetObjectsByPrefix(const char *prefix, const char *path);
void s3EvictCache(const char *path, long object_size);
long s3Size(const char *object_name);
int32_t s3GetObjectToFile(const char *object_name, char *fileName);
int32_t s3GetObjectToFile(const char *object_name, const char *fileName);
#define S3_DATA_CHUNK_PAGES (256 * 1024 * 1024)

View File

@ -1196,7 +1196,7 @@ static S3Status getObjectCallback(int bufferSize, const char *buffer, void *call
return ((wrote < (size_t)bufferSize) ? S3StatusAbortedByCallback : S3StatusOK);
}
int32_t s3GetObjectToFile(const char *object_name, char *fileName) {
int32_t s3GetObjectToFile(const char *object_name, const char *fileName) {
int64_t ifModifiedSince = -1, ifNotModifiedSince = -1;
const char *ifMatch = 0, *ifNotMatch = 0;

View File

@ -122,7 +122,7 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
void streamTaskSetCheckpointFailedId(SStreamTask* pTask);
void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
@ -161,7 +161,7 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path);
int32_t downloadCheckpoint(char* id, char* path);
int32_t deleteCheckpoint(char* id);
int32_t deleteCheckpointFile(char* id, char* name);
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName);
//int32_t downloadCheckpointDataByName(const char* id, char* fname, char* dstName);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -26,6 +26,8 @@ typedef struct {
SStreamTask* pTask;
} SAsyncUploadArg;
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -376,21 +378,23 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
return code;
}
void streamTaskSetCheckpointFailedId(SStreamTask* pTask) {
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
}
int32_t getChkpMeta(char* id, char* path, SArray* list) {
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
char* file = taosMemoryCalloc(1, strlen(path) + 32);
sprintf(file, "%s%s%s", path, TD_DIRSEP, "META_TMP");
int32_t code = downloadCheckpointByName(id, "META", file);
int32_t code = downloadCheckpointDataByName(id, "META", file);
if (code != 0) {
stDebug("chkp failed to download meta file:%s", file);
taosMemoryFree(file);
return code;
}
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
char buf[128] = {0};
if (taosReadFile(pFile, buf, sizeof(buf)) <= 0) {
@ -427,7 +431,7 @@ int32_t uploadCheckpointData(void* param) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId);
}
if (arg->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId);
}
}
@ -546,7 +550,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock);
streamTaskSetCheckpointFailedId(pTask);
streamTaskSetFailedCheckpointId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
}
@ -590,7 +594,7 @@ static int32_t uploadCheckpointToS3(char* id, char* path) {
return 0;
}
static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
sprintf(buf, "%s/%s", id, fname);
@ -625,16 +629,18 @@ int32_t streamTaskBackupCheckpoint(char* id, char* path) {
}
// fileName: CURRENT
int32_t downloadCheckpointByName(char* id, char* fname, char* dstName) {
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
stError("uploadCheckpointByName parameters invalid");
return -1;
}
if (strlen(tsSnodeAddress) != 0) {
return 0;
} else if (tsS3StreamEnabled) {
return downloadCheckpointByNameS3(id, fname, dstName);
}
return 0;
}
@ -643,11 +649,13 @@ int32_t downloadCheckpoint(char* id, char* path) {
stError("downloadCheckpoint parameters invalid");
return -1;
}
if (strlen(tsSnodeAddress) != 0) {
return downloadRsync(id, path);
} else if (tsS3StreamEnabled) {
return s3GetObjectsByPrefix(id, path);
}
return 0;
}

View File

@ -1399,7 +1399,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
streamTaskSetFailedCheckpointId(pTask);
} else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name);
}

View File

@ -193,7 +193,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
streamTaskSetFailedCheckpointId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
}
@ -203,7 +203,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
taosThreadMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask)->state;
if (status == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
streamTaskSetFailedCheckpointId(pTask);
}
taosThreadMutexUnlock(&pTask->lock);