diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 37fb081a87..6031710bf9 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1808,7 +1808,7 @@ int stateKeyDecode(void* k, char* buf) { return p - buf; } -int stateKeyToString(void* k, char* buf) { +int32_t stateKeyToString(void* k, char* buf) { SStateKey* key = k; int n = 0; n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->key.groupId); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8e8e300ee6..3955343fdb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -250,9 +250,10 @@ _EXIT: streamBackendCleanup((void*)pBackend); if (code == 0) { - char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32); + int32_t len = strlen(pMeta->path) + 32; + char* state = taosMemoryCalloc(1, len); if (state != NULL) { - sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); + (void) snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); taosRemoveDir(state); taosMemoryFree(state); } else { @@ -379,7 +380,7 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, char* tpath = taosMemoryCalloc(1, len); TSDB_CHECK_NULL(tpath, code, lino, _err, terrno); - sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream"); + (void) snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream"); pMeta->path = tpath; code = streamMetaOpenTdb(pMeta); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index ae8a71d988..88495ac760 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -119,23 +119,21 @@ static int64_t kBlockSize = 64 * 1024; int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta); void streamSnapHandleDestroy(SStreamSnapHandle* handle); -// static void streamBuildFname(char* path, char* file, char* fullname) - -#define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \ - do { \ - sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \ - } while (0) - int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { int32_t ret = 0; + int32_t len = strlen(path) + 32; - char* fullname = taosMemoryCalloc(1, strlen(path) + 32); + char* fullname = taosMemoryCalloc(1, len); if (fullname == NULL) { stError("failed to get file:%s size, code: out of memory", name); return terrno; } - sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); + ret = snprintf(fullname, len, "%s%s%s", path, TD_DIRSEP, name); + if (ret < 0 || ret >= len) { + stError("%s failed to set the file path for get the file size, code: out of buffer", name); + return TSDB_CODE_OUT_OF_BUFFER; + } ret = taosStatFile(fullname, sz, NULL, NULL); taosMemoryFree(fullname); @@ -146,7 +144,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { char fullname[256] = {0}; - STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname); + (void) snprintf(fullname, tListLen(fullname),"%s%s%s", path, TD_DIRSEP, name); return taosOpenFile(fullname, opt); } @@ -155,35 +153,74 @@ int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { retur int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); } void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { - if (qDebugFlag & DEBUG_DEBUG) { - int16_t cap = 512; + int16_t cap = 512; + if (qDebugFlag & DEBUG_DEBUG) { char* buf = taosMemoryCalloc(1, cap); if (buf == NULL) { stError("%s failed to alloc memory, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_MEMORY)); return; } - int32_t nBytes = snprintf(buf + strlen(buf), cap, "["); - if (nBytes <= 0 || nBytes >= cap) { - taosMemoryFree(buf); - stError("%s failed to write buf, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_RANGE)); - return; - } + int32_t len = 0; + int32_t wlen = 1; - if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent); - if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest); - if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions); - if (pSnapFile->pSst) { - for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { - char* name = taosArrayGetP(pSnapFile->pSst, i); - if (strlen(buf) + strlen(name) < cap) sprintf(buf + strlen(buf), "%s,", name); + do { + buf[0] = '['; + if (pSnapFile->pCurrent) { + len = snprintf(buf + wlen, cap - wlen, "current: %s,", pSnapFile->pCurrent); + if (len > 0 && len < (cap - wlen)) { + wlen += len; + } else { + stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER); + break; + } } + + if (pSnapFile->pMainfest) { + len = snprintf(buf + wlen, cap - wlen, "MANIFEST: %s,", pSnapFile->pMainfest); + if (len > 0 && len < (cap - wlen)) { + wlen += len; + } else { + stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER); + break; + } + } + + if (pSnapFile->pOptions) { + len = snprintf(buf + wlen, cap - wlen, "options: %s,", pSnapFile->pOptions); + if (len > 0 && len < (cap - wlen)) { + wlen += len; + } else { + stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER); + break; + } + } + + if (pSnapFile->pSst) { + for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) { + char* name = taosArrayGetP(pSnapFile->pSst, i); + if (strlen(buf) + strlen(name) < cap) { + len = snprintf(buf + wlen, cap - wlen, "%s,", name); + if (len > 0 && len < (cap - wlen)) { + wlen += len; + } else { + stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER); + break; + } + } + } + } + } while (0); + + if (wlen < cap) { + buf[wlen] = ']'; } - if ((strlen(buf)) < cap) sprintf(buf + strlen(buf) - 1, "]"); + buf[cap - 1] = '\0'; stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId, pSnapFile->snapInfo.taskId, buf); + taosMemoryFree(buf); } } @@ -771,16 +808,23 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx); if (pDbSnapFile->inited == 0) { char idstr[64] = {0}; - sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId)); + (void)snprintf(idstr, tListLen(idstr), "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId)); - char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256); + int32_t bufLen = strlen(pHandle->metaPath) + 256; + char* path = taosMemoryCalloc(1, bufLen); if (path == NULL) { stError("s-task:0x%x failed to prepare meta header buffer, code:Out of memory", (int32_t) snapInfo.taskId); return terrno; } - sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP, - "checkpoint", snapInfo.chkpId); + int32_t ret = snprintf(path, bufLen, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, + "checkpoints", TD_DIRSEP, "checkpoint", snapInfo.chkpId); + if (ret < 0 || ret >= bufLen) { + stError("s-task:0x%x failed to set the path for take snapshot, code: out of buffer, %s", (int32_t)snapInfo.taskId, + pHandle->metaPath); + return TSDB_CODE_OUT_OF_BUFFER; + } + if (!taosIsDir(path)) { code = taosMulMkDir(path); stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f46228fd47..d27ed520c6 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -132,8 +132,12 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool return code; } - char buf[128] = {0}; - sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId); + char buf[128] = {0}; + int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId); + if (ret < 0 || ret >= tListLen(buf)) { + stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId); + return TSDB_CODE_OUT_OF_BUFFER; + } pTask->id.idStr = taosStrdup(buf); if (pTask->id.idStr == NULL) { @@ -402,7 +406,7 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) { } char id[128] = {0}; - int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId); + int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId); if (nBytes < 0 || nBytes >= sizeof(id)) { return TSDB_CODE_OUT_OF_BUFFER; } @@ -413,10 +417,14 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) { return terrno; } - (void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id); - stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath); - - return 0; + int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id); + if (code < 0 || code >= len + nBytes + 2) { + stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath); + return TSDB_CODE_OUT_OF_BUFFER; + } else { + stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath); + return 0; + } } int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { @@ -1129,7 +1137,11 @@ SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) { int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) { char buf[128] = {0}; - sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId); + int32_t code = snprintf(buf, tListLen(buf),"0x%" PRIx64 "-0x%x", streamId, taskId); + if (code < 0 || code >= tListLen(buf)) { + return TSDB_CODE_OUT_OF_BUFFER; + } + *pId = taosStrdup(buf); if (*pId == NULL) {