fix:remove useless setmsg

This commit is contained in:
wangmm0220 2023-04-02 22:34:51 +08:00
parent afcc3d2064
commit b029c8f157
3 changed files with 7 additions and 20 deletions

View File

@ -232,10 +232,8 @@ typedef struct SSnapContext {
} SSnapContext; } SSnapContext;
typedef struct STqReader { typedef struct STqReader {
// int64_t ver;
SPackedData msg2; SPackedData msg2;
int8_t setMsg;
SSubmitReq2 submit; SSubmitReq2 submit;
int32_t nextBlk; int32_t nextBlk;

View File

@ -329,31 +329,24 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) {
} }
int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
// ASSERT(pReader->msg2.msgStr == NULL && msgStr && msgLen && (ver >= 0));
pReader->msg2.msgStr = msgStr; pReader->msg2.msgStr = msgStr;
pReader->msg2.msgLen = msgLen; pReader->msg2.msgLen = msgLen;
pReader->msg2.ver = ver; pReader->msg2.ver = ver;
// pReader->ver = ver;
tqDebug("tq reader set msg %p %d", msgStr, msgLen); tqDebug("tq reader set msg %p %d", msgStr, msgLen);
SDecoder decoder;
if (pReader->setMsg == 0) { tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen);
SDecoder decoder; if (tDecodeSSubmitReq2(&decoder, &pReader->submit) < 0) {
tDecoderInit(&decoder, pReader->msg2.msgStr, pReader->msg2.msgLen);
if (tDecodeSSubmitReq2(&decoder, &pReader->submit) < 0) {
tDecoderClear(&decoder);
tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%"PRId64, msgLen, ver);
return -1;
}
tDecoderClear(&decoder); tDecoderClear(&decoder);
pReader->setMsg = 1; tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%"PRId64, msgLen, ver);
return -1;
} }
tDecoderClear(&decoder);
return 0; return 0;
} }
bool tqNextDataBlock2(STqReader* pReader) { bool tqNextDataBlock2(STqReader* pReader) {
if (pReader->msg2.msgStr == NULL || pReader->setMsg != 1) { if (pReader->msg2.msgStr == NULL) {
return false; return false;
} }
@ -372,7 +365,6 @@ bool tqNextDataBlock2(STqReader* pReader) {
} }
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->setMsg = 0;
pReader->nextBlk = 0; pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL; pReader->msg2.msgStr = NULL;
@ -381,7 +373,6 @@ bool tqNextDataBlock2(STqReader* pReader) {
bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
if (pReader->msg2.msgStr == NULL) return false; if (pReader->msg2.msgStr == NULL) return false;
ASSERT(pReader->setMsg == 1);
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) { while (pReader->nextBlk < blockSz) {
@ -396,7 +387,6 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) {
} }
tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->setMsg = 0;
pReader->nextBlk = 0; pReader->nextBlk = 0;
pReader->msg2.msgStr = NULL; pReader->msg2.msgStr = NULL;

View File

@ -2033,7 +2033,6 @@ FETCH_NEXT_BLOCK:
int32_t current = pInfo->validBlockIndex++; int32_t current = pInfo->validBlockIndex++;
SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current);
/*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/
if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) {
qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current,
totBlockNum); totBlockNum);