From 21c8d9813210de2eb7dfac1a7c361d0ec8dca5e6 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 23 Nov 2022 21:42:06 +0800 Subject: [PATCH] more code --- include/common/tmsg.h | 22 +++++ include/util/tencode.h | 1 + source/common/src/tmsg.c | 191 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 214 insertions(+) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0e649a3678..434febbe22 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3226,6 +3226,28 @@ int32_t tDeserializeSMqAskEpReq(void* buf, int32_t bufLen, SMqAskEpReq* pReq); int32_t tSerializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); int32_t tDeserializeSMqHbReq(void* buf, int32_t bufLen, SMqHbReq* pReq); +typedef struct { + int64_t suid; + int64_t uid; + int32_t sver; + union { + SArray* aRowP; + SArray* aCol; + }; +} SSubmitTbData; + +#define SUBMIT_REQ_AUTO_CREATE_TABLE 0x1 +#define SUBMIT_REQ_COLUMN_DATA_FORMAT 0x2 +typedef struct { + int32_t flag; + SArray* aCreateTbReq; + SArray* aSubmitTbData; +} SSubmitReq2; + +int32_t tEncodeSSubmitReq2(SEncoder* pCoder, const SSubmitReq2* pReq); +int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq); +void tDestroySSubmitReq2(SSubmitReq2* pReq); + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/util/tencode.h b/include/util/tencode.h index a6dd58297e..ff97a20507 100644 --- a/include/util/tencode.h +++ b/include/util/tencode.h @@ -116,6 +116,7 @@ static int32_t tEncodeI64v(SEncoder* pCoder, int64_t val); static int32_t tEncodeFloat(SEncoder* pCoder, float val); static int32_t tEncodeDouble(SEncoder* pCoder, double val); static int32_t tEncodeBinary(SEncoder* pCoder, const uint8_t* val, uint32_t len); +static int32_t tEncodeBinaryEx(SEncoder* pCoder, const uint8_t* val, uint32_t len); static int32_t tEncodeCStrWithLen(SEncoder* pCoder, const char* val, uint32_t len); static int32_t tEncodeCStr(SEncoder* pCoder, const char* val); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 3f8b7ac4db..c530a00bac 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6652,3 +6652,194 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) { } return 0; } + +static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData, int8_t colFmt) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI64(pCoder, pSubmitTbData->suid) < 0) return -1; + if (tEncodeI64(pCoder, pSubmitTbData->uid) < 0) return -1; + if (tEncodeI32v(pCoder, pSubmitTbData->sver) < 0) return -1; + + if (colFmt) { + // todo + ASSERT(0); + } else { + if (tEncodeI64v(pCoder, taosArrayGetSize(pSubmitTbData->aRowP)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pSubmitTbData->aRowP); i++) { + SRow *pRow = taosArrayGetP(pSubmitTbData->aRowP, i); + if (tEncodeBinary(pCoder, (uint8_t *)pRow, pRow->len) < 0) return -1; // todo + } + } + + tEndEncode(pCoder); + return 0; +} + +static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbData, int8_t colFmt) { + int32_t code = 0; + + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + if (tDecodeI64(pCoder, &pSubmitTbData->suid) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + if (tDecodeI64(pCoder, &pSubmitTbData->uid) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + if (tDecodeI32v(pCoder, &pSubmitTbData->sver) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + if (colFmt) { + // todo + ASSERT(0); + } else { + int64_t nRows = 0; + if (tDecodeI64v(pCoder, &nRows) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + pSubmitTbData->aRowP = taosArrayInit(nRows, sizeof(SRow *)); + if (pSubmitTbData->aRowP == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int32_t i = 0; i < nRows; i++) { + SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1); + if (tDecodeBinary(pCoder, (uint8_t **)ppRow, NULL) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + } + } + + tEndDecode(pCoder); + +_exit: + if (code) { + // todo: do clear + } + return 0; +} + +int32_t tEncodeSSubmitReq2(SEncoder *pCoder, const SSubmitReq2 *pReq) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI32v(pCoder, pReq->flag) < 0) return -1; + + if (pReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { + if (tEncodeI64v(pCoder, taosArrayGetSize(pReq->aCreateTbReq)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pReq->aCreateTbReq); ++i) { + SVCreateTbReq *pCreateTbReq = (SVCreateTbReq *)taosArrayGet(pReq->aCreateTbReq, i); + if (tEncodeSVCreateTbReq(pCoder, pCreateTbReq) < 0) return -1; + } + } + + if (tEncodeI64v(pCoder, taosArrayGetSize(pReq->aSubmitTbData)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); ++i) { + SSubmitTbData *pSubmitTbData = (SSubmitTbData *)taosArrayGet(pReq->aSubmitTbData, i); + if (tEncodeSSubmitTbData(pCoder, pSubmitTbData, pReq->flag & SUBMIT_REQ_COLUMN_DATA_FORMAT) < 0) return -1; + } + + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeSSubmitReq2(SDecoder *pCoder, SSubmitReq2 **ppReq) { + int32_t code = 0; + + // alloc + SSubmitReq2 *pReq = (SSubmitReq2 *)taosMemoryCalloc(1, sizeof(SSubmitReq2)); + if (pReq == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // decode + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + if (tDecodeI32v(pCoder, &pReq->flag) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + if (pReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { + int64_t nCreateTbReq = 0; + + if (tDecodeI64v(pCoder, &nCreateTbReq) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + pReq->aCreateTbReq = taosArrayInit(nCreateTbReq, sizeof(SVCreateTbReq)); + if (pReq->aCreateTbReq == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int64_t i = 0; i < nCreateTbReq; ++i) { + SVCreateTbReq *pCreateTbReq = taosArrayReserve(pReq->aCreateTbReq, 1); + if (tDecodeSVCreateTbReq(pCoder, pCreateTbReq) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + } + } + + int64_t nSubmitTbData = 0; + if (tDecodeI64v(pCoder, &nSubmitTbData) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + pReq->aSubmitTbData = taosArrayInit(nSubmitTbData, sizeof(SSubmitTbData)); + if (pReq->aSubmitTbData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int64_t i = 0; i < nSubmitTbData; ++i) { + SSubmitTbData *pSubmitTbData = taosArrayReserve(pReq->aSubmitTbData, 1); + if (tDecodeSSubmitTbData(pCoder, pSubmitTbData, pReq->flag & SUBMIT_REQ_COLUMN_DATA_FORMAT) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + } + + tEndDecode(pCoder); + +_exit: + if (code) { + *ppReq = NULL; + if (pReq) { + // todo: do other clear + taosMemoryFree(pReq); + } + } else { + *ppReq = pReq; + } + return 0; +} + +void tDestroySSubmitReq2(SSubmitReq2 *pReq) { + if (NULL == pReq) return; + + if (pReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { + taosArrayDestroyEx(pReq->aCreateTbReq, NULL /* todo */); + } + + taosArrayDestroyEx(pReq->aSubmitTbData, NULL /* todo */); + + taosMemoryFree(pReq); +}