fix(stream): reset task add the failed chkptId info.

This commit is contained in:
Haojun Liao 2024-11-01 18:58:22 +08:00
parent 3b129f7e36
commit b2b5a14d9d
9 changed files with 51 additions and 35 deletions

View File

@ -3797,7 +3797,14 @@ typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
} SVPauseStreamTaskReq;
typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
int64_t chkptId;
} SVResetStreamTaskReq;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];

View File

@ -718,6 +718,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pInfo, int32_t code);
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);

View File

@ -56,6 +56,7 @@ typedef struct SStreamTransMgmt {
typedef struct SStreamTaskResetMsg {
int64_t streamId;
int32_t transId;
int64_t checkpointId;
} SStreamTaskResetMsg;
typedef struct SChkptReportInfo {
@ -142,9 +143,9 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId);
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId);
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts);
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream);
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,

View File

@ -24,7 +24,7 @@ typedef struct SFailedCheckpointInfo {
static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId);
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
@ -68,7 +68,7 @@ void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
}
}
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId) {
STrans *pTrans = NULL;
int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME,
" reset from failed checkpoint", &pTrans);
@ -84,7 +84,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return code;
}
code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream);
code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream, chkptId);
if (code) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -115,7 +115,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return code;
}
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId) {
int32_t size = sizeof(SStreamTaskResetMsg);
int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans);
@ -135,8 +135,9 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t
taosArrayRemove(execInfo.pKilledChkptTrans, 0); // remove this first, append new reset trans in the tail
}
SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId};
SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId, .checkpointId = checkpointId};
// let's remember that this trans had been killed already
void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p);
if (px == NULL) {
mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1);
@ -150,6 +151,7 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t
pReq->streamId = streamId;
pReq->transId = transId;
pReq->checkpointId = checkpointId;
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size};
int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
@ -234,7 +236,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
} else {
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, pMsg->transId);
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
code = mndCreateStreamResetStatusTrans(pMnode, pStream, pMsg->checkpointId);
}
}
@ -495,10 +497,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
continue;
}
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId);
mInfo("stream:0x%" PRIx64 "checkpointId:%" PRId64
" transId:%d failed issue task-reset trans to reset all tasks status",
pInfo->streamUid, pInfo->checkpointId, pInfo->transId);
code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId);
code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId, pInfo->checkpointId);
if (code) {
mError("failed to create reset task trans, code:%s", tstrerror(code));
}

View File

@ -295,7 +295,7 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas
return code;
}
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) {
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
@ -306,6 +306,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
pReq->chkptId = chkptId;
SEpSet epset = {0};
bool hasEpset = false;
@ -544,7 +545,7 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* p
return 0;
}
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) {
SStreamTaskIter *pIter = NULL;
taosWLockLatch(&pStream->lock);
@ -564,7 +565,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
return code;
}
code = doSetResetAction(pMnode, pTrans, pTask);
code = doSetResetAction(pMnode, pTrans, pTask, chkptId);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamTaskIter(pIter);
taosWUnLockLatch(&pStream->lock);

View File

@ -939,7 +939,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
}
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
@ -954,17 +954,13 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
streamMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, true);
streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state == TASK_STATUS__CK) {
int32_t tranId = 0;
int64_t activeChkId = 0;
streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId);
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
pTask->id.idStr, activeChkId, tranId);
streamTaskSetStatusReady(pTask);
tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
} else if (pState.state == TASK_STATUS__UNINIT) {
// tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
// tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);

View File

@ -192,7 +192,6 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask);
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);

View File

@ -562,7 +562,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
}
streamMutexUnlock(&pInfo->lock);
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", current checkpointId:%" PRId64,
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64,
pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
}
@ -682,15 +682,22 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
return TSDB_CODE_SUCCESS;
}
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) {
struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
if (pInfo->activeId <= 0) {
stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
if (failedId <= 0) {
stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64
" activeId:%" PRId64,
pTask->id.idStr, pInfo->failedId, pInfo->activeId);
} else {
pInfo->failedId = pInfo->activeId;
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId,
pInfo->transId);
if (failedId <= pInfo->failedId) {
stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId);
} else {
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64
" prev failedId:%" PRId64,
pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId);
pInfo->failedId = failedId;
}
}
}
@ -698,7 +705,7 @@ void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
streamMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId);
}
streamMutexUnlock(&pTask->lock);
}
@ -876,8 +883,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
}
} else { // clear the checkpoint info if failed
// set failed checkpoint id before clear the checkpoint info
streamMutexLock(&pTask->lock);
streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
streamTaskSetFailedCheckpointId(pTask, ckId);
streamMutexUnlock(&pTask->lock);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);

View File

@ -1246,13 +1246,13 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
taosMemoryFree(pInfo);
}
//NOTE: clear the checkpoint id, and keep the failed id
// NOTE: clear the checkpoint id, and keep the failed id
// failedId for a task will increase as the checkpoint I.D. increases.
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->activeId = 0;
pInfo->transId = 0;
pInfo->allUpstreamTriggerRecv = 0;
pInfo->dispatchTrigger = false;
// pInfo->failedId = 0;
taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList);