From 4490f40c961de428d95d8441d92700e8faa27f3a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 14 Jun 2024 14:43:57 +0800 Subject: [PATCH] fix(stream): do some internal refactor. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamData.c | 8 ++++-- source/libs/stream/src/streamTask.c | 44 ++++++++++++++++------------- 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 8bdc0d2343..be3da64c6a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -174,7 +174,7 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT SArray* pRes); void destroyStreamDataBlock(SStreamDataBlock* pBlock); -int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); +int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* idstr); int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index fa4efc3c6e..fae90f4db8 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -104,10 +104,12 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock) { taosFreeQitem(pBlock); } -int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) { +int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData, const char* id) { SArray* pArray = taosArrayInit(1, sizeof(SSDataBlock)); if (pArray == NULL) { - return -1; + terrno = TSDB_CODE_OUT_OF_MEMORY; + stError("failed to prepare retrieve block, %s", id); + return terrno; } taosArrayPush(pArray, &(SSDataBlock){0}); @@ -126,7 +128,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock pData->reqId = pReq->reqId; pData->blocks = pArray; - return 0; + return TSDB_CODE_SUCCESS; } SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e96e44c19b..7d869ce538 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -944,32 +944,36 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { static int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock)); - int8_t status = TASK_INPUT_STATUS__NORMAL; - - // enqueue - if (pData != NULL) { - stDebug("s-task:%s (child %d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, - pTask->info.selfChildId, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); - - pData->type = STREAM_INPUT__DATA_RETRIEVE; - pData->srcVgId = 0; - streamRetrieveReqToData(pReq, pData); - if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData) == 0) { - status = TASK_INPUT_STATUS__NORMAL; - } else { - status = TASK_INPUT_STATUS__FAILED; - } - } else { // todo handle oom - /*streamTaskInputFail(pTask);*/ - /*status = TASK_INPUT_STATUS__FAILED;*/ + if (pData == NULL) { + stError("s-task:%s failed to allocated retrieve-block", pTask->id.idStr); + return terrno; } - return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; + // enqueue + stDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr, + pTask->pMeta->vgId, pTask->info.taskLevel, pReq->srcTaskId, pReq->srcNodeId, pReq->reqId); + + pData->type = STREAM_INPUT__DATA_RETRIEVE; + pData->srcVgId = 0; + + int32_t code = streamRetrieveReqToData(pReq, pData, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + taosFreeQitem(pData); + return code; + } + + code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pData); + if (code != TSDB_CODE_SUCCESS) { + stError("s-task:%s failed to put retrieve-block into inputQ, inputQ is full, discard the retrieve msg", + pTask->id.idStr); + } + + return code; } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) { int32_t code = streamTaskEnqueueRetrieve(pTask, pReq); - if(code != 0){ + if (code != 0) { return code; } return streamTrySchedExec(pTask);