more code
This commit is contained in:
parent
e06f5b3830
commit
1bcf693f07
|
@ -174,6 +174,7 @@ int32_t tColDataUpdateValue(SColData *pColData, SColVal *pColVal, bool forward);
|
|||
void tColDataGetValue(SColData *pColData, int32_t iVal, SColVal *pColVal);
|
||||
uint8_t tColDataGetBitValue(const SColData *pColData, int32_t iVal);
|
||||
int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMalloc, void *arg);
|
||||
void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key);
|
||||
|
||||
extern void (*tColDataCalcSMA[])(SColData *pColData, int64_t *sum, int64_t *max, int64_t *min, int16_t *numOfNull);
|
||||
|
||||
|
@ -188,8 +189,8 @@ void tColDataSortMerge(SArray *colDataArr);
|
|||
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
|
||||
char *data);
|
||||
// for encode/decode
|
||||
int32_t tPutColData(uint8_t *pBuf, SColData *pColData);
|
||||
int32_t tGetColData(uint8_t *pBuf, SColData *pColData);
|
||||
int32_t tPutColData(uint8_t version, uint8_t *pBuf, SColData *pColData);
|
||||
int32_t tGetColData(uint8_t version, uint8_t *pBuf, SColData *pColData);
|
||||
|
||||
// STRUCT ================================
|
||||
struct STColumn {
|
||||
|
|
|
@ -4037,6 +4037,7 @@ int32_t tDeserializeSMqSeekReq(void* buf, int32_t bufLen, SMqSeekReq* pReq);
|
|||
#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2
|
||||
#define SUBMIT_REQ_FROM_FILE 0x4
|
||||
#define TD_REQ_FROM_TAOX 0x8
|
||||
#define SUBMIT_REQUEST_VERSION (1)
|
||||
|
||||
#define TD_REQ_FROM_TAOX_OLD 0x1 // for compatibility
|
||||
|
||||
|
|
|
@ -3111,7 +3111,7 @@ static int32_t tColDataCopyRowAppend(SColData *aFromColData, int32_t iFromRow, S
|
|||
return code;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key) {
|
||||
void tColDataArrGetRowKey(SColData *aColData, int32_t nColData, int32_t iRow, SRowKey *key) {
|
||||
SColVal cv;
|
||||
|
||||
key->ts = ((TSKEY *)aColData[0].pData)[iRow];
|
||||
|
@ -3490,7 +3490,7 @@ _exit:
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t tPutColData(uint8_t *pBuf, SColData *pColData) {
|
||||
static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tPutI16v(pBuf ? pBuf + n : NULL, pColData->cid);
|
||||
|
@ -3532,7 +3532,7 @@ int32_t tPutColData(uint8_t *pBuf, SColData *pColData) {
|
|||
return n;
|
||||
}
|
||||
|
||||
int32_t tGetColData(uint8_t *pBuf, SColData *pColData) {
|
||||
static int32_t tGetColDataVersion0(uint8_t *pBuf, SColData *pColData) {
|
||||
int32_t n = 0;
|
||||
|
||||
n += tGetI16v(pBuf + n, &pColData->cid);
|
||||
|
@ -3571,10 +3571,45 @@ int32_t tGetColData(uint8_t *pBuf, SColData *pColData) {
|
|||
n += pColData->nData;
|
||||
}
|
||||
}
|
||||
pColData->cflag = 0;
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t tPutColDataVersion1(uint8_t *pBuf, SColData *pColData) {
|
||||
int32_t n = tPutColDataVersion0(pBuf, pColData);
|
||||
n += tPutI8(pBuf ? pBuf + n : NULL, pColData->cflag);
|
||||
return n;
|
||||
}
|
||||
|
||||
static int32_t tGetColDataVersion1(uint8_t *pBuf, SColData *pColData) {
|
||||
int32_t n = tGetColDataVersion0(pBuf, pColData);
|
||||
n += tGetI8(pBuf ? pBuf + n : NULL, &pColData->cflag);
|
||||
return n;
|
||||
}
|
||||
|
||||
int32_t tPutColData(uint8_t version, uint8_t *pBuf, SColData *pColData) {
|
||||
if (version == 0) {
|
||||
return tPutColDataVersion0(pBuf, pColData);
|
||||
} else if (version == 1) {
|
||||
return tPutColDataVersion1(pBuf, pColData);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tGetColData(uint8_t version, uint8_t *pBuf, SColData *pColData) {
|
||||
if (version == 0) {
|
||||
return tGetColDataVersion0(pBuf, pColData);
|
||||
} else if (version == 1) {
|
||||
return tGetColDataVersion1(pBuf, pColData);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
#define CALC_SUM_MAX_MIN(SUM, MAX, MIN, VAL) \
|
||||
do { \
|
||||
(SUM) += (VAL); \
|
||||
|
|
|
@ -9074,7 +9074,8 @@ int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder *pDecoder, SBatchDeleteReq *pReq
|
|||
static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) {
|
||||
if (tStartEncode(pCoder) < 0) return -1;
|
||||
|
||||
if (tEncodeI32v(pCoder, pSubmitTbData->flags) < 0) return -1;
|
||||
int32_t flags = pSubmitTbData->flags | ((SUBMIT_REQUEST_VERSION) << 8);
|
||||
if (tEncodeI32v(pCoder, flags) < 0) return -1;
|
||||
|
||||
// auto create table
|
||||
if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||
|
@ -9094,7 +9095,8 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
|
|||
if (tEncodeU64v(pCoder, nColData) < 0) return -1;
|
||||
|
||||
for (uint64_t i = 0; i < nColData; i++) {
|
||||
pCoder->pos += tPutColData(pCoder->data ? pCoder->data + pCoder->pos : NULL, &aColData[i]);
|
||||
pCoder->pos +=
|
||||
tPutColData(SUBMIT_REQUEST_VERSION, pCoder->data ? pCoder->data + pCoder->pos : NULL, &aColData[i]);
|
||||
}
|
||||
} else {
|
||||
if (tEncodeU64v(pCoder, TARRAY_SIZE(pSubmitTbData->aRowP)) < 0) return -1;
|
||||
|
@ -9113,13 +9115,18 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
|
|||
|
||||
static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData) {
|
||||
int32_t code = 0;
|
||||
int32_t flags;
|
||||
uint8_t version;
|
||||
|
||||
if (tStartDecode(pCoder) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (tDecodeI32v(pCoder, &pSubmitTbData->flags) < 0) return -1;
|
||||
if (tDecodeI32v(pCoder, &flags) < 0) return -1;
|
||||
|
||||
pSubmitTbData->flags = flags & 0xff;
|
||||
version = (flags >> 8) & 0xff;
|
||||
|
||||
if (pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||
pSubmitTbData->pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||
|
@ -9163,7 +9170,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < nColData; ++i) {
|
||||
pCoder->pos += tGetColData(pCoder->data + pCoder->pos, taosArrayReserve(pSubmitTbData->aCol, 1));
|
||||
pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, taosArrayReserve(pSubmitTbData->aCol, 1));
|
||||
}
|
||||
} else {
|
||||
uint64_t nRow;
|
||||
|
|
|
@ -240,10 +240,13 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
|
|||
}
|
||||
|
||||
SSubmitTbData submitTbData;
|
||||
uint8_t version;
|
||||
if (tDecodeI32v(pCoder, &submitTbData.flags) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
version = (submitTbData.flags >> 8) & 0xff;
|
||||
submitTbData.flags = submitTbData.flags & 0xff;
|
||||
|
||||
if (submitTbData.flags & SUBMIT_REQ_FROM_FILE) {
|
||||
code = grantCheck(TSDB_GRANT_CSV);
|
||||
|
@ -307,7 +310,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
|
|||
}
|
||||
|
||||
SColData colData = {0};
|
||||
pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
|
||||
pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, &colData);
|
||||
if (colData.flag != HAS_VALUE) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
|
@ -321,7 +324,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
|
|||
}
|
||||
|
||||
for (uint64_t i = 1; i < nColData; i++) {
|
||||
pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
|
||||
pCoder->pos += tGetColData(version, pCoder->data + pCoder->pos, &colData);
|
||||
}
|
||||
} else {
|
||||
uint64_t nRow;
|
||||
|
@ -1572,17 +1575,18 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
|||
goto _exit;
|
||||
}
|
||||
|
||||
SColData *pColData = (SColData *)taosArrayGet(pSubmitTbData->aCol, 0);
|
||||
TSKEY *aKey = (TSKEY *)(pColData->pData);
|
||||
|
||||
for (int32_t iRow = 0; iRow < pColData->nVal; iRow++) {
|
||||
if (aKey[iRow] < minKey || aKey[iRow] > maxKey || (iRow > 0 && aKey[iRow] <= aKey[iRow - 1])) {
|
||||
SColData *colDataArr = TARRAY_DATA(pSubmitTbData->aCol);
|
||||
SRowKey lastKey;
|
||||
tColDataArrGetRowKey(colDataArr, TARRAY_SIZE(pSubmitTbData->aCol), 0, &lastKey);
|
||||
for (int32_t iRow = 1; iRow < colDataArr[0].nVal; iRow++) {
|
||||
SRowKey key;
|
||||
tColDataArrGetRowKey(TARRAY_DATA(pSubmitTbData->aCol), TARRAY_SIZE(pSubmitTbData->aCol), iRow, &key);
|
||||
if (tRowKeyCompare(&lastKey, &key) >= 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
vError("vgId:%d %s failed since %s, version:%" PRId64, TD_VID(pVnode), __func__, tstrerror(terrno), ver);
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
int32_t nRow = TARRAY_SIZE(pSubmitTbData->aRowP);
|
||||
SRow **aRow = (SRow **)TARRAY_DATA(pSubmitTbData->aRowP);
|
||||
|
|
Loading…
Reference in New Issue