From 1bcf693f071c392f4b5b24e9b4f1ff035c8ae3fe Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 10 Apr 2024 15:06:43 +0800 Subject: [PATCH] more code --- include/common/tdataformat.h | 7 +++-- include/common/tmsg.h | 1 + source/common/src/tdataformat.c | 41 +++++++++++++++++++++++++-- source/common/src/tmsg.c | 15 +++++++--- source/dnode/vnode/src/vnd/vnodeSvr.c | 20 +++++++------ 5 files changed, 66 insertions(+), 18 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 6c8a39caf6..3f3ec30569 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -126,7 +126,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag); void tRowGetKey(SRow *pRow, SRowKey *key); int32_t tRowKeyCompare(const void *p1, const void *p2); -int32_t tRowKeyAssign(SRowKey* pDst, SRowKey* pSrc); +int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); // SRowIter ================================ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); @@ -174,6 +174,7 @@ int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward); void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal); uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal); int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg); +void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key); extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull); @@ -188,8 +189,8 @@ void tColDataSortMerge(SArray *colDataArr); int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, char *data); // for encode/decode -int32_t tPutColData(uint8_t *pBuf, SColData *pColData); -int32_t tGetColData(uint8_t *pBuf, SColData *pColData); +int32_t tPutColData(uint8_t version, uint8_t *pBuf, SColData *pColData); +int32_t tGetColData(uint8_t version, uint8_t *pBuf, SColData *pColData); // STRUCT ================================ struct STColumn { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1be5abbf76..33e77aee2b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4037,6 +4037,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq); #define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 #define SUBMIT_REQ_FROM_FILE 0x4 #define TD_REQ_FROM_TAOX 0x8 +#define SUBMIT_REQUEST_VERSION (1) #define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 1585c12ac1..f13a0a0825 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -3111,7 +3111,7 @@ static int32_t tColDataCopyRowAppend(SColData *aFromColData, int32_t iFromRow, S return code; } -static FORCE_INLINE void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key) { +void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key) { SColVal cv; key->ts = ((TSKEY *)aColData[0].pData)[iRow]; @@ -3490,7 +3490,7 @@ _exit: return; } -int32_t tPutColData(uint8_t *pBuf, SColData *pColData) { +static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) { int32_t n = 0; n += tPutI16v(pBuf ? pBuf + n : NULL, pColData->cid); @@ -3532,7 +3532,7 @@ int32_t tPutColData(uint8_t *pBuf, SColData *pColData) { return n; } -int32_t tGetColData(uint8_t *pBuf, SColData *pColData) { +static int32_t tGetColDataVersion0(uint8_t *pBuf, SColData *pColData) { int32_t n = 0; n += tGetI16v(pBuf + n, &pColData->cid); @@ -3571,10 +3571,45 @@ int32_t tGetColData(uint8_t *pBuf, SColData *pColData) { n += pColData->nData; } } + pColData->cflag = 0; return n; } +static int32_t tPutColDataVersion1(uint8_t *pBuf, SColData *pColData) { + int32_t n = tPutColDataVersion0(pBuf, pColData); + n += tPutI8(pBuf ? pBuf + n : NULL, pColData->cflag); + return n; +} + +static int32_t tGetColDataVersion1(uint8_t *pBuf, SColData *pColData) { + int32_t n = tGetColDataVersion0(pBuf, pColData); + n += tGetI8(pBuf ? pBuf + n : NULL, &pColData->cflag); + return n; +} + +int32_t tPutColData(uint8_t version, uint8_t *pBuf, SColData *pColData) { + if (version == 0) { + return tPutColDataVersion0(pBuf, pColData); + } else if (version == 1) { + return tPutColDataVersion1(pBuf, pColData); + } else { + ASSERT(0); + return -1; + } +} + +int32_t tGetColData(uint8_t version, uint8_t *pBuf, SColData *pColData) { + if (version == 0) { + return tGetColDataVersion0(pBuf, pColData); + } else if (version == 1) { + return tGetColDataVersion1(pBuf, pColData); + } else { + ASSERT(0); + return -1; + } +} + #define CALC_SUM_MAX_MIN(SUM, MAX, MIN, VAL) \ do { \ (SUM) += (VAL); \ diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3fd8d53885..e4c6c9494a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9074,7 +9074,8 @@ int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder *pDecoder, SBatchDeleteReq *pReq static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) { if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI32v(pCoder, pSubmitTbData->flags) < 0) return -1; + int32_t flags = pSubmitTbData->flags | ((SUBMIT_REQUEST_VERSION) << 8); + if (tEncodeI32v(pCoder, flags) < 0) return -1; // auto create table if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { @@ -9094,7 +9095,8 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm if (tEncodeU64v(pCoder, nColData) < 0) return -1; for (uint64_t i = 0; i < nColData; i++) { - pCoder->pos += tPutColData(pCoder->data ? pCoder->data + pCoder->pos : NULL, &aColData[i]); + pCoder->pos += + tPutColData(SUBMIT_REQUEST_VERSION, pCoder->data ? pCoder->data + pCoder->pos : NULL, &aColData[i]); } } else { if (tEncodeU64v(pCoder, TARRAY_SIZE(pSubmitTbData->aRowP)) < 0) return -1; @@ -9113,13 +9115,18 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData) { int32_t code = 0; + int32_t flags; + uint8_t version; if (tStartDecode(pCoder) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; } - if (tDecodeI32v(pCoder, &pSubmitTbData->flags) < 0) return -1; + if (tDecodeI32v(pCoder, &flags) < 0) return -1; + + pSubmitTbData->flags = flags & 0xff; + version = (flags >> 8) & 0xff; if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { pSubmitTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); @@ -9163,7 +9170,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa } for (int32_t i = 0; i < nColData; ++i) { - pCoder->pos += tGetColData(pCoder->data + pCoder->pos, taosArrayReserve(pSubmitTbData->aCol, 1)); + pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, taosArrayReserve(pSubmitTbData->aCol, 1)); } } else { uint64_t nRow; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a4af782ad7..fae28ca32c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -240,10 +240,13 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int } SSubmitTbData submitTbData; + uint8_t version; if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) { code = TSDB_CODE_INVALID_MSG; TSDB_CHECK_CODE(code, lino, _exit); } + version = (submitTbData.flags >> 8) & 0xff; + submitTbData.flags = submitTbData.flags & 0xff; if (submitTbData.flags & SUBMIT_REQ_FROM_FILE) { code = grantCheck(TSDB_GRANT_CSV); @@ -307,7 +310,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int } SColData colData = {0}; - pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData); + pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, &colData); if (colData.flag != HAS_VALUE) { code = TSDB_CODE_INVALID_MSG; goto _exit; @@ -321,7 +324,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int } for (uint64_t i = 1; i < nColData; i++) { - pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData); + pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, &colData); } } else { uint64_t nRow; @@ -1572,17 +1575,18 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in goto _exit; } - SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0); - TSKEY *aKey = (TSKEY *)(pColData->pData); - - for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) { - if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) { + SColData *colDataArr = TARRAY_DATA(pSubmitTbData->aCol); + SRowKey lastKey; + tColDataArrGetRowKey(colDataArr, TARRAY_SIZE(pSubmitTbData->aCol), 0, &lastKey); + for (int32_t iRow = 1; iRow < colDataArr[0].nVal; iRow++) { + SRowKey key; + tColDataArrGetRowKey(TARRAY_DATA(pSubmitTbData->aCol), TARRAY_SIZE(pSubmitTbData->aCol), iRow, &key); + if (tRowKeyCompare(&lastKey, &key) >= 0) { code = TSDB_CODE_INVALID_MSG; vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver); goto _exit; } } - } else { int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP); SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);