From c6932abccacd89efcee526cc7d5e71552879ca4e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 Dec 2024 18:01:17 +0800 Subject: [PATCH] feat:[TD-32166] check input params for clientRaw --- source/client/src/clientRawBlockWrite.c | 306 ++++++++++++++++++------ 1 file changed, 235 insertions(+), 71 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 3a23d38375..6a83266c5e 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -52,10 +52,21 @@ #define TMQ_META_VERSION "1.0" -static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); -static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } +static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen); +static tb_uid_t processSuid(tb_uid_t suid, char* db) { + if (db == NULL) { + return suid; + } + return suid + MurmurHash3_32(db, strlen(db)); +} + static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t, SColCmprWrapper* pColCmprRow, cJSON** pJson) { + if (schemaRow == NULL || schemaTag == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) { + uError("invalid parameter, schemaRow:%p, schemaTag:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, schemaTag, + name, pColCmprRow, pJson); + return; + } int32_t code = TSDB_CODE_SUCCESS; int8_t buildDefaultCompress = 0; if (pColCmprRow->nCols <= 0) { @@ -168,6 +179,9 @@ end: } static int32_t setCompressOption(cJSON* json, uint32_t para) { + if (json == NULL) { + return TSDB_CODE_INVALID_PARA; + } uint8_t encode = COMPRESS_L1_TYPE_U32(para); int32_t code = 0; if (encode != 0) { @@ -201,6 +215,10 @@ end: return code; } static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) { + if (alterData == NULL || pJson == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SMAlterStbReq req = {0}; cJSON* json = NULL; char* string = NULL; @@ -344,6 +362,10 @@ end: } static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) { + if (metaRsp == NULL || pJson == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SVCreateStbReq req = {0}; SDecoder coder; @@ -364,6 +386,10 @@ end: } static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) { + if (metaRsp == NULL || pJson == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SVCreateStbReq req = {0}; SDecoder coder = {0}; uDebug("alter stable data:%p", metaRsp); @@ -384,6 +410,10 @@ end: } static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { + if (json == NULL || pCreateReq == NULL) { + uError("invalid parameter in %s", __func__); + return; + } STag* pTag = (STag*)pCreateReq->ctb.pTag; char* sname = pCreateReq->ctb.stbName; char* name = pCreateReq->name; @@ -491,6 +521,10 @@ end: } static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) { + if (pJson == NULL || pCreateReq == NULL) { + uError("invalid parameter in %s", __func__); + return; + } int32_t code = 0; char* string = NULL; cJSON* json = cJSON_CreateObject(); @@ -519,6 +553,10 @@ end: } static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) { + if (pJson == NULL || metaRsp == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SDecoder decoder = {0}; SVCreateTbBatchReq req = {0}; SVCreateTbReq* pCreateReq; @@ -549,6 +587,10 @@ end: } static void processAutoCreateTable(SMqDataRsp* rsp, char** string) { + if (rsp == NULL || string == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SDecoder* decoder = NULL; SVCreateTbReq* pCreateReq = NULL; int32_t code = 0; @@ -599,6 +641,10 @@ end: } static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { + if (pJson == NULL || metaRsp == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SDecoder decoder = {0}; SVAlterTbReq vAlterTbReq = {0}; char* string = NULL; @@ -838,6 +884,10 @@ end: } static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) { + if (pJson == NULL || metaRsp == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SDecoder decoder = {0}; SVDropStbReq req = {0}; cJSON* json = NULL; @@ -872,6 +922,10 @@ end: *pJson = json; } static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) { + if (pJson == NULL || metaRsp == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SDeleteRes req = {0}; SDecoder coder = {0}; cJSON* json = NULL; @@ -909,6 +963,10 @@ end: } static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) { + if (pJson == NULL || metaRsp == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SDecoder decoder = {0}; SVDropTbBatchReq req = {0}; cJSON* json = NULL; @@ -945,7 +1003,11 @@ end: *pJson = json; } -static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { +static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) { + if (taos == NULL || meta == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } SVCreateStbReq req = {0}; SDecoder coder; SMCreateStbReq pReq = {0}; @@ -960,8 +1022,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { goto end; } // decode and process req - void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); - int32_t len = metaLen - sizeof(SMsgHead); + void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); + uint32_t len = metaLen - sizeof(SMsgHead); tDecoderInit(&coder, data, len); if (tDecodeSVCreateStbReq(&coder, &req) < 0) { code = TSDB_CODE_INVALID_PARA; @@ -1055,7 +1117,11 @@ end: return code; } -static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { +static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) { + if (taos == NULL || meta == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } SVDropStbReq req = {0}; SDecoder coder = {0}; SMDropStbReq pReq = {0}; @@ -1070,8 +1136,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { goto end; } // decode and process req - void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); - int32_t len = metaLen - sizeof(SMsgHead); + void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); + uint32_t len = metaLen - sizeof(SMsgHead); tDecoderInit(&coder, data, len); if (tDecodeSVDropStbReq(&coder, &req) < 0) { code = TSDB_CODE_INVALID_PARA; @@ -1160,11 +1226,19 @@ typedef struct SVgroupCreateTableBatch { } SVgroupCreateTableBatch; static void destroyCreateTbReqBatch(void* data) { + if (data == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data; taosArrayDestroy(pTbBatch->req.pArray); } -static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { +static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) { + if (taos == NULL || meta == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } SVCreateTbBatchReq req = {0}; SDecoder coder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -1182,8 +1256,8 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } // decode and process req - void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); - int32_t len = metaLen - sizeof(SMsgHead); + void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); + uint32_t len = metaLen - sizeof(SMsgHead); tDecoderInit(&coder, data, len); if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) { code = TSDB_CODE_INVALID_PARA; @@ -1346,11 +1420,19 @@ typedef struct SVgroupDropTableBatch { } SVgroupDropTableBatch; static void destroyDropTbReqBatch(void* data) { + if (data == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data; taosArrayDestroy(pTbBatch->req.pArray); } -static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { +static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) { + if (taos == NULL || meta == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } SVDropTbBatchReq req = {0}; SDecoder coder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -1367,8 +1449,8 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } // decode and process req - void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); - int32_t len = metaLen - sizeof(SMsgHead); + void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); + uint32_t len = metaLen - sizeof(SMsgHead); tDecoderInit(&coder, data, len); if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) { code = TSDB_CODE_INVALID_PARA; @@ -1462,7 +1544,11 @@ end: return code; } -static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { +static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) { + if (taos == NULL || meta == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } SDeleteRes req = {0}; SDecoder coder = {0}; char sql[256] = {0}; @@ -1471,8 +1557,8 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen); // decode and process req - void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); - int32_t len = metaLen - sizeof(SMsgHead); + void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); + uint32_t len = metaLen - sizeof(SMsgHead); tDecoderInit(&coder, data, len); if (tDecodeDeleteRes(&coder, &req) < 0) { code = TSDB_CODE_INVALID_PARA; @@ -1497,7 +1583,11 @@ end: return code; } -static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { +static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) { + if (taos == NULL || meta == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } SVAlterTbReq req = {0}; SDecoder dcoder = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -1514,8 +1604,8 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { goto end; } // decode and process req - void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); - int32_t len = metaLen - sizeof(SMsgHead); + void* data = POINTER_SHIFT(meta, sizeof(SMsgHead)); + uint32_t len = metaLen - sizeof(SMsgHead); tDecoderInit(&dcoder, data, len); if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) { code = TSDB_CODE_INVALID_PARA; @@ -1619,7 +1709,8 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, TAOS_FIELD* fields, int numFields, int64_t reqid) { - if (!taos || !pData || !tbname) { + if (taos == NULL || pData == NULL || tbname == NULL) { + uError("invalid parameter in %s", __func__); return TSDB_CODE_INVALID_PARA; } int32_t code = TSDB_CODE_SUCCESS; @@ -1680,7 +1771,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) } int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) { - if (!taos || !pData || !tbname) { + if (taos == NULL || pData == NULL || tbname == NULL) { return TSDB_CODE_INVALID_PARA; } int32_t code = TSDB_CODE_SUCCESS; @@ -1736,6 +1827,10 @@ end: } static void* getRawDataFromRes(void* pRetrieve) { + if (pRetrieve == NULL) { + uError("invalid parameter in %s", __func__); + return NULL; + } void* rawData = NULL; // deal with compatibility if (*(int64_t*)pRetrieve == 0) { @@ -1747,6 +1842,10 @@ static void* getRawDataFromRes(void* pRetrieve) { } static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) { + if (rsp == NULL || pHashObj == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } // find schema data info int32_t code = 0; SVCreateTbReq pCreateReq = {0}; @@ -1806,11 +1905,19 @@ typedef struct { } tbInfo; static void tmqFreeMeta(void* data) { + if (data == NULL) { + uError("invalid parameter in %s", __func__); + return; + } STableMeta* pTableMeta = *(STableMeta**)data; taosMemoryFree(pTableMeta); } static void freeRawCache(void* data) { + if (data == NULL) { + uError("invalid parameter in %s", __func__); + return; + } rawCacheInfo* pRawCache = (rawCacheInfo*)data; taosHashCleanup(pRawCache->pMetaHash); taosHashCleanup(pRawCache->pNameHash); @@ -1829,6 +1936,10 @@ static int32_t initRawCacheHash() { } static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) { + if (rawData == NULL || pTableMeta == NULL || pSW == NULL) { + uError("invalid parameter in %s", __func__); + return false; + } char* p = (char*)rawData; // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each // column length | @@ -1864,6 +1975,10 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe } static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) { + if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } int32_t code = 0; void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES); if (cacheInfo == NULL) { @@ -1892,6 +2007,10 @@ end: } static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) { + if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } int32_t code = 0; RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0)); (*pRequest)->syncQuery = true; @@ -1911,26 +2030,38 @@ end: } typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp); -static int32_t decodeRawData(SDecoder* decoder, void* data, int32_t dataLen, _raw_decode_func_ func, +static int32_t decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func, SMqRspObj* rspObj) { - int8_t dataVersion = *(int8_t*)data; - if (dataVersion >= MQ_DATA_RSP_VERSION) { - data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t)); - dataLen -= sizeof(int8_t) + sizeof(int32_t); + if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } + int8_t dataVersion = *(int8_t*)data; + if (dataVersion >= MQ_DATA_RSP_VERSION) { + data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t)); + if (dataLen < sizeof(int8_t) + sizeof(int32_t)) { + return TSDB_CODE_INVALID_PARA; + } + dataLen -= sizeof(int8_t) + sizeof(int32_t); } - rspObj->resIter = -1; - tDecoderInit(decoder, data, dataLen); - int32_t code = func(decoder, &rspObj->dataRsp); - if (code != 0) { - SET_ERROR_MSG("decode mq taosx data rsp failed"); + rspObj->resIter = -1; + tDecoderInit(decoder, data, dataLen); + int32_t code = func(decoder, &rspObj->dataRsp); + if (code != 0) { + SET_ERROR_MSG("decode mq taosx data rsp failed"); } - return code; + return code; } static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash, SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName, STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) { + if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL || + pMeta == NULL || pSW == NULL || rawData == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } int32_t code = 0; STableMeta* pTableMeta = NULL; tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname)); @@ -1987,7 +2118,11 @@ end: return code; } -static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { +static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) { + if (taos == NULL || data == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } int32_t code = TSDB_CODE_SUCCESS; SQuery* pQuery = NULL; SMqRspObj rspObj = {0}; @@ -2060,7 +2195,11 @@ end: return code; } -static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) { +static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) { + if (taos == NULL || data == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } int32_t code = TSDB_CODE_SUCCESS; SQuery* pQuery = NULL; SMqRspObj rspObj = {0}; @@ -2149,6 +2288,10 @@ end: } static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) { + if (pMetaRsp == NULL || meta == NULL) { + uError("invalid parameter in %s", __func__); + return; + } if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) { processCreateStb(pMetaRsp, meta); } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) { @@ -2169,6 +2312,10 @@ static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) { } static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) { + if (pMsgRsp == NULL || string == NULL) { + uError("invalid parameter in %s", __func__); + return; + } SDecoder coder; SMqBatchMetaRsp rsp = {0}; int32_t code = 0; @@ -2214,7 +2361,10 @@ end: } char* tmq_get_json_meta(TAOS_RES* res) { - if (res == NULL) return NULL; + if (res == NULL) { + uError("invalid parameter in %s", __func__); + return NULL; + } uDebug("tmq_get_json_meta res:%p", res); if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) { return NULL; @@ -2242,6 +2392,10 @@ char* tmq_get_json_meta(TAOS_RES* res) { void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } static int32_t getOffSetLen(const SMqDataRsp* pRsp) { + if (pRsp == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } SEncoder coder = {0}; tEncoderInit(&coder, NULL, 0); if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1; @@ -2253,44 +2407,48 @@ static int32_t getOffSetLen(const SMqDataRsp* pRsp) { typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp); static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) { - int32_t len = 0; - int32_t code = 0; - SEncoder encoder = {0}; - void* buf = NULL; - tEncodeSize(encodeFunc, rspObj, len, code); - if (code < 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; + if (raw == NULL || encodeFunc == NULL || rspObj == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; } - len += sizeof(int8_t) + sizeof(int32_t); - buf = taosMemoryCalloc(1, len); - if (buf == NULL) { - code = terrno; - goto FAILED; + uint32_t len = 0; + int32_t code = 0; + SEncoder encoder = {0}; + void* buf = NULL; + tEncodeSize(encodeFunc, rspObj, len, code); + if (code < 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; } - tEncoderInit(&encoder, buf, len); - if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; + len += sizeof(int8_t) + sizeof(int32_t); + buf = taosMemoryCalloc(1, len); + if (buf == NULL) { + code = terrno; + goto FAILED; } - int32_t offsetLen = getOffSetLen(rspObj); - if (offsetLen <= 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; + tEncoderInit(&encoder, buf, len); + if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; } - if (tEncodeI32(&encoder, offsetLen) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; + int32_t offsetLen = getOffSetLen(rspObj); + if (offsetLen <= 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; } - if (encodeFunc(&encoder, rspObj) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto FAILED; + if (tEncodeI32(&encoder, offsetLen) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; } - tEncoderClear(&encoder); + if (encodeFunc(&encoder, rspObj) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto FAILED; + } + tEncoderClear(&encoder); - raw->raw = buf; - raw->raw_len = len; - return code; + raw->raw = buf; + raw->raw_len = len; + return code; FAILED: tEncoderClear(&encoder); taosMemoryFree(buf); @@ -2298,13 +2456,14 @@ FAILED: } int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) { - if (!raw || !res) { + if (raw == NULL || res == NULL) { + uError("invalid parameter in %s", __func__); return TSDB_CODE_INVALID_PARA; } SMqRspObj* rspObj = ((SMqRspObj*)res); if (TD_RES_TMQ_META(res)) { raw->raw = rspObj->metaRsp.metaRsp; - raw->raw_len = rspObj->metaRsp.metaRspLen; + raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0; raw->raw_type = rspObj->metaRsp.resMsgType; uDebug("tmq get raw type meta:%p", raw); } else if (TD_RES_TMQ(res)) { @@ -2364,6 +2523,10 @@ static int32_t writeRawInit() { } static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) { + if (taos == NULL || buf == NULL) { + uError("invalid parameter in %s", __func__); + return TSDB_CODE_INVALID_PARA; + } if (writeRawInit() != 0) { return TSDB_CODE_INTERNAL_ERROR; } @@ -2394,15 +2557,16 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) { - SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw); + uError("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw); return TSDB_CODE_INVALID_PARA; } return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type); } -static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen) { +static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) { if (taos == NULL || meta == NULL) { + uError("invalid parameter in %s", __func__); return TSDB_CODE_INVALID_PARA; } SMqBatchMetaRsp rsp = {0};