fix(stream): disable auto rsp for checkpoint source.
This commit is contained in:
parent
5155519559
commit
7d5f76c6f1
|
@ -224,7 +224,7 @@ int tqPushMsg(STQ*, tmsg_t msgType);
|
||||||
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
||||||
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
||||||
int tqScanWalAsync(STQ* pTq, bool ckPause);
|
int tqScanWalAsync(STQ* pTq, bool ckPause);
|
||||||
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
|
||||||
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq);
|
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq);
|
||||||
|
|
|
@ -1603,13 +1603,16 @@ FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo error code cannot be return, since this is invoked by an mnode-launched transaction.
|
// todo error code cannot be return, since this is invoked by an mnode-launched transaction.
|
||||||
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
// disable auto rsp to source
|
||||||
|
pRsp->info.handle = NULL;
|
||||||
|
|
||||||
SStreamCheckpointSourceReq req = {0};
|
SStreamCheckpointSourceReq req = {0};
|
||||||
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
if (!vnodeIsRoleLeader(pTq->pVnode)) {
|
||||||
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
|
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
|
||||||
|
|
|
@ -602,7 +602,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
|
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
|
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
|
||||||
tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg);
|
tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg, pRsp);
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_STREAM_TASK_UPDATE:
|
case TDMT_VND_STREAM_TASK_UPDATE:
|
||||||
tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
|
tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
|
||||||
|
|
|
@ -854,7 +854,6 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
|
||||||
}
|
}
|
||||||
|
|
||||||
((SMsgHead*)pBuf)->vgId = htonl(pReq->mnodeId);
|
((SMsgHead*)pBuf)->vgId = htonl(pReq->mnodeId);
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
|
||||||
|
|
||||||
tEncoderInit(&encoder, (uint8_t*)abuf, len);
|
tEncoderInit(&encoder, (uint8_t*)abuf, len);
|
||||||
|
|
Loading…
Reference in New Issue