From 90687bae1c48cc80d9217fa541bd5b4df9463910 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 May 2023 15:30:50 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/inc/streamInc.h | 2 +- source/libs/stream/src/stream.c | 35 ++++++++++--------------- source/libs/stream/src/streamData.c | 3 ++- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamExec.c | 1 + 5 files changed, 19 insertions(+), 24 deletions(-) diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index c471bc2bd8..7e1b4dcf2d 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -34,7 +34,7 @@ typedef struct { static SStreamGlobalEnv streamEnv; int32_t streamDispatch(SStreamTask* pTask); -int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); +int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 26e1c2ab43..1a5ef2e007 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -121,24 +121,22 @@ int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - int8_t status; - // enqueue data block - if (pData != NULL) { + int8_t status = 0; + if (pData == NULL) { + streamTaskInputFail(pTask); + status = TASK_INPUT_STATUS__FAILED; + qDebug("vgId:%d, s-task:%s failed to received dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr); + } else { pData->type = STREAM_INPUT__DATA_BLOCK; pData->srcVgId = pReq->dataSrcVgId; - // decode - /*pData->blocks = pReq->data;*/ - /*pBlock->sourceVer = pReq->sourceVer;*/ - streamDispatchReqToData(pReq, pData); + + streamConvertDispatchMsgToData(pReq, pData); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { // input queue is full, upstream is blocked now status = TASK_INPUT_STATUS__BLOCKED; } - } else { - streamTaskInputFail(pTask); - status = TASK_INPUT_STATUS__FAILED; } // rsp by input status @@ -219,7 +217,7 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("s-task:%s receive dispatch msg from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, + qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId); // todo add the input queue buffer limitation @@ -294,16 +292,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); if (type == STREAM_INPUT__DATA_SUBMIT) { - int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - px->submit.msgLen, px->submit.ver, numOfBlocks, size); + px->submit.msgLen, px->submit.ver, total, size); if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); taosFreeQitem(pItem); @@ -312,22 +307,20 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); return -1; } - qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks); + qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__GET_RES) { taosWriteQitem(pTask->inputQueue->queue, pItem); + qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index b70a8c93a9..2233e21f60 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,7 +15,7 @@ #include "streamInc.h" -int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { +int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { int32_t blockNum = pReq->blockNum; SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); if (pArray == NULL) { @@ -39,6 +39,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.childId = pReq->upstreamChildId; } + pData->blocks = pArray; return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a757d39d3f..7e0fbd30b1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -320,7 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p msg.pCont = buf; msg.msgType = pTask->dispatchMsgType; - qDebug("dispatch from s-task:%s to taskId:%d vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); + qDebug("dispatch from s-task:%s to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); tmsgSendReq(pEpSet, &msg); code = 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d14e8102d8..ec2738558a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -272,6 +272,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (streamTaskShouldPause(&pTask->status)) { return 0; } + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {