diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 1cc48f02b6..b08280f9ed 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -99,6 +99,8 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda } int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) { + int32_t code = 0; + if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; @@ -111,13 +113,17 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1; if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1; - taosArrayPush(pMsg->pNodeList, &info); + + void* p = taosArrayPush(pMsg->pNodeList, &info); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1; tEndDecode(pDecoder); - return 0; + return code; } int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { @@ -257,8 +263,18 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tDecodeI32(pDecoder, &len1) < 0) return -1; if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1; ASSERT(len1 == len2); - taosArrayPush(pReq->dataLen, &len1); - taosArrayPush(pReq->data, &data); + + void* p = taosArrayPush(pReq->dataLen, &len1); + if (p == NULL) { + tEndDecode(pDecoder); + return TSDB_CODE_OUT_OF_MEMORY; + } + + p = taosArrayPush(pReq->data, &data); + if (p == NULL) { + tEndDecode(pDecoder); + return TSDB_CODE_OUT_OF_MEMORY; + } } tEndDecode(pDecoder); @@ -371,6 +387,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { } int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { + int32_t code = 0; + if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1; @@ -413,7 +431,11 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1; entry.id.taskId = taskId; - taosArrayPush(pReq->pTaskStatus, &entry); + void* p = taosArrayPush(pReq->pTaskStatus, &entry); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } } int32_t numOfVgs = 0; @@ -424,12 +446,20 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { for (int j = 0; j < numOfVgs; ++j) { int32_t vgId = 0; if (tDecodeI32(pDecoder, &vgId) < 0) return -1; - taosArrayPush(pReq->pUpdateNodes, &vgId); + void* p = taosArrayPush(pReq->pUpdateNodes, &vgId); + if (p == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } } if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; tEndDecode(pDecoder); return 0; + + _err: + tEndDecode(pDecoder); + return code; } void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { @@ -572,7 +602,11 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { taosMemoryFreeClear(pInfo); return -1; } - taosArrayPush(pTask->upstreamInfo.pList, &pInfo); + void* p = taosArrayPush(pTask->upstreamInfo.pList, &pInfo); + if (p == NULL) { + tEndDecode(pDecoder); + return -1; + } } if (pTask->info.taskLevel != TASK_LEVEL__SINK) { diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 38d48a2a32..104b1c27d8 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -43,7 +43,9 @@ SStreamState *stateCreate(const char *path) { pTask->ver = 1024; pTask->id.streamId = 1023; pTask->id.taskId = 1111111; - SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL); + SStreamMeta *pMeta = NULL; + + int32_t code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pMeta); pTask->pMeta = pMeta; SStreamState *p = streamStateOpen((char *)path, pTask, 0, 0);