From 83a5e2be4c435f3fe178c95bbc3f836cce48a227 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 31 Aug 2023 11:42:12 +0800 Subject: [PATCH] stream change ver --- include/libs/stream/tstream.h | 3 +- source/dnode/mnode/impl/src/mndDef.c | 2 +- source/libs/stream/src/streamTask.c | 72 ++++++++++++++-------------- 3 files changed, 38 insertions(+), 39 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ab9f606fe3..477d35e3e2 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -371,8 +371,7 @@ struct SStreamTask { int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; SSHashObj* pNameMap; - - char reserve[256]; + char reserve[256]; }; typedef struct SMetaHbInfo { diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index a5c768a018..d01daee5a7 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -84,7 +84,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { // 3.0.50 ver = 3 if (tEncodeI64(pEncoder, pObj->checkpointId) < 0) return -1; - if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve)) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pObj->reserve, sizeof(pObj->reserve) - 1) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dc8c509f1e..6eb09b95ec 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -134,47 +134,12 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; - if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve)) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } -int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) { - int64_t ver; - int64_t skip64; - int8_t skip8; - int32_t skip32; - int16_t skip16; - SEpSet epSet; - - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &ver) < 0) return -1; - - if (ver != SSTREAM_TASK_VER) return -1; - - if (tDecodeI64(pDecoder, &skip64) < 0) return -1; - if (tDecodeI32(pDecoder, &skip32) < 0) return -1; - if (tDecodeI32(pDecoder, &skip32) < 0) return -1; - if (tDecodeI8(pDecoder, &skip8) < 0) return -1; - if (tDecodeI8(pDecoder, &skip8) < 0) return -1; - if (tDecodeI16(pDecoder, &skip16) < 0) return -1; - - if (tDecodeI8(pDecoder, &skip8) < 0) return -1; - if (tDecodeI8(pDecoder, &skip8) < 0) return -1; - - if (tDecodeI32(pDecoder, &skip32) < 0) return -1; - if (tDecodeI32(pDecoder, &skip32) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; - - if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1; - if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1; - - tEndDecode(pDecoder); - return 0; -} - int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; @@ -252,6 +217,41 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { return 0; } +int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) { + int64_t ver; + int64_t skip64; + int8_t skip8; + int32_t skip32; + int16_t skip16; + SEpSet epSet; + + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &ver) < 0) return -1; + + if (ver != SSTREAM_TASK_VER) return -1; + + if (tDecodeI64(pDecoder, &skip64) < 0) return -1; + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + if (tDecodeI16(pDecoder, &skip16) < 0) return -1; + + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + if (tDecodeI8(pDecoder, &skip8) < 0) return -1; + + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeI32(pDecoder, &skip32) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1; + + if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1; + + tEndDecode(pDecoder); + return 0; +} + static void freeItem(void* p) { SStreamContinueExecInfo* pInfo = p; rpcFreeCont(pInfo->msg.pCont);