diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9b96a41516..9137965849 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -314,6 +314,7 @@ typedef struct SCheckpointInfo { int32_t checkpointNotReadyTasks; bool dispatchCheckpointTrigger; int64_t msgVer; + int32_t transId; } SCheckpointInfo; typedef struct SStreamStatus { @@ -635,6 +636,7 @@ typedef struct { int32_t nodeId; SEpSet mgmtEps; int32_t mnodeId; + int32_t transId; int64_t expireTime; } SStreamCheckpointSourceReq; @@ -677,8 +679,8 @@ typedef struct STaskStatusEntry { int64_t verStart; // start version in WAL, only valid for source task int64_t verEnd; // end version in WAL, only valid for source task int64_t processedVer; // only valid for source task - int32_t relatedHTask; // has related fill-history task int64_t activeCheckpointId; // current active checkpoint id + int32_t chkpointTransId; // checkpoint trans id bool checkpointFailed; // denote if the checkpoint is failed or not bool inputQChanging; // inputQ is changing or not int64_t inputQUnchangeCounter; diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b0f6273acc..4d06b16297 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -44,12 +44,11 @@ typedef struct SStreamTransMgmt { } SStreamTransMgmt; typedef struct SStreamExecInfo { - SArray * pNodeList; + SArray *pNodeList; int64_t ts; // snapshot ts SStreamTransMgmt transMgmt; - int64_t activeCheckpoint; // active check point id - SHashObj * pTaskMap; - SArray * pTaskList; + SHashObj *pTaskMap; + SArray *pTaskList; TdThreadMutex lock; } SStreamExecInfo; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d5bb8b2b92..b04a707728 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -62,7 +62,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq); static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq); static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, - int64_t streamId, int32_t taskId); + int64_t streamId, int32_t taskId, int32_t transId); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static SArray *extractNodeListFromStream(SMnode *pMnode); @@ -997,13 +997,14 @@ static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) { } static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId, - int64_t streamId, int32_t taskId) { + int64_t streamId, int32_t taskId, int32_t transId) { SStreamCheckpointSourceReq req = {0}; req.checkpointId = checkpointId; req.nodeId = nodeId; req.expireTime = -1; req.streamId = streamId; // pTask->id.streamId; req.taskId = taskId; // pTask->id.taskId; + req.transId = transId; int32_t code; int32_t blen; @@ -1093,7 +1094,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre void * buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, - pTask->id.taskId) < 0) { + pTask->id.taskId, pTrans->id) < 0) { mndReleaseVgroup(pMnode, pVgObj); taosWUnLockLatch(&pStream->lock); goto _ERR; @@ -1162,7 +1163,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream void * buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId, - pTask->id.taskId) < 0) { + pTask->id.taskId, pTrans->id) < 0) { mndReleaseVgroup(pMnode, pVgObj); taosWUnLockLatch(&pStream->lock); return -1; @@ -2837,39 +2838,36 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t le return TSDB_CODE_SUCCESS; } -static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) { +static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { + int32_t code = TSDB_CODE_SUCCESS; + STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mInfo("kill checkpoint transId:%d to reset task status", transId); mndKillTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); + } else { + mError("failed to acquire checkpoint trans:%d", transId); } - // set all tasks status to be normal, refactor later to be stream level, instead of vnode level. - SSdb * pSdb = pMnode->pSdb; - SStreamObj *pStream = NULL; - void * pIter = NULL; - while (1) { - pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); - if (pIter == NULL) { - break; - } - + SStreamObj *pStream = mndGetStreamObj(pMnode, streamId); + if (pStream == NULL) { + code = TSDB_CODE_STREAM_TASK_NOT_EXIST; + mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid); + } else { bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false); if (conflict) { - mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name, - pStream->sourceDb, pStream->targetDb); - continue; - } - - mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid); - int32_t code = createStreamResetStatusTrans(pMnode, pStream); - if (code != TSDB_CODE_SUCCESS) { - sdbCancelFetch(pSdb, pIter); - return code; + mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name, + pStream->sourceDb, pStream->targetSTbName); + } else { + mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name, + pStream->uid, transId); + code = createStreamResetStatusTrans(pMnode, pStream); + mndReleaseStream(pMnode, pStream); } } - return 0; + + return code; } static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) { @@ -3007,6 +3005,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { bool checkpointFailed = false; int64_t activeCheckpointId = 0; + int64_t streamId = 0; SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); @@ -3049,7 +3048,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) { updateStageInfo(pTaskEntry, p->stage); - if (pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true; + if (pTaskEntry->nodeId == SNODE_HANDLE) { + snodeChanged = true; + } } else { // task is idle for more than 50 sec. if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) { @@ -3073,6 +3074,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (p->checkpointFailed) { checkpointFailed = p->checkpointFailed; + streamId = p->id.streamId; } } } @@ -3111,9 +3113,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal - mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", - execInfo.activeCheckpoint); - mndResetStatusFromCheckpoint(pMnode, activeCheckpointId); + mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId); + mndResetStatusFromCheckpoint(pMnode, streamId, activeCheckpointId); } else { mInfo("not all vgroups are ready, wait for next HB from stream tasks"); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4a99d54de1..2a766f21ec 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -25,6 +25,7 @@ typedef struct { SStreamTask* pTask; } SAsyncUploadArg; + int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; @@ -34,6 +35,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1; if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -47,6 +49,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1; tEndDecode(pDecoder); return 0; } @@ -149,6 +152,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); + pTask->chkInfo.transId = pReq->transId; pTask->chkInfo.checkpointingId = pReq->checkpointId; pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); pTask->chkInfo.startTs = taosGetTimestampMs(); @@ -273,8 +277,9 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) { pTask->chkInfo.failedId = 0; pTask->chkInfo.startTs = 0; // clear the recorded start time pTask->chkInfo.checkpointNotReadyTasks = 0; - // pTask->chkInfo.checkpointAlignCnt = 0; + pTask->chkInfo.transId = 0; pTask->chkInfo.dispatchCheckpointTrigger = false; + streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks if (clearChkpReadyMsg) { streamClearChkptReadyMsg(pTask); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 355e17db9a..d1df74b8d3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -940,6 +940,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; + if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1; } int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes); @@ -978,6 +979,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; if (tDecodeI64(pDecoder, &entry.activeCheckpointId) < 0) return -1; if (tDecodeI8(pDecoder, (int8_t*)&entry.checkpointFailed) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1; entry.id.taskId = taskId; taosArrayPush(pReq->pTaskStatus, &entry); @@ -1103,6 +1105,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { if ((*pTask)->chkInfo.checkpointingId != 0) { entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId); entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId; + entry.chkpointTransId = (*pTask)->chkInfo.transId; } if ((*pTask)->exec.pWalReader != NULL) { @@ -1350,10 +1353,13 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) { } taosThreadMutexLock(&pTask->lock); - ETaskStatus s = streamTaskGetStatus(pTask, NULL); + char* p = NULL; + ETaskStatus s = streamTaskGetStatus(pTask, &p); if (s == TASK_STATUS__CK) { streamTaskSetCheckpointFailedId(pTask); stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId); + } else { + stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, p); } taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 335f9d27d5..4d343a484d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -796,5 +796,6 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc) pDst->sinkDataSize = pSrc->sinkDataSize; pDst->activeCheckpointId = pSrc->activeCheckpointId; pDst->checkpointFailed = pSrc->checkpointFailed; + pDst->chkpointTransId = pSrc->chkpointTransId; }