Merge pull request #25550 from taosdata/fix/TD-29844

Fix/TD-29844
This commit is contained in:
Hongze Cheng 2024-04-29 13:48:30 +08:00 committed by GitHub
commit 0b4f5150b6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 48 additions and 14 deletions

View File

@ -258,7 +258,12 @@ void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list);
uint32_t nextPow2(uint32_t x); void* taskAcquireDb(int64_t refId);
void taskReleaseDb(int64_t refId);
int64_t taskGetDBRef(void* arg);
uint32_t nextPow2(uint32_t x);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1150,6 +1150,23 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) {
/* /*
0 0
*/ */
void* taskAcquireDb(int64_t refId) {
// acquire
void* p = taosAcquireRef(taskDbWrapperId, refId);
return p;
}
void taskReleaseDb(int64_t refId) {
// release
taosReleaseRef(taskDbWrapperId, refId);
}
int64_t taskGetDBRef(void* arg) {
if (arg == NULL) return -1;
STaskDbWrapper* pDb = arg;
return pDb->refId;
}
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
STaskDbWrapper* pTaskDb = arg; STaskDbWrapper* pTaskDb = arg;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
@ -2110,8 +2127,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
return code; return code;
} }
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
STaskDbWrapper* pDb = arg; STaskDbWrapper* pDb = arg;
ECHECKPOINT_BACKUP_TYPE utype = type; ECHECKPOINT_BACKUP_TYPE utype = type;
if (utype == DATA_UPLOAD_RSYNC) { if (utype == DATA_UPLOAD_RSYNC) {
return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); return taskDbGenChkpUploadData__rsync(pDb, chkpId, path);

View File

@ -20,10 +20,11 @@
typedef struct { typedef struct {
ECHECKPOINT_BACKUP_TYPE type; ECHECKPOINT_BACKUP_TYPE type;
char* taskId; char* taskId;
int64_t chkpId; int64_t chkpId;
SStreamTask* pTask; SStreamTask* pTask;
int64_t dbRefId;
} SAsyncUploadArg; } SAsyncUploadArg;
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
@ -323,7 +324,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
taosThreadMutexLock(&p->lock); taosThreadMutexLock(&p->lock);
SStreamTaskState* pStatus = streamTaskGetStatus(p); SStreamTaskState* pStatus = streamTaskGetStatus(p);
ETaskStatus prevStatus = pStatus->state; ETaskStatus prevStatus = pStatus->state;
if (pStatus->state == TASK_STATUS__CK) { if (pStatus->state == TASK_STATUS__CK) {
ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId &&
@ -426,26 +427,37 @@ int32_t uploadCheckpointData(void* param) {
char* path = NULL; char* path = NULL;
int32_t code = 0; int32_t code = 0;
SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); SArray* toDelFiles = taosArrayInit(4, sizeof(void*));
char* taskStr = arg->taskId ? arg->taskId : "NULL";
void* pBackend = taskAcquireDb(arg->dbRefId);
if (pBackend == NULL) {
stError("s-task:%s failed to acquire db", taskStr);
taosMemoryFree(arg->taskId);
taosMemoryFree(arg);
return -1;
}
if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId,
(int8_t)(arg->type), &path, toDelFiles)) != 0) { (int8_t)(arg->type), &path, toDelFiles)) != 0) {
stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId);
} }
if (arg->type == DATA_UPLOAD_S3) { if (arg->type == DATA_UPLOAD_S3) {
if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) { if (code == 0 && (code = getCheckpointDataMeta(arg->taskId, path, toDelFiles)) != 0) {
stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", taskStr, arg->chkpId);
} }
} }
if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) {
stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId);
} }
taskReleaseDb(arg->dbRefId);
if (code == 0) { if (code == 0) {
for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) {
char* p = taosArrayGetP(toDelFiles, i); char* p = taosArrayGetP(toDelFiles, i);
code = deleteCheckpointFile(arg->taskId, p); code = deleteCheckpointFile(arg->taskId, p);
stDebug("s-task:%s try to del file: %s", arg->pTask->id.idStr, p); stDebug("s-task:%s try to del file: %s", taskStr, p);
if (code != 0) { if (code != 0) {
break; break;
} }
@ -453,12 +465,11 @@ int32_t uploadCheckpointData(void* param) {
} }
taosArrayDestroyP(toDelFiles, taosMemoryFree); taosArrayDestroyP(toDelFiles, taosMemoryFree);
taosRemoveDir(path); taosRemoveDir(path);
taosMemoryFree(path); taosMemoryFree(path);
taosMemoryFree(arg->taskId); taosMemoryFree(arg->taskId);
taosMemoryFree(arg); taosMemoryFree(arg);
return code; return code;
} }
@ -477,6 +488,7 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha
arg->taskId = taosStrdup(taskId); arg->taskId = taosStrdup(taskId);
arg->chkpId = chkpId; arg->chkpId = chkpId;
arg->pTask = pTask; arg->pTask = pTask;
arg->dbRefId = taskGetDBRef(pTask->pBackend);
return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL);
} }
@ -595,8 +607,8 @@ static int32_t uploadCheckpointToS3(char* id, char* path) {
} }
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
int32_t code = 0; int32_t code = 0;
char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4);
sprintf(buf, "%s/%s", id, fname); sprintf(buf, "%s/%s", id, fname);
if (s3GetObjectToFile(buf, dstName) != 0) { if (s3GetObjectToFile(buf, dstName) != 0) {
code = -1; code = -1;