diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e4af676259..a9e5fe628b 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -232,10 +232,8 @@ typedef struct SSnapContext { } SSnapContext; typedef struct STqReader { -// int64_t ver; SPackedData msg2; - int8_t setMsg; SSubmitReq2 submit; int32_t nextBlk; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 6d4a8a1c76..4f92330000 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -329,31 +329,24 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) { } int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { -// ASSERT(pReader->msg2.msgStr == NULL && msgStr && msgLen && (ver >= 0)); - pReader->msg2.msgStr = msgStr; pReader->msg2.msgLen = msgLen; pReader->msg2.ver = ver; -// pReader->ver = ver; tqDebug("tq reader set msg %p %d", msgStr, msgLen); - - if (pReader->setMsg == 0) { - SDecoder decoder; - tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen); - if (tDecodeSSubmitReq2(&decoder, &pReader->submit) < 0) { - tDecoderClear(&decoder); - tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%"PRId64, msgLen, ver); - return -1; - } + SDecoder decoder; + tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen); + if (tDecodeSSubmitReq2(&decoder, &pReader->submit) < 0) { tDecoderClear(&decoder); - pReader->setMsg = 1; + tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%"PRId64, msgLen, ver); + return -1; } + tDecoderClear(&decoder); return 0; } bool tqNextDataBlock2(STqReader* pReader) { - if (pReader->msg2.msgStr == NULL || pReader->setMsg != 1) { + if (pReader->msg2.msgStr == NULL) { return false; } @@ -372,7 +365,6 @@ bool tqNextDataBlock2(STqReader* pReader) { } tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); - pReader->setMsg = 0; pReader->nextBlk = 0; pReader->msg2.msgStr = NULL; @@ -381,7 +373,6 @@ bool tqNextDataBlock2(STqReader* pReader) { bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { if (pReader->msg2.msgStr == NULL) return false; - ASSERT(pReader->setMsg == 1); int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { @@ -396,7 +387,6 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { } tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); - pReader->setMsg = 0; pReader->nextBlk = 0; pReader->msg2.msgStr = NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 734b5ad1fe..3b2d2188a6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2033,7 +2033,6 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); - /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, totBlockNum);