enh(stream): add rsp for req-checkpoint

This commit is contained in:
Haojun Liao 2024-02-27 10:53:25 +08:00
parent 81a1eac87a
commit dafac591f7
13 changed files with 36 additions and 10 deletions

View File

@ -27,6 +27,7 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
bool isLeader, bool restored); bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);

View File

@ -89,6 +89,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0; code = 0;
_OVER: _OVER:

View File

@ -838,6 +838,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -86,6 +86,10 @@ typedef struct SOrphanTask {
int32_t nodeId; int32_t nodeId;
} SOrphanTask; } SOrphanTask;
typedef struct {
SMsgHead head;
} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg;
int32_t mndInitStream(SMnode *pMnode); int32_t mndInitStream(SMnode *pMnode);
void mndCleanupStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode);
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);

View File

@ -308,8 +308,8 @@ static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
} }
} }
mError("failed to find the vgId:%d for extract last version, total existed vgs:%d", vgId, size); mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
return -1; return 1;
} }
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) { static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {

View File

@ -877,7 +877,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
// mWarn("checkpoint interval less than the threshold, ignore it"); // mWarn("checkpoint interval less than the threshold, ignore it");
return -1; return TSDB_CODE_SUCCESS;
} }
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
@ -2179,5 +2179,16 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
mndReleaseStream(pMnode, pStream); mndReleaseStream(pMnode, pStream);
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
{
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead* pHead = rsp.pCont;
pHead->vgId = htonl(req.nodeId);
tmsgSendRsp(&rsp);
pReq->info.handle = NULL; // disable auto rsp
}
return 0; return 0;
} }

View File

@ -16,10 +16,6 @@
#include "mndStream.h" #include "mndStream.h"
#include "mndTrans.h" #include "mndTrans.h"
typedef struct {
SMsgHead head;
} SMStreamHbRspMsg;
typedef struct SFailedCheckpointInfo { typedef struct SFailedCheckpointInfo {
int64_t streamUid; int64_t streamUid;
int64_t checkpointId; int64_t checkpointId;

View File

@ -180,6 +180,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg); return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
case TDMT_MND_STREAM_HEARTBEAT_RSP: case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg); return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg);
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg);
default: default:
sndError("invalid snode msg:%d", pMsg->msgType); sndError("invalid snode msg:%d", pMsg->msgType);
ASSERT(0); ASSERT(0);

View File

@ -243,6 +243,7 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq); int32_t tqScanWal(STQ* pTq);

View File

@ -1225,3 +1225,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);
} }
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
}

View File

@ -938,9 +938,13 @@ int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
return taosArrayGetSize(pMeta->pTaskList); return taosArrayGetSize(pMeta->pTaskList);
} }
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL; pMsg->pCont = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }

View File

@ -798,6 +798,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_HEARTBEAT_RSP: case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqProcessStreamHbRsp(pVnode->pTq, pMsg); return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
default: default:
vError("unknown msg type:%d in stream queue", pMsg->msgType); vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;

View File

@ -934,9 +934,8 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
SRpcMsg msg = {.info.noResp = 1}; SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen); initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId); stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg); tmsgSendReq(&pTask->info.mnodeEpset, &msg);