From 6aaac7c0dcb656bf08d3248f15acd5ecf40bcd9c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 25 Mar 2024 17:57:13 +0800 Subject: [PATCH 1/4] fix:decode task error if task version is smaller than 2 --- source/libs/stream/src/streamTask.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4ca784a32f..13a4f5012d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -287,7 +287,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; } if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; + } if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; tEndDecode(pDecoder); From 8e45bebd66770855794a1f6e83ec9cc512793b46 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 26 Mar 2024 15:16:09 +0800 Subject: [PATCH 2/4] fix:decode task error if task version is smaller than 2 --- source/libs/stream/src/streamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 13a4f5012d..c7a1a00a46 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -216,7 +216,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; From f4fed29739d97f04603f315025930a685913cceb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 27 Mar 2024 15:04:37 +0800 Subject: [PATCH 3/4] fix:write schema error if schema ver is 1 more than origin version --- source/dnode/mnode/impl/src/mndStb.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 7ee1b36916..a507631128 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1104,14 +1104,16 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { } else if (createReq.tagVer > 0 || createReq.colVer > 0) { int32_t tagDelta = createReq.tagVer - pStb->tagVer; int32_t colDelta = createReq.colVer - pStb->colVer; - int32_t verDelta = tagDelta + colDelta; mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d", createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer); if (tagDelta <= 0 && colDelta <= 0) { mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name); code = 0; goto _OVER; - } else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) { + } else if ((tagDelta == 1 && colDelta == 0) || + (tagDelta == 0 && colDelta == 1) || + pStb->colVer == 1 || + pStb->tagVer == 1) { isAlter = true; mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name); } else { From 4722f8be8787596d8c88939fc2f7166530eff5aa Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 27 Mar 2024 15:17:49 +0800 Subject: [PATCH 4/4] fix:write schema error if schema ver is 1 more than origin version --- source/dnode/mnode/impl/src/mndStb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index a507631128..79c62df766 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1112,8 +1112,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { goto _OVER; } else if ((tagDelta == 1 && colDelta == 0) || (tagDelta == 0 && colDelta == 1) || - pStb->colVer == 1 || - pStb->tagVer == 1) { + (pStb->colVer == 1 && createReq.colVer > 1) || + (pStb->tagVer == 1 && createReq.tagVer > 1)) { isAlter = true; mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name); } else {