Merge pull request #24895 from taosdata/fix/nullcheck
enh(stream):rsp the checkpoint ready msg.
This commit is contained in:
commit
9762c53cf9
|
@ -27,6 +27,8 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
|||
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
|
||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
|
||||
bool isLeader, bool restored);
|
||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
|
||||
|
|
|
@ -87,8 +87,10 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
|
|
|
@ -835,9 +835,11 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -86,6 +86,10 @@ typedef struct SOrphanTask {
|
|||
int32_t nodeId;
|
||||
} SOrphanTask;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg;
|
||||
|
||||
int32_t mndInitStream(SMnode *pMnode);
|
||||
void mndCleanupStream(SMnode *pMnode);
|
||||
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName);
|
||||
|
|
|
@ -300,15 +300,16 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet*
|
|||
}
|
||||
|
||||
static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) {
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
|
||||
int32_t size = (int32_t) taosArrayGetSize(pList);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SVgroupVer* pVer = taosArrayGet(pList, i);
|
||||
if (pVer->vgId == vgId) {
|
||||
return pVer->ver;
|
||||
}
|
||||
}
|
||||
|
||||
mError("failed to find the vgId:%d for extract last version", vgId);
|
||||
return -1;
|
||||
mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size);
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) {
|
||||
|
@ -472,6 +473,9 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream
|
|||
int code =
|
||||
doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam);
|
||||
if (code != 0) {
|
||||
mError("create stream task, code:%s", tstrerror(code));
|
||||
|
||||
// todo drop the added source tasks.
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -877,7 +877,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
|||
int64_t ts = taosGetTimestampMs();
|
||||
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
|
||||
// mWarn("checkpoint interval less than the threshold, ignore it");
|
||||
return -1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
|
||||
|
@ -2179,5 +2179,16 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
|||
mndReleaseStream(pMnode, pStream);
|
||||
taosThreadMutexUnlock(&execInfo.lock);
|
||||
|
||||
{
|
||||
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)};
|
||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||
SMsgHead* pHead = rsp.pCont;
|
||||
pHead->vgId = htonl(req.nodeId);
|
||||
|
||||
tmsgSendRsp(&rsp);
|
||||
|
||||
pReq->info.handle = NULL; // disable auto rsp
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -16,10 +16,6 @@
|
|||
#include "mndStream.h"
|
||||
#include "mndTrans.h"
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
} SMStreamHbRspMsg;
|
||||
|
||||
typedef struct SFailedCheckpointInfo {
|
||||
int64_t streamUid;
|
||||
int64_t checkpointId;
|
||||
|
|
|
@ -180,6 +180,10 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
|
||||
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
||||
return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg);
|
||||
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
||||
return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
|
||||
return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg);
|
||||
default:
|
||||
sndError("invalid snode msg:%d", pMsg->msgType);
|
||||
ASSERT(0);
|
||||
|
|
|
@ -259,6 +259,8 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||
int32_t tqScanWal(STQ* pTq);
|
||||
|
|
|
@ -1175,7 +1175,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
ASSERT(status == TASK_STATUS__HALT);
|
||||
// ASSERT(status == TASK_STATUS__HALT);
|
||||
if (status != TASK_STATUS__HALT) {
|
||||
tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr);
|
||||
// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
|
||||
}
|
||||
}
|
||||
|
||||
// check if the checkpoint msg already sent or not.
|
||||
|
@ -1225,3 +1229,11 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||
return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg);
|
||||
}
|
||||
|
|
|
@ -485,6 +485,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
|||
return code;
|
||||
}
|
||||
|
||||
typedef struct SMStreamCheckpointReadyRspMsg {
|
||||
SMsgHead head;
|
||||
}SMStreamCheckpointReadyRspMsg;
|
||||
|
||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
|
@ -513,6 +517,18 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
|
||||
streamProcessCheckpointReadyMsg(pTask);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
{ // send checkpoint ready rsp
|
||||
SRpcMsg rsp = {.code = 0, .info = pMsg->info, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
|
||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||
SMsgHead* pHead = rsp.pCont;
|
||||
pHead->vgId = htonl(req.downstreamNodeId);
|
||||
|
||||
tmsgSendRsp(&rsp);
|
||||
|
||||
pMsg->info.handle = NULL; // disable auto rsp
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -938,9 +954,17 @@ int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
|
|||
return taosArrayGetSize(pMeta->pTaskList);
|
||||
}
|
||||
|
||||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
pMsg->pCont = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
||||
|
||||
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
||||
|
||||
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
return doProcessDummyRspMsg(pMeta, pMsg);
|
||||
}
|
|
@ -800,6 +800,10 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
||||
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
||||
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
|
||||
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
|
|
@ -907,7 +907,6 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
|||
|
||||
SStreamChkptReadyInfo info = {.upStreamTaskId = pInfo->taskId, .upstreamNodeEpset = pInfo->epSet};
|
||||
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
|
||||
info.msg.info.noResp = 1; // refactor later.
|
||||
|
||||
stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64
|
||||
":0x%x (vgId:%d) idx:%d, vgId:%d",
|
||||
|
|
|
@ -934,9 +934,8 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
|||
}
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg msg = {.info.noResp = 1};
|
||||
SRpcMsg msg = {0};
|
||||
initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen);
|
||||
|
||||
stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId);
|
||||
|
||||
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
||||
|
|
Loading…
Reference in New Issue