From 583ac1fc45feaf85819977861c44c9ea0fc27f6b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 1 Oct 2023 22:27:29 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 24 ++++++----- source/common/src/systable.c | 4 +- source/dnode/mnode/impl/src/mndScheduler.c | 2 +- source/dnode/mnode/impl/src/mndStream.c | 28 ++++++++++++- source/dnode/snode/src/snode.c | 4 +- source/dnode/vnode/src/tq/tq.c | 8 ++-- source/dnode/vnode/src/tq/tqStreamTask.c | 4 +- source/libs/stream/inc/streamInt.h | 17 ++++++-- source/libs/stream/src/streamMeta.c | 48 ++++++++++++++++------ source/libs/stream/src/streamQueue.c | 25 +++++------ source/libs/stream/src/streamRecover.c | 16 ++++---- 11 files changed, 120 insertions(+), 60 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 40b2be72bc..b399459230 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -44,7 +44,8 @@ extern "C" { #define NODE_ROLE_LEADER 0x2 #define NODE_ROLE_FOLLOWER 0x3 -typedef struct SStreamTask SStreamTask; +typedef struct SStreamTask SStreamTask; +typedef struct SStreamQueue SStreamQueue; #define SSTREAM_TASK_VER 2 enum { @@ -190,13 +191,6 @@ int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem); SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); #endif -typedef struct { - STaosQueue* pQueue; - STaosQall* qall; - void* qItem; - int8_t status; -} SStreamQueue; - int32_t streamInit(); void streamCleanUp(); @@ -314,7 +308,7 @@ typedef struct STaskOutputInfo { } STaskOutputInfo; typedef struct STaskInputInfo { - int8_t status; + int8_t status; SStreamQueue* queue; } STaskInputInfo; @@ -406,7 +400,8 @@ struct SStreamTask { }; typedef struct STaskStartInfo { - int64_t ts; + int64_t startTs; + int64_t readyTs; int32_t startedAfterNodeUpdate; SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing int32_t elapsedTime; @@ -463,7 +458,7 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId); int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem); int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock); int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask); -bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ); +bool streamQueueIsFull(const SStreamQueue* pQueue, bool inputQ); typedef struct { SMsgHead head; @@ -602,6 +597,13 @@ typedef struct STaskStatusEntry { int32_t status; int32_t stage; int32_t nodeId; + int64_t verStart; // start version in WAL, only valid for source task + int64_t verEnd; // end version in WAL, only valid for source task + int64_t offset; // only valid for source task + double inputQUsed; // in MiB + double inputQCap; + double outputQUsed; // in MiB + double outputQCap; } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 7107f0e058..e4e4f2ce99 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -161,9 +161,11 @@ static const SSysDbTableSchema streamTaskSchema[] = { {.name = "task_id", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_type", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "level", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stage", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "in_queue", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "out_queue", .bytes = 35, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; static const SSysDbTableSchema userTblsSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index f152fc7c5d..d598dc11d2 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -353,7 +353,7 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan* if (pStream->conf.fillHistory) { SArray** pHSinkTaskList = taosArrayGet(pStream->pHTasksList, SINK_NODE_LEVEL); code = addSourceTask(pMnode, pVgroup, pHTaskList, *pHSinkTaskList, pStream, plan, pStream->hTaskUid, - pEpset, true, hasExtraSink, nextWindowSkey, false); + pEpset, true, hasExtraSink, nextWindowSkey, true); } sdbRelease(pSdb, pVgroup); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8cbdeff19a..11916bdb4f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1563,7 +1563,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&level, false); + colDataSetVal(pColInfo, numOfRows, (const char *)level, false); // status char status[20 + VARSTR_HEADER_SIZE] = {0}; @@ -1577,12 +1577,31 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock const char* pStatus = streamGetTaskStatusStr(pe->status); STR_TO_VARSTR(status, pStatus); + // status pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&status, false); + colDataSetVal(pColInfo, numOfRows, (const char *)status, false); + // stage pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false); + // input queue + char vbuf[30] = {0}; + char buf[25] = {0}; + const char* queueInfoStr = "%.2fMiB (%.2f%, %.2fMiB)"; + sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputQUsed/pe->inputQCap, pe->inputQCap); + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + + // output queue + sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputQUsed/pe->outputQCap, pe->outputQCap); + STR_TO_VARSTR(vbuf, buf); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false); + numOfRows++; } } @@ -2429,6 +2448,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } else { pEntry->stage = p->stage; + pEntry->inputQUsed = p->inputQUsed; + pEntry->inputQCap = p->inputQCap; + pEntry->outputQUsed = p->outputQUsed; + pEntry->outputQCap = p->outputQCap; + pEntry->offset = p->offset; } pEntry->status = p->status; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 799c784f38..c5fd202986 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -53,9 +53,9 @@ FAIL: taosFreeQitem(pMsg); } -int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { +int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0); - int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, ver); + int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 455937b603..19bcf2a9d8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -724,11 +724,11 @@ end: void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } -int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { +int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); - int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver); + int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1421,8 +1421,8 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); } + streamMetaReleaseTask(pMeta, pTask); } - streamMetaReleaseTask(pMeta, pTask); // drop the stream task now streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); @@ -1510,7 +1510,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { streamStartScanHistoryAsync(pTask, igUntreated); - } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputInfo.queue->pQueue) == 0)) { + } else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputInfo.queue) == 0)) { tqScanWalAsync(pTq, false); } else { streamSchedExec(pTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index c3ef52e96f..069cc4cbbd 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -73,7 +73,7 @@ int32_t tqCheckAndRunStreamTask(STQ* pTq) { taosWLockLatch(&pMeta->lock); pTaskList = taosArrayDup(pMeta->pTaskList, NULL); taosHashClear(pMeta->startInfo.pReadyTaskSet); - pMeta->startInfo.ts = taosGetTimestampMs(); + pMeta->startInfo.startTs = taosGetTimestampMs(); taosWUnLockLatch(&pMeta->lock); // broadcast the check downstream tasks msg @@ -370,7 +370,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (streamQueueIsFull(pTask->inputInfo.queue->pQueue, true)) { + if (streamQueueGetNumOfItems(pTask->inputInfo.queue)) { tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 1d8fde3a48..fe4d73b566 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -40,6 +40,10 @@ extern "C" { #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) +#define STREAM_TASK_QUEUE_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) +#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) + // clang-format off #define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define stError(...) do { if (stDebugFlag & DEBUG_ERROR) { taosPrintLog("STM ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) @@ -70,6 +74,13 @@ struct STokenBucket { int64_t fillTimestamp; // fill timestamp }; +struct SStreamQueue { + STaosQueue* pQueue; + STaosQall* qall; + void* qItem; + int8_t status; +}; + extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; @@ -100,7 +111,6 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); -int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); @@ -118,14 +128,15 @@ STaskId streamTaskExtractKey(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); +void streamMetaResetStartInfo(STaskStartInfo* pMeta); + SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); void streamQueueProcessSuccess(SStreamQueue* queue); void streamQueueProcessFail(SStreamQueue* queue); void* streamQueueNextItem(SStreamQueue* pQueue); void streamFreeQitem(SStreamQueueItem* data); - - +int32_t streamQueueGetItemSize(const SStreamQueue* pQueue); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5470da8360..0007b83fca 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -406,15 +406,15 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa taosArrayPush(pMeta->pTaskList, &pTask->id); -// if (streamMetaSaveTask(pMeta, pTask) < 0) { -// tFreeStreamTask(pTask); -// return -1; -// } -// -// if (streamMetaCommit(pMeta) < 0) { -// tFreeStreamTask(pTask); -// return -1; -// } + if (streamMetaSaveTask(pMeta, pTask) < 0) { + tFreeStreamTask(pTask); + return -1; + } + + if (streamMetaCommit(pMeta) < 0) { + tFreeStreamTask(pTask); + return -1; + } taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); if (pTask->info.fillHistory == 0) { @@ -706,9 +706,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { - // pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader - // In this case, we try not to start fill-history task anymore. - if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); return -1; @@ -776,6 +774,10 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI32(pEncoder, ps->status) < 0) return -1; if (tEncodeI32(pEncoder, ps->stage) < 0) return -1; if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->inputQUsed) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->inputQCap) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->outputQUsed) < 0) return -1; + if (tEncodeDouble(pEncoder, ps->outputQCap) < 0) return -1; } tEndEncode(pEncoder); return pEncoder->pos; @@ -796,6 +798,10 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1; if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.inputQUsed) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.inputQCap) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.outputQUsed) < 0) return -1; + if (tDecodeDouble(pDecoder, &entry.outputQCap) < 0) return -1; entry.id.taskId = taskId; taosArrayPush(pReq->pTaskStatus, &entry); @@ -863,12 +869,23 @@ void metaHbToMnode(void* param, void* tmrId) { STaskId* pId = taosArrayGet(pMeta->pTaskList, i); SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + + // not report the status of fill-history task if ((*pTask)->info.fillHistory == 1) { continue; } STaskStatusEntry entry = { - .id = *pId, .status = (*pTask)->status.taskStatus, .nodeId = pMeta->vgId, .stage = pMeta->stage}; + .id = *pId, + .status = (*pTask)->status.taskStatus, + .nodeId = pMeta->vgId, + .stage = pMeta->stage, + .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputInfo.queue)), + .inputQCap = STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, + .outputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->outputInfo.queue)), + .outputQCap = STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE, + }; + taosArrayPush(hbMsg.pTaskStatus, &entry); if (!hasValEpset) { @@ -1004,4 +1021,9 @@ void streamMetaStartHb(SStreamMeta* pMeta) { void streamMetaInitForSnode(SStreamMeta* pMeta) { pMeta->stage = 0; pMeta->role = NODE_ROLE_LEADER; +} + +void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { + taosHashClear(pStartInfo->pReadyTaskSet); + pStartInfo->startedAfterNodeUpdate = 0; } \ No newline at end of file diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 65400386b1..8a7827b5a7 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -17,9 +17,6 @@ #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MIN_STREAM_EXEC_BATCH_NUM 4 -#define STREAM_TASK_QUEUE_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30) -#define STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE (50) #define MAX_SMOOTH_BURST_RATIO 5 // 20 sec // todo refactor: @@ -105,15 +102,14 @@ void streamQueueProcessFail(SStreamQueue* queue) { atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } -bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) { - bool isFull = taosQueueItemSize((STaosQueue*)pQueue) >= STREAM_TASK_QUEUE_CAPACITY; - if (isFull) { +bool streamQueueIsFull(const SStreamQueue* pQueue, bool inputQ) { + int32_t numOfItems = streamQueueGetNumOfItems(pQueue); + if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) { return true; } - int32_t threahold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; - double size = SIZE_IN_MiB(taosQueueMemorySize((STaosQueue*)pQueue)); - return (size >= threahold); + int32_t threshold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; + return (SIZE_IN_MiB(taosQueueMemorySize(pQueue->pQueue)) >= threshold); } int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { @@ -123,8 +119,9 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { return numOfItems1 + numOfItems2; } -int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue) { - return taosQueueItemSize(pQueue->pQueue); +// todo: fix it: data in Qall is not included here +int32_t streamQueueGetItemSize(const SStreamQueue* pQueue) { + return taosQueueMemorySize(pQueue->pQueue); } int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) { @@ -267,7 +264,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; - if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue, true)) { + if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pTask->inputInfo.queue, true)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace( "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", @@ -294,7 +291,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - if (streamQueueIsFull(pQueue, true)) { + if (streamQueueIsFull(pTask->inputInfo.queue, true)) { double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", @@ -348,7 +345,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { STaosQueue* pQueue = pTask->outputInfo.queue->pQueue; - while (streamQueueIsFull(pQueue, false)) { + while (streamQueueIsFull(pTask->inputInfo.queue, false)) { if (streamTaskShouldStop(&pTask->status)) { stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); return TSDB_CODE_STREAM_EXEC_CANCELLED; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 67f4108270..85e57339e0 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -58,16 +58,18 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { STaskId id = streamTaskExtractKey(pTask); taosHashPut(pMeta->startInfo.pReadyTaskSet, &id, sizeof(id), NULL, 0); + int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); - if (taosHashGetSize(pMeta->startInfo.pReadyTaskSet) == numOfTotal) { - // reset value for next time start - taosHashClear(pMeta->startInfo.pReadyTaskSet); - pMeta->startInfo.startedAfterNodeUpdate = 0; - pMeta->startInfo.elapsedTime = pTask->execInfo.start - pMeta->startInfo.ts; + STaskStartInfo* pStartInfo = &pMeta->startInfo; + pStartInfo->readyTs = pTask->execInfo.start; + pStartInfo->elapsedTime = pStartInfo->readyTs - pStartInfo->startTs; + streamMetaResetStartInfo(pStartInfo); - stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2fs", - vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pMeta->startInfo.elapsedTime / 1000.0); + stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, startTs:%" PRId64 + ", readyTs:%" PRId64 " total elapsed time:%.2fs", + vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs, + pStartInfo->elapsedTime / 1000.0); } taosWUnLockLatch(&pMeta->lock); }