Merge pull request #29522 from taosdata/fix/TS-5853-main

enh: add strict message check for submit message
This commit is contained in:
Hongze Cheng 2025-01-09 13:18:38 +08:00 committed by GitHub
commit dbfb910bff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 116 additions and 78 deletions

View File

@ -57,9 +57,9 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111
#define ONE ((uint8_t)1) #define ONE ((uint8_t)1)
#define THREE ((uint8_t)3) #define THREE ((uint8_t)3)
#define DIV_8(i) ((i) >> 3) #define DIV_8(i) ((i) >> 3)
#define MOD_8(i) ((i) & 7) #define MOD_8(i) ((i)&7)
#define DIV_4(i) ((i) >> 2) #define DIV_4(i) ((i) >> 2)
#define MOD_4(i) ((i) & 3) #define MOD_4(i) ((i)&3)
#define MOD_4_TIME_2(i) (MOD_4(i) << 1) #define MOD_4_TIME_2(i) (MOD_4(i) << 1)
#define BIT1_SIZE(n) (DIV_8((n)-1) + 1) #define BIT1_SIZE(n) (DIV_8((n)-1) + 1)
#define BIT2_SIZE(n) (DIV_4((n)-1) + 1) #define BIT2_SIZE(n) (DIV_4((n)-1) + 1)
@ -201,8 +201,10 @@ int32_t tColDataSortMerge(SArray **arr);
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data); char *data);
// for encode/decode // for encode/decode
int32_t tPutColData(uint8_t version, uint8_t *pBuf, SColData *pColData); int32_t tEncodeColData(uint8_t version, SEncoder *pEncoder, SColData *pColData);
int32_t tGetColData(uint8_t version, uint8_t *pBuf, SColData *pColData); int32_t tDecodeColData(uint8_t version, SDecoder *pDecoder, SColData *pColData);
int32_t tEncodeRow(SEncoder *pEncoder, SRow *pRow);
int32_t tDecodeRow(SDecoder *pDecoder, SRow **ppRow);
// STRUCT ================================ // STRUCT ================================
struct STColumn { struct STColumn {

View File

@ -118,6 +118,7 @@ static int32_t tDecodeI64v(SDecoder* pCoder, int64_t* val);
static int32_t tDecodeFloat(SDecoder* pCoder, float* val); static int32_t tDecodeFloat(SDecoder* pCoder, float* val);
static int32_t tDecodeDouble(SDecoder* pCoder, double* val); static int32_t tDecodeDouble(SDecoder* pCoder, double* val);
static int32_t tDecodeBool(SDecoder* pCoder, bool* val); static int32_t tDecodeBool(SDecoder* pCoder, bool* val);
static int32_t tDecodeBinaryWithSize(SDecoder* pCoder, uint32_t size, uint8_t** val);
static int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len); static int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len);
static int32_t tDecodeCStrAndLen(SDecoder* pCoder, char** val, uint32_t* len); static int32_t tDecodeCStrAndLen(SDecoder* pCoder, char** val, uint32_t* len);
static int32_t tDecodeCStr(SDecoder* pCoder, char** val); static int32_t tDecodeCStr(SDecoder* pCoder, char** val);
@ -404,6 +405,19 @@ static int32_t tDecodeBool(SDecoder* pCoder, bool* val) {
return 0; return 0;
} }
static FORCE_INLINE int32_t tDecodeBinaryWithSize(SDecoder* pCoder, uint32_t size, uint8_t** val) {
if (pCoder->pos + size > pCoder->size) {
TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE);
}
if (val) {
*val = pCoder->data + pCoder->pos;
}
pCoder->pos += size;
return 0;
}
static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len) { static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint32_t* len) {
uint32_t length = 0; uint32_t length = 0;
@ -412,21 +426,12 @@ static FORCE_INLINE int32_t tDecodeBinary(SDecoder* pCoder, uint8_t** val, uint3
*len = length; *len = length;
} }
if (pCoder->pos + length > pCoder->size) { TAOS_RETURN(tDecodeBinaryWithSize(pCoder, length, val));
TAOS_RETURN(TSDB_CODE_OUT_OF_RANGE);
}
if (val) {
*val = pCoder->data + pCoder->pos;
}
pCoder->pos += length;
return 0;
} }
static FORCE_INLINE int32_t tDecodeCStrAndLen(SDecoder* pCoder, char** val, uint32_t* len) { static FORCE_INLINE int32_t tDecodeCStrAndLen(SDecoder* pCoder, char** val, uint32_t* len) {
TAOS_CHECK_RETURN(tDecodeBinary(pCoder, (uint8_t**)val, len)); TAOS_CHECK_RETURN(tDecodeBinary(pCoder, (uint8_t**)val, len));
if (*len > 0) { // notice!!! *len maybe 0 if (*len > 0) { // notice!!! *len maybe 0
(*len) -= 1; (*len) -= 1;
} }
return 0; return 0;
@ -497,7 +502,7 @@ static FORCE_INLINE int32_t tDecodeBinaryAlloc32(SDecoder* pCoder, void** val, u
static FORCE_INLINE int32_t tDecodeCStrAndLenAlloc(SDecoder* pCoder, char** val, uint64_t* len) { static FORCE_INLINE int32_t tDecodeCStrAndLenAlloc(SDecoder* pCoder, char** val, uint64_t* len) {
TAOS_CHECK_RETURN(tDecodeBinaryAlloc(pCoder, (void**)val, len)); TAOS_CHECK_RETURN(tDecodeBinaryAlloc(pCoder, (void**)val, len));
if (*len > 0){ if (*len > 0) {
(*len) -= 1; (*len) -= 1;
} }
return 0; return 0;

View File

@ -14,8 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
#include "tglobal.h"
#undef TD_MSG_NUMBER_ #undef TD_MSG_NUMBER_
#undef TD_MSG_DICT_ #undef TD_MSG_DICT_
@ -11639,16 +11639,14 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
TAOS_CHECK_EXIT(tEncodeU64v(pCoder, nColData)); TAOS_CHECK_EXIT(tEncodeU64v(pCoder, nColData));
for (uint64_t i = 0; i < nColData; i++) { for (uint64_t i = 0; i < nColData; i++) {
pCoder->pos += TAOS_CHECK_EXIT(tEncodeColData(SUBMIT_REQUEST_VERSION, pCoder, &aColData[i]));
tPutColData(SUBMIT_REQUEST_VERSION, pCoder->data ? pCoder->data + pCoder->pos : NULL, &aColData[i]);
} }
} else { } else {
TAOS_CHECK_EXIT(tEncodeU64v(pCoder, TARRAY_SIZE(pSubmitTbData->aRowP))); TAOS_CHECK_EXIT(tEncodeU64v(pCoder, TARRAY_SIZE(pSubmitTbData->aRowP)));
SRow **rows = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP); SRow **rows = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
for (int32_t iRow = 0; iRow < TARRAY_SIZE(pSubmitTbData->aRowP); ++iRow) { for (int32_t iRow = 0; iRow < TARRAY_SIZE(pSubmitTbData->aRowP); ++iRow) {
if (pCoder->data) memcpy(pCoder->data + pCoder->pos, rows[iRow], rows[iRow]->len); TAOS_CHECK_EXIT(tEncodeRow(pCoder, rows[iRow]));
pCoder->pos += rows[iRow]->len;
} }
} }
TAOS_CHECK_EXIT(tEncodeI64(pCoder, pSubmitTbData->ctimeMs)); TAOS_CHECK_EXIT(tEncodeI64(pCoder, pSubmitTbData->ctimeMs));
@ -11695,7 +11693,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
} }
for (int32_t i = 0; i < nColData; ++i) { for (int32_t i = 0; i < nColData; ++i) {
pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, taosArrayReserve(pSubmitTbData->aCol, 1)); TAOS_CHECK_EXIT(tDecodeColData(version, pCoder, taosArrayReserve(pSubmitTbData->aCol, 1)));
} }
} else { } else {
uint64_t nRow; uint64_t nRow;
@ -11712,8 +11710,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
TAOS_CHECK_EXIT(terrno); TAOS_CHECK_EXIT(terrno);
} }
*ppRow = (SRow *)(pCoder->data + pCoder->pos); TAOS_CHECK_EXIT(tDecodeRow(pCoder, ppRow));
pCoder->pos += (*ppRow)->len;
} }
} }

View File

@ -2673,7 +2673,7 @@ static void (*tColDataGetValueImpl[])(SColData *pColData, int32_t iVal, SColVal
}; };
int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) { int32_t tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal) {
if (iVal < 0 || iVal >= pColData->nVal || if (iVal < 0 || iVal >= pColData->nVal ||
(pColData->flag <= 0 || pColData->flag >= sizeof(tColDataGetValueImpl)/POINTER_BYTES)){ (pColData->flag <= 0 || pColData->flag >= sizeof(tColDataGetValueImpl) / POINTER_BYTES)) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal); tColDataGetValueImpl[pColData->flag](pColData, iVal, pColVal);
@ -3689,25 +3689,25 @@ _exit:
return 0; return 0;
} }
static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) { static int32_t tEncodeColDataVersion0(SEncoder *pEncoder, SColData *pColData) {
int32_t n = 0; int32_t code = 0;
n += tPutI16v(pBuf ? pBuf + n : NULL, pColData->cid); if ((code = tEncodeI16v(pEncoder, pColData->cid))) return code;
n += tPutI8(pBuf ? pBuf + n : NULL, pColData->type); if ((code = tEncodeI8(pEncoder, pColData->type))) return code;
n += tPutI32v(pBuf ? pBuf + n : NULL, pColData->nVal); if ((code = tEncodeI32v(pEncoder, pColData->nVal))) return code;
n += tPutI8(pBuf ? pBuf + n : NULL, pColData->flag); if ((code = tEncodeI8(pEncoder, pColData->flag))) return code;
// bitmap // bitmap
switch (pColData->flag) { switch (pColData->flag) {
case (HAS_NULL | HAS_NONE): case (HAS_NULL | HAS_NONE):
case (HAS_VALUE | HAS_NONE): case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL): case (HAS_VALUE | HAS_NULL):
if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT1_SIZE(pColData->nVal)); code = tEncodeFixed(pEncoder, pColData->pBitMap, BIT1_SIZE(pColData->nVal));
n += BIT1_SIZE(pColData->nVal); if (code) return code;
break; break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): case (HAS_VALUE | HAS_NULL | HAS_NONE):
if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT2_SIZE(pColData->nVal)); code = tEncodeFixed(pEncoder, pColData->pBitMap, BIT2_SIZE(pColData->nVal));
n += BIT2_SIZE(pColData->nVal); if (code) return code;
break; break;
default: default:
break; break;
@ -3716,40 +3716,46 @@ static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) {
// value // value
if (pColData->flag & HAS_VALUE) { if (pColData->flag & HAS_VALUE) {
if (IS_VAR_DATA_TYPE(pColData->type)) { if (IS_VAR_DATA_TYPE(pColData->type)) {
if (pBuf) (void)memcpy(pBuf + n, pColData->aOffset, pColData->nVal << 2); code = tEncodeFixed(pEncoder, pColData->aOffset, pColData->nVal << 2);
n += (pColData->nVal << 2); if (code) return code;
n += tPutI32v(pBuf ? pBuf + n : NULL, pColData->nData); code = tEncodeI32v(pEncoder, pColData->nData);
if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData); if (code) return code;
n += pColData->nData;
code = tEncodeFixed(pEncoder, pColData->pData, pColData->nData);
if (code) return code;
} else { } else {
if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData); code = tEncodeFixed(pEncoder, pColData->pData, pColData->nData);
n += pColData->nData; if (code) return code;
} }
} }
return n; return code;
} }
static int32_t tGetColDataVersion0(uint8_t *pBuf, SColData *pColData) { static int32_t tDecodeColDataVersion0(SDecoder *pDecoder, SColData *pColData) {
int32_t n = 0; int32_t code = 0;
n += tGetI16v(pBuf + n, &pColData->cid); if ((code = tDecodeI16v(pDecoder, &pColData->cid))) return code;
n += tGetI8(pBuf + n, &pColData->type); if ((code = tDecodeI8(pDecoder, &pColData->type))) return code;
n += tGetI32v(pBuf + n, &pColData->nVal); if ((code = tDecodeI32v(pDecoder, &pColData->nVal))) return code;
n += tGetI8(pBuf + n, &pColData->flag); if ((code = tDecodeI8(pDecoder, &pColData->flag))) return code;
if (pColData->type <= 0 || pColData->type >= TSDB_DATA_TYPE_MAX || pColData->flag <= 0 || pColData->flag >= 8) {
return TSDB_CODE_INVALID_PARA;
}
// bitmap // bitmap
switch (pColData->flag) { switch (pColData->flag) {
case (HAS_NULL | HAS_NONE): case (HAS_NULL | HAS_NONE):
case (HAS_VALUE | HAS_NONE): case (HAS_VALUE | HAS_NONE):
case (HAS_VALUE | HAS_NULL): case (HAS_VALUE | HAS_NULL):
pColData->pBitMap = pBuf + n; code = tDecodeBinaryWithSize(pDecoder, BIT1_SIZE(pColData->nVal), &pColData->pBitMap);
n += BIT1_SIZE(pColData->nVal); if (code) return code;
break; break;
case (HAS_VALUE | HAS_NULL | HAS_NONE): case (HAS_VALUE | HAS_NULL | HAS_NONE):
pColData->pBitMap = pBuf + n; code = tDecodeBinaryWithSize(pDecoder, BIT2_SIZE(pColData->nVal), &pColData->pBitMap);
n += BIT2_SIZE(pColData->nVal); if (code) return code;
break; break;
default: default:
break; break;
@ -3758,55 +3764,74 @@ static int32_t tGetColDataVersion0(uint8_t *pBuf, SColData *pColData) {
// value // value
if (pColData->flag & HAS_VALUE) { if (pColData->flag & HAS_VALUE) {
if (IS_VAR_DATA_TYPE(pColData->type)) { if (IS_VAR_DATA_TYPE(pColData->type)) {
pColData->aOffset = (int32_t *)(pBuf + n); code = tDecodeBinaryWithSize(pDecoder, pColData->nVal << 2, (uint8_t **)&pColData->aOffset);
n += (pColData->nVal << 2); if (code) return code;
n += tGetI32v(pBuf + n, &pColData->nData); code = tDecodeI32v(pDecoder, &pColData->nData);
pColData->pData = pBuf + n; if (code) return code;
n += pColData->nData;
code = tDecodeBinaryWithSize(pDecoder, pColData->nData, &pColData->pData);
if (code) return code;
} else { } else {
pColData->pData = pBuf + n;
pColData->nData = TYPE_BYTES[pColData->type] * pColData->nVal; pColData->nData = TYPE_BYTES[pColData->type] * pColData->nVal;
n += pColData->nData; code = tDecodeBinaryWithSize(pDecoder, pColData->nData, &pColData->pData);
if (code) return code;
} }
} }
pColData->cflag = 0; pColData->cflag = 0;
return n; return code;
} }
static int32_t tPutColDataVersion1(uint8_t *pBuf, SColData *pColData) { static int32_t tEncodeColDataVersion1(SEncoder *pEncoder, SColData *pColData) {
int32_t n = tPutColDataVersion0(pBuf, pColData); int32_t code = tEncodeColDataVersion0(pEncoder, pColData);
n += tPutI8(pBuf ? pBuf + n : NULL, pColData->cflag); if (code) return code;
return n; return tEncodeI8(pEncoder, pColData->cflag);
} }
static int32_t tGetColDataVersion1(uint8_t *pBuf, SColData *pColData) { static int32_t tDecodeColDataVersion1(SDecoder *pDecoder, SColData *pColData) {
int32_t n = tGetColDataVersion0(pBuf, pColData); int32_t code = tDecodeColDataVersion0(pDecoder, pColData);
n += tGetI8(pBuf ? pBuf + n : NULL, &pColData->cflag); if (code) return code;
return n;
code = tDecodeI8(pDecoder, &pColData->cflag);
return code;
} }
int32_t tPutColData(uint8_t version, uint8_t *pBuf, SColData *pColData) { int32_t tEncodeColData(uint8_t version, SEncoder *pEncoder, SColData *pColData) {
if (version == 0) { if (version == 0) {
return tPutColDataVersion0(pBuf, pColData); return tEncodeColDataVersion0(pEncoder, pColData);
} else if (version == 1) { } else if (version == 1) {
return tPutColDataVersion1(pBuf, pColData); return tEncodeColDataVersion1(pEncoder, pColData);
} else { } else {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
} }
int32_t tGetColData(uint8_t version, uint8_t *pBuf, SColData *pColData) { int32_t tDecodeColData(uint8_t version, SDecoder *pDecoder, SColData *pColData) {
if (version == 0) { if (version == 0) {
return tGetColDataVersion0(pBuf, pColData); return tDecodeColDataVersion0(pDecoder, pColData);
} else if (version == 1) { } else if (version == 1) {
return tGetColDataVersion1(pBuf, pColData); return tDecodeColDataVersion1(pDecoder, pColData);
} else { } else {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
} }
int32_t tEncodeRow(SEncoder *pEncoder, SRow *pRow) { return tEncodeFixed(pEncoder, pRow, pRow->len); }
int32_t tDecodeRow(SDecoder *pDecoder, SRow **ppRow) {
if (ppRow == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pDecoder->pos + sizeof(SRow) > pDecoder->size) {
return TSDB_CODE_OUT_OF_RANGE;
}
SRow *pRow = (SRow *)(pDecoder->data + pDecoder->pos);
return tDecodeBinaryWithSize(pDecoder, pRow->len, (uint8_t **)ppRow);
}
#define CALC_SUM_MAX_MIN(SUM, MAX, MIN, VAL) \ #define CALC_SUM_MAX_MIN(SUM, MAX, MIN, VAL) \
do { \ do { \
(SUM) += (VAL); \ (SUM) += (VAL); \

View File

@ -318,7 +318,12 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
} }
SColData colData = {0}; SColData colData = {0};
pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, &colData); code = tDecodeColData(version, pCoder, &colData);
if (code) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (colData.flag != HAS_VALUE) { if (colData.flag != HAS_VALUE) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
@ -332,7 +337,11 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
} }
for (uint64_t i = 1; i < nColData; i++) { for (uint64_t i = 1; i < nColData; i++) {
pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, &colData); code = tDecodeColData(version, pCoder, &colData);
if (code) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
} }
} else { } else {
uint64_t nRow; uint64_t nRow;
@ -816,7 +825,7 @@ _exit:
_err: _err:
vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
tstrerror(code), ver); tstrerror(terrno), ver);
return code; return code;
} }