diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index f418b7e94c..f7515cc359 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -2385,7 +2385,7 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) end: uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); - tDeleteSTaosxRsp(&rspObj.dataRsp); + tDeleteMqDataRsp(&rspObj.dataRsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); taosHashCleanup(pVgroupHash); diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index cb8c98bde4..3f8e686559 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -11734,7 +11734,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa } TAOS_CHECK_EXIT(tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq)); - dataAfterCreate = pCoder->data; + dataAfterCreate = pCoder->data + pCoder->pos; posAfterCreate = pCoder->pos; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 5a6489d182..072db703a6 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -242,6 +242,7 @@ int64_t tqGetResultBlockTime(STqReader *pReader); int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver, SArray* rawList); +void tqReaderClearSubmitMsg(STqReader *pReader); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SMqDataRsp* pRsp, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 1ab76019a9..796a4a8742 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -514,6 +514,13 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i return code; } +void tqReaderClearSubmitMsg(STqReader *pReader) { + tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); + pReader->nextBlk = 0; + pReader->msg.msgStr = NULL; +} + + SWalReader* tqGetWalReader(STqReader* pReader) { if (pReader == NULL) { return NULL; @@ -551,14 +558,12 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true); - tqDebug("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%"PRId64, pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid); + tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid); pReader->nextBlk++; } - tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); - pReader->nextBlk = 0; - pReader->msg.msgStr = NULL; - tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid); + tqReaderClearSubmitMsg(pReader); + tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid); END: tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid); @@ -584,10 +589,7 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) { tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid); pReader->nextBlk++; } - - tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); - pReader->nextBlk = 0; - pReader->msg.msgStr = NULL; + tqReaderClearSubmitMsg(pReader); tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid); END: diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 0e697ccd8b..5e30f07e2b 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -344,7 +344,6 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int if (taosHashGet(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES) != NULL) { tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 " is already exists", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid); terrno = TSDB_CODE_TMQ_DUPLICATE_UID; - pReader->nextBlk = 0; goto END; } else { code = taosHashPut(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES, &pExec->pTqReader->lastBlkUid, LONG_BYTES); @@ -357,7 +356,6 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid); terrno = TSDB_CODE_TMQ_DUPLICATE_UID; pRsp->createTableNum = 0; - pReader->nextBlk = 0; goto END; } @@ -391,26 +389,23 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int continue; } *totalRows += pBlock->info.rows; - blockDataFreeRes(pBlock); } - SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); - if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){ + void** pSW = taosArrayGet(pSchemas, i); + if (taosArrayPush(pRsp->blockSchema, pSW) == NULL){ tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); continue; } + *pSW = NULL; pRsp->blockNum++; } tqDebug("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows); END: - if (code != 0){ + if (code != 0) { tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code)); - taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); - taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper); - } else { - taosArrayDestroy(pBlocks); - taosArrayDestroy(pSchemas); } + taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); + taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper); } int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) { @@ -430,6 +425,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData while (tqNextBlockImpl(pReader, NULL)) { tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList); if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){ + tqReaderClearSubmitMsg(pReader); goto END; } } @@ -437,6 +433,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList); if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){ + tqReaderClearSubmitMsg(pReader); goto END; } } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index c9d1454fbc..0d70292501 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -859,13 +859,6 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, } if (TSDB_CODE_SUCCESS == code) { code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size); - SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0}; - SDecoder dc = {0}; - tDecoderInit(&dc, POINTER_SHIFT(dst->pData, sizeof(SSubmitReq2Msg)), dst->size - sizeof(SSubmitReq2Msg)); - if (tDecodeSubmitReq(&dc, pSubmitReq, NULL) < 0) { - code = TSDB_CODE_INVALID_MSG; - } - tDecoderClear(&dc); } if (TSDB_CODE_SUCCESS == code) { code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);