enh(stream): send msg to mnode when the transferring state completed.

This commit is contained in:
Haojun Liao 2024-01-18 15:43:36 +08:00
parent 719d1d1b90
commit 76e6c453e7
9 changed files with 104 additions and 38 deletions

View File

@ -217,6 +217,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_COMPACT, "kill-compact", SKillCompactReq, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_TIMER, "compact-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_REQ_CHKPT, "stream-req-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)

View File

@ -640,6 +640,7 @@ typedef struct {
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
typedef struct {
int64_t streamId;
int64_t checkpointId;
@ -648,6 +649,7 @@ typedef struct {
SEpSet mgmtEps;
int32_t mnodeId;
int32_t transId;
int8_t mndTrigger;
int64_t expireTime;
} SStreamCheckpointSourceReq;
@ -770,6 +772,15 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
typedef struct SStreamTaskCheckpointReq {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SStreamTaskCheckpointReq;
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq);
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
@ -839,6 +850,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask);
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);

View File

@ -767,7 +767,7 @@ _OVER:
pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER ||
pMsg->msgType == TDMT_MND_COMPACT_TIMER || pMsg->msgType == TDMT_MND_NODECHECK_TIMER ||
pMsg->msgType == TDMT_MND_GRANT_HB_TIMER || pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE ||
pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER) {
pMsg->msgType == TDMT_MND_STREAM_CHECKPOINT_TIMER || pMsg->msgType == TDMT_MND_STREAM_REQ_CHKPT) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, state.restored, syncStr(state.state));
return -1;

View File

@ -67,7 +67,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
@ -130,6 +130,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, mndProcessStreamCheckpointInCandid);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
@ -980,22 +981,6 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
return 0;
}
static int32_t mndProcessStreamRemainChkptTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
pMsg->checkpointId = 0;
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CHECKPOINT_CANDIDITATE, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId, int32_t transId) {
SStreamCheckpointSourceReq req = {0};
@ -1005,6 +990,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
req.streamId = streamId; // pTask->id.streamId;
req.taskId = taskId; // pTask->id.taskId;
req.transId = transId;
req.mndTrigger = 1;
int32_t code;
int32_t blen;
@ -3093,3 +3079,26 @@ SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
return NULL;
}
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamTaskCheckpointReq req = {0};
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamTaskCheckpointReq(&decoder, &req)) {
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
mError("invalid task checkpoint req msg received");
return -1;
}
tDecoderClear(&decoder);
mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
return 0;
}

View File

@ -235,7 +235,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);

View File

@ -36,6 +36,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->transId) < 0) return -1;
if (tEncodeI8(pEncoder, pReq->mndTrigger) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -50,6 +51,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->transId) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->mndTrigger) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}

View File

@ -398,13 +398,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// 3. resume the state of stream task, after this function, the stream task will run immediately.
streamTaskResume(pStreamTask);
// 3. send msg to mnode to launch a checkpoint to keep the state for current stream
streamTaskSendCheckpointReq(pStreamTask);
// streamTaskResume(pStreamTask);
stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id);
// stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id);
// 4. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
@ -412,20 +413,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 6. open the inputQ for all upstream tasks
streamTaskOpenAllUpstreamInput(pStreamTask);
// 7. add empty delete block
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
pDelBlock->info.rows = 0;
pDelBlock->info.version = 0;
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
pItem->pBlock = pDelBlock;
int32_t code = streamTaskPutDataIntoInputQ(pStreamTask, (SStreamQueueItem*)pItem);
stDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code);
}
streamSchedExec(pStreamTask);
// streamSchedExec(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
return TSDB_CODE_SUCCESS;
}
@ -443,7 +431,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
} else { // drop fill-history task and open inputQ of sink task
} else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask != NULL) {
streamTaskOpenAllUpstreamInput(pStreamTask);

View File

@ -1054,6 +1054,23 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
return 0;
}
int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
return 0;
}
int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;

View File

@ -852,3 +852,41 @@ void streamTaskResume(SStreamTask* pTask) {
bool streamTaskIsSinkTask(const SStreamTask* pTask) {
return pTask->info.taskLevel == TASK_LEVEL__SINK;
}
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
int32_t code;
int32_t tlen = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SStreamTaskCheckpointReq req = {0};
tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
if (code < 0) {
stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskCheckpointReq(&encoder, &req)) < 0) {
rpcFreeCont(buf);
stError("s-task:%s vgId:%d encode stream task req checkpoint msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
tEncoderClear(&encoder);
SRpcMsg msg = {.info.noResp = 1};
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;
}