refactor: do some internal refactor (discard the checkpoint source msg during restoring).

This commit is contained in:
Haojun Liao 2023-09-28 22:44:13 +08:00
parent bb4ba54f28
commit 83f84d92bc
5 changed files with 21 additions and 19 deletions

View File

@ -224,7 +224,7 @@ int tqPushMsg(STQ*, tmsg_t msgType);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqScanWalAsync(STQ* pTq, bool ckPause);
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen);

View File

@ -1638,7 +1638,7 @@ FAIL:
}
// todo error code cannot be return, since this is invoked by an mnode-launched transaction.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -1648,6 +1648,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs
// disable auto rsp to source
pRsp->info.handle = NULL;
// todo: add counter to make sure other tasks would not be trapped in checkpoint state
SStreamCheckpointSourceReq req = {0};
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
@ -1657,6 +1658,14 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs
return TSDB_CODE_SUCCESS;
}
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d checkpoint-source msg received during restoring, ignore it", vgId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) {
@ -1680,6 +1689,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs
return TSDB_CODE_SUCCESS;
}
// todo: handle the partial failure cases
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
if (pTask->status.downstreamReady != 1) {
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
@ -1731,8 +1741,6 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs
return code;
}
// todo: when generating checkpoint, no new tasks are allowed to add into current Vnode
// todo: when generating checkpoint, leader of mnode has transfer to other DNode?
streamMetaReleaseTask(pMeta, pTask);
return code;
}

View File

@ -605,7 +605,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
break;
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg, pRsp);
tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp);
break;
case TDMT_VND_STREAM_TASK_UPDATE:
tqProcessTaskUpdateReq(pVnode->pTq, pMsg);

View File

@ -716,9 +716,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
taosArrayPush(pMeta->pTaskList, &pTask->id);
} else {
// todo this should replace the existed object put by replay creating stream task msg from mnode
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
ASSERT(0);
tdbFree(pKey);
tdbFree(pVal);
taosMemoryFree(pTask);

View File

@ -583,16 +583,12 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
SDataRange* pRange = &pHTask->dataRange;
pRange->range.minVer = 0;
// todo remove this
// the query version range should be limited to the already processed data
pRange->range.minVer = 0;
pRange->range.maxVer = pTask->chkInfo.nextProcessVer - 1;
if (pRange->range.maxVer < pRange->range.minVer) {
pRange->range.maxVer = pRange->range.minVer;
}
pHTask->execInfo.init = taosGetTimestampMs();
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
" ver range:%" PRId64 " - %" PRId64", init:%"PRId64,
@ -890,8 +886,7 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
if (pTask->hTaskInfo.id.taskId == 0) {
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
"-%" PRId64,
stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64,
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
} else {
stDebug(