refactor: check return value for each function.

This commit is contained in:
Haojun Liao 2024-07-17 17:57:08 +08:00
parent 0100bb51b9
commit 2b00d6549d
7 changed files with 54 additions and 50 deletions

View File

@ -169,11 +169,12 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask);
void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
void clearBufferedDispatchMsg(SStreamTask* pTask);
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
SArray* pRes);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg,
SStreamDataBlock** pBlock);
int32_t createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes,
SStreamDataBlock** pBlock);
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* idstr);
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
@ -214,8 +215,8 @@ int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key);
void streamMetaHbToMnode(void* param, void* tmrId);
SMetaHbInfo* createMetaHbInfo(int64_t* pRid);
void* destroyMetaHbInfo(SMetaHbInfo* pInfo);
int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes);
void destroyMetaHbInfo(SMetaHbInfo* pInfo);
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta);
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount);
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta);

View File

@ -29,10 +29,9 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
static void checkpointTriggerMonitorFn(void* param, void* tmrId);
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId, int32_t srcTaskId) {
SStreamDataBlock* pChkpoint;
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
int32_t srcTaskId, SStreamDataBlock** pRes) {
SStreamDataBlock* pChkpoint = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint);
if (code) {
return code;

View File

@ -15,13 +15,11 @@
#include "streamInt.h"
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
SStreamDataBlock* pData;
int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg, SStreamDataBlock** pRes) {
SStreamDataBlock* pData = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen, (void**)&pData);
if (code) {
terrno = code;
return NULL;
return terrno = code;
}
pData->type = blockType;
@ -32,7 +30,7 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe
SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
if (pArray == NULL) {
taosFreeQitem(pData);
return NULL;
return code;
}
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
@ -69,37 +67,36 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe
}
pData->blocks = pArray;
return pData;
*pRes = pData;
return code;
}
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
SArray* pRes) {
SStreamDataBlock* pStreamBlocks;
int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize, (void**)&pStreamBlocks);
int32_t createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes,
SStreamDataBlock** pBlock) {
int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize, (void**)pBlock);
if (code) {
taosArrayClearEx(pRes, (FDelete)blockDataFreeRes);
terrno = code;
return NULL;
return terrno = code;
}
pStreamBlocks->srcTaskId = pTask->id.taskId;
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes;
(*pBlock)->srcTaskId = pTask->id.taskId;
(*pBlock)->type = STREAM_INPUT__DATA_BLOCK;
(*pBlock)->blocks = pRes;
if (pItem == NULL) {
return pStreamBlocks;
return code;
}
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->sourceVer = pSubmit->ver;
(*pBlock)->sourceVer = pSubmit->ver;
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
pStreamBlocks->sourceVer = pMerged->ver;
(*pBlock)->sourceVer = pMerged->ver;
}
return pStreamBlocks;
return code;
}
void destroyStreamDataBlock(SStreamDataBlock* pBlock) {

View File

@ -1408,8 +1408,10 @@ static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchR
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
int8_t status = 0;
SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
if (pBlock == NULL) {
SStreamDataBlock* pBlock = NULL;
int32_t code = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId, &pBlock);
if (code) {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
stError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
@ -1419,7 +1421,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
pTask->status.appendTranstateBlock = true;
}
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock);
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pBlock);
// input queue is full, upstream is blocked now
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
}

View File

@ -67,8 +67,10 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return TSDB_CODE_SUCCESS;
}
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes);
if (pStreamBlocks == NULL) {
SStreamDataBlock* pStreamBlocks = NULL;
int32_t code = createStreamBlockFromResults(pItem, pTask, size, pRes, &pStreamBlocks);
if (code) {
stError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno));
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return TSDB_CODE_OUT_OF_MEMORY;
@ -77,7 +79,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
stDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
SIZE_IN_MiB(size));
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
return code;
}
@ -187,7 +189,12 @@ int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t*
static int32_t handleSanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
int32_t code = TSDB_CODE_SUCCESS;
if (taosArrayGetSize(pRes) > 0) {
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
SStreamDataBlock* pStreamBlocks = NULL;
code = createStreamBlockFromResults(NULL, pTask, size, pRes, &pStreamBlocks);
if (code) {
return code;
}
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // should not have error code
stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));

View File

@ -275,11 +275,11 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
taosReleaseRef(streamMetaId, rid);
}
SMetaHbInfo* createMetaHbInfo(int64_t* pRid) {
int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) {
*pRes = NULL;
SMetaHbInfo* pInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return pInfo;
return TSDB_CODE_OUT_OF_MEMORY;
}
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
@ -287,10 +287,10 @@ SMetaHbInfo* createMetaHbInfo(int64_t* pRid) {
pInfo->stopFlag = 0;
pInfo->msgSendTs = -1;
pInfo->hbCount = 0;
return pInfo;
return TSDB_CODE_SUCCESS;
}
void* destroyMetaHbInfo(SMetaHbInfo* pInfo) {
void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
if (pInfo != NULL) {
tCleanupStreamHbMsg(&pInfo->hbMsg);
@ -301,8 +301,6 @@ void* destroyMetaHbInfo(SMetaHbInfo* pInfo) {
taosMemoryFree(pInfo);
}
return NULL;
}
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {

View File

@ -402,9 +402,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo = createMetaHbInfo(pRid);
if (pMeta->pHbInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
@ -538,7 +537,8 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
pMeta->pHbInfo = destroyMetaHbInfo(pMeta->pHbInfo);
destroyMetaHbInfo(pMeta->pHbInfo);
pMeta->pHbInfo = NULL;
taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);