fix(stream): fix error in parsing checkpoint msg.

This commit is contained in:
Haojun Liao 2023-07-07 18:17:50 +08:00
parent b03ca31a7f
commit 128f67784a
3 changed files with 17 additions and 12 deletions

View File

@ -219,7 +219,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen);
int32_t tqCheckStreamStatus(STQ* pTq);

View File

@ -1492,14 +1492,14 @@ FAIL:
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) {
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead);
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamCheckpointSourceReq req= {0};
SStreamCheckpointSourceReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
@ -1524,11 +1524,20 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, int64_t sversion, char* pMs
taosArrayPush(pTask->pRpcMsgList, &pRpcMsg);
// todo: when generating checkpoint, no new tasks are allowed to add into current Vnode
// set the initial value for generating check point
int32_t total = 0;
taosWLockLatch(&pMeta->lock);
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
}
total = taosArrayGetSize(pMeta->pTaskList);
taosWUnLockLatch(&pMeta->lock);
qDebug("s-task:%s level:%d receive the checkpoint msg id:%" PRId64 " from mnode, total source checkpoint req:%d",
pTask->id.idStr, pTask->info.taskLevel, req.checkpointId, total);
streamProcessCheckpointSourceReq(pMeta, pTask, &req);
streamMetaReleaseTask(pMeta, pTask);
return code;

View File

@ -498,12 +498,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
}
} break;
case TDMT_VND_STREAM_CHECK_POINT_SOURCE: {
if (tqProcessStreamCheckPointSourceReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_CHECKPOINT: {
if (tqProcessStreamCheckPointReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
@ -688,6 +682,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;