refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-11 10:09:23 +08:00
parent 7f93ec2c53
commit 11ed1f54b3
2 changed files with 36 additions and 22 deletions

View File

@ -1209,6 +1209,7 @@ _EXIT:
taosMemoryFree(ppCf); taosMemoryFree(ppCf);
return code; return code;
} }
int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskDbDoCheckpoint(arg, chkpId); } int32_t streamBackendDoCheckpoint(void* arg, int64_t chkpId) { return taskDbDoCheckpoint(arg, chkpId); }
SListNode* streamBackendAddCompare(void* backend, void* arg) { SListNode* streamBackendAddCompare(void* backend, void* arg) {

View File

@ -353,42 +353,51 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
} }
int32_t uploadCheckpointData(void* param) { int32_t uploadCheckpointData(void* param) {
SAsyncUploadArg* arg = param; SAsyncUploadArg* pParam = param;
char* path = NULL; char* path = NULL;
int32_t code = 0; int32_t code = 0;
SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); SArray* toDelFiles = taosArrayInit(4, sizeof(void*));
char* taskStr = arg->taskId ? arg->taskId : "NULL"; char* taskStr = pParam->taskId ? pParam->taskId : "NULL";
void* pBackend = taskAcquireDb(arg->dbRefId); void* pBackend = taskAcquireDb(pParam->dbRefId);
if (pBackend == NULL) { if (pBackend == NULL) {
stError("s-task:%s failed to acquire db", taskStr); stError("s-task:%s failed to acquire db", taskStr);
taosMemoryFree(arg->taskId); taosMemoryFree(pParam->taskId);
taosMemoryFree(arg); taosMemoryFree(pParam);
return -1; return -1;
} }
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, ((SStreamMeta*)arg->pMeta)->bkdChkptMgt, arg->chkpId, if ((code = taskDbGenChkpUploadData(pParam->pTask->pBackend, ((SStreamMeta*)pParam->pMeta)->bkdChkptMgt,
(int8_t)(arg->type), &path, toDelFiles)) != 0) { pParam->chkpId,
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); (int8_t)(pParam->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64, taskStr, pParam->chkpId);
} }
if (arg->type == DATA_UPLOAD_S3) { if (pParam->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) { if (code == 0 && (code = getCheckpointDataMeta(pParam->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpointId:%" PRId64 " meta", taskStr, arg->chkpId); stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 " meta", taskStr, pParam->chkpId);
} }
} }
if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { if (code == TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to upload checkpointId:%" PRId64, taskStr, arg->chkpId); code = streamTaskBackupCheckpoint(pParam->taskId, path);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to upload checkpoint data:%s, checkpointId:%" PRId64, taskStr, path, pParam->chkpId);
} else {
stDebug("s-task:%s backup checkpointId:%"PRId64" to remote succ", taskStr, pParam->chkpId);
}
} }
taskReleaseDb(arg->dbRefId); taskReleaseDb(pParam->dbRefId);
if (code == 0) { if (code == 0) {
for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { int32_t size = taosArrayGetSize(toDelFiles);
char* p = taosArrayGetP(toDelFiles, i); stDebug("s-task:%s remove redundant %d files", taskStr, size);
code = deleteCheckpointFile(arg->taskId, p);
stDebug("s-task:%s try to del file: %s", taskStr, p); for (int i = 0; i < size; i++) {
char* pName = taosArrayGetP(toDelFiles, i);
code = deleteCheckpointFile(pParam->taskId, pName);
stDebug("s-task:%s try to del file: %s", taskStr, pName);
if (code != 0) { if (code != 0) {
break; break;
} }
@ -396,15 +405,18 @@ int32_t uploadCheckpointData(void* param) {
} }
taosArrayDestroyP(toDelFiles, taosMemoryFree); taosArrayDestroyP(toDelFiles, taosMemoryFree);
stDebug("s-task:%s remove local checkpoint dir:%s", taskStr, path);
taosRemoveDir(path); taosRemoveDir(path);
taosMemoryFree(path); taosMemoryFree(path);
taosMemoryFree(arg->taskId);
taosMemoryFree(arg); taosMemoryFree(pParam->taskId);
taosMemoryFree(pParam);
return code; return code;
} }
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, char* taskId) { int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId, char* taskId) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType(); ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_DISABLE) { if (type == DATA_UPLOAD_DISABLE) {
return 0; return 0;
@ -417,7 +429,7 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
arg->type = type; arg->type = type;
arg->taskId = taosStrdup(taskId); arg->taskId = taosStrdup(taskId);
arg->chkpId = chkpId; arg->chkpId = checkpointId;
arg->pTask = pTask; arg->pTask = pTask;
arg->dbRefId = taskGetDBRef(pTask->pBackend); arg->dbRefId = taskGetDBRef(pTask->pBackend);
arg->pMeta = pTask->pMeta; arg->pMeta = pTask->pMeta;
@ -613,6 +625,7 @@ int32_t deleteCheckpoint(const char* id) {
int32_t deleteCheckpointFile(const char* id, const char* name) { int32_t deleteCheckpointFile(const char* id, const char* name) {
char object[128] = {0}; char object[128] = {0};
snprintf(object, sizeof(object), "%s/%s", id, name); snprintf(object, sizeof(object), "%s/%s", id, name);
char* tmp = object; char* tmp = object;
s3DeleteObjects((const char**)&tmp, 1); s3DeleteObjects((const char**)&tmp, 1);
return 0; return 0;