diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9948847ecb..69c2ead7d2 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -56,6 +56,13 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint pBlock->info.childId = pTask->info.selfChildId; pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock; + if (pChkpoint->blocks == NULL) { + taosMemoryFree(pBlock); + taosFreeQitem(pChkpoint); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + taosArrayPush(pChkpoint->blocks, pBlock); taosMemoryFree(pBlock); @@ -110,7 +117,12 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId SRpcHandleInfo* pRpcInfo, int32_t code) { int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); - void* pBuf = rpcMallocCont(size); + void* pBuf = rpcMallocCont(size); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); @@ -131,6 +143,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); + return 0; } @@ -1006,52 +1019,78 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { } static int32_t uploadCheckpointToS3(const char* id, const char* path) { + int32_t code = 0; + int32_t nBytes = 0; + + if (s3Init() != 0) { + return -1; + } + TdDirPtr pDir = taosOpenDir(path); if (pDir == NULL) return -1; TdDirEntryPtr de = NULL; - s3Init(); while ((de = taosReadDir(pDir)) != NULL) { char* name = taosGetDirEntryName(de); if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue; char filename[PATH_MAX] = {0}; if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) { - snprintf(filename, sizeof(filename), "%s%s", path, name); + nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name); + if (nBytes <= 0 || nBytes >= sizeof(filename)) { + code = -1; + break; + } } else { - snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name); + nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name); + if (nBytes <= 0 || nBytes >= sizeof(filename)) { + code = -1; + break; + } } char object[PATH_MAX] = {0}; - snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); + nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name); + if (nBytes <= 0 || nBytes >= sizeof(object)) { + code = -1; + break; + } if (s3PutObjectFromFile2(filename, object, 0) != 0) { - taosCloseDir(&pDir); - return -1; + code = -1; + stError("[s3] failed to upload checkpoint:%s", filename); + } else { + stDebug("[s3] upload checkpoint:%s", filename); } - stDebug("[s3] upload checkpoint:%s", filename); - // break; } - taosCloseDir(&pDir); - return 0; + return code; } int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) { - int32_t code = 0; - char* buf = taosMemoryCalloc(1, strlen(id) + strlen(dstName) + 4); + int32_t nBytes; + int32_t cap = strlen(id) + strlen(dstName) + 16; + + char* buf = taosMemoryCalloc(1, cap); if (buf == NULL) { - code = terrno = TSDB_CODE_OUT_OF_MEMORY; - return code; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + nBytes = snprintf(buf, cap, "%s/%s", id, fname); + if (nBytes <= 0 || nBytes >= cap) { + taosMemoryFree(buf); + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; } - sprintf(buf, "%s/%s", id, fname); if (s3GetObjectToFile(buf, dstName) != 0) { - code = errno; + taosMemoryFree(buf); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; } - taosMemoryFree(buf); - return code; + return 0; } ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() { @@ -1082,6 +1121,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { // fileName: CURRENT int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) { if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) { + terrno = TSDB_CODE_INVALID_PARA; stError("down load checkpoint data parameters invalid"); return -1; } @@ -1125,9 +1165,13 @@ int32_t deleteCheckpoint(const char* id) { int32_t deleteCheckpointFile(const char* id, const char* name) { char object[128] = {0}; - snprintf(object, sizeof(object), "%s/%s", id, name); + + int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name); + if (nBytes <= 0 || nBytes >= sizeof(object)) { + terrno = TSDB_CODE_OUT_OF_RANGE; + return -1; + } char* tmp = object; - s3DeleteObjects((const char**)&tmp, 1); - return 0; + return s3DeleteObjects((const char**)&tmp, 1); }