fix(stream): fix error in generating checkpoint.

This commit is contained in:
Haojun Liao 2023-07-07 21:38:53 +08:00
parent 4e0b32cddc
commit 9c30abf95b
7 changed files with 33 additions and 29 deletions

View File

@ -622,7 +622,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq); int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointRsp* pRsp); int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask);

View File

@ -220,8 +220,8 @@ int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen); int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckStreamStatus(STQ* pTq); int32_t tqCheckStreamStatus(STQ* pTq);
int tqCommit(STQ*); int tqCommit(STQ*);

View File

@ -819,6 +819,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} }
pTask->pRpcMsgList = taosArrayInit(4, POINTER_BYTES);
// sink // sink
if (pTask->outputType == TASK_OUTPUT__SMA) { if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.vnode = pTq->pVnode; pTask->smaSink.vnode = pTq->pVnode;
@ -1253,7 +1255,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// even in halt status, the data in inputQ must be processed // even in halt status, the data in inputQ must be processed
int8_t status = pTask->status.taskStatus; int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT) { if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__CK) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.version); pTask->chkInfo.version);
streamProcessRunReq(pTask); streamProcessRunReq(pTask);
@ -1546,14 +1548,14 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
return code; return code;
} }
int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamTaskCheckpointReq req= {0}; SStreamTaskCheckpointReq req = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len); tDecoderInit(&decoder, (uint8_t*)msg, len);
@ -1574,21 +1576,22 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return code;
FAIL: FAIL:
return code; return code;
} }
// downstream task has complete the stream task checkpoint procedure // downstream task has complete the stream task checkpoint procedure
int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* pMsg, int32_t msgLen) { int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) {
// if this task is an agg task, rsp this message to upstream directly. // if this task is an agg task, rsp this message to upstream directly.
// if this task is an source task, send source rsp to mnode // if this task is an source task, send source rsp to mnode
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0; int32_t code = 0;
SStreamTaskCheckpointRsp req= {0}; SStreamTaskCheckpointRsp req = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len); tDecoderInit(&decoder, (uint8_t*)msg, len);
@ -1605,7 +1608,9 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, int64_t sversion, char* pMsg, int
goto FAIL; goto FAIL;
} }
streamProcessCheckpointRsp(pMeta, pTask, &req); tqDebug("vgId:%d s-task:%s received the checkpoint rsp, handle it", vgId, pTask->id.idStr);
streamProcessCheckpointRsp(pMeta, pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return code;

View File

@ -497,17 +497,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_TASK_CHECKPOINT: {
if (tqProcessStreamCheckPointReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_CHECKPOINT_RSP: {
if (tqProcessStreamCheckPointRsp(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_VND_ALTER_CONFIRM: case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange; needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
@ -684,6 +673,10 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_CHECK_POINT_SOURCE: case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg); return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT:
return tqProcessStreamCheckPointReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_RSP:
return tqProcessStreamCheckPointRsp(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in stream queue", pMsg->msgType); vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;

View File

@ -160,6 +160,9 @@ static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t chec
pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i); pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId, i);
streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
} else { // no need to dispatch msg to downstream task
qDebug("s-task:%s no down stream task, not dispatch checkpoint msg to downstream", pTask->id.idStr);
streamProcessCheckpointRsp(NULL, pTask);
} }
return 0; return 0;
@ -215,7 +218,7 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
qDebug("s-task:%s sink task set to checkpoint ready", pTask->id.idStr); qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr);
appendCheckpointIntoInputQ(pTask); appendCheckpointIntoInputQ(pTask);
streamSchedExec(pTask); streamSchedExec(pTask);
} else { } else {
@ -248,7 +251,7 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre
* All down stream tasks have successfully completed the check point task. * All down stream tasks have successfully completed the check point task.
* Current stream task is allowed to start to do checkpoint things in ASYNC model. * Current stream task is allowed to start to do checkpoint things in ASYNC model.
*/ */
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointRsp* pRsp) { int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
@ -258,6 +261,8 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre
pTask->id.idStr); pTask->id.idStr);
appendCheckpointIntoInputQ(pTask); appendCheckpointIntoInputQ(pTask);
streamSchedExec(pTask); streamSchedExec(pTask);
} else {
qDebug("s-task:%s %d downstream tasks are not ready, wait", pTask->id.idStr, notReady);
} }
return 0; return 0;

View File

@ -470,7 +470,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_CHECKPOINT) { } else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*) pInput; const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*) pInput;
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT);
} else { } else {

View File

@ -45,6 +45,7 @@ const char* streamGetTaskStatusStr(int32_t status) {
case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__HALT: return "halt";
case TASK_STATUS__PAUSE: return "paused"; case TASK_STATUS__PAUSE: return "paused";
case TASK_STATUS__CK: return "check-point"; case TASK_STATUS__CK: return "check-point";
case TASK_STATUS__CK_READY: return "check-point-ready";
default:return ""; default:return "";
} }
} }