From af567ea2cb2b7107b7ea46ad09696f2bca42f83f Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Sun, 28 Apr 2024 06:47:31 +0000 Subject: [PATCH 1/5] fix invalid read --- source/libs/stream/src/streamCheckpoint.c | 28 +++++++++++++++-------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e6d7c2fde8..c0082507b4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -20,10 +20,11 @@ typedef struct { ECHECKPOINT_BACKUP_TYPE type; - char* taskId; - int64_t chkpId; + char* taskId; + int64_t chkpId; SStreamTask* pTask; + char* taskStr; } SAsyncUploadArg; int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { @@ -320,7 +321,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { taosThreadMutexLock(&p->lock); SStreamTaskState* pStatus = streamTaskGetStatus(p); - ETaskStatus prevStatus = pStatus->state; + ETaskStatus prevStatus = pStatus->state; if (pStatus->state == TASK_STATUS__CK) { ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && @@ -421,26 +422,27 @@ int32_t uploadCheckpointData(void* param) { char* path = NULL; int32_t code = 0; SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); + char* taskStr = arg->taskStr != NULL ? arg->taskStr : "NULL"; if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, (int8_t)(arg->type), &path, toDelFiles)) != 0) { - stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); } if (arg->type == DATA_UPLOAD_S3) { if (code == 0 && (code = getChkpMeta(arg->taskId, path, toDelFiles)) != 0) { - stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to get checkpoint:%" PRId64 " meta", taskStr, arg->chkpId); } } if (code == 0 && (code = streamTaskBackupCheckpoint(arg->taskId, path)) != 0) { - stError("s-task:%s failed to upload checkpoint:%" PRId64, arg->pTask->id.idStr, arg->chkpId); + stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId); } if (code == 0) { for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { char* p = taosArrayGetP(toDelFiles, i); code = deleteCheckpointFile(arg->taskId, p); - stDebug("s-task:%s try to del file: %s", arg->pTask->id.idStr, p); + stDebug("s-task:%s try to del file: %s", arg->taskStr != NULL ? arg->taskStr : "NULL", p); if (code != 0) { break; } @@ -451,8 +453,8 @@ int32_t uploadCheckpointData(void* param) { taosRemoveDir(path); taosMemoryFree(path); - taosMemoryFree(arg->taskId); + taosMemoryFree(arg->taskStr); taosMemoryFree(arg); return code; } @@ -473,6 +475,12 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { arg->taskId = taosStrdup(taskId); arg->chkpId = chkpId; arg->pTask = pTask; + if (arg->pTask->id.idStr != NULL) { + arg->taskStr = taosStrdup(arg->pTask->id.idStr); + return 0; + } else { + arg->taskStr = NULL; + } return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); } @@ -591,8 +599,8 @@ static int32_t uploadCheckpointToS3(char* id, char* path) { } static int32_t downloadCheckpointByNameS3(char* id, char* fname, char* dstName) { - int32_t code = 0; - char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); + int32_t code = 0; + char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); sprintf(buf, "%s/%s", id, fname); if (s3GetObjectToFile(buf, dstName) != 0) { code = -1; From fac853058031251342c2c1cb391a966e7bf3da6f Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Sun, 28 Apr 2024 06:50:33 +0000 Subject: [PATCH 2/5] fix invalid read --- source/libs/stream/src/streamCheckpoint.c | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index c0082507b4..b52c512c37 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -24,7 +24,6 @@ typedef struct { int64_t chkpId; SStreamTask* pTask; - char* taskStr; } SAsyncUploadArg; int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { @@ -422,7 +421,7 @@ int32_t uploadCheckpointData(void* param) { char* path = NULL; int32_t code = 0; SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); - char* taskStr = arg->taskStr != NULL ? arg->taskStr : "NULL"; + char* taskStr = arg->taskId ? arg->taskId : "NULL"; if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, (int8_t)(arg->type), &path, toDelFiles)) != 0) { @@ -442,7 +441,7 @@ int32_t uploadCheckpointData(void* param) { for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { char* p = taosArrayGetP(toDelFiles, i); code = deleteCheckpointFile(arg->taskId, p); - stDebug("s-task:%s try to del file: %s", arg->taskStr != NULL ? arg->taskStr : "NULL", p); + stDebug("s-task:%s try to del file: %s", taskStr, p); if (code != 0) { break; } @@ -475,12 +474,6 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { arg->taskId = taosStrdup(taskId); arg->chkpId = chkpId; arg->pTask = pTask; - if (arg->pTask->id.idStr != NULL) { - arg->taskStr = taosStrdup(arg->pTask->id.idStr); - return 0; - } else { - arg->taskStr = NULL; - } return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); } From 9e19c5292443f1ca0f474f5bf6c7b01c4a1284c3 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Sun, 28 Apr 2024 06:50:54 +0000 Subject: [PATCH 3/5] Remove unnecessary memory free in uploadCheckpointData function --- source/libs/stream/src/streamCheckpoint.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b52c512c37..ec96ad7108 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -453,7 +453,6 @@ int32_t uploadCheckpointData(void* param) { taosRemoveDir(path); taosMemoryFree(path); taosMemoryFree(arg->taskId); - taosMemoryFree(arg->taskStr); taosMemoryFree(arg); return code; } From 97aadd3aa717cbe6202cebb432aba3793f8ff7ee Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 29 Apr 2024 03:17:33 +0000 Subject: [PATCH 4/5] avoid invalid read --- source/libs/stream/inc/streamBackendRocksdb.h | 7 ++++++- source/libs/stream/src/streamBackendRocksdb.c | 21 +++++++++++++++++-- source/libs/stream/src/streamCheckpoint.c | 19 ++++++++++++++++- 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 1dc1db8e9c..704bc9a2f2 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -258,7 +258,12 @@ void bkdMgtDestroy(SBkdMgt* bm); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); -uint32_t nextPow2(uint32_t x); +void* taskAcquireDb(int64_t refId); +void taskReleaseDb(int64_t refId); + +int64_t taskGetDBRef(void* arg); + +uint32_t nextPow2(uint32_t x); #ifdef __cplusplus } #endif diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 662d02a48f..1a80a26d35 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1150,6 +1150,23 @@ int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId) { /* 0 */ + +void* taskAcquireDb(int64_t refId) { + // acquire + void* p = taosAcquireRef(taskDbWrapperId, refId); + return p; +} +void taskReleaseDb(int64_t refId) { + // release + taosReleaseRef(taskDbWrapperId, refId); +} + +int64_t taskGetDBRef(void* arg) { + if (arg == NULL) return -1; + STaskDbWrapper* pDb = arg; + return pDb->refId; +} + int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) { STaskDbWrapper* pTaskDb = arg; int64_t st = taosGetTimestampMs(); @@ -2110,8 +2127,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { - STaskDbWrapper* pDb = arg; - ECHECKPOINT_BACKUP_TYPE utype = type; + STaskDbWrapper* pDb = arg; + ECHECKPOINT_BACKUP_TYPE utype = type; if (utype == DATA_UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 824a33dd13..b9a73b38a3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -24,6 +24,7 @@ typedef struct { int64_t chkpId; SStreamTask* pTask; + int64_t dbRefId; } SAsyncUploadArg; static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); @@ -428,6 +429,14 @@ int32_t uploadCheckpointData(void* param) { SArray* toDelFiles = taosArrayInit(4, sizeof(void*)); char* taskStr = arg->taskId ? arg->taskId : "NULL"; + void* pBackend = taskAcquireDb(arg->dbRefId); + if (pBackend == NULL) { + stError("s-task:%s failed to acquire db", taskStr); + taosMemoryFree(arg->taskId); + taosMemoryFree(arg); + return -1; + } + if ((code = taskDbGenChkpUploadData(arg->pTask->pBackend, arg->pTask->pMeta->bkdChkptMgt, arg->chkpId, (int8_t)(arg->type), &path, toDelFiles)) != 0) { stError("s-task:%s failed to gen upload checkpoint:%" PRId64 "", taskStr, arg->chkpId); @@ -442,6 +451,8 @@ int32_t uploadCheckpointData(void* param) { stError("s-task:%s failed to upload checkpoint:%" PRId64, taskStr, arg->chkpId); } + taskReleaseDb(arg->dbRefId); + if (code == 0) { for (int i = 0; i < taosArrayGetSize(toDelFiles); i++) { char* p = taosArrayGetP(toDelFiles, i); @@ -454,11 +465,11 @@ int32_t uploadCheckpointData(void* param) { } taosArrayDestroyP(toDelFiles, taosMemoryFree); - taosRemoveDir(path); taosMemoryFree(path); taosMemoryFree(arg->taskId); taosMemoryFree(arg); + return code; } @@ -472,11 +483,17 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha return 0; } + // void* p = taskAcquireDb(pTask->pBackend); + // if (p == NULL) { + // return 0; + // } + SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); arg->type = type; arg->taskId = taosStrdup(taskId); arg->chkpId = chkpId; arg->pTask = pTask; + arg->dbRefId = taskGetDBRef(pTask->pBackend); return streamMetaAsyncExec(pTask->pMeta, uploadCheckpointData, arg, NULL); } From b54198ba4af8c1ba68b1c46eda5d63ff682190ad Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Mon, 29 Apr 2024 03:18:38 +0000 Subject: [PATCH 5/5] avoid invalid read --- source/libs/stream/src/streamCheckpoint.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b9a73b38a3..e9dfbf7693 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -483,11 +483,6 @@ int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t chkpId, cha return 0; } - // void* p = taskAcquireDb(pTask->pBackend); - // if (p == NULL) { - // return 0; - // } - SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); arg->type = type; arg->taskId = taosStrdup(taskId);