Merge pull request #25179 from taosdata/mark/3.0

fix:decode task error if task version is smaller than 2
This commit is contained in:
dapan1121 2024-03-28 13:19:45 +08:00 committed by GitHub
commit ca0f790f3e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 8 additions and 4 deletions

View File

@ -1104,14 +1104,16 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
} else if (createReq.tagVer > 0 || createReq.colVer > 0) { } else if (createReq.tagVer > 0 || createReq.colVer > 0) {
int32_t tagDelta = createReq.tagVer - pStb->tagVer; int32_t tagDelta = createReq.tagVer - pStb->tagVer;
int32_t colDelta = createReq.colVer - pStb->colVer; int32_t colDelta = createReq.colVer - pStb->colVer;
int32_t verDelta = tagDelta + colDelta;
mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d", mInfo("stb:%s, already exist while create, input tagVer:%d colVer:%d, exist tagVer:%d colVer:%d",
createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer); createReq.name, createReq.tagVer, createReq.colVer, pStb->tagVer, pStb->colVer);
if (tagDelta <= 0 && colDelta <= 0) { if (tagDelta <= 0 && colDelta <= 0) {
mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name); mInfo("stb:%s, schema version is not incremented and nothing needs to be done", createReq.name);
code = 0; code = 0;
goto _OVER; goto _OVER;
} else if ((tagDelta == 1 || colDelta == 1) && (verDelta == 1)) { } else if ((tagDelta == 1 && colDelta == 0) ||
(tagDelta == 0 && colDelta == 1) ||
(pStb->colVer == 1 && createReq.colVer > 1) ||
(pStb->tagVer == 1 && createReq.tagVer > 1)) {
isAlter = true; isAlter = true;
mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name); mInfo("stb:%s, schema version is only increased by 1 number, do alter operation", createReq.name);
} else { } else {

View File

@ -216,7 +216,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER) return -1; if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
@ -287,7 +287,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
} }
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
}
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);