diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 3027abc4be..ae74ed204e 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -169,11 +169,12 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); void clearBufferedDispatchMsg(SStreamTask* pTask); -int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); -SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); -SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, - SArray* pRes); -void destroyStreamDataBlock(SStreamDataBlock* pBlock); +int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); +int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg, + SStreamDataBlock** pBlock); +int32_t createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes, + SStreamDataBlock** pBlock); +void destroyStreamDataBlock(SStreamDataBlock* pBlock); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* idstr); int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock); @@ -214,8 +215,8 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); void streamMetaRemoveDB(void* arg, char* key); void streamMetaHbToMnode(void* param, void* tmrId); -SMetaHbInfo* createMetaHbInfo(int64_t* pRid); -void* destroyMetaHbInfo(SMetaHbInfo* pInfo); +int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes); +void destroyMetaHbInfo(SMetaHbInfo* pInfo); void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta); void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount); int32_t streamMetaSendHbHelper(SStreamMeta* pMeta); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d0ad38ea79..8de923e900 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -29,10 +29,9 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); static void checkpointTriggerMonitorFn(void* param, void* tmrId); -int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId, int32_t srcTaskId) { - SStreamDataBlock* pChkpoint; - +int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId, + int32_t srcTaskId, SStreamDataBlock** pRes) { + SStreamDataBlock* pChkpoint = NULL; int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint); if (code) { return code; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index bf499293b4..4235b6c1bb 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,13 +15,11 @@ #include "streamInt.h" -SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { - SStreamDataBlock* pData; - +int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg, SStreamDataBlock** pRes) { + SStreamDataBlock* pData = NULL; int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen, (void**)&pData); if (code) { - terrno = code; - return NULL; + return terrno = code; } pData->type = blockType; @@ -32,7 +30,7 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); if (pArray == NULL) { taosFreeQitem(pData); - return NULL; + return code; } ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); @@ -69,37 +67,36 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe } pData->blocks = pArray; - return pData; + *pRes = pData; + + return code; } -SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, - SArray* pRes) { - SStreamDataBlock* pStreamBlocks; - - int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize, (void**)&pStreamBlocks); +int32_t createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes, + SStreamDataBlock** pBlock) { + int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize, (void**)pBlock); if (code) { taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); - terrno = code; - return NULL; + return terrno = code; } - pStreamBlocks->srcTaskId = pTask->id.taskId; - pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; - pStreamBlocks->blocks = pRes; + (*pBlock)->srcTaskId = pTask->id.taskId; + (*pBlock)->type = STREAM_INPUT__DATA_BLOCK; + (*pBlock)->blocks = pRes; if (pItem == NULL) { - return pStreamBlocks; + return code; } if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; - pStreamBlocks->sourceVer = pSubmit->ver; + (*pBlock)->sourceVer = pSubmit->ver; } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; - pStreamBlocks->sourceVer = pMerged->ver; + (*pBlock)->sourceVer = pMerged->ver; } - return pStreamBlocks; + return code; } void destroyStreamDataBlock(SStreamDataBlock* pBlock) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 2023da43ad..9e4b6bc09d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1408,8 +1408,10 @@ static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchR static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { int8_t status = 0; - SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); - if (pBlock == NULL) { + SStreamDataBlock* pBlock = NULL; + + int32_t code = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId, &pBlock); + if (code) { streamTaskInputFail(pTask); status = TASK_INPUT_STATUS__FAILED; stError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, @@ -1419,7 +1421,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp pTask->status.appendTranstateBlock = true; } - int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock); + code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock); // input queue is full, upstream is blocked now status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b2c491de05..238db27d60 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -67,8 +67,10 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return TSDB_CODE_SUCCESS; } - SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes); - if (pStreamBlocks == NULL) { + SStreamDataBlock* pStreamBlocks = NULL; + + int32_t code = createStreamBlockFromResults(pItem, pTask, size, pRes, &pStreamBlocks); + if (code) { stError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno)); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return TSDB_CODE_OUT_OF_MEMORY; @@ -77,7 +79,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* stDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, SIZE_IN_MiB(size)); - int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); + code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { // back pressure and record position return code; } @@ -187,7 +189,12 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* static int32_t handleSanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) { int32_t code = TSDB_CODE_SUCCESS; if (taosArrayGetSize(pRes) > 0) { - SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); + SStreamDataBlock* pStreamBlocks = NULL; + code = createStreamBlockFromResults(NULL, pTask, size, pRes, &pStreamBlocks); + if (code) { + return code; + } + code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { // should not have error code stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code)); diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index 16cb23de10..d8ee15c909 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -275,11 +275,11 @@ void streamMetaHbToMnode(void* param, void* tmrId) { taosReleaseRef(streamMetaId, rid); } -SMetaHbInfo* createMetaHbInfo(int64_t* pRid) { +int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) { + *pRes = NULL; SMetaHbInfo* pInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); if (pInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return pInfo; + return TSDB_CODE_OUT_OF_MEMORY; } pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); @@ -287,10 +287,10 @@ SMetaHbInfo* createMetaHbInfo(int64_t* pRid) { pInfo->stopFlag = 0; pInfo->msgSendTs = -1; pInfo->hbCount = 0; - return pInfo; + return TSDB_CODE_SUCCESS; } -void* destroyMetaHbInfo(SMetaHbInfo* pInfo) { +void destroyMetaHbInfo(SMetaHbInfo* pInfo) { if (pInfo != NULL) { tCleanupStreamHbMsg(&pInfo->hbMsg); @@ -301,8 +301,6 @@ void* destroyMetaHbInfo(SMetaHbInfo* pInfo) { taosMemoryFree(pInfo); } - - return NULL; } void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 3940866488..d9c937a14f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -402,9 +402,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); metaRefMgtAdd(pMeta->vgId, pRid); - pMeta->pHbInfo = createMetaHbInfo(pRid); - if (pMeta->pHbInfo == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = createMetaHbInfo(pRid, &pMeta->pHbInfo); + if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -538,7 +537,8 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pFailedTaskSet); - pMeta->pHbInfo = destroyMetaHbInfo(pMeta->pHbInfo); + destroyMetaHbInfo(pMeta->pHbInfo); + pMeta->pHbInfo = NULL; taosMemoryFree(pMeta->path); taosThreadMutexDestroy(&pMeta->backendMutex);