diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5e647c0f9e..03cf2cc21e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -384,8 +384,8 @@ typedef struct SSinkRecorder { typedef struct STaskExecStatisInfo { int64_t created; - int64_t init; - int64_t start; + int64_t checkTs; + int64_t readyTs; int64_t startCheckpointId; int64_t startCheckpointVer; @@ -478,7 +478,7 @@ typedef struct STaskStartInfo { int64_t startTs; int64_t readyTs; int32_t tasksWillRestart; - int32_t taskStarting; // restart flag, sentinel to guard the restart procedure. + int32_t startAllTasks; // restart flag, sentinel to guard the restart procedure. SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed int64_t elapsedTime; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f7003ca41b..6d276c11bc 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -309,7 +309,7 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pRec->numOfSubmit += 1; if ((pRec->numOfSubmit % 1000) == 0) { - double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0; + double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, %.2fMiB duration:%.2f Sec.", pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index f2b90606d9..03553dfc2e 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -455,8 +455,8 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } -static void setParam(SStreamTask* pTask, int64_t* initTs, bool* hasHTask, STaskId* pId) { - *initTs = pTask->execInfo.init; +static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) { + *startCheckTs = pTask->execInfo.checkTs; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { *hasHTask = true; @@ -525,6 +525,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe if (pTask == NULL) { streamMetaRLock(pMeta); + // let's try to find this task in hashmap SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask != NULL) { setParam(*ppTask, &initTs, &hasHistoryTask, &fId); @@ -533,7 +534,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe if (hasHistoryTask) { streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false); } - } else { + } else { // not exist even in the hash map of meta, forget it streamMetaRUnLock(pMeta); } @@ -749,7 +750,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int64_t st = taosGetTimestampMs(); streamMetaWLock(pMeta); - if (pMeta->startInfo.taskStarting == 1) { + if (pMeta->startInfo.startAllTasks == 1) { pMeta->startInfo.restartCount += 1; tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, pMeta->startInfo.restartCount); @@ -757,7 +758,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { return TSDB_CODE_SUCCESS; } - pMeta->startInfo.taskStarting = 1; + pMeta->startInfo.startAllTasks = 1; streamMetaWUnLock(pMeta); terrno = 0; @@ -873,7 +874,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { bool scanWal = false; streamMetaWLock(pMeta); - if (pStartInfo->taskStarting == 1) { + if (pStartInfo->startAllTasks == 1) { tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId, pMeta->startInfo.restartCount); } else { // not in starting procedure diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 72cf8295dc..166a230c76 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -579,12 +579,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId); int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta); if (numOfTasks > 0) { - if (pMeta->startInfo.taskStarting == 1) { + if (pMeta->startInfo.startAllTasks == 1) { pMeta->startInfo.restartCount += 1; tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId, pMeta->startInfo.restartCount); } else { - pMeta->startInfo.taskStarting = 1; + pMeta->startInfo.startAllTasks = 1; streamMetaWUnLock(pMeta); tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e3a0eef071..3c4e6f0101 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1119,7 +1119,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { .stage = pMeta->stage, .inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)), - .startTime = (*pTask)->execInfo.start, + .startTime = (*pTask)->execInfo.readyTs, .checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId, .checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer, .checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime, @@ -1329,7 +1329,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { pStartInfo->readyTs = 0; // reset the sentinel flag value to be 0 - pStartInfo->taskStarting = 0; + pStartInfo->startAllTasks = 0; } void streamMetaRLock(SStreamMeta* pMeta) { @@ -1496,7 +1496,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { streamLaunchFillHistoryTask(pTask); } - streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true); streamMetaReleaseTask(pMeta, pTask); continue; } @@ -1506,10 +1506,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false); } } @@ -1601,10 +1601,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); - streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->checkTs, pInfo->readyTs, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false); } } @@ -1641,7 +1641,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 streamMetaWLock(pMeta); - if (pStartInfo->taskStarting != 1) { + if (pStartInfo->startAllTasks != 1) { int64_t el = endTs - startTs; qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", pMeta->vgId, taskId, ready, el); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 276997e5a8..53904788eb 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -56,8 +56,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) { ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - pTask->execInfo.start = taosGetTimestampMs(); - int64_t el = (pTask->execInfo.start - pTask->execInfo.init); + pTask->execInfo.readyTs = taosGetTimestampMs(); + int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs); stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p->name); return TSDB_CODE_SUCCESS; @@ -391,9 +391,9 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) { EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); - int64_t initTs = pTask->execInfo.init; - int64_t startTs = pTask->execInfo.start; - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); + int64_t checkTs = pTask->execInfo.checkTs; + int64_t readyTs = pTask->execInfo.readyTs; + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true); if (pTask->status.taskStatus == TASK_STATUS__HALT) { ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); @@ -498,7 +498,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } - int32_t startTs = pTask->execInfo.init; + int32_t startTs = pTask->execInfo.checkTs; int64_t now = taosGetTimestampMs(); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false); @@ -603,13 +603,13 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) SDataRange* pRange = &pHTask->dataRange; // the query version range should be limited to the already processed data - pHTask->execInfo.init = taosGetTimestampMs(); + pHTask->execInfo.checkTs = taosGetTimestampMs(); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64 " verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64, pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, - pRange->range.maxVer, pHTask->execInfo.init); + pRange->range.maxVer, pHTask->execInfo.checkTs); } else { stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr); } @@ -767,8 +767,7 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId); if (pInfo == NULL) { stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr); - - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); return terrno; } @@ -785,7 +784,7 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref); taosMemoryFree(pInfo); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); return terrno; } @@ -816,7 +815,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId, pStatus->name); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); return -1; // todo set the correct error code } @@ -831,11 +830,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId); if (pHisTask == NULL) { stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false); } else { if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr); - streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, true); + streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true); } else { // exist, but not ready, continue check downstream task status checkFillhistoryTaskStatus(pTask, pHisTask); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7badbfa9f3..fdcf9da9af 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -365,8 +365,9 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64 ", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64 " nextProcessVer:%" PRId64 ", checkpointCount:%d", - taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs, - pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint); + taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, + pStatis->latestUpdateTs, pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, + pStatis->checkpoint); // remove the ref by timer while (pTask->status.timerActive > 0) { diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index cfa94209f6..c16598f84c 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -80,10 +80,10 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv } int32_t streamTaskInitStatus(SStreamTask* pTask) { - pTask->execInfo.init = taosGetTimestampMs(); - + pTask->execInfo.checkTs = taosGetTimestampMs(); stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, - pTask->execInfo.init); + pTask->execInfo.checkTs); + streamTaskCheckDownstream(pTask); return 0; }