fix(stream): handle TDMT_STREAM_CHKPT_EXEC in stream queue.

This commit is contained in:
Haojun Liao 2025-03-11 13:57:31 +08:00
parent 46dcdf36b7
commit 90aa27ee23
5 changed files with 17 additions and 2 deletions

View File

@ -47,7 +47,7 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pMsg);
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg);
int32_t smPutNodeMsgToChkptQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
#ifdef __cplusplus
}

View File

@ -102,6 +102,8 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER;
code = 0;
_OVER:
if (code != 0) {

View File

@ -162,6 +162,9 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
case WRITE_QUEUE:
code = smPutNodeMsgToWriteQueue(pMgmt, pMsg);
break;
case STREAM_CHKPT_QUEUE:
code = smPutNodeMsgToStreamQueue(pMgmt, pMsg);
break;
default:
code = TSDB_CODE_INVALID_PARA;
rpcFreeCont(pMsg->pCont);
@ -172,7 +175,6 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
}
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
if (pWorker == NULL) {
return TSDB_CODE_INVALID_MSG;
@ -198,3 +200,10 @@ int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return taosWriteQitem(pWorker->queue, pMsg);
}
//int32_t smPutNodeMsgToChkptQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
// SSingleWorker *pWorker = &pMgmt->chkptWorker;
//
// dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
// return taosWriteQitem(pWorker->queue, pMsg);
//}

View File

@ -138,6 +138,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg);
case TDMT_STREAM_CHKPT_EXEC:
return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true);
default:
sndError("invalid snode msg:%d", pMsg->msgType);
return TSDB_CODE_INVALID_MSG;

View File

@ -22,6 +22,8 @@ from util.cluster import *
# should be used by -N option
class TDTestCase:
updatecfgDict = {'checkpointInterval': 60 ,
'vdebugflag':143,
'ddebugflag':143
}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)