enh: add strict message check for submit message

This commit is contained in:
Hongze Cheng 2025-01-08 17:29:32 +08:00
parent 0b71316894
commit 00b0a46060
5 changed files with 92 additions and 77 deletions

View File

@ -57,9 +57,9 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111
#define ONE ((uint8_t)1)
#define THREE ((uint8_t)3)
#define DIV_8(i) ((i) >> 3)
#define MOD_8(i) ((i) & 7)
#define MOD_8(i) ((i)&7)
#define DIV_4(i) ((i) >> 2)
#define MOD_4(i) ((i) & 3)
#define MOD_4(i) ((i)&3)
#define MOD_4_TIME_2(i) (MOD_4(i) << 1)
#define BIT1_SIZE(n) (DIV_8((n)-1) + 1)
#define BIT2_SIZE(n) (DIV_4((n)-1) + 1)
@ -201,8 +201,8 @@ int32_t tColDataSortMerge(SArray **arr);
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data);
// for encode/decode
int32_t tPutColData(uint8_t version, uint8_t *pBuf, SColData *pColData);
int32_t tGetColData(uint8_t version, uint8_t *pBuf, SColData *pColData);
int32_t tPutColData(uint8_t version, SEncoder *pEncoder, SColData *pColData);
int32_t tGetColData(uint8_t version, SDecoder *pDecoder, SColData *pColData);
// STRUCT ================================
struct STColumn {

View File

@ -118,6 +118,7 @@ static int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val);
static int32_t tDecodeFloat(SDecoder* pCoder, float* val);
static int32_t tDecodeDouble(SDecoder* pCoder, double* val);
static int32_t tDecodeBool(SDecoder* pCoder, bool* val);
static int32_t tDecodeBinaryWithSize(SDecoder* pCoder, uint32_t size, uint8_t** val);
static int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len);
static int32_t tDecodeCStrAndLen(SDecoder* pCoder, char** val, uint32_t* len);
static int32_t tDecodeCStr(SDecoder* pCoder, char** val);
@ -404,6 +405,19 @@ static int32_t tDecodeBool(SDecoder* pCoder, bool* val) {
return 0;
}
static FORCE_INLINE int32_t tDecodeBinaryWithSize(SDecoder* pCoder, uint32_t size, uint8_t** val) {
if (pCoder->pos + size > pCoder->size) {
TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE);
}
if (val) {
*val = pCoder->data + pCoder->pos;
}
pCoder->pos += size;
return 0;
}
static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len) {
uint32_t length = 0;
@ -412,21 +426,12 @@ static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint3
*len = length;
}
if (pCoder->pos + length > pCoder->size) {
TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE);
}
if (val) {
*val = pCoder->data + pCoder->pos;
}
pCoder->pos += length;
return 0;
TAOS_RETURN(tDecodeBinaryWithSize(pCoder, length, val));
}
static FORCE_INLINE int32_t tDecodeCStrAndLen(SDecoder* pCoder, char** val, uint32_t* len) {
TAOS_CHECK_RETURN(tDecodeBinary(pCoder, (uint8_t**)val, len));
if (*len > 0) { // notice!!! *len maybe 0
if (*len > 0) { // notice!!! *len maybe 0
(*len) -= 1;
}
return 0;
@ -497,7 +502,7 @@ static FORCE_INLINE int32_t tDecodeBinaryAlloc32(SDecoder* pCoder, void** val, u
static FORCE_INLINE int32_t tDecodeCStrAndLenAlloc(SDecoder* pCoder, char** val, uint64_t* len) {
TAOS_CHECK_RETURN(tDecodeBinaryAlloc(pCoder, (void**)val, len));
if (*len > 0){
if (*len > 0) {
(*len) -= 1;
}
return 0;

View File

@ -14,8 +14,8 @@
*/
#define _DEFAULT_SOURCE
#include "tglobal.h"
#include "tmsg.h"
#include "tglobal.h"
#undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_
@ -11639,8 +11639,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
TAOS_CHECK_EXIT(tEncodeU64v(pCoder, nColData));
for (uint64_t i = 0; i < nColData; i++) {
pCoder->pos +=
tPutColData(SUBMIT_REQUEST_VERSION, pCoder->data ? pCoder->data + pCoder->pos : NULL, &aColData[i]);
tPutColData(SUBMIT_REQUEST_VERSION, pCoder, &aColData[i]);
}
} else {
TAOS_CHECK_EXIT(tEncodeU64v(pCoder, TARRAY_SIZE(pSubmitTbData->aRowP)));
@ -11695,7 +11694,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
}
for (int32_t i = 0; i < nColData; ++i) {
pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, taosArrayReserve(pSubmitTbData->aCol, 1));
tGetColData(version, pCoder, taosArrayReserve(pSubmitTbData->aCol, 1));
}
} else {
uint64_t nRow;

View File

@ -2673,7 +2673,7 @@ static void (*tColDataGetValueImpl[])(SColData *pColData, int32_t iVal, SColVal
};
int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
if (iVal < 0 || iVal >= pColData->nVal ||
(pColData->flag <= 0 || pColData->flag >= sizeof(tColDataGetValueImpl)/POINTER_BYTES)){
(pColData->flag <= 0 || pColData->flag >= sizeof(tColDataGetValueImpl) / POINTER_BYTES)) {
return TSDB_CODE_INVALID_PARA;
}
tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal);
@ -3689,25 +3689,25 @@ _exit:
return 0;
}
static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) {
int32_t n = 0;
static int32_t tPutColDataVersion0(SEncoder *pEncoder, SColData *pColData) {
int32_t code = 0;
n += tPutI16v(pBuf ? pBuf + n : NULL, pColData->cid);
n += tPutI8(pBuf ? pBuf + n : NULL, pColData->type);
n += tPutI32v(pBuf ? pBuf + n : NULL, pColData->nVal);
n += tPutI8(pBuf ? pBuf + n : NULL, pColData->flag);
if ((code = tEncodeI16v(pEncoder, pColData->cid))) return code;
if ((code = tEncodeI8(pEncoder, pColData->type))) return code;
if ((code = tEncodeI32v(pEncoder, pColData->nVal))) return code;
if ((code = tEncodeI8(pEncoder, pColData->flag))) return code;
// bitmap
switch (pColData->flag) {
case (HAS_NULL | HAS_NONE):
case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL):
if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT1_SIZE(pColData->nVal));
n += BIT1_SIZE(pColData->nVal);
code = tEncodeFixed(pEncoder, pColData->pBitMap, BIT1_SIZE(pColData->nVal));
if (code) return code;
break;
case (HAS_VALUE | HAS_NULL | HAS_NONE):
if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT2_SIZE(pColData->nVal));
n += BIT2_SIZE(pColData->nVal);
code = tEncodeFixed(pEncoder, pColData->pBitMap, BIT2_SIZE(pColData->nVal));
if (code) return code;
break;
default:
break;
@ -3716,40 +3716,46 @@ static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) {
// value
if (pColData->flag & HAS_VALUE) {
if (IS_VAR_DATA_TYPE(pColData->type)) {
if (pBuf) (void)memcpy(pBuf + n, pColData->aOffset, pColData->nVal << 2);
n += (pColData->nVal << 2);
code = tEncodeFixed(pEncoder, pColData->aOffset, pColData->nVal << 2);
if (code) return code;
n += tPutI32v(pBuf ? pBuf + n : NULL, pColData->nData);
if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData);
n += pColData->nData;
code = tEncodeI32v(pEncoder, pColData->nData);
if (code) return code;
code = tEncodeFixed(pEncoder, pColData->pData, pColData->nData);
if (code) return code;
} else {
if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData);
n += pColData->nData;
code = tEncodeFixed(pEncoder, pColData->pData, pColData->nData);
if (code) return code;
}
}
return n;
return code;
}
static int32_t tGetColDataVersion0(uint8_t *pBuf, SColData *pColData) {
int32_t n = 0;
static int32_t tGetColDataVersion0(SDecoder *pDecoder, SColData *pColData) {
int32_t code = 0;
n += tGetI16v(pBuf + n, &pColData->cid);
n += tGetI8(pBuf + n, &pColData->type);
n += tGetI32v(pBuf + n, &pColData->nVal);
n += tGetI8(pBuf + n, &pColData->flag);
if ((code = tDecodeI16v(pDecoder, &pColData->cid))) return code;
if ((code = tDecodeI8(pDecoder, &pColData->type))) return code;
if ((code = tDecodeI32v(pDecoder, &pColData->nVal))) return code;
if ((code = tDecodeI8(pDecoder, &pColData->flag))) return code;
if (pColData->type <= 0 || pColData->type >= TSDB_DATA_TYPE_MAX || pColData->flag <= 0 || pColData->flag >= 8) {
return TSDB_CODE_INVALID_PARA;
}
// bitmap
switch (pColData->flag) {
case (HAS_NULL | HAS_NONE):
case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL):
pColData->pBitMap = pBuf + n;
n += BIT1_SIZE(pColData->nVal);
code = tDecodeBinaryWithSize(pDecoder, BIT1_SIZE(pColData->nVal), &pColData->pBitMap);
if (code) return code;
break;
case (HAS_VALUE | HAS_NULL | HAS_NONE):
pColData->pBitMap = pBuf + n;
n += BIT2_SIZE(pColData->nVal);
code = tDecodeBinaryWithSize(pDecoder, BIT2_SIZE(pColData->nVal), &pColData->pBitMap);
if (code) return code;
break;
default:
break;
@ -3758,50 +3764,54 @@ static int32_t tGetColDataVersion0(uint8_t *pBuf, SColData *pColData) {
// value
if (pColData->flag & HAS_VALUE) {
if (IS_VAR_DATA_TYPE(pColData->type)) {
pColData->aOffset = (int32_t *)(pBuf + n);
n += (pColData->nVal << 2);
code = tDecodeBinaryWithSize(pDecoder, pColData->nVal << 2, (uint8_t **)&pColData->aOffset);
if (code) return code;
n += tGetI32v(pBuf + n, &pColData->nData);
pColData->pData = pBuf + n;
n += pColData->nData;
code = tDecodeI32v(pDecoder, &pColData->nData);
if (code) return code;
code = tDecodeBinaryWithSize(pDecoder, pColData->nData, &pColData->pData);
if (code) return code;
} else {
pColData->pData = pBuf + n;
pColData->nData = TYPE_BYTES[pColData->type] * pColData->nVal;
n += pColData->nData;
code = tDecodeBinaryWithSize(pDecoder, pColData->nData, &pColData->pData);
if (code) return code;
}
}
pColData->cflag = 0;
return n;
return code;
}
static int32_t tPutColDataVersion1(uint8_t *pBuf, SColData *pColData) {
int32_t n = tPutColDataVersion0(pBuf, pColData);
n += tPutI8(pBuf ? pBuf + n : NULL, pColData->cflag);
return n;
static int32_t tPutColDataVersion1(SEncoder *pEncoder, SColData *pColData) {
int32_t code = tPutColDataVersion0(pEncoder, pColData);
if (code) return code;
return tEncodeI8(pEncoder, pColData->cflag);
}
static int32_t tGetColDataVersion1(uint8_t *pBuf, SColData *pColData) {
int32_t n = tGetColDataVersion0(pBuf, pColData);
n += tGetI8(pBuf ? pBuf + n : NULL, &pColData->cflag);
return n;
static int32_t tGetColDataVersion1(SDecoder *pDecoder, SColData *pColData) {
int32_t code = tGetColDataVersion0(pDecoder, pColData);
if (code) return code;
code = tDecodeI8(pDecoder, &pColData->cflag);
return code;
}
int32_t tPutColData(uint8_t version, uint8_t *pBuf, SColData *pColData) {
int32_t tPutColData(uint8_t version, SEncoder *pEncoder, SColData *pColData) {
if (version == 0) {
return tPutColDataVersion0(pBuf, pColData);
return tPutColDataVersion0(pEncoder, pColData);
} else if (version == 1) {
return tPutColDataVersion1(pBuf, pColData);
return tPutColDataVersion1(pEncoder, pColData);
} else {
return TSDB_CODE_INVALID_PARA;
}
}
int32_t tGetColData(uint8_t version, uint8_t *pBuf, SColData *pColData) {
int32_t tGetColData(uint8_t version, SDecoder *pDecoder, SColData *pColData) {
if (version == 0) {
return tGetColDataVersion0(pBuf, pColData);
return tGetColDataVersion0(pDecoder, pColData);
} else if (version == 1) {
return tGetColDataVersion1(pBuf, pColData);
return tGetColDataVersion1(pDecoder, pColData);
} else {
return TSDB_CODE_INVALID_PARA;
}

View File

@ -318,7 +318,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
}
SColData colData = {0};
pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, &colData);
tGetColData(version, pCoder, &colData);
if (colData.flag != HAS_VALUE) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
@ -332,7 +332,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
}
for (uint64_t i = 1; i < nColData; i++) {
pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, &colData);
tGetColData(version, pCoder, &colData);
}
} else {
uint64_t nRow;
@ -816,7 +816,7 @@ _exit:
_err:
vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
tstrerror(code), ver);
tstrerror(terrno), ver);
return code;
}
@ -1448,7 +1448,8 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
vAlterTbRsp.pMeta = &vMetaRsp;
}
if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL || vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
if (vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL) {
int64_t uid = metaGetTableEntryUidByName(pVnode->pMeta, vAlterTbReq.tbName);
if (uid == 0) {
vError("vgId:%d, %s failed at %s:%d since table %s not found", TD_VID(pVnode), __func__, __FILE__, __LINE__,
@ -1456,8 +1457,8 @@ static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t ver, void *pReq, i
goto _exit;
}
SArray* tbUids = taosArrayInit(4, sizeof(int64_t));
void* p = taosArrayPush(tbUids, &uid);
SArray *tbUids = taosArrayInit(4, sizeof(int64_t));
void *p = taosArrayPush(tbUids, &uid);
TSDB_CHECK_NULL(p, code, lino, _exit, terrno);
vDebug("vgId:%d, remove tags value altered table:%s from query table list", TD_VID(pVnode), vAlterTbReq.tbName);