From 4d8548e9387fa7c7dc7290012c867036dc216ba9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 6 Nov 2023 23:42:16 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 ++ source/dnode/vnode/src/tq/tq.c | 11 +++++------ source/libs/stream/src/streamDispatch.c | 8 ++++---- source/libs/stream/src/streamExec.c | 19 +++++++++++++++---- source/libs/stream/src/streamStart.c | 2 +- 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a063f39d92..4842b4b8e2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -372,7 +372,9 @@ typedef struct STaskExecStatisInfo { int64_t init; int64_t start; int64_t step1Start; + double step1El; int64_t step2Start; + double step2El; int32_t updateCount; int64_t latestUpdateTs; int32_t processDataBlocks; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 81defbe6d0..118355383f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1102,10 +1102,6 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask } } -static void ddxx() { - -} - // this function should be executed by only one thread, so we set an sentinel to protect this function int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; @@ -1149,9 +1145,12 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } } else { if (pTask->execInfo.step2Start == 0) { - tqDebug("s-task:%s resume from paused, original step1 startTs:%" PRId64, id, pTask->execInfo.step1Start); + tqDebug("s-task:%s continue exec scan-history(step1), original step1 startTs:%" PRId64", already elapsed:%.2fs", id, + pTask->execInfo.step1Start, pTask->execInfo.step1El); } else { - tqDebug("s-task:%s already in step2, no need to scan-history data, step2 starTs:%"PRId64, id, pTask->execInfo.step2Start); + tqDebug("s-task:%s already in step2, no need to scan-history data, step2 startTs:%" PRId64, id, + pTask->execInfo.step2Start); + atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); return 0; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5665e7a917..6bb15dfd23 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1106,8 +1106,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId); taosThreadMutexUnlock(&pTask->lock); - stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); + stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); } else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) { stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); @@ -1147,8 +1147,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d", - pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); + stDebug("s-task:%s failed to dispatch msg to downstream, add into timer to retry in %dms, ref:%d", + pTask->id.idStr, DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // this message has been sent successfully, let's try next one. diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d81a5f0c2f..a3e981658a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -220,7 +220,12 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) { } if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { - stDebug("s-task:%s level:%d inputQ is blocked, retry later", pTask->id.idStr, pTask->info.taskLevel); + int64_t el = taosGetTimestampMs() - st; + pTask->execInfo.step1El += el/1000.0; + + stDebug("s-task:%s level:%d inputQ is blocked, resume in 5sec, elapsed time:%.2fs", pTask->id.idStr, + pTask->info.taskLevel, pTask->execInfo.step1El); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; } @@ -268,14 +273,20 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask) { } int64_t el = taosGetTimestampMs() - st; + pTask->execInfo.step1El += el/1000.0; + if (el >= STREAM_SCAN_HISTORY_TIMESLICE) { - stDebug("s-task:%s fill-history:%d level:%d timeslice for scan-history exhausted", pTask->id.idStr, - pTask->info.fillHistory, pTask->info.taskLevel); + stDebug("s-task:%s fill-history:%d time slice for scan-history exhausted, elapse time:%.2fs, retry in 100ms", + pTask->id.idStr, pTask->info.fillHistory, el / 1000.0); return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; } } - return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0};; + // todo refactor + int64_t el = taosGetTimestampMs() - st; + pTask->execInfo.step1El += el/1000.0; + + return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; } // wait for the stream task to be idle diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 9044e65eb6..0424dcd715 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -125,7 +125,7 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) pTask->schedHistoryInfo.numOfTicks = numOfTicks; int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s scan-history start in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref); + stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref); if (pTask->schedHistoryInfo.pTimer == NULL) { pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);