diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 68a99d2697..7f3a696f26 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1948,10 +1948,10 @@ static int32_t initRawCacheHash() { } static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) { - if (rawData == NULL){ + if (rawData == NULL || pSW == NULL){ return false; } - if (pTableMeta == NULL || pSW == NULL) { + if (pTableMeta == NULL) { uError("invalid parameter in %s", __func__); return false; } @@ -2074,7 +2074,7 @@ static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName, STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) { if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL || - pMeta == NULL || pSW == NULL) { + pMeta == NULL) { uError("invalid parameter in %s", __func__); return TSDB_CODE_INVALID_PARA; } @@ -2337,14 +2337,8 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) RAW_NULL_CHECK(pStmt->pVgDataBlocks); while (++rspObj.resIter < rspObj.dataRsp.blockNum) { - if (!rspObj.dataRsp.withSchema) { - goto end; - } - const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); RAW_NULL_CHECK(tbName); - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); - RAW_NULL_CHECK(pSW); void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter); RAW_NULL_CHECK(pRetrieve); void* rawData = getRawDataFromRes(pRetrieve); @@ -2358,7 +2352,7 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) // find schema data info STableMeta* pTableMeta = NULL; RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName, - &pTableMeta, pSW, NULL, retry)); + &pTableMeta, NULL, NULL, retry)); char err[ERR_MSG_LEN] = {0}; code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index bb55eb2e31..15a7a9b538 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -42,18 +42,19 @@ #define PROCESS_POLL_RSP(FUNC,DATA) \ SDecoder decoder = {0}; \ - tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); \ + tDecoderInit(&decoder, POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)), pRspWrapper->pollRsp.len - sizeof(SMqRspHead)); \ if (FUNC(&decoder, DATA) < 0) { \ tDecoderClear(&decoder); \ code = terrno; \ goto END;\ }\ tDecoderClear(&decoder);\ - (void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead)); + (void)memcpy(DATA, pRspWrapper->pollRsp.data, sizeof(SMqRspHead)); #define DELETE_POLL_RSP(FUNC,DATA) \ SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\ - taosMemoryFreeClear(pRsp->pEpset);\ + taosMemoryFreeClear(pRsp->pEpset); \ + taosMemoryFreeClear(pRsp->data); \ FUNC(DATA); enum { @@ -190,6 +191,8 @@ typedef struct { SMqClientTopic* topicHandle; uint64_t reqId; SEpSet* pEpset; + void* data; + uint32_t len; union { struct{ SMqRspHead head; @@ -1742,7 +1745,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->resetOffsetCfg = conf->resetOffset; pTmq->replayEnable = conf->replayEnable; pTmq->sourceExcluded = conf->sourceExcluded; - pTmq->rawData = 1; + pTmq->rawData = conf->rawData;; pTmq->enableBatchMeta = conf->enableBatchMeta; tstrncpy(pTmq->user, user, TSDB_USER_LEN); if (taosGetFqdn(pTmq->fqdn) != 0) { @@ -2068,27 +2071,13 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s),QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId); - if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { - PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp) - } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { - PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp) - } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { - PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp) - } else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { - PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp) - } else if (rspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { - PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp) - pRspWrapper->pollRsp.dataRsp.len = pMsg->len - sizeof(SMqRspHead); - pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)); - pMsg->pData = NULL; - } else { // invalid rspType - tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); - code = TSDB_CODE_TSC_INTERNAL_ERROR; - goto END; - } + pRspWrapper->tmqRspType = rspType; pRspWrapper->pollRsp.reqId = requestId; pRspWrapper->pollRsp.pEpset = pMsg->pEpSet; + pRspWrapper->pollRsp.data = pMsg->pData; + pRspWrapper->pollRsp.len = pMsg->len; + pMsg->pData = NULL; pMsg->pEpSet = NULL; END: @@ -2519,6 +2508,29 @@ END: return pRspObj; } +static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){ + int32_t code = 0; + if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { + PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp) + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { + PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp) + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { + PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp) + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) { + PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp) + } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) { + PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp) + pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead); + pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)); + pRspWrapper->pollRsp.data = NULL; + } else { + tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType); + code = TSDB_CODE_TSC_INTERNAL_ERROR; + goto END; + } +END: + return code; +} static void* tmqHandleAllRsp(tmq_t* tmq) { tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); @@ -2538,12 +2550,16 @@ static void* tmqHandleAllRsp(tmq_t* tmq) { } tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]); - if (pRspWrapper->code != 0) { - code = processMqRspError(tmq, pRspWrapper); - }else{ - returnVal = processMqRsp(tmq, pRspWrapper); - code = terrno; + code = processWrapperData(pRspWrapper); + if (code == 0){ + if (pRspWrapper->code != 0) { + code = processMqRspError(tmq, pRspWrapper); + }else{ + returnVal = processMqRsp(tmq, pRspWrapper); + code = terrno; + } } + tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); if(returnVal != NULL || code != 0){ diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index 22bf3a529c..d7c793df33 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -11445,9 +11445,9 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { } for (int32_t i = 0; i < pRsp->blockNum; i++) { - void *data; - uint64_t bLen; - TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &bLen)); + void *data = NULL; + uint32_t bLen = 0; + TAOS_CHECK_EXIT(tDecodeBinary(pDecoder, (uint8_t**)&data, &bLen)); if (taosArrayPush(pRsp->blockData, &data) == NULL) { TAOS_CHECK_EXIT(terrno); } @@ -11510,7 +11510,7 @@ _exit: static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) { taosArrayDestroy(pRsp->blockDataLen); pRsp->blockDataLen = NULL; - taosArrayDestroyP(pRsp->blockData, NULL); + taosArrayDestroy(pRsp->blockData); pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); pRsp->blockSchema = NULL; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index f5410659b1..ec82fd8e71 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -118,7 +118,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest); -int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp, int32_t type, int32_t vgId); void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); @@ -147,7 +147,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name); // tq util int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); -int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, +int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset); void tqUpdateNodeStage(STQ* pTq, bool isLeader); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index cdedda3e9c..daf4fa65f3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -208,7 +208,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { tDeleteMqDataRsp(&dataRsp); } -int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp, int32_t type, int32_t vgId) { if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) { return TSDB_CODE_INVALID_PARA; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index a33d050460..941e74fd9b 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -263,7 +263,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat tbName = NULL; } if (pRsp->withSchema) { - SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); + pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); TSDB_CHECK_NULL(pSW, code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno); pSW = NULL; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 86a5bba712..bb6751c883 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -533,7 +533,7 @@ int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPoll return 0; } -int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, +int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever) { if (pRpcHandleInfo == NULL || pRsp == NULL) { return TSDB_CODE_TMQ_INVALID_MSG; @@ -541,6 +541,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* int32_t len = 0; int32_t code = 0; + if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){ + pRsp->withSchema = 0; + } if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP || type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {