fix(stream): add checkpoint status check.

This commit is contained in:
Haojun Liao 2024-01-11 16:47:51 +08:00
parent d4fd544c74
commit 9703018f56
3 changed files with 25 additions and 33 deletions

View File

@ -3116,19 +3116,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
// if (p->status == TASK_STATUS__STREAM_SCAN_HISTORY) {
// bool drop = needDropRelatedFillhistoryTask(pTaskEntry, &execInfo);
// if (drop) {
// SStreamObj *pStreamObj = mndGetStreamObj(pMnode, pTaskEntry->id.streamId);
// if (pStreamObj == NULL) {
// mError("failed to acquire the streamObj:0x%" PRIx64 " it may have been dropped", pStreamObj->uid);
// } else {
// mndDropRelatedFillhistoryTask(pMnode, pTaskEntry, pStreamObj);
// mndReleaseStream(pMnode, pStreamObj);
// }
// }
// }
}
}

View File

@ -288,10 +288,11 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
}
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
SStreamMeta* pMeta = p->pMeta;
int32_t vgId = pMeta->vgId;
const char* id = p->id.idStr;
int32_t code = 0;
SStreamMeta* pMeta = p->pMeta;
int32_t vgId = pMeta->vgId;
const char* id = p->id.idStr;
int32_t code = 0;
SCheckpointInfo* pCKInfo = &p->chkInfo;
if (p->info.fillHistory == 1) {
return code;
@ -303,25 +304,33 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
taosThreadMutexLock(&p->lock);
ASSERT(p->chkInfo.checkpointId <= p->chkInfo.checkpointingId && p->chkInfo.checkpointingId == checkpointId &&
p->chkInfo.checkpointVer <= p->chkInfo.processedVer);
p->chkInfo.checkpointId = p->chkInfo.checkpointingId;
p->chkInfo.checkpointVer = p->chkInfo.processedVer;
SStreamTaskState* pStatus = streamTaskGetStatus(p);
if (pStatus->state == TASK_STATUS__CK) {
ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId &&
pCKInfo->checkpointVer <= pCKInfo->processedVer);
streamTaskClearCheckInfo(p, false);
SStreamTaskState* pState = streamTaskGetStatus(p);
pCKInfo->checkpointId = pCKInfo->checkpointingId;
pCKInfo->checkpointVer = pCKInfo->processedVer;
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&p->lock);
streamTaskClearCheckInfo(p, false);
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&p->lock);
} else {
stDebug("s-task:%s vgId:%d status:%s not keep the checkpoint metaInfo, checkpoint:%" PRId64 " failed", id, vgId,
pStatus->name, pCKInfo->checkpointingId);
taosThreadMutexUnlock(&p->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s vgId:%d handle event:checkpoint-done failed", id, vgId);
return -1;
return code;
}
stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s",
vgId, id, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.nextProcessVer, pState->name);
vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name);
// save the task if not sink task
if (p->info.taskLevel != TASK_LEVEL__SINK) {
@ -343,6 +352,7 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
streamMetaWUnLock(pMeta);
}
return code;
}
@ -474,7 +484,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
code = streamSaveTaskCheckpointInfo(pTask, ckId);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s commit taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", pTask->id.idStr, ckId,
tstrerror(terrno));
tstrerror(code));
} else {
code = streamTaskUploadChkp(pTask, ckId, (char*)pTask->id.idStr);
if (code != 0) {

View File

@ -160,7 +160,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
// no available token in bucket for sink task, let's wait for a little bit
if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) {
stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id);
taosMsleep(WAIT_FOR_DURATION);
return TSDB_CODE_SUCCESS;
}
@ -172,10 +171,6 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue);
if (qItem == NULL) {
// if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
// taosMsleep(WAIT_FOR_DURATION);
// continue;
// }
// restore the token to bucket
if (*numOfBlocks > 0) {