Merge pull request #29404 from taosdata/fix/tag
fix(stream): replace functions with safty ones.
This commit is contained in:
commit
94ea52ad9d
|
@ -1808,7 +1808,7 @@ int stateKeyDecode(void* k, char* buf) {
|
|||
return p - buf;
|
||||
}
|
||||
|
||||
int stateKeyToString(void* k, char* buf) {
|
||||
int32_t stateKeyToString(void* k, char* buf) {
|
||||
SStateKey* key = k;
|
||||
int n = 0;
|
||||
n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->key.groupId);
|
||||
|
|
|
@ -250,9 +250,10 @@ _EXIT:
|
|||
streamBackendCleanup((void*)pBackend);
|
||||
|
||||
if (code == 0) {
|
||||
char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32);
|
||||
int32_t len = strlen(pMeta->path) + 32;
|
||||
char* state = taosMemoryCalloc(1, len);
|
||||
if (state != NULL) {
|
||||
sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
|
||||
(void) snprintf(state, len, "%s%s%s", pMeta->path, TD_DIRSEP, "state");
|
||||
taosRemoveDir(state);
|
||||
taosMemoryFree(state);
|
||||
} else {
|
||||
|
@ -379,7 +380,7 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
|
|||
char* tpath = taosMemoryCalloc(1, len);
|
||||
TSDB_CHECK_NULL(tpath, code, lino, _err, terrno);
|
||||
|
||||
sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream");
|
||||
(void) snprintf(tpath, len, "%s%s%s", path, TD_DIRSEP, "stream");
|
||||
pMeta->path = tpath;
|
||||
|
||||
code = streamMetaOpenTdb(pMeta);
|
||||
|
|
|
@ -119,23 +119,21 @@ static int64_t kBlockSize = 64 * 1024;
|
|||
int32_t streamSnapHandleInit(SStreamSnapHandle* handle, char* path, void* pMeta);
|
||||
void streamSnapHandleDestroy(SStreamSnapHandle* handle);
|
||||
|
||||
// static void streamBuildFname(char* path, char* file, char* fullname)
|
||||
|
||||
#define STREAM_ROCKSDB_BUILD_FULLNAME(path, file, fullname) \
|
||||
do { \
|
||||
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, file); \
|
||||
} while (0)
|
||||
|
||||
int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
|
||||
int32_t ret = 0;
|
||||
int32_t len = strlen(path) + 32;
|
||||
|
||||
char* fullname = taosMemoryCalloc(1, strlen(path) + 32);
|
||||
char* fullname = taosMemoryCalloc(1, len);
|
||||
if (fullname == NULL) {
|
||||
stError("failed to get file:%s size, code: out of memory", name);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name);
|
||||
ret = snprintf(fullname, len, "%s%s%s", path, TD_DIRSEP, name);
|
||||
if (ret < 0 || ret >= len) {
|
||||
stError("%s failed to set the file path for get the file size, code: out of buffer", name);
|
||||
return TSDB_CODE_OUT_OF_BUFFER;
|
||||
}
|
||||
|
||||
ret = taosStatFile(fullname, sz, NULL, NULL);
|
||||
taosMemoryFree(fullname);
|
||||
|
@ -146,7 +144,7 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) {
|
|||
TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) {
|
||||
char fullname[256] = {0};
|
||||
|
||||
STREAM_ROCKSDB_BUILD_FULLNAME(path, name, fullname);
|
||||
(void) snprintf(fullname, tListLen(fullname),"%s%s%s", path, TD_DIRSEP, name);
|
||||
return taosOpenFile(fullname, opt);
|
||||
}
|
||||
|
||||
|
@ -155,35 +153,74 @@ int32_t streamCreateTaskDbSnapInfo(void* arg, char* path, SArray* pSnap) { retur
|
|||
int32_t streamDestroyTaskDbSnapInfo(void* arg, SArray* snap) { return taskDbDestroySnap(arg, snap); }
|
||||
|
||||
void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
|
||||
if (qDebugFlag & DEBUG_DEBUG) {
|
||||
int16_t cap = 512;
|
||||
|
||||
if (qDebugFlag & DEBUG_DEBUG) {
|
||||
char* buf = taosMemoryCalloc(1, cap);
|
||||
if (buf == NULL) {
|
||||
stError("%s failed to alloc memory, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t nBytes = snprintf(buf + strlen(buf), cap, "[");
|
||||
if (nBytes <= 0 || nBytes >= cap) {
|
||||
taosMemoryFree(buf);
|
||||
stError("%s failed to write buf, reason:%s", STREAM_STATE_TRANSFER, tstrerror(TSDB_CODE_OUT_OF_RANGE));
|
||||
return;
|
||||
int32_t len = 0;
|
||||
int32_t wlen = 1;
|
||||
|
||||
do {
|
||||
buf[0] = '[';
|
||||
if (pSnapFile->pCurrent) {
|
||||
len = snprintf(buf + wlen, cap - wlen, "current: %s,", pSnapFile->pCurrent);
|
||||
if (len > 0 && len < (cap - wlen)) {
|
||||
wlen += len;
|
||||
} else {
|
||||
stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pSnapFile->pMainfest) {
|
||||
len = snprintf(buf + wlen, cap - wlen, "MANIFEST: %s,", pSnapFile->pMainfest);
|
||||
if (len > 0 && len < (cap - wlen)) {
|
||||
wlen += len;
|
||||
} else {
|
||||
stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pSnapFile->pOptions) {
|
||||
len = snprintf(buf + wlen, cap - wlen, "options: %s,", pSnapFile->pOptions);
|
||||
if (len > 0 && len < (cap - wlen)) {
|
||||
wlen += len;
|
||||
} else {
|
||||
stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pSnapFile->pCurrent) sprintf(buf, "current: %s,", pSnapFile->pCurrent);
|
||||
if (pSnapFile->pMainfest) sprintf(buf + strlen(buf), "MANIFEST: %s,", pSnapFile->pMainfest);
|
||||
if (pSnapFile->pOptions) sprintf(buf + strlen(buf), "options: %s,", pSnapFile->pOptions);
|
||||
if (pSnapFile->pSst) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pSnapFile->pSst); i++) {
|
||||
char* name = taosArrayGetP(pSnapFile->pSst, i);
|
||||
if (strlen(buf) + strlen(name) < cap) sprintf(buf + strlen(buf), "%s,", name);
|
||||
if (strlen(buf) + strlen(name) < cap) {
|
||||
len = snprintf(buf + wlen, cap - wlen, "%s,", name);
|
||||
if (len > 0 && len < (cap - wlen)) {
|
||||
wlen += len;
|
||||
} else {
|
||||
stError("%s failed to build buf for debug, code: out of buffer", STREAM_STATE_TRANSFER);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if ((strlen(buf)) < cap) sprintf(buf + strlen(buf) - 1, "]");
|
||||
}
|
||||
}
|
||||
} while (0);
|
||||
|
||||
if (wlen < cap) {
|
||||
buf[wlen] = ']';
|
||||
}
|
||||
buf[cap - 1] = '\0';
|
||||
|
||||
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
|
||||
pSnapFile->snapInfo.taskId, buf);
|
||||
|
||||
taosMemoryFree(buf);
|
||||
}
|
||||
}
|
||||
|
@ -771,16 +808,23 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
|||
SBackendSnapFile2* pDbSnapFile = taosArrayGet(pHandle->pDbSnapSet, pHandle->currIdx);
|
||||
if (pDbSnapFile->inited == 0) {
|
||||
char idstr[64] = {0};
|
||||
sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId));
|
||||
(void)snprintf(idstr, tListLen(idstr), "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId));
|
||||
|
||||
char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256);
|
||||
int32_t bufLen = strlen(pHandle->metaPath) + 256;
|
||||
char* path = taosMemoryCalloc(1, bufLen);
|
||||
if (path == NULL) {
|
||||
stError("s-task:0x%x failed to prepare meta header buffer, code:Out of memory", (int32_t) snapInfo.taskId);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP,
|
||||
"checkpoint", snapInfo.chkpId);
|
||||
int32_t ret = snprintf(path, bufLen, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP,
|
||||
"checkpoints", TD_DIRSEP, "checkpoint", snapInfo.chkpId);
|
||||
if (ret < 0 || ret >= bufLen) {
|
||||
stError("s-task:0x%x failed to set the path for take snapshot, code: out of buffer, %s", (int32_t)snapInfo.taskId,
|
||||
pHandle->metaPath);
|
||||
return TSDB_CODE_OUT_OF_BUFFER;
|
||||
}
|
||||
|
||||
if (!taosIsDir(path)) {
|
||||
code = taosMulMkDir(path);
|
||||
stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
|
||||
|
|
|
@ -133,7 +133,11 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool
|
|||
}
|
||||
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
|
||||
int32_t ret = snprintf(buf, tListLen(buf), "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId);
|
||||
if (ret < 0 || ret >= tListLen(buf)) {
|
||||
stError("s-task:0x%x failed to set the taskIdstr, code: out of buffer", pTask->id.taskId);
|
||||
return TSDB_CODE_OUT_OF_BUFFER;
|
||||
}
|
||||
|
||||
pTask->id.idStr = taosStrdup(buf);
|
||||
if (pTask->id.idStr == NULL) {
|
||||
|
@ -402,7 +406,7 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
char id[128] = {0};
|
||||
int32_t nBytes = sprintf(id, "0x%" PRIx64 "-0x%x", streamId, taskId);
|
||||
int32_t nBytes = snprintf(id, tListLen(id), "0x%" PRIx64 "-0x%x", streamId, taskId);
|
||||
if (nBytes < 0 || nBytes >= sizeof(id)) {
|
||||
return TSDB_CODE_OUT_OF_BUFFER;
|
||||
}
|
||||
|
@ -413,11 +417,15 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
(void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
|
||||
int32_t code = snprintf(pTask->backendPath, len + nBytes + 2, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
|
||||
if (code < 0 || code >= len + nBytes + 2) {
|
||||
stError("s-task:%s failed to set backend path:%s, code: out of buffer", pTask->id.idStr, pTask->backendPath);
|
||||
return TSDB_CODE_OUT_OF_BUFFER;
|
||||
} else {
|
||||
stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
|
||||
int32_t code = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId, &pTask->id.idStr);
|
||||
|
@ -1129,7 +1137,11 @@ SEpSet* streamTaskGetDownstreamEpInfo(SStreamTask* pTask, int32_t taskId) {
|
|||
|
||||
int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
|
||||
int32_t code = snprintf(buf, tListLen(buf),"0x%" PRIx64 "-0x%x", streamId, taskId);
|
||||
if (code < 0 || code >= tListLen(buf)) {
|
||||
return TSDB_CODE_OUT_OF_BUFFER;
|
||||
}
|
||||
|
||||
*pId = taosStrdup(buf);
|
||||
|
||||
if (*pId == NULL) {
|
||||
|
|
Loading…
Reference in New Issue