fix(stream): transfer the checkpoint trans id.

This commit is contained in:
Haojun Liao 2023-12-21 19:12:49 +08:00
parent 2acc8423c5
commit 84583580e2
6 changed files with 51 additions and 37 deletions

View File

@ -314,6 +314,7 @@ typedef struct SCheckpointInfo {
int32_t checkpointNotReadyTasks;
bool dispatchCheckpointTrigger;
int64_t msgVer;
int32_t transId;
} SCheckpointInfo;
typedef struct SStreamStatus {
@ -635,6 +636,7 @@ typedef struct {
int32_t nodeId;
SEpSet mgmtEps;
int32_t mnodeId;
int32_t transId;
int64_t expireTime;
} SStreamCheckpointSourceReq;
@ -677,8 +679,8 @@ typedef struct STaskStatusEntry {
int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task
int32_t relatedHTask; // has related fill-history task
int64_t activeCheckpointId; // current active checkpoint id
int32_t chkpointTransId; // checkpoint trans id
bool checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter;

View File

@ -44,12 +44,11 @@ typedef struct SStreamTransMgmt {
} SStreamTransMgmt;
typedef struct SStreamExecInfo {
SArray * pNodeList;
SArray *pNodeList;
int64_t ts; // snapshot ts
SStreamTransMgmt transMgmt;
int64_t activeCheckpoint; // active check point id
SHashObj * pTaskMap;
SArray * pTaskList;
SHashObj *pTaskMap;
SArray *pTaskList;
TdThreadMutex lock;
} SStreamExecInfo;

View File

@ -62,7 +62,7 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId);
int64_t streamId, int32_t taskId, int32_t transId);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
@ -997,13 +997,14 @@ static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) {
}
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId) {
int64_t streamId, int32_t taskId, int32_t transId) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = checkpointId;
req.nodeId = nodeId;
req.expireTime = -1;
req.streamId = streamId; // pTask->id.streamId;
req.taskId = taskId; // pTask->id.taskId;
req.transId = transId;
int32_t code;
int32_t blen;
@ -1093,7 +1094,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
void * buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId) < 0) {
pTask->id.taskId, pTrans->id) < 0) {
mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock);
goto _ERR;
@ -1162,7 +1163,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
void * buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
pTask->id.taskId) < 0) {
pTask->id.taskId, pTrans->id) < 0) {
mndReleaseVgroup(pMnode, pVgObj);
taosWUnLockLatch(&pStream->lock);
return -1;
@ -2837,39 +2838,36 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t le
return TSDB_CODE_SUCCESS;
}
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t code = TSDB_CODE_SUCCESS;
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire checkpoint trans:%d", transId);
}
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
SStreamObj *pStream = mndGetStreamObj(pMnode, streamId);
if (pStream == NULL) {
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
mError("failed to acquire the streamObj:0x%" PRIx64 " to reset checkpoint, may have been dropped", pStream->uid);
} else {
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
if (conflict) {
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetDb);
continue;
}
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid);
int32_t code = createStreamResetStatusTrans(pMnode, pStream);
if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pSdb, pIter);
return code;
mError("stream:%s other trans exists in DB:%s, dstTable:%s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetSTbName);
} else {
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, transId:%d, create reset trans", pStream->name,
pStream->uid, transId);
code = createStreamResetStatusTrans(pMnode, pStream);
mndReleaseStream(pMnode, pStream);
}
}
return 0;
return code;
}
static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
@ -3007,6 +3005,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
bool checkpointFailed = false;
int64_t activeCheckpointId = 0;
int64_t streamId = 0;
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
@ -3049,7 +3048,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (pTaskEntry->stage != p->stage && pTaskEntry->stage != -1) {
updateStageInfo(pTaskEntry, p->stage);
if (pTaskEntry->nodeId == SNODE_HANDLE) snodeChanged = true;
if (pTaskEntry->nodeId == SNODE_HANDLE) {
snodeChanged = true;
}
} else {
// task is idle for more than 50 sec.
if (fabs(pTaskEntry->inputQUsed - p->inputQUsed) <= DBL_EPSILON) {
@ -3073,6 +3074,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (p->checkpointFailed) {
checkpointFailed = p->checkpointFailed;
streamId = p->id.streamId;
}
}
}
@ -3111,9 +3113,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status",
execInfo.activeCheckpoint);
mndResetStatusFromCheckpoint(pMnode, activeCheckpointId);
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId);
mndResetStatusFromCheckpoint(pMnode, streamId, activeCheckpointId);
} else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks");
}

View File

@ -25,6 +25,7 @@ typedef struct {
SStreamTask* pTask;
} SAsyncUploadArg;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
@ -34,6 +35,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp
if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -47,6 +49,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
@ -149,6 +152,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
pTask->chkInfo.transId = pReq->transId;
pTask->chkInfo.checkpointingId = pReq->checkpointId;
pTask->chkInfo.checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs();
@ -273,8 +277,9 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->chkInfo.failedId = 0;
pTask->chkInfo.startTs = 0; // clear the recorded start time
pTask->chkInfo.checkpointNotReadyTasks = 0;
// pTask->chkInfo.checkpointAlignCnt = 0;
pTask->chkInfo.transId = 0;
pTask->chkInfo.dispatchCheckpointTrigger = false;
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask);

View File

@ -940,6 +940,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1;
}
int32_t numOfVgs = taosArrayGetSize(pReq->pUpdateNodes);
@ -978,6 +979,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.activeCheckpointId) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&entry.checkpointFailed) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1;
entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry);
@ -1103,6 +1105,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId);
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId;
}
if ((*pTask)->exec.pWalReader != NULL) {
@ -1350,10 +1353,13 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
}
taosThreadMutexLock(&pTask->lock);
ETaskStatus s = streamTaskGetStatus(pTask, NULL);
char* p = NULL;
ETaskStatus s = streamTaskGetStatus(pTask, &p);
if (s == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask);
stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId);
} else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, p);
}
taosThreadMutexUnlock(&pTask->lock);

View File

@ -796,5 +796,6 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->sinkDataSize = pSrc->sinkDataSize;
pDst->activeCheckpointId = pSrc->activeCheckpointId;
pDst->checkpointFailed = pSrc->checkpointFailed;
pDst->chkpointTransId = pSrc->chkpointTransId;
}