diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index cd69bc0d92..e34ec07eac 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1175,9 +1175,6 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; - // todo this new attribute will be result in being incompatible with previous version - if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1; - int32_t size = taosArrayGetSize(pMsg->pNodeList); if (tEncodeI32(pEncoder, size) < 0) return -1; @@ -1187,6 +1184,9 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda if (tEncodeSEpSet(pEncoder, &pInfo->prevEp) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pInfo->newEp) < 0) return -1; } + + // todo this new attribute will be result in being incompatible with previous version + if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -1196,8 +1196,6 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1; - int32_t size = 0; if (tDecodeI32(pDecoder, &size) < 0) return -1; pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); @@ -1209,6 +1207,8 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* taosArrayPush(pMsg->pNodeList, &info); } + if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1; + tEndDecode(pDecoder); return 0; }