avoid invalid read
This commit is contained in:
parent
2beb46c9df
commit
97aadd3aa7
|
@ -258,7 +258,12 @@ void bkdMgtDestroy(SBkdMgt* bm);
|
|||
|
||||
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
|
||||
|
||||
uint32_t nextPow2(uint32_t x);
|
||||
void* taskAcquireDb(int64_t refId);
|
||||
void taskReleaseDb(int64_t refId);
|
||||
|
||||
int64_t taskGetDBRef(void* arg);
|
||||
|
||||
uint32_t nextPow2(uint32_t x);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1150,6 +1150,23 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
|
|||
/*
|
||||
0
|
||||
*/
|
||||
|
||||
void* taskAcquireDb(int64_t refId) {
|
||||
// acquire
|
||||
void* p = taosAcquireRef(taskDbWrapperId, refId);
|
||||
return p;
|
||||
}
|
||||
void taskReleaseDb(int64_t refId) {
|
||||
// release
|
||||
taosReleaseRef(taskDbWrapperId, refId);
|
||||
}
|
||||
|
||||
int64_t taskGetDBRef(void* arg) {
|
||||
if (arg == NULL) return -1;
|
||||
STaskDbWrapper* pDb = arg;
|
||||
return pDb->refId;
|
||||
}
|
||||
|
||||
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
|
||||
STaskDbWrapper* pTaskDb = arg;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
|
@ -2110,8 +2127,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
|
|||
return code;
|
||||
}
|
||||
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
|
||||
STaskDbWrapper* pDb = arg;
|
||||
ECHECKPOINT_BACKUP_TYPE utype = type;
|
||||
STaskDbWrapper* pDb = arg;
|
||||
ECHECKPOINT_BACKUP_TYPE utype = type;
|
||||
|
||||
if (utype == DATA_UPLOAD_RSYNC) {
|
||||
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
|
||||
|
|
|
@ -24,6 +24,7 @@ typedef struct {
|
|||
int64_t chkpId;
|
||||
|
||||
SStreamTask* pTask;
|
||||
int64_t dbRefId;
|
||||
} SAsyncUploadArg;
|
||||
|
||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||
|
@ -428,6 +429,14 @@ int32_t uploadCheckpointData(void* param) {
|
|||
SArray* toDelFiles = taosArrayInit(4, sizeof(void*));
|
||||
char* taskStr = arg->taskId ? arg->taskId : "NULL";
|
||||
|
||||
void* pBackend = taskAcquireDb(arg->dbRefId);
|
||||
if (pBackend == NULL) {
|
||||
stError("s-task:%s failed to acquire db", taskStr);
|
||||
taosMemoryFree(arg->taskId);
|
||||
taosMemoryFree(arg);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
|
||||
(int8_t)(arg->type), &path, toDelFiles)) != 0) {
|
||||
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId);
|
||||
|
@ -442,6 +451,8 @@ int32_t uploadCheckpointData(void* param) {
|
|||
stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId);
|
||||
}
|
||||
|
||||
taskReleaseDb(arg->dbRefId);
|
||||
|
||||
if (code == 0) {
|
||||
for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) {
|
||||
char* p = taosArrayGetP(toDelFiles, i);
|
||||
|
@ -454,11 +465,11 @@ int32_t uploadCheckpointData(void* param) {
|
|||
}
|
||||
|
||||
taosArrayDestroyP(toDelFiles, taosMemoryFree);
|
||||
|
||||
taosRemoveDir(path);
|
||||
taosMemoryFree(path);
|
||||
taosMemoryFree(arg->taskId);
|
||||
taosMemoryFree(arg);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -472,11 +483,17 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha
|
|||
return 0;
|
||||
}
|
||||
|
||||
// void* p = taskAcquireDb(pTask->pBackend);
|
||||
// if (p == NULL) {
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg));
|
||||
arg->type = type;
|
||||
arg->taskId = taosStrdup(taskId);
|
||||
arg->chkpId = chkpId;
|
||||
arg->pTask = pTask;
|
||||
arg->dbRefId = taskGetDBRef(pTask->pBackend);
|
||||
|
||||
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue