feat:[TD-32166] check input params for clientRaw

This commit is contained in:
wangmm0220 2024-12-09 18:01:17 +08:00
parent 50087851f3
commit c6932abcca
1 changed files with 235 additions and 71 deletions

View File

@ -52,10 +52,21 @@
#define TMQ_META_VERSION "1.0"
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen);
static tb_uid_t processSuid(tb_uid_t suid, char* db) {
if (db == NULL) {
return suid;
}
return suid + MurmurHash3_32(db, strlen(db));
}
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
SColCmprWrapper* pColCmprRow, cJSON** pJson) {
if (schemaRow == NULL || schemaTag == NULL || name == NULL || pColCmprRow == NULL || pJson == NULL) {
uError("invalid parameter, schemaRow:%p, schemaTag:%p, name:%p, pColCmprRow:%p, pJson:%p", schemaRow, schemaTag,
name, pColCmprRow, pJson);
return;
}
int32_t code = TSDB_CODE_SUCCESS;
int8_t buildDefaultCompress = 0;
if (pColCmprRow->nCols <= 0) {
@ -168,6 +179,9 @@ end:
}
static int32_t setCompressOption(cJSON* json, uint32_t para) {
if (json == NULL) {
return TSDB_CODE_INVALID_PARA;
}
uint8_t encode = COMPRESS_L1_TYPE_U32(para);
int32_t code = 0;
if (encode != 0) {
@ -201,6 +215,10 @@ end:
return code;
}
static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** pJson) {
if (alterData == NULL || pJson == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SMAlterStbReq req = {0};
cJSON* json = NULL;
char* string = NULL;
@ -344,6 +362,10 @@ end:
}
static void processCreateStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
if (metaRsp == NULL || pJson == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SVCreateStbReq req = {0};
SDecoder coder;
@ -364,6 +386,10 @@ end:
}
static void processAlterStb(SMqMetaRsp* metaRsp, cJSON** pJson) {
if (metaRsp == NULL || pJson == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SVCreateStbReq req = {0};
SDecoder coder = {0};
uDebug("alter stable data:%p", metaRsp);
@ -384,6 +410,10 @@ end:
}
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
if (json == NULL || pCreateReq == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
STag* pTag = (STag*)pCreateReq->ctb.pTag;
char* sname = pCreateReq->ctb.stbName;
char* name = pCreateReq->name;
@ -491,6 +521,10 @@ end:
}
static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSON** pJson) {
if (pJson == NULL || pCreateReq == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
int32_t code = 0;
char* string = NULL;
cJSON* json = cJSON_CreateObject();
@ -519,6 +553,10 @@ end:
}
static void processCreateTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
if (pJson == NULL || metaRsp == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SDecoder decoder = {0};
SVCreateTbBatchReq req = {0};
SVCreateTbReq* pCreateReq;
@ -549,6 +587,10 @@ end:
}
static void processAutoCreateTable(SMqDataRsp* rsp, char** string) {
if (rsp == NULL || string == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SDecoder* decoder = NULL;
SVCreateTbReq* pCreateReq = NULL;
int32_t code = 0;
@ -599,6 +641,10 @@ end:
}
static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
if (pJson == NULL || metaRsp == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SDecoder decoder = {0};
SVAlterTbReq vAlterTbReq = {0};
char* string = NULL;
@ -838,6 +884,10 @@ end:
}
static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
if (pJson == NULL || metaRsp == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SDecoder decoder = {0};
SVDropStbReq req = {0};
cJSON* json = NULL;
@ -872,6 +922,10 @@ end:
*pJson = json;
}
static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
if (pJson == NULL || metaRsp == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SDeleteRes req = {0};
SDecoder coder = {0};
cJSON* json = NULL;
@ -909,6 +963,10 @@ end:
}
static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
if (pJson == NULL || metaRsp == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SDecoder decoder = {0};
SVDropTbBatchReq req = {0};
cJSON* json = NULL;
@ -945,7 +1003,11 @@ end:
*pJson = json;
}
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
static int32_t taosCreateStb(TAOS* taos, void* meta, uint32_t metaLen) {
if (taos == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
SVCreateStbReq req = {0};
SDecoder coder;
SMCreateStbReq pReq = {0};
@ -960,8 +1022,8 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
uint32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
@ -1055,7 +1117,11 @@ end:
return code;
}
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
static int32_t taosDropStb(TAOS* taos, void* meta, uint32_t metaLen) {
if (taos == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
SVDropStbReq req = {0};
SDecoder coder = {0};
SMDropStbReq pReq = {0};
@ -1070,8 +1136,8 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
uint32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVDropStbReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
@ -1160,11 +1226,19 @@ typedef struct SVgroupCreateTableBatch {
} SVgroupCreateTableBatch;
static void destroyCreateTbReqBatch(void* data) {
if (data == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SVgroupCreateTableBatch* pTbBatch = (SVgroupCreateTableBatch*)data;
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
static int32_t taosCreateTable(TAOS* taos, void* meta, uint32_t metaLen) {
if (taos == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
SVCreateTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
@ -1182,8 +1256,8 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
uint32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVCreateTbBatchReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
@ -1346,11 +1420,19 @@ typedef struct SVgroupDropTableBatch {
} SVgroupDropTableBatch;
static void destroyDropTbReqBatch(void* data) {
if (data == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SVgroupDropTableBatch* pTbBatch = (SVgroupDropTableBatch*)data;
taosArrayDestroy(pTbBatch->req.pArray);
}
static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
static int32_t taosDropTable(TAOS* taos, void* meta, uint32_t metaLen) {
if (taos == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
SVDropTbBatchReq req = {0};
SDecoder coder = {0};
int32_t code = TSDB_CODE_SUCCESS;
@ -1367,8 +1449,8 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
uint32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeSVDropTbBatchReq(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
@ -1462,7 +1544,11 @@ end:
return code;
}
static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
static int32_t taosDeleteData(TAOS* taos, void* meta, uint32_t metaLen) {
if (taos == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
SDeleteRes req = {0};
SDecoder coder = {0};
char sql[256] = {0};
@ -1471,8 +1557,8 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
uDebug("connId:0x%" PRIx64 " delete data, meta:%p, len:%d", *(int64_t*)taos, meta, metaLen);
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
uint32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&coder, data, len);
if (tDecodeDeleteRes(&coder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
@ -1497,7 +1583,11 @@ end:
return code;
}
static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
static int32_t taosAlterTable(TAOS* taos, void* meta, uint32_t metaLen) {
if (taos == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
SVAlterTbReq req = {0};
SDecoder dcoder = {0};
int32_t code = TSDB_CODE_SUCCESS;
@ -1514,8 +1604,8 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
goto end;
}
// decode and process req
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
int32_t len = metaLen - sizeof(SMsgHead);
void* data = POINTER_SHIFT(meta, sizeof(SMsgHead));
uint32_t len = metaLen - sizeof(SMsgHead);
tDecoderInit(&dcoder, data, len);
if (tDecodeSVAlterTbReq(&dcoder, &req) < 0) {
code = TSDB_CODE_INVALID_PARA;
@ -1619,7 +1709,8 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
int taos_write_raw_block_with_fields_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname,
TAOS_FIELD* fields, int numFields, int64_t reqid) {
if (!taos || !pData || !tbname) {
if (taos == NULL || pData == NULL || tbname == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -1680,7 +1771,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
}
int taos_write_raw_block_with_reqid(TAOS* taos, int rows, char* pData, const char* tbname, int64_t reqid) {
if (!taos || !pData || !tbname) {
if (taos == NULL || pData == NULL || tbname == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
@ -1736,6 +1827,10 @@ end:
}
static void* getRawDataFromRes(void* pRetrieve) {
if (pRetrieve == NULL) {
uError("invalid parameter in %s", __func__);
return NULL;
}
void* rawData = NULL;
// deal with compatibility
if (*(int64_t*)pRetrieve == 0) {
@ -1747,6 +1842,10 @@ static void* getRawDataFromRes(void* pRetrieve) {
}
static int32_t buildCreateTbMap(SMqDataRsp* rsp, SHashObj* pHashObj) {
if (rsp == NULL || pHashObj == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
// find schema data info
int32_t code = 0;
SVCreateTbReq pCreateReq = {0};
@ -1806,11 +1905,19 @@ typedef struct {
} tbInfo;
static void tmqFreeMeta(void* data) {
if (data == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
STableMeta* pTableMeta = *(STableMeta**)data;
taosMemoryFree(pTableMeta);
}
static void freeRawCache(void* data) {
if (data == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
rawCacheInfo* pRawCache = (rawCacheInfo*)data;
taosHashCleanup(pRawCache->pMetaHash);
taosHashCleanup(pRawCache->pNameHash);
@ -1829,6 +1936,10 @@ static int32_t initRawCacheHash() {
}
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
if (rawData == NULL || pTableMeta == NULL || pSW == NULL) {
uError("invalid parameter in %s", __func__);
return false;
}
char* p = (char*)rawData;
// | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each
// column length |
@ -1864,6 +1975,10 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe
}
static int32_t getRawCache(SHashObj** pVgHash, SHashObj** pNameHash, SHashObj** pMetaHash, void* key) {
if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || key == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
void* cacheInfo = taosHashGet(writeRawCache, &key, POINTER_BYTES);
if (cacheInfo == NULL) {
@ -1892,6 +2007,10 @@ end:
}
static int32_t buildRawRequest(TAOS* taos, SRequestObj** pRequest, SCatalog** pCatalog, SRequestConnInfo* conn) {
if (taos == NULL || pRequest == NULL || pCatalog == NULL || conn == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
RAW_RETURN_CHECK(buildRequest(*(int64_t*)taos, "", 0, NULL, false, pRequest, 0));
(*pRequest)->syncQuery = true;
@ -1911,26 +2030,38 @@ end:
}
typedef int32_t _raw_decode_func_(SDecoder* pDecoder, SMqDataRsp* pRsp);
static int32_t decodeRawData(SDecoder* decoder, void* data, int32_t dataLen, _raw_decode_func_ func,
static int32_t decodeRawData(SDecoder* decoder, void* data, uint32_t dataLen, _raw_decode_func_ func,
SMqRspObj* rspObj) {
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION) {
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
dataLen -= sizeof(int8_t) + sizeof(int32_t);
if (decoder == NULL || data == NULL || func == NULL || rspObj == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
int8_t dataVersion = *(int8_t*)data;
if (dataVersion >= MQ_DATA_RSP_VERSION) {
data = POINTER_SHIFT(data, sizeof(int8_t) + sizeof(int32_t));
if (dataLen < sizeof(int8_t) + sizeof(int32_t)) {
return TSDB_CODE_INVALID_PARA;
}
dataLen -= sizeof(int8_t) + sizeof(int32_t);
}
rspObj->resIter = -1;
tDecoderInit(decoder, data, dataLen);
int32_t code = func(decoder, &rspObj->dataRsp);
if (code != 0) {
SET_ERROR_MSG("decode mq taosx data rsp failed");
rspObj->resIter = -1;
tDecoderInit(decoder, data, dataLen);
int32_t code = func(decoder, &rspObj->dataRsp);
if (code != 0) {
SET_ERROR_MSG("decode mq taosx data rsp failed");
}
return code;
return code;
}
static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj* pMetaHash,
SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
pMeta == NULL || pSW == NULL || rawData == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
STableMeta* pTableMeta = NULL;
tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, pName->tname, strlen(pName->tname));
@ -1987,7 +2118,11 @@ end:
return code;
}
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
if (taos == NULL || data == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
SQuery* pQuery = NULL;
SMqRspObj rspObj = {0};
@ -2060,7 +2195,11 @@ end:
return code;
}
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) {
static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
if (taos == NULL || data == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
int32_t code = TSDB_CODE_SUCCESS;
SQuery* pQuery = NULL;
SMqRspObj rspObj = {0};
@ -2149,6 +2288,10 @@ end:
}
static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
if (pMetaRsp == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
processCreateStb(pMetaRsp, meta);
} else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
@ -2169,6 +2312,10 @@ static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) {
}
static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
if (pMsgRsp == NULL || string == NULL) {
uError("invalid parameter in %s", __func__);
return;
}
SDecoder coder;
SMqBatchMetaRsp rsp = {0};
int32_t code = 0;
@ -2214,7 +2361,10 @@ end:
}
char* tmq_get_json_meta(TAOS_RES* res) {
if (res == NULL) return NULL;
if (res == NULL) {
uError("invalid parameter in %s", __func__);
return NULL;
}
uDebug("tmq_get_json_meta res:%p", res);
if (!TD_RES_TMQ_META(res) && !TD_RES_TMQ_METADATA(res) && !TD_RES_TMQ_BATCH_META(res)) {
return NULL;
@ -2242,6 +2392,10 @@ char* tmq_get_json_meta(TAOS_RES* res) {
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }
static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
if (pRsp == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
SEncoder coder = {0};
tEncoderInit(&coder, NULL, 0);
if (tEncodeSTqOffsetVal(&coder, &pRsp->reqOffset) < 0) return -1;
@ -2253,44 +2407,48 @@ static int32_t getOffSetLen(const SMqDataRsp* pRsp) {
typedef int32_t __encode_func__(SEncoder* pEncoder, const SMqDataRsp* pRsp);
static int32_t encodeMqDataRsp(__encode_func__* encodeFunc, SMqDataRsp* rspObj, tmq_raw_data* raw) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder = {0};
void* buf = NULL;
tEncodeSize(encodeFunc, rspObj, len, code);
if (code < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
if (raw == NULL || encodeFunc == NULL || rspObj == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
len += sizeof(int8_t) + sizeof(int32_t);
buf = taosMemoryCalloc(1, len);
if (buf == NULL) {
code = terrno;
goto FAILED;
uint32_t len = 0;
int32_t code = 0;
SEncoder encoder = {0};
void* buf = NULL;
tEncodeSize(encodeFunc, rspObj, len, code);
if (code < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
tEncoderInit(&encoder, buf, len);
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
len += sizeof(int8_t) + sizeof(int32_t);
buf = taosMemoryCalloc(1, len);
if (buf == NULL) {
code = terrno;
goto FAILED;
}
int32_t offsetLen = getOffSetLen(rspObj);
if (offsetLen <= 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
tEncoderInit(&encoder, buf, len);
if (tEncodeI8(&encoder, MQ_DATA_RSP_VERSION) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
if (tEncodeI32(&encoder, offsetLen) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
int32_t offsetLen = getOffSetLen(rspObj);
if (offsetLen <= 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
if (encodeFunc(&encoder, rspObj) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
if (tEncodeI32(&encoder, offsetLen) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
tEncoderClear(&encoder);
if (encodeFunc(&encoder, rspObj) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto FAILED;
}
tEncoderClear(&encoder);
raw->raw = buf;
raw->raw_len = len;
return code;
raw->raw = buf;
raw->raw_len = len;
return code;
FAILED:
tEncoderClear(&encoder);
taosMemoryFree(buf);
@ -2298,13 +2456,14 @@ FAILED:
}
int32_t tmq_get_raw(TAOS_RES* res, tmq_raw_data* raw) {
if (!raw || !res) {
if (raw == NULL || res == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
SMqRspObj* rspObj = ((SMqRspObj*)res);
if (TD_RES_TMQ_META(res)) {
raw->raw = rspObj->metaRsp.metaRsp;
raw->raw_len = rspObj->metaRsp.metaRspLen;
raw->raw_len = rspObj->metaRsp.metaRspLen >= 0 ? rspObj->metaRsp.metaRspLen : 0;
raw->raw_type = rspObj->metaRsp.resMsgType;
uDebug("tmq get raw type meta:%p", raw);
} else if (TD_RES_TMQ(res)) {
@ -2364,6 +2523,10 @@ static int32_t writeRawInit() {
}
static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) {
if (taos == NULL || buf == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
if (writeRawInit() != 0) {
return TSDB_CODE_INTERNAL_ERROR;
}
@ -2394,15 +2557,16 @@ static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type)
int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
if (taos == NULL || raw.raw == NULL || raw.raw_len <= 0) {
SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
uError("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
return TSDB_CODE_INVALID_PARA;
}
return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
}
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen) {
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, uint32_t metaLen) {
if (taos == NULL || meta == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
SMqBatchMetaRsp rsp = {0};