Merge pull request #27770 from taosdata/fix/syntax

fix(stream):not reset the failed checkpointId
This commit is contained in:
Haojun Liao 2024-09-11 09:03:51 +08:00 committed by GitHub
commit c33b57fa65
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 55 additions and 20 deletions

View File

@ -653,7 +653,7 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask
return code;
}
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, TSDB_CODE_VND_INVALID_VGROUP_ID, 0);
code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
}

View File

@ -235,6 +235,16 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
return code;
}
if (pActiveInfo->failedId >= checkpointId) {
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
"discard the checkpoint-trigger block",
id, vgId, checkpointId, transId, pActiveInfo->failedId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
if (pTask->chkInfo.checkpointId == checkpointId) {
{ // send checkpoint-ready msg to upstream
SRpcMsg msg = {0};
@ -531,15 +541,20 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
}
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
pTask->chkInfo.startTs = 0; // clear the recorded start time
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
streamMutexLock(&pTask->chkInfo.pActiveInfo->lock);
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
streamMutexLock(&pInfo->lock);
streamTaskClearActiveInfo(pInfo);
if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
streamClearChkptReadyMsg(pInfo);
}
streamMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
streamMutexUnlock(&pInfo->lock);
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%"PRId64", current checkpointId:%"PRId64,
pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
}
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
@ -669,7 +684,7 @@ void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
} else {
pInfo->failedId = pInfo->activeId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pInfo->activeId,
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId,
pInfo->transId);
}
}

View File

@ -726,9 +726,10 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t code = 0;
SStreamDataBlock* pBlock = NULL;
const char* id = pTask->id.idStr;
int32_t code = 0;
SStreamDataBlock* pBlock = NULL;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
if (numOfElems > 0) {
@ -746,10 +747,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return 0;
}
if (pTask->chkInfo.pActiveInfo->dispatchTrigger) {
stDebug("s-task:%s already send checkpoint-trigger, no longer dispatch any other data", id);
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
return 0;
if (pInfo->dispatchTrigger) {
if ((pInfo->activeId != 0) && (pInfo->failedId < pInfo->activeId)) {
stDebug("s-task:%s already send checkpoint-trigger, no longer dispatch any other data", id);
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
return 0;
} else {
stDebug("s-task:%s dispatch trigger set, and ignore since current active checkpointId:%" PRId64 " failed", id,
pInfo->activeId);
}
}
if (pTask->msgInfo.pData != NULL) {
@ -788,8 +794,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
// outputQ should be empty here, otherwise, set the checkpoint failed due to the retrieve req happens
if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) {
stError("s-task:%s items are still in outputQ due to downstream retrieve, failed to init trigger dispatch",
pTask->id.idStr);
stError(
"s-task:%s items are still in outputQ due to downstream retrieve, failed to init and discard "
"checkpoint-trigger dispatch",
pTask->id.idStr);
streamTaskSetCheckpointFailed(pTask);
clearBufferedDispatchMsg(pTask);
continue;
@ -1478,6 +1486,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t numOfFailed = 0;
bool triggerDispatchRsp = false;
taosMsleep(500);
// we only set the dispatch msg info for current checkpoint trans
streamMutexLock(&pTask->lock);
triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) &&

View File

@ -1190,6 +1190,7 @@ void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t c
pTask->chkInfo.pActiveInfo->transId = transId;
pTask->chkInfo.pActiveInfo->activeId = checkpointId;
pTask->chkInfo.pActiveInfo->failedId = checkpointId;
stDebug("s-task:%s set failed checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
}
int32_t streamTaskCreateActiveChkptInfo(SActiveCheckpointInfo** pRes) {
@ -1239,12 +1240,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
taosMemoryFree(pInfo);
}
//NOTE: clear the checkpoint id, and keep the failed id
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->activeId = 0; // clear the checkpoint id
pInfo->activeId = 0;
pInfo->transId = 0;
pInfo->allUpstreamTriggerRecv = 0;
pInfo->dispatchTrigger = false;
pInfo->failedId = 0;
// pInfo->failedId = 0;
taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList);

View File

@ -109,7 +109,7 @@ endi
print ===== idle for 70 sec for checkpoint gen
sleep 70000
print ===== idle 60 sec completed , continue
print ===== idle 70 sec completed , continue
print ===== step 1 over
@ -127,9 +127,12 @@ sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 wate
sql_error create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
sql create stream if not exists streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s);
print start to check stream status
sleep 1000
run tsim/stream/checkTaskStatus.sim
print pause stream2
sql pause stream streams2;
sql insert into t1 values(1648791213001,1,12,3,1.0);

View File

@ -608,7 +608,7 @@ $loop_count = 0
print step 7
loop4:
sleep 100
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 10 then

View File

@ -283,7 +283,8 @@ class TDTestCase:
fake.random_int(min=-0, max=32767, step=1) , fake.random_int(min=-0, max=127, step=1) ,
fake.pyfloat() , fake.pyfloat() , fake.pystr() , fake.pystr() , ts + i, fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() ,
fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr() , fake.pystr()))
time.sleep(1)
tdSql.query("select count(*) from stable_1;")
tdSql.checkData(0,0,10*num_random*n)
tdSql.query("select count(*) from hn_table_1_r;")
@ -291,6 +292,10 @@ class TDTestCase:
# stream data check
tdCom.check_stream_task_status(stream_name,vgroups,90)
print("sleep 30s")
time.sleep(30)
print("check--------------------------------------------------------------------------")
tdSql.query("select startts,wend,max_int from stream_max_stable_1 ;")
tdSql.checkRows(20)
tdSql.query("select sum(max_int) from stream_max_stable_1 ;")