fix(stream): reset task add the failed chkptId info.
This commit is contained in:
parent
526904fdd1
commit
e7b3a3aec8
|
@ -3769,7 +3769,14 @@ typedef struct {
|
|||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
|
||||
} SVPauseStreamTaskReq;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int64_t chkptId;
|
||||
} SVResetStreamTaskReq;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_STREAM_FNAME_LEN];
|
||||
|
|
|
@ -712,6 +712,7 @@ int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
|||
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||
SRpcHandleInfo* pInfo, int32_t code);
|
||||
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId);
|
||||
|
||||
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
|
||||
int32_t streamQueueGetNumOfUnAccessedItems(const SStreamQueue* pQueue);
|
||||
|
|
|
@ -56,6 +56,7 @@ typedef struct SStreamTransMgmt {
|
|||
typedef struct SStreamTaskResetMsg {
|
||||
int64_t streamId;
|
||||
int32_t transId;
|
||||
int64_t checkpointId;
|
||||
} SStreamTaskResetMsg;
|
||||
|
||||
typedef struct SChkptReportInfo {
|
||||
|
@ -141,9 +142,9 @@ int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pSt
|
|||
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
|
||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId);
|
||||
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId);
|
||||
int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* pTask, int64_t checkpointId, int64_t ts);
|
||||
int32_t mndStreamSetRestartAction(SMnode* pMnode, STrans *pTrans, SStreamObj* pStream);
|
||||
int32_t mndStreamSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t checkpointId,
|
||||
|
|
|
@ -24,7 +24,7 @@ typedef struct SFailedCheckpointInfo {
|
|||
|
||||
static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
|
||||
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
|
||||
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId);
|
||||
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId);
|
||||
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
|
||||
static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
|
||||
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
|
||||
|
@ -68,7 +68,7 @@ void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
||||
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream, int64_t chkptId) {
|
||||
STrans *pTrans = NULL;
|
||||
int32_t code = doCreateTrans(pMnode, pStream, NULL, TRN_CONFLICT_NOTHING, MND_STREAM_TASK_RESET_NAME,
|
||||
" reset from failed checkpoint", &pTrans);
|
||||
|
@ -84,7 +84,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
|||
return code;
|
||||
}
|
||||
|
||||
code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream);
|
||||
code = mndStreamSetResetTaskAction(pMnode, pTrans, pStream, chkptId);
|
||||
if (code) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -115,7 +115,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId) {
|
||||
int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId, int64_t checkpointId) {
|
||||
int32_t size = sizeof(SStreamTaskResetMsg);
|
||||
|
||||
int32_t num = taosArrayGetSize(execInfo.pKilledChkptTrans);
|
||||
|
@ -135,8 +135,9 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t
|
|||
taosArrayRemove(execInfo.pKilledChkptTrans, 0); // remove this first, append new reset trans in the tail
|
||||
}
|
||||
|
||||
SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId};
|
||||
SStreamTaskResetMsg p = {.streamId = streamId, .transId = transId, .checkpointId = checkpointId};
|
||||
|
||||
// let's remember that this trans had been killed already
|
||||
void *px = taosArrayPush(execInfo.pKilledChkptTrans, &p);
|
||||
if (px == NULL) {
|
||||
mError("failed to push reset-msg trans:%d into the killed chkpt trans list, size:%d", transId, num - 1);
|
||||
|
@ -150,6 +151,7 @@ int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t
|
|||
|
||||
pReq->streamId = streamId;
|
||||
pReq->transId = transId;
|
||||
pReq->checkpointId = checkpointId;
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_TASK_RESET, .pCont = pReq, .contLen = size};
|
||||
int32_t code = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
|
@ -234,7 +236,7 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
|
|||
} else {
|
||||
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
|
||||
pStream->uid, pMsg->transId);
|
||||
code = mndCreateStreamResetStatusTrans(pMnode, pStream);
|
||||
code = mndCreateStreamResetStatusTrans(pMnode, pStream, pMsg->checkpointId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -495,10 +497,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
|||
continue;
|
||||
}
|
||||
|
||||
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
|
||||
pInfo->checkpointId, pInfo->transId);
|
||||
mInfo("stream:0x%" PRIx64 "checkpointId:%" PRId64
|
||||
" transId:%d failed issue task-reset trans to reset all tasks status",
|
||||
pInfo->streamUid, pInfo->checkpointId, pInfo->transId);
|
||||
|
||||
code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId);
|
||||
code = mndSendResetFromCheckpointMsg(pMnode, pInfo->streamUid, pInfo->transId, pInfo->checkpointId);
|
||||
if (code) {
|
||||
mError("failed to create reset task trans, code:%s", tstrerror(code));
|
||||
}
|
||||
|
|
|
@ -295,7 +295,7 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, int64_t chkptId) {
|
||||
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
|
||||
|
@ -306,6 +306,7 @@ static int32_t doSetResetAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
|
|||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
pReq->chkptId = chkptId;
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
|
@ -544,7 +545,7 @@ int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* p
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream, int64_t chkptId) {
|
||||
SStreamTaskIter *pIter = NULL;
|
||||
|
||||
taosWLockLatch(&pStream->lock);
|
||||
|
@ -564,7 +565,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamObj *
|
|||
return code;
|
||||
}
|
||||
|
||||
code = doSetResetAction(pMnode, pTrans, pTask);
|
||||
code = doSetResetAction(pMnode, pTrans, pTask, chkptId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyStreamTaskIter(pIter);
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
|
|
|
@ -935,7 +935,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
|
||||
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
|
||||
SVResetStreamTaskReq* pReq = (SVResetStreamTaskReq*)pMsg;
|
||||
|
||||
SStreamTask* pTask = NULL;
|
||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||
|
@ -950,17 +950,13 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
|
|||
streamMutexLock(&pTask->lock);
|
||||
streamTaskClearCheckInfo(pTask, true);
|
||||
|
||||
streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
|
||||
|
||||
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||
if (pState.state == TASK_STATUS__CK) {
|
||||
int32_t tranId = 0;
|
||||
int64_t activeChkId = 0;
|
||||
streamTaskGetActiveCheckpointInfo(pTask, &tranId, &activeChkId);
|
||||
|
||||
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
|
||||
pTask->id.idStr, activeChkId, tranId);
|
||||
|
||||
streamTaskSetStatusReady(pTask);
|
||||
tqDebug("s-task:%s reset checkpoint status to ready", pTask->id.idStr);
|
||||
} else if (pState.state == TASK_STATUS__UNINIT) {
|
||||
// tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
|
||||
// tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
|
||||
|
|
|
@ -191,7 +191,6 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
|
|||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
|
||||
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
||||
|
||||
void streamTaskSetFailedCheckpointId(SStreamTask* pTask);
|
||||
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
|
||||
int32_t streamTaskGetNumOfUpstream(const SStreamTask* pTask);
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate, const char*);
|
||||
|
|
|
@ -560,7 +560,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
|
|||
}
|
||||
streamMutexUnlock(&pInfo->lock);
|
||||
|
||||
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", current checkpointId:%" PRId64,
|
||||
stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64,
|
||||
pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
|
||||
}
|
||||
|
||||
|
@ -680,15 +680,22 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
|
||||
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) {
|
||||
struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||
|
||||
if (pInfo->activeId <= 0) {
|
||||
stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
|
||||
if (failedId <= 0) {
|
||||
stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64
|
||||
" activeId:%" PRId64,
|
||||
pTask->id.idStr, pInfo->failedId, pInfo->activeId);
|
||||
} else {
|
||||
pInfo->failedId = pInfo->activeId;
|
||||
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId,
|
||||
pInfo->transId);
|
||||
if (failedId <= pInfo->failedId) {
|
||||
stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId);
|
||||
} else {
|
||||
stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64
|
||||
" prev failedId:%" PRId64,
|
||||
pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId);
|
||||
pInfo->failedId = failedId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -696,7 +703,7 @@ void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
|
|||
streamMutexLock(&pTask->lock);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||
if (status == TASK_STATUS__CK) {
|
||||
streamTaskSetFailedCheckpointId(pTask);
|
||||
streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId);
|
||||
}
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
}
|
||||
|
@ -874,8 +881,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
|||
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
|
||||
}
|
||||
} else { // clear the checkpoint info if failed
|
||||
// set failed checkpoint id before clear the checkpoint info
|
||||
streamMutexLock(&pTask->lock);
|
||||
streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
|
||||
streamTaskSetFailedCheckpointId(pTask, ckId);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||
|
|
|
@ -1249,12 +1249,12 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
|
|||
}
|
||||
|
||||
// NOTE: clear the checkpoint id, and keep the failed id
|
||||
// failedId for a task will increase as the checkpoint I.D. increases.
|
||||
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
|
||||
pInfo->activeId = 0;
|
||||
pInfo->transId = 0;
|
||||
pInfo->allUpstreamTriggerRecv = 0;
|
||||
pInfo->dispatchTrigger = false;
|
||||
// pInfo->failedId = 0;
|
||||
|
||||
taosArrayClear(pInfo->pDispatchTriggerList);
|
||||
taosArrayClear(pInfo->pCheckpointReadyRecvList);
|
||||
|
|
Loading…
Reference in New Issue