diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ff6e4de128..3d46342d6a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3807,6 +3807,7 @@ int32_t tDecodeMqMetaRsp(SDecoder* pDecoder, SMqMetaRsp* pRsp); void tDeleteMqMetaRsp(SMqMetaRsp *pRsp); #define MQ_DATA_RSP_VERSION 100 + typedef struct { SMqRspHead head; STqOffsetVal reqOffset; @@ -3818,33 +3819,27 @@ typedef struct { SArray* blockData; SArray* blockTbName; SArray* blockSchema; - int64_t sleepTime; +} SMqDataRspCommon; + +typedef struct { + SMqDataRspCommon common; + int64_t sleepTime; } SMqDataRsp; -int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const SMqDataRsp* pRsp); -int32_t tDecodeMqDataRsp(SDecoder* pDecoder, SMqDataRsp* pRsp, int8_t dataVersion); -void tDeleteMqDataRsp(SMqDataRsp* pRsp); +int32_t tEncodeMqDataRsp(SEncoder* pEncoder, const void* pRsp); +int32_t tDecodeMqDataRsp(SDecoder* pDecoder, void* pRsp); +void tDeleteMqDataRsp(void* pRsp); typedef struct { - SMqRspHead head; - STqOffsetVal reqOffset; - STqOffsetVal rspOffset; - int32_t blockNum; - int8_t withTbName; - int8_t withSchema; - SArray* blockDataLen; - SArray* blockData; - SArray* blockTbName; - SArray* blockSchema; - // the following attributes are extended from SMqDataRsp - int32_t createTableNum; - SArray* createTableLen; - SArray* createTableReq; + SMqDataRspCommon common; + int32_t createTableNum; + SArray* createTableLen; + SArray* createTableReq; } STaosxRsp; -int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const STaosxRsp* pRsp); -int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, STaosxRsp* pRsp, int8_t dateVersion); -void tDeleteSTaosxRsp(STaosxRsp* pRsp); +int32_t tEncodeSTaosxRsp(SEncoder* pEncoder, const void* pRsp); +int32_t tDecodeSTaosxRsp(SDecoder* pDecoder, void* pRsp); +void tDeleteSTaosxRsp(void* pRsp); typedef struct { SMqRspHead head; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 6c3603b4e0..9507472df0 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -221,7 +221,11 @@ typedef struct { SSchemaWrapper schema; int32_t resIter; SReqResultInfo resInfo; - SMqDataRsp rsp; +} SMqRspObjCommon; + +typedef struct { + SMqRspObjCommon common; + SMqDataRsp rsp; } SMqRspObj; typedef struct { @@ -233,14 +237,8 @@ typedef struct { } SMqMetaRspObj; typedef struct { - int8_t resType; - char topic[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - int32_t vgId; - SSchemaWrapper schema; - int32_t resIter; - SReqResultInfo resInfo; - STaosxRsp rsp; + SMqRspObjCommon common; + STaosxRsp rsp; } SMqTaosxRspObj; typedef struct SReqRelInfo { @@ -320,7 +318,7 @@ int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols); static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { SMqRspObj* msg = (SMqRspObj*)res; - return (SReqResultInfo*)&msg->resInfo; + return (SReqResultInfo*)&msg->common.resInfo; } SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 9464baea52..2ec98b3d67 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -343,12 +343,12 @@ void taos_free_result(TAOS_RES *res) { } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj *pRsp = (SMqTaosxRspObj *)res; tDeleteSTaosxRsp(&pRsp->rsp); - doFreeReqResultInfo(&pRsp->resInfo); + doFreeReqResultInfo(&pRsp->common.resInfo); taosMemoryFree(pRsp); } else if (TD_RES_TMQ(res)) { SMqRspObj *pRsp = (SMqRspObj *)res; tDeleteMqDataRsp(&pRsp->rsp); - doFreeReqResultInfo(&pRsp->resInfo); + doFreeReqResultInfo(&pRsp->common.resInfo); taosMemoryFree(pRsp); } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj *pRspObj = (SMqMetaRspObj *)res; @@ -417,7 +417,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { } else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { SMqRspObj *msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo; - if (msg->resIter == -1) { + if (msg->common.resIter == -1) { pResultInfo = tmqGetNextResInfo(res, true); } else { pResultInfo = tmqGetCurResInfo(res); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index d25a5332d7..ae60270d8b 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1623,11 +1623,16 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { uDebug(LOG_ID_TAG " write raw data, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); pRequest->syncQuery = true; - rspObj.resIter = -1; - rspObj.resType = RES_TYPE__TMQ; + rspObj.common.resIter = -1; + rspObj.common.resType = RES_TYPE__TMQ; + int8_t dataVersion = *(int8_t*)data; + if (dataVersion >= MQ_DATA_RSP_VERSION){ + data += sizeof(int8_t) + sizeof(int32_t); + dataLen -= sizeof(int8_t) + sizeof(int32_t); + } tDecoderInit(&decoder, data, dataLen); - code = tDecodeMqDataRsp(&decoder, &rspObj.rsp, *(int8_t*)data); + code = tDecodeMqDataRsp(&decoder, &rspObj.rsp); if (code != 0) { code = TSDB_CODE_INVALID_MSG; goto end; @@ -1656,13 +1661,13 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { goto end; } pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - while (++rspObj.resIter < rspObj.rsp.blockNum) { - void* pRetrieve = taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); - if (!rspObj.rsp.withSchema) { + while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) { + void* pRetrieve = taosArrayGetP(rspObj.rsp.common.blockData, rspObj.common.resIter); + if (!rspObj.rsp.common.withSchema) { goto end; } - const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); + const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.common.blockTbName, rspObj.common.resIter); if (!tbName) { code = TSDB_CODE_TMQ_INVALID_MSG; goto end; @@ -1693,7 +1698,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter); TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); if (fields == NULL) { goto end; @@ -1752,11 +1757,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) } uDebug(LOG_ID_TAG " write raw metadata, data:%p, dataLen:%d", LOG_ID_VALUE, data, dataLen); pRequest->syncQuery = true; - rspObj.resIter = -1; - rspObj.resType = RES_TYPE__TMQ_METADATA; + rspObj.common.resIter = -1; + rspObj.common.resType = RES_TYPE__TMQ_METADATA; + + int8_t dataVersion = *(int8_t*)data; + if (dataVersion >= MQ_DATA_RSP_VERSION){ + data += sizeof(int8_t) + sizeof(int32_t); + dataLen -= sizeof(int8_t) + sizeof(int32_t); + } tDecoderInit(&decoder, data, dataLen); - code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp, *(int8_t*)data); + code = tDecodeSTaosxRsp(&decoder, &rspObj.rsp); if (code != 0) { code = TSDB_CODE_INVALID_MSG; goto end; @@ -1786,14 +1797,14 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) } pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.blockNum); - while (++rspObj.resIter < rspObj.rsp.blockNum) { - void* pRetrieve = taosArrayGetP(rspObj.rsp.blockData, rspObj.resIter); - if (!rspObj.rsp.withSchema) { + uDebug(LOG_ID_TAG " write raw metadata block num:%d", LOG_ID_VALUE, rspObj.rsp.common.blockNum); + while (++rspObj.common.resIter < rspObj.rsp.common.blockNum) { + void* pRetrieve = taosArrayGetP(rspObj.rsp.common.blockData, rspObj.common.resIter); + if (!rspObj.rsp.common.withSchema) { goto end; } - const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.blockTbName, rspObj.resIter); + const char* tbName = (const char*)taosArrayGetP(rspObj.rsp.common.blockTbName, rspObj.common.resIter); if (!tbName) { code = TSDB_CODE_TMQ_INVALID_MSG; goto end; @@ -1865,7 +1876,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.common.blockSchema, rspObj.common.resIter); TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); if (fields == NULL) { goto end; @@ -1944,6 +1955,64 @@ char* tmq_get_json_meta(TAOS_RES* res) { void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } +static int32_t getOffSetLen(const void *rsp){ + const SMqDataRspCommon *pRsp = rsp; + SEncoder coder = {0}; + tEncoderInit(&coder, NULL, 0); + if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1; + if (tEncodeSTqOffsetVal(&coder, &pRsp->rspOffset) < 0) return -1; + int32_t pos = coder.pos; + tEncoderClear(&coder); + return pos; +} + +typedef int32_t __encode_func__(SEncoder *pEncoder, const void *pRsp); + +static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, void* rspObj, tmq_raw_data* raw){ + int32_t len = 0; + int32_t code = 0; + SEncoder encoder = {0}; + void* buf = NULL; + tEncodeSize(encodeFunc, rspObj, len, code); + if (code < 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto FAILED; + } + len += sizeof(int8_t) + sizeof(int32_t); + buf = taosMemoryCalloc(1, len); + if(buf == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAILED; + } + tEncoderInit(&encoder, buf, len); + if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto FAILED; + } + int32_t offsetLen = getOffSetLen(rspObj); + if(offsetLen <= 0){ + terrno = TSDB_CODE_INVALID_MSG; + goto FAILED; + } + if (tEncodeI32(&encoder, offsetLen) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto FAILED; + } + if(encodeFunc(&encoder, rspObj) < 0){ + terrno = TSDB_CODE_INVALID_MSG; + goto FAILED; + } + tEncoderClear(&encoder); + + raw->raw = buf; + raw->raw_len = len; + return 0; +FAILED: + tEncoderClear(&encoder); + taosMemoryFree(buf); + return terrno; +} + int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { if (!raw || !res) { terrno = TSDB_CODE_INVALID_PARA; @@ -1957,42 +2026,19 @@ int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { uDebug("tmq get raw type meta:%p", raw); } else if (TD_RES_TMQ(res)) { SMqRspObj* rspObj = ((SMqRspObj*)res); - - int32_t len = 0; - int32_t code = 0; - tEncodeSize(tEncodeMqDataRsp, &rspObj->rsp, len, code); - if (code < 0) { - return -1; + if(encodeMqDataRsp(tEncodeMqDataRsp, &rspObj->rsp, raw) != 0){ + uError("tmq get raw type error:%d", terrno); + return terrno; } - - void* buf = taosMemoryCalloc(1, len); - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, len); - tEncodeMqDataRsp(&encoder, &rspObj->rsp); - tEncoderClear(&encoder); - - raw->raw = buf; - raw->raw_len = len; raw->raw_type = RES_TYPE__TMQ; uDebug("tmq get raw type data:%p", raw); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* rspObj = ((SMqTaosxRspObj*)res); - int32_t len = 0; - int32_t code = 0; - tEncodeSize(tEncodeSTaosxRsp, &rspObj->rsp, len, code); - if (code < 0) { - return -1; + if(encodeMqDataRsp(tEncodeSTaosxRsp, &rspObj->rsp, raw) != 0){ + uError("tmq get raw type error:%d", terrno); + return terrno; } - - void* buf = taosMemoryCalloc(1, len); - SEncoder encoder = {0}; - tEncoderInit(&encoder, buf, len); - tEncodeSTaosxRsp(&encoder, &rspObj->rsp); - tEncoderClear(&encoder); - - raw->raw = buf; - raw->raw_len = len; raw->raw_type = RES_TYPE__TMQ_METADATA; uDebug("tmq get raw type metadata:%p", raw); } else { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4cac1febe5..d4b7f59d5c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -622,9 +622,9 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c if (TD_RES_TMQ(pRes)) { SMqRspObj* pRspObj = (SMqRspObj*)pRes; - pTopicName = pRspObj->topic; - vgId = pRspObj->vgId; - offsetVal = pRspObj->rsp.rspOffset; + pTopicName = pRspObj->common.topic; + vgId = pRspObj->common.vgId; + offsetVal = pRspObj->rsp.common.rspOffset; } else if (TD_RES_TMQ_META(pRes)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes; pTopicName = pMetaRspObj->topic; @@ -632,9 +632,9 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c offsetVal = pMetaRspObj->metaRsp.rspOffset; } else if (TD_RES_TMQ_METADATA(pRes)) { SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes; - pTopicName = pRspObj->topic; - vgId = pRspObj->vgId; - offsetVal = pRspObj->rsp.rspOffset; + pTopicName = pRspObj->common.topic; + vgId = pRspObj->common.vgId; + offsetVal = pRspObj->rsp.common.rspOffset; } else { code = TSDB_CODE_TMQ_INVALID_MSG; goto end; @@ -1377,7 +1377,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - if(tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){ + if(tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp) < 0){ tDecoderClear(&decoder); taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; @@ -1387,9 +1387,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead)); char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.common.rspOffset); tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64, - tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId); + tmq->consumerId, vgId, pRspWrapper->dataRsp.common.reqOffset.version, buf, rspType, requestId); } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); @@ -1404,7 +1404,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - if(tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))) < 0){ + if(tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp) < 0){ tDecoderClear(&decoder); taosReleaseRef(tmqMgmt.rsetId, refId); code = TSDB_CODE_OUT_OF_MEMORY; @@ -1598,6 +1598,9 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj)); + if(pRspObj == NULL) { + return NULL; + } pRspObj->resType = RES_TYPE__TMQ_META; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); @@ -1611,8 +1614,7 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { void changeByteEndian(char* pData){ char* p = pData; - // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | - // version: + // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length | version: int32_t blockVersion = *(int32_t*)p; ASSERT(blockVersion == BLOCK_VERSION_1); *(int32_t*)p = BLOCK_VERSION_2; @@ -1649,7 +1651,7 @@ static void tmqGetRawDataRowsPrecisionFromRes(void *pRetrieve, void** rawData, i } } -static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) { +static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObjCommon* pRspObj, SMqDataRspCommon* pDataRsp) { (*numOfRows) = 0; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); @@ -1660,14 +1662,14 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; - bool needTransformSchema = !pRspObj->rsp.withSchema; - if (!pRspObj->rsp.withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable - pRspObj->rsp.withSchema = true; - pRspObj->rsp.blockSchema = taosArrayInit(pRspObj->rsp.blockNum, sizeof(void*)); + bool needTransformSchema = !pDataRsp->withSchema; + if (!pDataRsp->withSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable + pDataRsp->withSchema = true; + pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*)); } // extract the rows in this data packet - for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { - void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i); + for (int32_t i = 0; i < pDataRsp->blockNum; ++i) { + void* pRetrieve = taosArrayGetP(pDataRsp->blockData, i); void* rawData = NULL; int64_t rows = 0; // deal with compatibility @@ -1679,7 +1681,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); if(schema){ - taosArrayPush(pRspObj->rsp.blockSchema, &schema); + taosArrayPush(pDataRsp->blockSchema, &schema); } } } @@ -1687,18 +1689,24 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); - pRspObj->resType = RES_TYPE__TMQ; + if(pRspObj == NULL){ + return NULL; + } + pRspObj->common.resType = RES_TYPE__TMQ; memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp)); - tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj); + tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common); return pRspObj; } SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); - pRspObj->resType = RES_TYPE__TMQ_METADATA; + if(pRspObj == NULL){ + return NULL; + } + pRspObj->common.resType = RES_TYPE__TMQ_METADATA; memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp)); - tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj); + tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, &pRspObj->common, &pRspObj->rsp.common); return pRspObj; } @@ -1891,7 +1899,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); - SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp; + SMqDataRspCommon* pDataRsp = (SMqDataRspCommon*)&pollRspWrapper->dataRsp; if (pDataRsp->head.epoch == consumerEpoch) { taosWLockLatch(&tmq->lock); @@ -1994,8 +2002,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); + SMqDataRspCommon* pDataRsp = (SMqDataRspCommon*)&pollRspWrapper->taosxRsp; - if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { + if (pDataRsp->head.epoch == consumerEpoch) { taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pollRspWrapper->vgHandle = pVg; @@ -2009,11 +2018,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { return NULL; } - updateVgInfo(pVg, &pollRspWrapper->taosxRsp.reqOffset, &pollRspWrapper->taosxRsp.rspOffset, - pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId, - pollRspWrapper->taosxRsp.blockNum != 0); + updateVgInfo(pVg, &pDataRsp->reqOffset, &pDataRsp->rspOffset, + pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId, + pDataRsp->blockNum != 0); - if (pollRspWrapper->taosxRsp.blockNum == 0) { + if (pDataRsp->blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId); pVg->emptyBlockReceiveTs = taosGetTimestampMs(); @@ -2034,7 +2043,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, + tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); taosFreeQitem(pRspWrapper); @@ -2042,7 +2051,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { return pRsp; } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); @@ -2205,15 +2214,11 @@ const char* tmq_get_topic_name(TAOS_RES* res) { if (res == NULL) { return NULL; } - if (TD_RES_TMQ(res)) { - SMqRspObj* pRspObj = (SMqRspObj*)res; - return strchr(pRspObj->topic, '.') + 1; + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + return strchr(((SMqRspObjCommon*)res)->topic, '.') + 1; } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; return strchr(pMetaRspObj->topic, '.') + 1; - } else if (TD_RES_TMQ_METADATA(res)) { - SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; - return strchr(pRspObj->topic, '.') + 1; } else { return NULL; } @@ -2224,15 +2229,11 @@ const char* tmq_get_db_name(TAOS_RES* res) { return NULL; } - if (TD_RES_TMQ(res)) { - SMqRspObj* pRspObj = (SMqRspObj*)res; - return strchr(pRspObj->db, '.') + 1; + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + return strchr(((SMqRspObjCommon*)res)->db, '.') + 1; } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; return strchr(pMetaRspObj->db, '.') + 1; - } else if (TD_RES_TMQ_METADATA(res)) { - SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; - return strchr(pRspObj->db, '.') + 1; } else { return NULL; } @@ -2242,15 +2243,11 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { if (res == NULL) { return -1; } - if (TD_RES_TMQ(res)) { - SMqRspObj* pRspObj = (SMqRspObj*)res; - return pRspObj->vgId; + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + return ((SMqRspObjCommon*)res)->vgId; } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; return pMetaRspObj->vgId; - } else if (TD_RES_TMQ_METADATA(res)) { - SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; - return pRspObj->vgId; } else { return -1; } @@ -2260,24 +2257,19 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) { if (res == NULL) { return TSDB_CODE_INVALID_PARA; } - if (TD_RES_TMQ(res)) { - SMqRspObj* pRspObj = (SMqRspObj*)res; - STqOffsetVal* pOffset = &pRspObj->rsp.reqOffset; - if (pOffset->type == TMQ_OFFSET__LOG) { - return pRspObj->rsp.reqOffset.version; + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + SMqDataRspCommon* common = (SMqDataRspCommon*)POINTER_SHIFT(res, sizeof(SMqRspObjCommon)); + STqOffsetVal* pOffset = &common->reqOffset; + if (common->reqOffset.type == TMQ_OFFSET__LOG) { + return common->reqOffset.version; } else { - tscError("invalid offset type:%d", pOffset->type); + tscError("invalid offset type:%d", common->reqOffset.type); } } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res; if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) { return pRspObj->metaRsp.rspOffset.version; } - } else if (TD_RES_TMQ_METADATA(res)) { - SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; - if (pRspObj->rsp.reqOffset.type == TMQ_OFFSET__LOG) { - return pRspObj->rsp.reqOffset.version; - } } else { tscError("invalid tmq type:%d", *(int8_t*)res); } @@ -2290,20 +2282,15 @@ const char* tmq_get_table_name(TAOS_RES* res) { if (res == NULL) { return NULL; } - if (TD_RES_TMQ(res)) { - SMqRspObj* pRspObj = (SMqRspObj*)res; - if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || - pRspObj->resIter >= pRspObj->rsp.blockNum) { + if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) { + SMqDataRspCommon* common = (SMqDataRspCommon*)POINTER_SHIFT(res, sizeof(SMqRspObjCommon)); + + SMqRspObjCommon* pRspObj = (SMqRspObjCommon*)res; + if (!common->withTbName || common->blockTbName == NULL || pRspObj->resIter < 0 || + pRspObj->resIter >= common->blockNum) { return NULL; } - return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); - } else if (TD_RES_TMQ_METADATA(res)) { - SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; - if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || - pRspObj->resIter >= pRspObj->rsp.blockNum) { - return NULL; - } - return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + return (const char*)taosArrayGetP(common->blockTbName, pRspObj->resIter); } return NULL; } @@ -2640,17 +2627,17 @@ void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, cons } SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { - SMqRspObj* pRspObj = (SMqRspObj*)res; + SMqDataRspCommon* common = (SMqDataRspCommon*)POINTER_SHIFT(res, sizeof(SMqRspObjCommon)); + SMqRspObjCommon* pRspObj = (SMqRspObjCommon*)res; pRspObj->resIter++; - - if (pRspObj->resIter < pRspObj->rsp.blockNum) { - if (pRspObj->rsp.withSchema) { + if (pRspObj->resIter < common->blockNum) { + if (common->withSchema) { doFreeReqResultInfo(&pRspObj->resInfo); - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(common->blockSchema, pRspObj->resIter); setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); } - void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); + void* pRetrieve = taosArrayGetP(common->blockData, pRspObj->resIter); void* rawData = NULL; int64_t rows = 0; int32_t precision = 0; @@ -2685,14 +2672,14 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { SMqDataRsp rsp; SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); - tDecodeMqDataRsp(&decoder, &rsp, *(int8_t*)POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead))); + tDecodeMqDataRsp(&decoder, &rsp); tDecoderClear(&decoder); SMqRspHead* pHead = pMsg->pData; tmq_topic_assignment assignment = {.begin = pHead->walsver, .end = pHead->walever + 1, - .currentOffset = rsp.rspOffset.version, + .currentOffset = rsp.common.rspOffset.version, .vgId = pParam->vgId}; taosThreadMutexLock(&pCommon->mutex); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a38b745db8..ae230d21aa 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -8897,7 +8897,7 @@ void tDeleteMqMetaRsp(SMqMetaRsp *pRsp) { taosMemoryFree(pRsp->metaRsp); } -int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) { +int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRspCommon *pRsp) { if (tEncodeSTqOffsetVal(pEncoder, &pRsp->reqOffset) < 0) return -1; if (tEncodeSTqOffsetVal(pEncoder, &pRsp->rspOffset) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->blockNum) < 0) return -1; @@ -8922,14 +8922,13 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) { return 0; } -int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { - if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1; +int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const void *pRsp) { if (tEncodeMqDataRspCommon(pEncoder, pRsp) < 0) return -1; - if (tEncodeI64(pEncoder, pRsp->sleepTime) < 0) return -1; + if (tEncodeI64(pEncoder, ((SMqDataRsp*)pRsp)->sleepTime) < 0) return -1; return 0; } -int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { +int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRspCommon *pRsp) { if (tDecodeSTqOffsetVal(pDecoder, &pRsp->reqOffset) < 0) return -1; if (tDecodeSTqOffsetVal(pDecoder, &pRsp->rspOffset) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->blockNum) < 0) return -1; @@ -8975,19 +8974,17 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { return 0; } -int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp, int8_t dataVersion) { - if (dataVersion >= MQ_DATA_RSP_VERSION){ - if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1; - } +int32_t tDecodeMqDataRsp(SDecoder *pDecoder, void *pRsp) { if (tDecodeMqDataRspCommon(pDecoder, pRsp) < 0) return -1; if (!tDecodeIsEnd(pDecoder)) { - if (tDecodeI64(pDecoder, &pRsp->sleepTime) < 0) return -1; + if (tDecodeI64(pDecoder, &((SMqDataRsp*)pRsp)->sleepTime) < 0) return -1; } return 0; } -void tDeleteMqDataRsp(SMqDataRsp *pRsp) { +static void tDeleteMqDataRspCommon(void *rsp) { + SMqDataRspCommon *pRsp = rsp; pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); pRsp->blockData = NULL; @@ -8999,10 +8996,14 @@ void tDeleteMqDataRsp(SMqDataRsp *pRsp) { tOffsetDestroy(&pRsp->rspOffset); } -int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { - if (tEncodeI8(pEncoder, MQ_DATA_RSP_VERSION) < 0) return -1; - if (tEncodeMqDataRspCommon(pEncoder, (const SMqDataRsp *)pRsp) < 0) return -1; +void tDeleteMqDataRsp(void *rsp) { + tDeleteMqDataRspCommon(rsp); +} +int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const void *rsp) { + if (tEncodeMqDataRspCommon(pEncoder, rsp) < 0) return -1; + + const STaosxRsp *pRsp = (const STaosxRsp *)rsp; if (tEncodeI32(pEncoder, pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { for (int32_t i = 0; i < pRsp->createTableNum; i++) { @@ -9014,19 +9015,17 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { return 0; } -int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp, int8_t dataVersion) { - if (dataVersion >= MQ_DATA_RSP_VERSION){ - if (tDecodeI8(pDecoder, &dataVersion) < 0) return -1; - } - if (tDecodeMqDataRspCommon(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1; +int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, void *rsp) { + if (tDecodeMqDataRspCommon(pDecoder, rsp) < 0) return -1; + STaosxRsp *pRsp = (STaosxRsp *)rsp; if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1; if (pRsp->createTableNum) { pRsp->createTableLen = taosArrayInit(pRsp->createTableNum, sizeof(int32_t)); pRsp->createTableReq = taosArrayInit(pRsp->createTableNum, sizeof(void *)); for (int32_t i = 0; i < pRsp->createTableNum; i++) { void * pCreate = NULL; - uint64_t len; + uint64_t len = 0; if (tDecodeBinaryAlloc(pDecoder, &pCreate, &len) < 0) return -1; int32_t l = (int32_t)len; taosArrayPush(pRsp->createTableLen, &l); @@ -9037,20 +9036,13 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp, int8_t dataVersion return 0; } -void tDeleteSTaosxRsp(STaosxRsp *pRsp) { - pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen); - taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); - pRsp->blockData = NULL; - taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper); - pRsp->blockSchema = NULL; - taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); - pRsp->blockTbName = NULL; +void tDeleteSTaosxRsp(void *rsp) { + tDeleteMqDataRspCommon(rsp); + STaosxRsp *pRsp = (STaosxRsp *)rsp; pRsp->createTableLen = taosArrayDestroy(pRsp->createTableLen); taosArrayDestroyP(pRsp->createTableReq, (FDelete)taosMemoryFree); pRsp->createTableReq = NULL; - tOffsetDestroy(&pRsp->reqOffset); - tOffsetDestroy(&pRsp->rspOffset); } int32_t tEncodeSSingleDeleteReq(SEncoder *pEncoder, const SSingleDeleteReq *pReq) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index ed9333f480..c7ae941389 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -648,7 +648,6 @@ static SMqConsumerObj* buildSubConsumer(SMnode *pMnode, SCMSubscribeReq *subscri _over: mndReleaseConsumer(pMnode, pExistedConsumer); tDeleteSMqConsumerObj(pConsumerNew); - taosArrayDestroyP(subscribe->topicNames, (FDelete)taosMemoryFree); return NULL; } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 2a076cfc61..bd8b73ed33 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -119,8 +119,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded); -int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); -int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, void* pRsp, int32_t numOfCols, int8_t precision); +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t type, int32_t vgId); int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); @@ -154,9 +154,9 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); // tq util int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); -int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, +int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const void* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); -int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset); +int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset); void tqUpdateNodeStage(STQ* pTq, bool isLeader); int32_t tqSetDstTableDataPayload(uint64_t suid, const STSchema* pTSchema, int32_t blockIndex, SSDataBlock* pDataBlock, SSubmitTbData* pTableData, const char* id); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b2b65b54cb..43308ce1f5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -153,10 +153,10 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { } SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, req.reqOffset); - dataRsp.blockNum = 0; + tqInitDataRsp(&dataRsp.common, req.reqOffset); + dataRsp.common.blockNum = 0; char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.reqOffset); tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, reqId:0x%" PRIx64, req.consumerId, vgId, buf, req.reqId); @@ -165,7 +165,7 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) { return 0; } -int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, +int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const void* pRsp, int32_t type, int32_t vgId) { int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); @@ -174,11 +174,11 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* char buf1[TSDB_OFFSET_LEN] = {0}; char buf2[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf1, TSDB_OFFSET_LEN, &pRsp->reqOffset); - tFormatOffset(buf2, TSDB_OFFSET_LEN, &pRsp->rspOffset); + tFormatOffset(buf1, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->reqOffset); + tFormatOffset(buf2, TSDB_OFFSET_LEN, &((SMqDataRspCommon*)pRsp)->rspOffset); tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s, reqId:0x%" PRIx64, - vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); + vgId, pReq->consumerId, pReq->epoch, ((SMqDataRspCommon*)pRsp)->blockNum, buf1, buf2, pReq->reqId); return 0; } @@ -499,7 +499,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { taosRUnLockLatch(&pTq->lock); SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, req.reqOffset); + tqInitDataRsp(&dataRsp.common, req.reqOffset); if (req.useSnapshot == true) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); @@ -508,10 +508,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } - dataRsp.rspOffset.type = TMQ_OFFSET__LOG; + dataRsp.common.rspOffset.type = TMQ_OFFSET__LOG; if (reqOffset.type == TMQ_OFFSET__LOG) { - dataRsp.rspOffset.version = reqOffset.version; + dataRsp.common.rspOffset.version = reqOffset.version; } else if (reqOffset.type < 0) { STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); if (pOffset != NULL) { @@ -522,17 +522,17 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } - dataRsp.rspOffset.version = pOffset->val.version; + dataRsp.common.rspOffset.version = pOffset->val.version; tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%" PRId64, consumerId, vgId, - req.subKey, dataRsp.rspOffset.version); + req.subKey, dataRsp.common.rspOffset.version); } else { if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { - dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position + dataRsp.common.rspOffset.version = sver; // not consume yet, set the earliest position } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - dataRsp.rspOffset.version = ever; + dataRsp.common.rspOffset.version = ever; } tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%" PRId64, consumerId, vgId, req.subKey, - dataRsp.rspOffset.version); + dataRsp.common.rspOffset.version); } } else { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 017d5247d8..08f1689f2f 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -15,7 +15,7 @@ #include "tq.h" -int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { +int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, void* pRsp, int32_t numOfCols, int8_t precision) { int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) { @@ -30,22 +30,22 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); actualLen += sizeof(SRetrieveTableRspForTmq); - taosArrayPush(pRsp->blockDataLen, &actualLen); - taosArrayPush(pRsp->blockData, &buf); + taosArrayPush(((SMqDataRspCommon*)pRsp)->blockDataLen, &actualLen); + taosArrayPush(((SMqDataRspCommon*)pRsp)->blockData, &buf); return TSDB_CODE_SUCCESS; } -static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) { +static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, void* pRsp) { SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper); if (pSW == NULL) { return -1; } - taosArrayPush(pRsp->blockSchema, &pSW); + taosArrayPush(((SMqDataRspCommon*)pRsp)->blockSchema, &pSW); return 0; } -static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) { +static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, void* pRsp, int32_t n) { SMetaReader mr = {0}; metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK); @@ -57,7 +57,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in for (int32_t i = 0; i < n; i++) { char* tbName = taosStrdup(mr.me.name); - taosArrayPush(pRsp->blockTbName, &tbName); + taosArrayPush(((SMqDataRspCommon*)pRsp)->blockTbName, &tbName); } metaReaderClear(&mr); return 0; @@ -125,7 +125,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* return code; } - pRsp->blockNum++; + pRsp->common.blockNum++; if (pDataBlock == NULL) { blockDataDestroy(pHandle->block); pHandle->block = NULL; @@ -149,7 +149,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* return code; } - pRsp->blockNum++; + pRsp->common.blockNum++; totalRows += pDataBlock->info.rows; if (totalRows >= tmqRowSize) { break; @@ -158,8 +158,8 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* } tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", - pHandle->consumerId, vgId, pRsp->blockNum, totalRows); - qStreamExtractOffset(task, &pRsp->rspOffset); + pHandle->consumerId, vgId, pRsp->common.blockNum, totalRows); + qStreamExtractOffset(task, &pRsp->common.rspOffset); return 0; } @@ -186,7 +186,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta tqDebug("tmqsnap task execute end, get %p", pDataBlock); if (pDataBlock != NULL && pDataBlock->info.rows > 0) { - if (pRsp->withTbName) { + if (pRsp->common.withTbName) { if (pOffset->type == TMQ_OFFSET__LOG) { int64_t uid = pExec->pTqReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) { @@ -194,21 +194,21 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } } else { char* tbName = taosStrdup(qExtractTbnameFromTask(task)); - taosArrayPush(pRsp->blockTbName, &tbName); + taosArrayPush(pRsp->common.blockTbName, &tbName); } } - if (pRsp->withSchema) { + if (pRsp->common.withSchema) { if (pOffset->type == TMQ_OFFSET__LOG) { tqAddBlockSchemaToRsp(pExec, pRsp); } else { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); - taosArrayPush(pRsp->blockSchema, &pSW); + taosArrayPush(pRsp->common.blockSchema, &pSW); } } tqAddBlockDataToRsp(pDataBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pDataBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision); - pRsp->blockNum++; + pRsp->common.blockNum++; if (pOffset->type == TMQ_OFFSET__LOG) { continue; } else { @@ -234,13 +234,13 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); - qStreamExtractOffset(task, &pRsp->rspOffset); + qStreamExtractOffset(task, &pRsp->common.rspOffset); break; } - if (pRsp->blockNum > 0) { + if (pRsp->common.blockNum > 0) { tqDebug("tmqsnap task exec exited, get data"); - qStreamExtractOffset(task, &pRsp->rspOffset); + qStreamExtractOffset(task, &pRsp->common.rspOffset); break; } } @@ -268,7 +268,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) { goto loop_table; } - if (pRsp->withTbName) { + if (pRsp->common.withTbName) { int64_t uid = pExec->pTqReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { goto loop_table; @@ -312,8 +312,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR *totalRows += pBlock->info.rows; blockDataFreeRes(pBlock); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); - taosArrayPush(pRsp->blockSchema, &pSW); - pRsp->blockNum++; + taosArrayPush(pRsp->common.blockSchema, &pSW); + pRsp->common.blockNum++; } continue; loop_table: @@ -336,7 +336,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) { goto loop_db; } - if (pRsp->withTbName) { + if (pRsp->common.withTbName) { int64_t uid = pExec->pTqReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { goto loop_db; @@ -380,8 +380,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR *totalRows += pBlock->info.rows; blockDataFreeRes(pBlock); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); - taosArrayPush(pRsp->blockSchema, &pSW); - pRsp->blockNum++; + taosArrayPush(pRsp->common.blockSchema, &pSW); + pRsp->common.blockNum++; } continue; loop_db: diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5e5c77265b..a5bb01f6c0 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -18,7 +18,7 @@ static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp, int32_t vgId); -int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { +int32_t tqInitDataRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) { tOffsetCopy(&pRsp->reqOffset, &pOffset); tOffsetCopy(&pRsp->rspOffset, &pOffset); @@ -39,7 +39,7 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader); } -static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) { +static int32_t tqInitTaosxRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) { tOffsetCopy(&pRsp->reqOffset, &pOffset); tOffsetCopy(&pRsp->rspOffset, &pOffset); @@ -110,9 +110,9 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand SMqDataRsp dataRsp = {0}; tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer + 1); - tqInitDataRsp(&dataRsp, *pOffsetVal); + tqInitDataRsp(&dataRsp.common, *pOffsetVal); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId, - pHandle->subKey, vgId, dataRsp.rspOffset.version); + pHandle->subKey, vgId, dataRsp.common.rspOffset.version); int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); tDeleteMqDataRsp(&dataRsp); @@ -137,7 +137,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, terrno = 0; SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, *pOffset); + tqInitDataRsp(&dataRsp.common, *pOffset); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest); @@ -146,11 +146,11 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, } // till now, all data has been transferred to consumer, new data needs to push client once arrived. - if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.blockNum == 0) { + if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST && dataRsp.common.blockNum == 0) { // lock taosWLockLatch(&pTq->lock); int64_t ver = walGetCommittedVer(pTq->pVnode->pWal); - if (dataRsp.rspOffset.version > ver) { // check if there are data again to avoid lost data + if (dataRsp.common.rspOffset.version > ver) { // check if there are data again to avoid lost data code = tqRegisterPushHandle(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); goto end; @@ -158,15 +158,15 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, taosWUnLockLatch(&pTq->lock); } - tOffsetCopy(&dataRsp.reqOffset, pOffset); // reqOffset represents the current date offset, may be changed if wal not exists - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + tOffsetCopy(&dataRsp.common.reqOffset, pOffset); // reqOffset represents the current date offset, may be changed if wal not exists + code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); end : { char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset); + tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.common.rspOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", - consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); + consumerId, pHandle->subKey, vgId, dataRsp.common.blockNum, buf, pRequest->reqId, code); tDeleteMqDataRsp(&dataRsp); return code; } @@ -194,7 +194,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int32_t vgId = TD_VID(pTq->pVnode); SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, *offset); + tqInitTaosxRsp(&taosxRsp.common, *offset); if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { @@ -214,13 +214,13 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 ",ts:%" PRId64, - pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, - taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts); - if (taosxRsp.blockNum > 0) { - code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.common.blockNum, taosxRsp.common.rspOffset.type, + taosxRsp.common.rspOffset.uid, taosxRsp.common.rspOffset.ts); + if (taosxRsp.common.blockNum > 0) { + code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } else { - tOffsetCopy(offset, &taosxRsp.rspOffset); + tOffsetCopy(offset, &taosxRsp.common.rspOffset); } } @@ -235,9 +235,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, ASSERT(savedEpoch <= pRequest->epoch); if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer); code = tqSendDataRsp( - pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, + pHandle, pMsg, pRequest, &taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -249,9 +249,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, // process meta if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer); code = tqSendDataRsp( - pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, + pHandle, pMsg, pRequest, &taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -292,9 +292,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); + tqOffsetResetToLog(&taosxRsp.common.rspOffset, fetchVer + 1); code = tqSendDataRsp( - pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, + pHandle, pMsg, pRequest, &taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } else { @@ -386,7 +386,7 @@ int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPoll return 0; } -int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, +int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const void* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever) { int32_t len = 0; int32_t code = 0; @@ -394,7 +394,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { tEncodeSize(tEncodeMqDataRsp, pRsp, len, code); } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { - tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); + tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); } if (code < 0) { @@ -418,7 +418,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) { tEncodeMqDataRsp(&encoder, pRsp); } else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { - tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp); + tEncodeSTaosxRsp(&encoder, pRsp); } tEncoderClear(&encoder);