fix(stream):not reset the failed checkpointId

This commit is contained in:
Haojun Liao 2024-09-10 12:58:08 +08:00
parent eba32fdcb3
commit 51bee72807
4 changed files with 47 additions and 17 deletions

View File

@ -235,6 +235,16 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
return code;
}
if (pActiveInfo->failedId >= checkpointId) {
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
"discard the checkpoint-trigger block",
id, vgId, checkpointId, transId, pActiveInfo->failedId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
if (pTask->chkInfo.checkpointId == checkpointId) {
{ // send checkpoint-ready msg to upstream
SRpcMsg msg = {0};
@ -531,15 +541,20 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
}
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
pTask->chkInfo.startTs = 0; // clear the recorded start time
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
streamMutexLock(&pTask->chkInfo.pActiveInfo->lock);
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
streamMutexLock(&pInfo->lock);
streamTaskClearActiveInfo(pInfo);
if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
streamClearChkptReadyMsg(pInfo);
}
streamMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
streamMutexUnlock(&pInfo->lock);
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%"PRId64", current checkpointId:%"PRId64,
pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
}
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
@ -669,7 +684,7 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
} else {
pInfo->failedId = pInfo->activeId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pInfo->activeId,
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId,
pInfo->transId);
}
}

View File

@ -729,6 +729,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t code = 0;
SStreamDataBlock* pBlock = NULL;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
if (numOfElems > 0) {
@ -746,10 +747,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return 0;
}
if (pTask->chkInfo.pActiveInfo->dispatchTrigger) {
if (pInfo->dispatchTrigger) {
if ((pInfo->activeId != 0) && (pInfo->failedId < pInfo->activeId)) {
stDebug("s-task:%s already send checkpoint-trigger, no longer dispatch any other data", id);
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
return 0;
} else {
stDebug("s-task:%s dispatch trigger set, and ignore since current active checkpointId:%" PRId64 " failed", id,
pInfo->activeId);
}
}
if (pTask->msgInfo.pData != NULL) {
@ -788,7 +794,9 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// outputQ should be empty here, otherwise, set the checkpoint failed due to the retrieve req happens
if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) {
stError("s-task:%s items are still in outputQ due to downstream retrieve, failed to init trigger dispatch",
stError(
"s-task:%s items are still in outputQ due to downstream retrieve, failed to init and discard "
"checkpoint-trigger dispatch",
pTask->id.idStr);
streamTaskSetCheckpointFailed(pTask);
clearBufferedDispatchMsg(pTask);
@ -1478,6 +1486,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t numOfFailed = 0;
bool triggerDispatchRsp = false;
taosMsleep(500);
// we only set the dispatch msg info for current checkpoint trans
streamMutexLock(&pTask->lock);
triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) &&

View File

@ -1190,6 +1190,7 @@ void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t c
pTask->chkInfo.pActiveInfo->transId = transId;
pTask->chkInfo.pActiveInfo->activeId = checkpointId;
pTask->chkInfo.pActiveInfo->failedId = checkpointId;
stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
}
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
@ -1239,12 +1240,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
taosMemoryFree(pInfo);
}
//NOTE: clear the checkpoint id, and keep the failed id
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->activeId = 0; // clear the checkpoint id
pInfo->activeId = 0;
pInfo->transId = 0;
pInfo->allUpstreamTriggerRecv = 0;
pInfo->dispatchTrigger = false;
pInfo->failedId = 0;
// pInfo->failedId = 0;
taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList);

View File

@ -109,7 +109,7 @@ endi
print ===== idle for 70 sec for checkpoint gen
sleep 70000
print ===== idle 60 sec completed , continue
print ===== idle 70 sec completed , continue
print ===== step 1 over
@ -127,9 +127,12 @@ sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 wate
sql_error create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
sql create stream if not exists streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
print start to check stream status
sleep 1000
run tsim/stream/checkTaskStatus.sim
print pause stream2
sql pause stream streams2;
sql insert into t1 values(1648791213001,1,12,3,1.0);