fix(stream): rsp the checkpoint ready msg.
This commit is contained in:
parent
402a43bf67
commit
de1c67f773
|
@ -485,6 +485,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct SMStreamCheckpointReadyRspMsg {
|
||||||
|
int8_t placeholder;
|
||||||
|
}SMStreamCheckpointReadyRspMsg;
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
@ -513,6 +517,18 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
|
|
||||||
streamProcessCheckpointReadyMsg(pTask);
|
streamProcessCheckpointReadyMsg(pTask);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
|
{ // send checkpoint ready rsp
|
||||||
|
SRpcMsg rsp = {.code = 0, .info = pMsg->info, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
|
||||||
|
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||||
|
SMsgHead* pHead = rsp.pCont;
|
||||||
|
pHead->vgId = htonl(req.downstreamNodeId);
|
||||||
|
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
|
pMsg->info.handle = NULL; // disable auto rsp
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue