diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a69543a6d6..f4ece1a141 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -3116,19 +3116,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (p->status != TASK_STATUS__READY) { mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status)); - -// if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) { -// bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo); -// if (drop) { -// SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId); -// if (pStreamObj == NULL) { -// mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid); -// } else { -// mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj); -// mndReleaseStream(pMnode, pStreamObj); -// } -// } -// } } } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index c75185b0ea..1c9361fa61 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -288,10 +288,11 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { } int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { - SStreamMeta* pMeta = p->pMeta; - int32_t vgId = pMeta->vgId; - const char* id = p->id.idStr; - int32_t code = 0; + SStreamMeta* pMeta = p->pMeta; + int32_t vgId = pMeta->vgId; + const char* id = p->id.idStr; + int32_t code = 0; + SCheckpointInfo* pCKInfo = &p->chkInfo; if (p->info.fillHistory == 1) { return code; @@ -303,25 +304,33 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { taosThreadMutexLock(&p->lock); - ASSERT(p->chkInfo.checkpointId <= p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId && - p->chkInfo.checkpointVer <= p->chkInfo.processedVer); - p->chkInfo.checkpointId = p->chkInfo.checkpointingId; - p->chkInfo.checkpointVer = p->chkInfo.processedVer; + SStreamTaskState* pStatus = streamTaskGetStatus(p); + if (pStatus->state == TASK_STATUS__CK) { + ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && + pCKInfo->checkpointVer <= pCKInfo->processedVer); - streamTaskClearCheckInfo(p, false); - SStreamTaskState* pState = streamTaskGetStatus(p); + pCKInfo->checkpointId = pCKInfo->checkpointingId; + pCKInfo->checkpointVer = pCKInfo->processedVer; - code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - taosThreadMutexUnlock(&p->lock); + streamTaskClearCheckInfo(p, false); + code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + taosThreadMutexUnlock(&p->lock); + } else { + stDebug("s-task:%s vgId:%d status:%s not keep the checkpoint metaInfo, checkpoint:%" PRId64 " failed", id, vgId, + pStatus->name, pCKInfo->checkpointingId); + taosThreadMutexUnlock(&p->lock); + + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } if (code != TSDB_CODE_SUCCESS) { stDebug("s-task:%s vgId:%d handle event:checkpoint-done failed", id, vgId); - return -1; + return code; } stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64 ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s", - vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, pState->name); + vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name); // save the task if not sink task if (p->info.taskLevel != TASK_LEVEL__SINK) { @@ -343,6 +352,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { streamMetaWUnLock(pMeta); } + return code; } @@ -474,7 +484,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { code = streamSaveTaskCheckpointInfo(pTask, ckId); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId, - tstrerror(terrno)); + tstrerror(code)); } else { code = streamTaskUploadChkp(pTask, ckId, (char*)pTask->id.idStr); if (code != 0) { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index d3287a6b96..8b5b2da061 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -160,7 +160,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); - taosMsleep(WAIT_FOR_DURATION); return TSDB_CODE_SUCCESS; } @@ -172,10 +171,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue); if (qItem == NULL) { -// if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { -// taosMsleep(WAIT_FOR_DURATION); -// continue; -// } // restore the token to bucket if (*numOfBlocks > 0) {