From f6a097d96f9d0d6d357c12e1e5ea2c601a29752d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Sep 2023 21:57:46 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 5 +- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- source/libs/stream/src/stream.c | 30 ---------- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamExec.c | 71 ++++++++++++++++-------- source/libs/stream/src/streamQueue.c | 64 ++++++++++++++++----- 6 files changed, 103 insertions(+), 71 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ac067823f8..3eb624f932 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -434,8 +434,9 @@ int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) int32_t tDecodeStreamTaskId(SDecoder* pDecoder, SStreamTaskId* pTaskId); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); +int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask); -bool streamQueueIsFull(const STaosQueue* pQueue); +bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ); typedef struct { SMsgHead head; @@ -645,12 +646,10 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); -int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); -int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 14135b14dc..1ac2ddb9cb 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -374,7 +374,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (streamQueueIsFull(pTask->inputInfo.queue->pQueue)) { + if (streamQueueIsFull(pTask->inputInfo.queue->pQueue, true)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 13ec509f63..d1bf6a91c5 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -212,36 +212,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } -int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { - int32_t code = 0; - int32_t type = pTask->outputInfo.type; - if (type == TASK_OUTPUT__TABLE) { - pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks); - destroyStreamDataBlock(pBlock); - } else if (type == TASK_OUTPUT__SMA) { - pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); - destroyStreamDataBlock(pBlock); - } else { - ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); - STaosQueue* pQueue = pTask->outputInfo.queue->pQueue; - code = taosWriteQitem(pQueue, pBlock); - - int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue); - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); - if (code != 0) { - qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", - pTask->id.idStr, total, size, tstrerror(code)); - } else { - qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); - } - - streamDispatchStreamBlock(pTask); - return code; - } - - return 0; -} - int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e283b43ac6..4d5234a68c 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -499,8 +499,8 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { const char* id = pTask->id.idStr; int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue); - double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue)); if (numOfElems > 0) { + double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue)); qDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9595d3bf02..f03a6a32d4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -32,33 +32,58 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } +static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBlock) { + int32_t code = 0; + int32_t type = pTask->outputInfo.type; + if (type == TASK_OUTPUT__TABLE) { + pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks); + destroyStreamDataBlock(pBlock); + } else if (type == TASK_OUTPUT__SMA) { + pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); + destroyStreamDataBlock(pBlock); + } else { + ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); + code = streamTaskPutDataIntoOutputQ(pTask, pBlock); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + streamDispatchStreamBlock(pTask); + return code; + } + + return 0; +} + static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, int32_t* totalBlocks) { int32_t numOfBlocks = taosArrayGetSize(pRes); - if (numOfBlocks > 0) { - SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes); - if (pStreamBlocks == NULL) { - qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno)); - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return -1; - } - - qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, - SIZE_IN_MB(size)); - - int32_t code = streamTaskOutputResultBlock(pTask, pStreamBlocks); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position - destroyStreamDataBlock(pStreamBlocks); - return -1; - } - - *totalSize += size; - *totalBlocks += numOfBlocks; - } else { + if (numOfBlocks == 0) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return TSDB_CODE_SUCCESS; } - return TSDB_CODE_SUCCESS; + SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes); + if (pStreamBlocks == NULL) { + qError("s-task:%s failed to create result stream data block, code:%s", pTask->id.idStr, tstrerror(terrno)); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return TSDB_CODE_OUT_OF_MEMORY; + } + + qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, + SIZE_IN_MB(size)); + + int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); + if (code != TSDB_CODE_SUCCESS) { // back pressure and record position + //code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY + destroyStreamDataBlock(pStreamBlocks); + return code; + } + + *totalSize += size; + *totalBlocks += numOfBlocks; + + return code; } static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, @@ -236,7 +261,7 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; - code = streamTaskOutputResultBlock(pTask, qRes); + code = doOutputResultBlockImpl(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosFreeQitem(qRes); @@ -536,7 +561,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (type == STREAM_INPUT__DATA_BLOCK) { qDebug("s-task:%s sink task start to sink %d blocks", id, numOfBlocks); - streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); + doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); continue; } } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index a38095ea58..29ca351a6b 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -15,10 +15,11 @@ #include "streamInt.h" -#define MAX_STREAM_EXEC_BATCH_NUM 32 -#define MIN_STREAM_EXEC_BATCH_NUM 4 -#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) +#define MAX_STREAM_EXEC_BATCH_NUM 32 +#define MIN_STREAM_EXEC_BATCH_NUM 4 +#define STREAM_TASK_QUEUE_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) +#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) // todo refactor: // read data from input queue @@ -159,10 +160,15 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { } #endif -bool streamQueueIsFull(const STaosQueue* pQueue) { - bool isFull = taosQueueItemSize((STaosQueue*) pQueue) >= STREAM_TASK_INPUT_QUEUE_CAPACITY; - double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*) pQueue)); - return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE); +bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) { + bool isFull = taosQueueItemSize((STaosQueue*)pQueue) >= STREAM_TASK_QUEUE_CAPACITY; + if (isFull) { + return true; + } + + int32_t threahold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; + double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*)pQueue)); + return (size >= threahold); } int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { @@ -275,15 +281,15 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; STaosQueue* pQueue = pTask->inputInfo.queue->pQueue; - int32_t total = taosQueueItemSize(pQueue) + 1; + int32_t total = streamQueueGetNumOfItems(pTask->inputInfo.queue) + 1; if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue, true)) { double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); qTrace( "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); + pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); taosFreeQitem(pItem); return -1; @@ -306,11 +312,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) msgLen, ver, total, size + SIZE_IN_MB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if (streamQueueIsFull(pQueue)) { + if (streamQueueIsFull(pQueue, true)) { double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); + pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*)pItem); return -1; } @@ -356,6 +362,38 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return 0; } +// the result should be put into the outputQ in any cases, otherwise, the result may be lost +int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { + STaosQueue* pQueue = pTask->outputInfo.queue->pQueue; + + while (streamQueueIsFull(pQueue, false)) { + if (streamTaskShouldStop(&pTask->status)) { + qInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); + return TSDB_CODE_STREAM_EXEC_CANCELLED; + } + + int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue); + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + // let's wait for there are enough space to hold this result pBlock + qDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, + total, size); + taosMsleep(500); + } + + int32_t code = taosWriteQitem(pQueue, pBlock); + + int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue); + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + if (code != 0) { + qError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", + pTask->id.idStr, total + 1, size, tstrerror(code)); + } else { + qInfo("s-task:%s data put into outputQ, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); + } + + return TSDB_CODE_SUCCESS; +} + int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t cap, int32_t rate) { if (cap < 100 || rate < 50 || pBucket == NULL) { qError("failed to init sink task bucket, cap:%d, rate:%d", cap, rate);