From 628bb62c8a775ce8d338571205748be12dfa5dcf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 4 Apr 2023 07:20:05 +0000 Subject: [PATCH] add backpressure --- .gitignore | 1 + include/libs/stream/tstream.h | 20 +++++++++++--------- include/util/taoserror.h | 4 ++++ include/util/tqueue.h | 6 +++++- source/dnode/snode/src/snode.c | 5 +++-- source/dnode/vnode/src/tq/tq.c | 4 ++-- source/libs/executor/src/dataDispatcher.c | 6 +++++- source/libs/stream/src/stream.c | 6 +++++- source/libs/stream/src/streamExec.c | 21 ++++++++++++++++----- source/libs/stream/src/streamQueue.c | 4 +++- source/util/src/terror.c | 5 +++++ source/util/src/tqueue.c | 23 +++++++++++++++++++++-- 12 files changed, 81 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index a6e222d2e9..1e6e178e74 100644 --- a/.gitignore +++ b/.gitignore @@ -131,3 +131,4 @@ tools/BUGS tools/taos-tools tools/taosws-rs tags +.clangd diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1d301623b1..d1e6de71da 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -204,7 +204,7 @@ typedef struct { int32_t streamInit(); void streamCleanUp(); -SStreamQueue* streamQueueOpen(); +SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* queue); static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { @@ -374,7 +374,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask); static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { - int8_t type = pItem->type; + int32_t code = 0; + int8_t type = pItem->type; if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit2* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit2*)pItem); if (pSubmitClone == NULL) { @@ -385,19 +386,20 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem } qDebug("task %d %p submit enqueue %p %p %p %d %" PRId64, pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->submit.msgStr, pSubmitClone->submit.msgLen, pSubmitClone->submit.ver); - taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); + code = taosWriteQitem(pTask->inputQueue->queue, pSubmitClone); // qStreamInput(pTask->exec.executor, pSubmitClone); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - taosWriteQitem(pTask->inputQueue->queue, pItem); + code = taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { - taosWriteQitem(pTask->inputQueue->queue, pItem); + code = taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); } else if (type == STREAM_INPUT__GET_RES) { - taosWriteQitem(pTask->inputQueue->queue, pItem); + code = taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); } + if (code != 0) return code; if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); @@ -637,9 +639,9 @@ typedef struct SStreamMeta { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); -int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); -int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); +int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); +int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); +int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); // SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 75b71409a8..a068de2686 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -761,11 +761,15 @@ int32_t* taosGetErrno(); // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) +#define TSDB_CODE_STREAM_BACKPRESSURE_OUT_OF_QUEUE TAOS_DEF_ERROR_CODE(0, 0x4101) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) #define TSDB_CODE_TDLITE_IVLD_OPEN_DIR TAOS_DEF_ERROR_CODE(0, 0x5101) +// UTIL +#define TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x6000) + #ifdef __cplusplus } #endif diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 1f6b205cdf..576703e842 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -84,6 +84,8 @@ typedef struct STaosQueue { int64_t memOfItems; int32_t numOfItems; int64_t threadId; + int64_t memLimit; + int64_t itemLimit; } STaosQueue; typedef struct STaosQset { @@ -106,12 +108,14 @@ void taosCloseQueue(STaosQueue *queue); void taosSetQueueFp(STaosQueue *queue, FItem itemFp, FItems itemsFp); void *taosAllocateQitem(int32_t size, EQItype itype, int64_t dataSize); void taosFreeQitem(void *pItem); -void taosWriteQitem(STaosQueue *queue, void *pItem); +int32_t taosWriteQitem(STaosQueue *queue, void *pItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem); bool taosQueueEmpty(STaosQueue *queue); void taosUpdateItemSize(STaosQueue *queue, int32_t items); int32_t taosQueueItemSize(STaosQueue *queue); int64_t taosQueueMemorySize(STaosQueue *queue); +void taosSetQueueCapacity(STaosQueue *queue, int64_t size); +void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t mem); STaosQall *taosAllocateQall(); void taosFreeQall(STaosQall *qall); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index d4ca81a6a9..df71567d00 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -66,8 +66,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->refCnt = 1; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->inputQueue = streamQueueOpen(); - pTask->outputQueue = streamQueueOpen(); + + pTask->inputQueue = streamQueueOpen(0); + pTask->outputQueue = streamQueueOpen(0); if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 70d957a4ac..b7dceffb15 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -934,8 +934,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->refCnt = 1; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->inputQueue = streamQueueOpen(); - pTask->outputQueue = streamQueueOpen(); + pTask->inputQueue = streamQueueOpen(128 << 10); + pTask->outputQueue = streamQueueOpen(128 << 10); if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { return -1; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index d8efcf50ca..1b7506dd0d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -125,6 +125,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { } static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { + int32_t code = 0; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf), DEF_QITEM, 0); if (NULL == pBuf) { @@ -137,7 +138,10 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, } toDataCacheEntry(pDispatcher, pInput, pBuf); - taosWriteQitem(pDispatcher->pDataBlocks, pBuf); + code = taosWriteQitem(pDispatcher->pDataBlocks, pBuf); + if (code != 0) { + return code; + } int32_t status = updateStatus(pDispatcher); *pContinue = (status == DS_BUF_LOW || status == DS_BUF_EMPTY); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 60729c4d0e..649e31be77 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -188,6 +188,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, } int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { + int32_t code = 0; if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); @@ -198,7 +199,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { taosFreeQitem(pBlock); } else { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - taosWriteQitem(pTask->outputQueue->queue, pBlock); + code = taosWriteQitem(pTask->outputQueue->queue, pBlock); + if (code != 0) { + return code; + } streamDispatch(pTask); } return 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cb9774b584..06819a1fd4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,7 +20,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code; void* exec = pTask->exec.executor; - while(atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + while (atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { qError("stream task wait for the end of fill history"); taosMsleep(2); continue; @@ -105,8 +105,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* } int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { - ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); + int32_t code = 0; + ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); void* exec = pTask->exec.executor; qSetStreamOpOpen(exec); @@ -164,7 +165,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; - streamTaskOutput(pTask, qRes); + code = streamTaskOutput(pTask, qRes); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { + taosFreeQitem(pRes); + return code; + } if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { qDebug("task %d scan exec dispatch block num %d", pTask->taskId, batchCnt); @@ -214,6 +219,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { #endif int32_t streamExecForAll(SStreamTask* pTask) { + int32_t code = 0; while (1) { int32_t batchCnt = 1; void* input = NULL; @@ -256,7 +262,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, input); + code = streamTaskOutput(pTask, input); + if (code != 0) { + // backpressure and record position + } continue; } @@ -286,7 +295,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { qRes->sourceVer = pMerged->ver; } - if (streamTaskOutput(pTask, qRes) < 0) { + code = streamTaskOutput(pTask, qRes); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { + // backpressure and record position taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); streamFreeQitem(input); taosFreeQitem(qRes); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 882fba718b..20abcca197 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -15,7 +15,7 @@ #include "streamInc.h" -SStreamQueue* streamQueueOpen() { +SStreamQueue* streamQueueOpen(int64_t cap) { SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); if (pQueue == NULL) return NULL; pQueue->queue = taosOpenQueue(); @@ -24,6 +24,8 @@ SStreamQueue* streamQueueOpen() { goto FAIL; } pQueue->status = STREAM_QUEUE__SUCESS; + taosSetQueueCapacity(pQueue->queue, cap); + taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024); return pQueue; FAIL: if (pQueue->queue) taosCloseQueue(pQueue->queue); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 1f49f8f8b5..6ea3349fe4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -632,11 +632,16 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_BACKPRESSURE_OUT_OF_QUEUE,"Out of memory in stream queue") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open directory") +// UTIL + +TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory") + #ifdef TAOS_ERROR_C }; #endif diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 3769da6ccd..1b0b2f63f1 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -21,6 +21,9 @@ int64_t tsRpcQueueMemoryAllowed = 0; int64_t tsRpcQueueMemoryUsed = 0; +void taosSetQueueMemoryCapacity(STaosQueue *queue, int64_t cap) { queue->memLimit = cap; } +void taosSetQueueCapacity(STaosQueue *queue, int64_t size) { queue->itemLimit = size; } + STaosQueue *taosOpenQueue() { STaosQueue *queue = taosMemoryCalloc(1, sizeof(STaosQueue)); if (queue == NULL) { @@ -153,11 +156,26 @@ void taosFreeQitem(void *pItem) { taosMemoryFree(pNode); } -void taosWriteQitem(STaosQueue *queue, void *pItem) { +int32_t taosWriteQitem(STaosQueue *queue, void *pItem) { + int32_t code = 0; STaosQnode *pNode = (STaosQnode *)(((char *)pItem) - sizeof(STaosQnode)); pNode->next = NULL; taosThreadMutexLock(&queue->mutex); + if (queue->memLimit > 0 && queue->memOfItems + pNode->size > queue->memLimit) { + code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; + uError("item:%p failed to put into queue:%p, queue mem limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, + queue->memLimit, tstrerror(code)); + + taosThreadMutexUnlock(&queue->mutex); + return code; + } else if (queue->itemLimit > 0 && queue->itemLimit + 1 > queue->itemLimit) { + code = TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY; + uError("item:%p failed to put into queue:%p, queue size limit: %" PRId64 ", reason: %s" PRId64, pItem, queue, + queue->itemLimit, tstrerror(code)); + taosThreadMutexUnlock(&queue->mutex); + return code; + } if (queue->tail) { queue->tail->next = pNode; @@ -166,15 +184,16 @@ void taosWriteQitem(STaosQueue *queue, void *pItem) { queue->head = pNode; queue->tail = pNode; } - queue->numOfItems++; queue->memOfItems += pNode->size; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); + uTrace("item:%p is put into queue:%p, items:%d mem:%" PRId64, pItem, queue, queue->numOfItems, queue->memOfItems); taosThreadMutexUnlock(&queue->mutex); if (queue->qset) tsem_post(&queue->qset->sem); + return code; } int32_t taosReadQitem(STaosQueue *queue, void **ppItem) {