diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9e6188e9d9..4f5198acc0 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1168,8 +1168,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { } } - SArray *pInvalidList = taosArrayInit(4, sizeof(STaskId)); - for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId *p = taosArrayGet(execInfo.pTaskList, i); if (p == NULL) { @@ -1181,23 +1179,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { continue; } - if (pEntry->status == TASK_STATUS__STOP) { - for (int32_t j = 0; j < taosArrayGetSize(pInvalidList); ++j) { - STaskId *pId = taosArrayGet(pInvalidList, j); - if (pId == NULL) { - continue; - } - - if (pEntry->id.streamId == pId->streamId) { - void *px = taosArrayPush(pInvalidList, &pEntry->id); - if (px == NULL) { - mError("failed to put stream into invalid list, code:%s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - } - break; - } - } - } - if (pEntry->status != TASK_STATUS__READY) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s, checkpoint not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); @@ -1215,9 +1196,6 @@ static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) { } } - removeTasksInBuf(pInvalidList, &execInfo); - taosArrayDestroy(pInvalidList); - streamMutexUnlock(&execInfo.lock); return ready ? 0 : -1; } @@ -1258,6 +1236,30 @@ static int32_t streamWaitComparFn(const void *p1, const void *p2) { return pInt1->duration > pInt2->duration ? -1 : 1; } +// all tasks of this stream should be ready, otherwise do nothing +static bool isStreamReadyHelp(int64_t now, SStreamObj* pStream) { + bool ready = false; + + streamMutexLock(&execInfo.lock); + + int64_t lastReadyTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); + if ((lastReadyTs == -1) || ((lastReadyTs != -1) && ((now - lastReadyTs) < tsStreamCheckpointInterval * 1000))) { + if (lastReadyTs != -1) { + mInfo("not start checkpoint, stream:0x%"PRIx64" last ready ts:%"PRId64" ready duration:%"PRId64" less than threshold", + pStream->uid, lastReadyTs, now - lastReadyTs); + } else { + mInfo("not start checkpoint, stream:0x%"PRIx64" not ready now", pStream->uid); + } + + ready = false; + } else { + ready = true; + } + + streamMutexUnlock(&execInfo.lock); + return ready; +} + static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1284,20 +1286,17 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) { continue; } - streamMutexLock(&execInfo.lock); - int64_t startTs = getStreamTaskLastReadyState(execInfo.pTaskList, pStream->uid); - if (startTs != -1 && (now - startTs) < tsStreamCheckpointInterval * 1000) { - streamMutexUnlock(&execInfo.lock); + bool ready = isStreamReadyHelp(now, pStream); + if (!ready) { sdbRelease(pSdb, pStream); continue; } - streamMutexUnlock(&execInfo.lock); SCheckpointInterval in = {.streamId = pStream->uid, .duration = duration}; void *p = taosArrayPush(pList, &in); if (p) { int32_t currentSize = taosArrayGetSize(pList); - mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chpt interval threshold: %ds(%" PRId64 + mDebug("stream:%s (uid:0x%" PRIx64 ") total %d stream(s) beyond chkpt interval threshold: %ds(%" PRId64 "s), concurrently launch threshold:%d", pStream->name, pStream->uid, currentSize, tsStreamCheckpointInterval, duration / 1000, tsMaxConcurrentCheckpoint);