diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a137c10ed5..20cd415a6f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1134,6 +1134,7 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); ready = false; + break; } if (pEntry->hTaskId != 0) { @@ -1153,6 +1154,27 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { return ready ? 0 : -1; } +int64_t getStreamTaskLastReadyState(SArray *pTaskList, int64_t streamId) { + int64_t ts = -1; + int32_t taskId = -1; + + for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) { + STaskId *p = taosArrayGet(pTaskList, i); + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL || pEntry->id.streamId != streamId) { + continue; + } + + if (pEntry->status == TASK_STATUS__READY && ts < pEntry->startTime) { + ts = pEntry->startTime; + taskId = pEntry->id.taskId; + } + } + + mDebug("stream:0x%" PRIx64 " last ready ts:%" PRId64 " s-task:0x%x", streamId, ts, taskId); + return ts; +} + typedef struct { int64_t streamId; int64_t duration; @@ -1191,6 +1213,15 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { continue; } + taosThreadMutexLock(&execInfo.lock); + int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); + if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) { + taosThreadMutexUnlock(&execInfo.lock); + sdbRelease(pSdb, pStream); + continue; + } + taosThreadMutexUnlock(&execInfo.lock); + SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration}; taosArrayPush(pList, &in);