diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 290266d94a..a2b9254db7 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -67,7 +67,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS _err: tqError("vgId:%d, vnode %s snapshot reader failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, - tstrerror(code)); + tstrerror(terrno)); *ppReader = NULL; return code; } @@ -145,14 +145,15 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS goto _err; } - tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, pTq->pStreamMeta->path); + tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, + pTq->pStreamMeta->path); pWriter->pWriterImpl = pSnapWriter; *ppWriter = pWriter; return code; _err: tqError("vgId:%d, vnode %s snapshot writer failed to open since %s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, - tstrerror(code)); + tstrerror(terrno)); taosMemoryFree(pWriter); *ppWriter = NULL; return -1; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 69c2ead7d2..bc5067d4d6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -813,6 +813,11 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { SArray* pList = pTask->upstreamInfo.pList; ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE); SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo)); + if (pNotSendList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); + return; + } for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i); @@ -1057,6 +1062,7 @@ static int32_t uploadCheckpointToS3(const char* id, const char* path) { } if (s3PutObjectFromFile2(filename, object, 0) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); code = -1; stError("[s3] failed to upload checkpoint:%s", filename); } else { @@ -1152,6 +1158,7 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path) { int32_t deleteCheckpoint(const char* id) { if (id == NULL || strlen(id) == 0) { + terrno = TSDB_CODE_INVALID_PARA; stError("deleteCheckpoint parameters invalid"); return -1; } diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 868ff002bf..7ef4e8ec09 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -130,7 +130,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { int32_t ret = 0; char* fullname = taosMemoryCalloc(1, strlen(path) + 32); - + sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); ret = taosStatFile(fullname, sz, NULL, NULL); @@ -259,17 +259,33 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { } int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) { int32_t code = -1; + int32_t nBytes = 0; + int32_t cap = strlen(pSnap->dbPrefixPath) + 256; + + char* path = taosMemoryCalloc(1, cap); + if (path == NULL) { + return -1; + } + + nBytes = snprintf(path, cap, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, + "checkpoint", pSnap->chkpId); + if (nBytes <= 0 || nBytes >= cap) { + terrno = TSDB_CODE_OUT_OF_RANGE; + goto _ERROR; + } - char* path = taosMemoryCalloc(1, strlen(pSnap->dbPrefixPath) + 256); - // char idstr[64] = {0}; - sprintf(path, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", - pSnap->chkpId); if (!taosIsDir(path)) { + terrno = TSDB_CODE_INVALID_MSG; goto _ERROR; } pSnapFile->pSst = taosArrayInit(16, sizeof(void*)); pSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); + if (pSnapFile->pSst == NULL || pSnapFile->pFileList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _ERROR; + } + pSnapFile->path = path; pSnapFile->snapInfo = *pSnap; if ((code = snapFileReadMeta(pSnapFile)) != 0) { @@ -313,8 +329,15 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { } int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { // impl later + int32_t code = 0; + SArray* pSnapInfoSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); - int32_t code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); + if (pSnapInfoSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + code = streamCreateTaskDbSnapInfo(pMeta, path, pSnapInfoSet); if (code != 0) { stError("failed to do task db snap info, reason:%s", tstrerror(terrno)); taosArrayDestroy(pSnapInfoSet); @@ -322,6 +345,11 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta } SArray* pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); + if (pDbSnapSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(pSnapInfoSet); + return -1; + } for (int32_t i = 0; i < taosArrayGetSize(pSnapInfoSet); i++) { SStreamTaskSnap* pSnap = taosArrayGet(pSnapInfoSet, i); @@ -369,7 +397,8 @@ int32_t streamSnapReaderOpen(void* pMeta, int64_t sver, int64_t chkpId, char* pa // impl later SStreamSnapReader* pReader = taosMemoryCalloc(1, sizeof(SStreamSnapReader)); if (pReader == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } if (streamSnapHandleInit(&pReader->handle, (char*)path, pMeta) < 0) { @@ -501,11 +530,27 @@ int32_t streamSnapWriterOpen(void* pMeta, int64_t sver, int64_t ever, char* path SStreamSnapHandle* pHandle = &pWriter->handle; pHandle->currIdx = 0; + pHandle->metaPath = taosStrdup(path); + if (pHandle->metaPath == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pWriter); + } + pHandle->pDbSnapSet = taosArrayInit(8, sizeof(SBackendSnapFile2)); + if (pHandle->pDbSnapSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pHandle->metaPath); + taosMemoryFree(pWriter); + return -1; + } SBackendSnapFile2 snapFile = {0}; - taosArrayPush(pHandle->pDbSnapSet, &snapFile); + if (taosArrayPush(pHandle->pDbSnapSet, &snapFile) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + streamSnapWriterClose(pWriter, 0); + return -1; + } *ppWriter = pWriter; return 0;