fix(stream): add lost codes caused by merge 3.0.

This commit is contained in:
Haojun Liao 2023-08-17 18:51:39 +08:00
parent 58c0a3d8ef
commit a18989df88
3 changed files with 32 additions and 11 deletions

View File

@ -184,6 +184,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp)

View File

@ -40,11 +40,15 @@ typedef struct SNodeEntry {
} SNodeEntry;
typedef struct SStreamVnodeRevertIndex {
SArray* pDBList;
SArray* pNodeEntryList;
int64_t ts; // snapshot ts
} SStreamVnodeRevertIndex;
typedef struct SVgroupChangeInfo {
SHashObj* pDBMap;
SArray* pUpdateNodeList; //SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static int32_t mndNodeCheckSentinel = 0;
static SStreamVnodeRevertIndex execNodeList;
@ -68,6 +72,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
int64_t streamId, int32_t taskId);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans);
@ -86,7 +91,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheckReq);
mndSetMsgHandle(pMnode, TDMT_MND_NODECHECK_TIMER, mndProcessNodeCheck);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
@ -100,6 +105,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
@ -868,9 +874,9 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
pMsg->checkpointId = checkpointId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)};
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
@ -1741,11 +1747,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
typedef struct SVgroupChangeInfo {
SHashObj* pDBMap;
SArray* pUpdateNodeList; //SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo, int64_t streamId, int32_t taskId) {
pMsg->streamId = streamId;
pMsg->taskId = taskId;
@ -1973,7 +1974,7 @@ static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) {
return pVgroupListSnapshot;
}
int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
SSdb *pSdb = pMnode->pSdb;
// check all streams that involved this vnode should update the epset info
@ -2093,6 +2094,21 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
}
typedef struct SMStreamNodeCheckMsg{} SMStreamNodeCheckMsg;
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
SMStreamNodeCheckMsg *pMsg = rpcMallocCont(sizeof(SMStreamNodeCheckMsg));
SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
// todo: this process should be executed by the write queue worker of the mnode
//int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// SMnode *pMnode = pReq->info.node;

View File

@ -171,6 +171,10 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
pTask->id.idStr);
} else {
if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
pTask->status.appendTranstateBlock = true;
}
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
// input queue is full, upstream is blocked now
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;