diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a9da7d5ef8..bdc2c0a5ec 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -458,7 +458,7 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask); -bool streamQueueIsFull(const SStreamQueue* pQueue, bool inputQ); +bool streamQueueIsFull(const SStreamQueue* pQueue); typedef struct { SMsgHead head; @@ -706,6 +706,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); +int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize); // common int32_t streamRestoreParam(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 6448e9d2f7..541d52d5b5 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -21,7 +21,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); -static void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); +static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -297,7 +297,7 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { } // todo handle memory error -void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { +bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { const char* id = pTask->id.idStr; int64_t maxVer = pTask->dataRange.range.maxVer; @@ -310,12 +310,94 @@ void handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); /*int32_t code = */streamTaskPutTranstateIntoInputQ(pTask); - /*int32_t code = */streamSchedExec(pTask); +// /*int32_t code = */streamSchedExec(pTask); + return true; } else { qWarn("s-task:%s fill-history scan WAL, nextProcessVer:%" PRId64 " out of the maximum ver:%" PRId64 ", not scan wal", id, ver, maxVer); } } + + return false; +} + +static bool taskReadyForDataFromWal(SStreamTask* pTask) { + // non-source or fill-history tasks don't need to response the WAL scan action. + if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { + return false; + } + + // not in ready state, do not handle the data from wal + int32_t status = pTask->status.taskStatus; + if (status != TASK_STATUS__NORMAL) { + tqTrace("s-task:%s not ready for submit block in wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); + return false; + } + + // fill-history task has entered into the last phase, no need to anything + if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { + ASSERT(status == TASK_STATUS__NORMAL); + // the maximum version of data in the WAL has reached already, the step2 is done + tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, + pTask->dataRange.range.maxVer); + return false; + } + + // check if input queue is full or not + if (streamQueueIsFull(pTask->inputInfo.queue)) { + tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); + return false; + } + + // the input queue of downstream task is full, so the output is blocked, stopped for a while + if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { + tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); + return false; + } + + return true; +} + +static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { + const char* id = pTask->id.idStr; + int32_t numOfNewItems = 0; + + while(1) { + if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { + *numOfItems += numOfNewItems; + return numOfNewItems > 0; + } + + SStreamQueueItem* pItem = NULL; + int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); + + if ((code != TSDB_CODE_SUCCESS || pItem == NULL)/* && (numOfItems + numOfNewItems == 0)*/) { // failed, continue +// handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); +// streamMetaReleaseTask(pMeta, pTask); +// taosThreadMutexUnlock(&pTask->lock); + break; + } + + if (pItem != NULL) { + code = streamTaskPutDataIntoInputQ(pTask, pItem); + if (code == TSDB_CODE_SUCCESS) { + numOfNewItems += 1; + int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); + pTask->chkInfo.nextProcessVer = ver; + tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", id, ver); + + bool itemInFillhistory = handleFillhistoryScanComplete(pTask, ver); + if (itemInFillhistory) { + break; + } + } else { + tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); + } + } + } + + *numOfItems += numOfNewItems; + return numOfNewItems > 0; } int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { @@ -340,45 +422,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { numOfTasks = taosArrayGetSize(pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); + STaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTask* pTask = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId); if (pTask == NULL) { continue; } - int32_t status = pTask->status.taskStatus; - - // non-source or fill-history tasks don't need to response the WAL scan action. - if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - - const char* pStatus = streamGetTaskStatusStr(status); - if (status != TASK_STATUS__NORMAL) { - tqTrace("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, pStatus); - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - - if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { - ASSERT(status == TASK_STATUS__NORMAL); - // the maximum version of data in the WAL has reached already, the step2 is done - tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, - pTask->dataRange.range.maxVer); - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - - if (streamQueueIsFull(pTask->inputInfo.queue, true)) { - tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - - // downstream task has blocked the output, stopped for a while - if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { - tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr); + if (!taskReadyForDataFromWal(pTask)) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -397,7 +447,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { taosThreadMutexLock(&pTask->lock); - pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); taosThreadMutexUnlock(&pTask->lock); @@ -405,33 +455,11 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - SStreamQueueItem* pItem = NULL; - code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr); - - if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue - handleFillhistoryScanComplete(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); - streamMetaReleaseTask(pStreamMeta, pTask); - taosThreadMutexUnlock(&pTask->lock); - continue; - } - - if (pItem != NULL) { - noDataInWal = false; - code = streamTaskPutDataIntoInputQ(pTask, pItem); - if (code == TSDB_CODE_SUCCESS) { - int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader); - pTask->chkInfo.nextProcessVer = ver; - handleFillhistoryScanComplete(pTask, ver); - tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, ver); - } else { - tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr, - pTask->chkInfo.nextProcessVer); - } - } - + bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems); taosThreadMutexUnlock(&pTask->lock); - if ((code == TSDB_CODE_SUCCESS) || (numOfItems > 0)) { + if (/*(code == TSDB_CODE_SUCCESS) || */(numOfItems > 0) || hasNewData) { + noDataInWal = false; code = streamSchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index fe4d73b566..2912c2954d 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -41,8 +41,7 @@ extern "C" { #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) #define STREAM_TASK_QUEUE_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) -#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) +#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) // clang-format off #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5b76354dff..7b23366c53 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -603,7 +603,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER || pBlock->type == STREAM_INPUT__TRANS_STATE); - int32_t retryCount = 0; pTask->execInfo.dispatch += 1; pTask->msgInfo.startTs = taosGetTimestampMs(); @@ -613,6 +612,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } else { // todo handle build dispatch msg failed } + int32_t retryCount = 0; while (1) { code = sendDispatchMsg(pTask, pTask->msgInfo.pData); if (code == TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 71d4beebb1..19cdf9d6bd 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -890,8 +890,8 @@ void metaHbToMnode(void* param, void* tmrId) { .outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)), }; - entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE; - entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; + entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; + entry.outputRate = entry.outputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; if ((*pTask)->exec.pWalReader != NULL) { entry.offset = walReaderGetCurrentVer((*pTask)->exec.pWalReader); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 887220f840..ed2dd42bcb 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -102,14 +102,13 @@ void streamQueueProcessFail(SStreamQueue* queue) { atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } -bool streamQueueIsFull(const SStreamQueue* pQueue, bool inputQ) { +bool streamQueueIsFull(const SStreamQueue* pQueue) { int32_t numOfItems = streamQueueGetNumOfItems(pQueue); if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) { return true; } - int32_t threshold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; - return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= threshold); + return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= STREAM_TASK_QUEUE_CAPACITY_IN_SIZE); } int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { @@ -119,6 +118,14 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { return numOfItems1 + numOfItems2; } +int32_t streamQueueGetAvailableSpace(const SStreamQueue* pQueue, int32_t* availNum, double* availSize) { + int32_t num = streamQueueGetNumOfItems(pQueue); + *availNum = STREAM_TASK_QUEUE_CAPACITY - num; + + *availSize = STREAM_TASK_QUEUE_CAPACITY_IN_SIZE - taosQueueMemorySize(pQueue->pQueue); + return 0; +} + // todo: fix it: data in Qall is not included here int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) { return taosQueueMemorySize(pQueue->pQueue); @@ -264,11 +271,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue, true)) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace( "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", - pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); + pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); taosFreeQitem(pItem); return -1; @@ -291,11 +298,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if (streamQueueIsFull(pTask->inputInfo.queue, true)) { + if (streamQueueIsFull(pTask->inputInfo.queue)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); + pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*)pItem); return -1; } @@ -345,7 +352,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { STaosQueue* pQueue = pTask->outputInfo.queue->pQueue; - while (streamQueueIsFull(pTask->inputInfo.queue, false)) { + while (streamQueueIsFull(pTask->inputInfo.queue)) { if (streamTaskShouldStop(&pTask->status)) { stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); return TSDB_CODE_STREAM_EXEC_CANCELLED;