diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 59bf050677..c4ab632837 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -391,6 +391,13 @@ struct SStreamTask { char reserve[256]; }; +typedef struct STaskStartInfo { + int64_t ts; + int32_t startedAfterNodeUpdate; + int32_t readyTasks; + int32_t elapsedTime; +} STaskStartInfo; + // meta typedef struct SStreamMeta { char* path; @@ -405,7 +412,7 @@ typedef struct SStreamMeta { int32_t vgId; int64_t stage; bool leader; - int8_t taskStartedByNodeUpdate; + STaskStartInfo startInfo; SRWLatch lock; int32_t walScanCounter; void* streamBackend; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 543289d621..1e3a27d567 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1830,7 +1830,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet); if (updateTasks < numOfTasks) { - pMeta->taskStartedByNodeUpdate = 1; + pMeta->startInfo.startedAfterNodeUpdate = 1; tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, updateTasks, (numOfTasks - updateTasks)); taosWUnLockLatch(&pMeta->lock); @@ -1839,7 +1839,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { if (!pTq->pVnode->restored) { tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); - pMeta->taskStartedByNodeUpdate = 0; + pMeta->startInfo.startedAfterNodeUpdate = 0; taosWUnLockLatch(&pMeta->lock); } else { tqDebug("vgId:%d tasks are all updated and stopped, restart them", vgId); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b39132d675..34cc74852f 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -231,11 +231,12 @@ int32_t tqStartStreamTasks(STQ* pTq) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); tqDebug("vgId:%d start all %d stream task(s)", vgId, numOfTasks); - if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; } + pMeta->startInfo.ts = taosGetTimestampMs(); + for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 2d96bcfffd..a6c743c87d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -552,7 +552,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) pVnode->restored = true; taosWLockLatch(&pVnode->pTq->pStreamMeta->lock); - if (pVnode->pTq->pStreamMeta->taskStartedByNodeUpdate) { + if (pVnode->pTq->pStreamMeta->startInfo.startedAfterNodeUpdate) { vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId); taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock); return; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 882c57383e..abf10487de 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -154,41 +154,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu int32_t taskLevel = pTask->info.taskLevel; *numOfBlocks = 0; - // if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one - // while (1) { - // if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { - // stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); - // return TSDB_CODE_SUCCESS; - // } - // - // STokenBucket* pBucket = pTask->pTokenBucket; - // // if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this - // execution - // // stInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", - // pTask->id.idStr, - // // pBucket->capacity, pBucket->rate); - // // return TSDB_CODE_SUCCESS; - // // } - // - // SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); - // if (qItem == NULL) { - // if (++retryTimes < MAX_RETRY_TIMES) { - // taosMsleep(10); - // continue; - // } - // - // return TSDB_CODE_SUCCESS; - // } - // - // stDebug("s-task:%s sink task handle block, type:%s", id, streamQueueItemGetTypeStr(qItem->type)); - // pTask->sinkRecorder.bytes += streamQueueItemGetSize(qItem); - // - // *numOfBlocks = 1; - // *pInput = qItem; - // return TSDB_CODE_SUCCESS; - // } - // } - while (1) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 3ae50b5b2b..d3a45a5e62 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -34,6 +34,9 @@ static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { + SStreamMeta* pMeta = pTask->pMeta; + int32_t vgId = pMeta->vgId; + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", @@ -48,6 +51,19 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { int64_t el = (pTask->execInfo.start - pTask->execInfo.init); stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfReqs, el, streamGetTaskStatusStr(pTask->status.taskStatus)); + + taosWLockLatch(&pMeta->lock); + pMeta->startInfo.readyTasks += 1; + int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); + if (pMeta->startInfo.readyTasks == numOfTotal) { + // reset value for next time start + pMeta->startInfo.readyTasks = 0; + pMeta->startInfo.startedAfterNodeUpdate = 0; + pMeta->startInfo.elapsedTime = pTask->execInfo.start - pMeta->startInfo.ts; + stDebug("vgId:%d all %d task(s) are started successfully, last ready task:%s level:%d, total elapsed time:%.2f sec", + vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pMeta->startInfo.elapsedTime / 1000.0); + } + taosWUnLockLatch(&pMeta->lock); } int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {