fix(stream): resolve the nodeUpdate msg compatible issue.

This commit is contained in:
Haojun Liao 2023-10-25 10:01:20 +08:00
parent 03165d5327
commit 985371e841
1 changed files with 5 additions and 5 deletions

View File

@ -1175,9 +1175,6 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
// todo this new attribute will be result in being incompatible with previous version
if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1;
int32_t size = taosArrayGetSize(pMsg->pNodeList);
if (tEncodeI32(pEncoder, size) < 0) return -1;
@ -1187,6 +1184,9 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1;
}
// todo this new attribute will be result in being incompatible with previous version
if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -1196,8 +1196,6 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1;
pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));
@ -1209,6 +1207,8 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
taosArrayPush(pMsg->pNodeList, &info);
}
if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}