fix invalid read

This commit is contained in:
Yihao Deng 2024-04-28 06:47:31 +00:00
parent 1737ee47a2
commit af567ea2cb
1 changed files with 18 additions and 10 deletions

View File

@ -20,10 +20,11 @@
typedef struct { typedef struct {
ECHECKPOINT_BACKUP_TYPE type; ECHECKPOINT_BACKUP_TYPE type;
char* taskId; char* taskId;
int64_t chkpId; int64_t chkpId;
SStreamTask* pTask; SStreamTask* pTask;
char* taskStr;
} SAsyncUploadArg; } SAsyncUploadArg;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
@ -320,7 +321,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
taosThreadMutexLock(&p->lock); taosThreadMutexLock(&p->lock);
SStreamTaskState* pStatus = streamTaskGetStatus(p); SStreamTaskState* pStatus = streamTaskGetStatus(p);
ETaskStatus prevStatus = pStatus->state; ETaskStatus prevStatus = pStatus->state;
if (pStatus->state == TASK_STATUS__CK) { if (pStatus->state == TASK_STATUS__CK) {
ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId &&
@ -421,26 +422,27 @@ int32_t uploadCheckpointData(void* param) {
char* path = NULL; char* path = NULL;
int32_t code = 0; int32_t code = 0;
SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); SArray* toDelFiles = taosArrayInit(4, sizeof(void*));
char* taskStr = arg->taskStr != NULL ? arg->taskStr : "NULL";
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
(int8_t)(arg->type), &path, toDelFiles)) != 0) { (int8_t)(arg->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId);
} }
if (arg->type == DATA_UPLOAD_S3) { if (arg->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", taskStr, arg->chkpId);
} }
} }
if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId);
} }
if (code == 0) { if (code == 0) {
for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) {
char* p = taosArrayGetP(toDelFiles, i); char* p = taosArrayGetP(toDelFiles, i);
code = deleteCheckpointFile(arg->taskId, p); code = deleteCheckpointFile(arg->taskId, p);
stDebug("s-task:%s try to del file: %s", arg->pTask->id.idStr, p); stDebug("s-task:%s try to del file: %s", arg->taskStr != NULL ? arg->taskStr : "NULL", p);
if (code != 0) { if (code != 0) {
break; break;
} }
@ -451,8 +453,8 @@ int32_t uploadCheckpointData(void* param) {
taosRemoveDir(path); taosRemoveDir(path);
taosMemoryFree(path); taosMemoryFree(path);
taosMemoryFree(arg->taskId); taosMemoryFree(arg->taskId);
taosMemoryFree(arg->taskStr);
taosMemoryFree(arg); taosMemoryFree(arg);
return code; return code;
} }
@ -473,6 +475,12 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
arg->taskId = taosStrdup(taskId); arg->taskId = taosStrdup(taskId);
arg->chkpId = chkpId; arg->chkpId = chkpId;
arg->pTask = pTask; arg->pTask = pTask;
if (arg->pTask->id.idStr != NULL) {
arg->taskStr = taosStrdup(arg->pTask->id.idStr);
return 0;
} else {
arg->taskStr = NULL;
}
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
} }
@ -591,8 +599,8 @@ static int32_t uploadCheckpointToS3(char* id, char* path) {
} }
static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) {
int32_t code = 0; int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
sprintf(buf, "%s/%s", id, fname); sprintf(buf, "%s/%s", id, fname);
if (s3GetObjectToFile(buf, dstName) != 0) { if (s3GetObjectToFile(buf, dstName) != 0) {
code = -1; code = -1;