fix(stream): fix error in build msg.
This commit is contained in:
parent
f65651f6ef
commit
fd1996a1d1
|
@ -1232,12 +1232,13 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
SMStreamHbRspMsg rsp;
|
SMStreamHbRspMsg rsp = {0};
|
||||||
int32_t len = 0;
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
tDecoderInit(&decoder, (uint8_t*)pMsg->pCont, len);
|
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||||
code = tDecodeStreamHbRsp(&decoder, &rsp);
|
code = tDecodeStreamHbRsp(&decoder, &rsp);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
|
|
@ -1140,15 +1140,12 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
|
||||||
SRpcMsg rpcMsg = {0};
|
SRpcMsg rpcMsg = {0};
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
|
|
||||||
SRetrieveChkptTriggerReq req =
|
SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId,
|
||||||
{
|
|
||||||
.streamId = pTask->id.streamId,
|
|
||||||
.downstreamTaskId = pTask->id.taskId,
|
.downstreamTaskId = pTask->id.taskId,
|
||||||
.downstreamNodeId = vgId,
|
.downstreamNodeId = vgId,
|
||||||
.upstreamTaskId = pUpstreamTask->taskId,
|
.upstreamTaskId = pUpstreamTask->taskId,
|
||||||
.upstreamNodeId = pUpstreamTask->nodeId,
|
.upstreamNodeId = pUpstreamTask->nodeId,
|
||||||
.checkpointId = checkpointId,
|
.checkpointId = checkpointId};
|
||||||
};
|
|
||||||
|
|
||||||
tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
|
tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
|
|
|
@ -102,9 +102,10 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
|
||||||
}
|
}
|
||||||
|
|
||||||
((SMsgHead*)buf)->vgId = vgId;
|
((SMsgHead*)buf)->vgId = vgId;
|
||||||
|
char* bufx = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, buf, tlen);
|
tEncoderInit(&encoder, (uint8_t*)bufx, tlen);
|
||||||
if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) {
|
if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) {
|
||||||
rpcFreeCont(buf);
|
rpcFreeCont(buf);
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
Loading…
Reference in New Issue