diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5bcee58bba..007c9c81e0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3226,6 +3226,20 @@ int32_t tDecodeSSubmitReq2(SDecoder* pCoder, SSubmitReq2** ppReq); void tDestroySSubmitTbData(SSubmitTbData* pTbData); void tDestroySSubmitReq2(SSubmitReq2* pReq); +typedef struct { + int32_t code; + int32_t affectedRows; + SArray* aCreateTbRsp; // SArray +} SSubmitRsp2; + +int32_t tCreateSSubmitRsp2(SSubmitRsp2** ppRsp); +void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag); +int32_t tEncodeSSubmitRsp2(SEncoder* pCoder, const SSubmitRsp2* pRsp); +int32_t tDecodeSSubmitRsp2(SDecoder* pCoder, SSubmitRsp2* pRsp); + +#define TSDB_MSG_FLG_ENCODE 0x1 +#define TSDB_MSG_FLG_DECODE 0x2 + #pragma pack(pop) #ifdef __cplusplus diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 76127d727c..7c865723f3 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6857,3 +6857,104 @@ void tDestroySSubmitReq2(SSubmitReq2 *pReq) { taosArrayDestroyEx(pReq->aSubmitTbData, (FDelete)destroySSubmitTbData); taosMemoryFree(pReq); } + +int32_t tEncodeSSubmitRsp2(SEncoder *pCoder, const SSubmitRsp2 *pRsp) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI32v(pCoder, pRsp->code) < 0) return -1; + if (tEncodeI32v(pCoder, pRsp->affectedRows) < 0) return -1; + if (tEncodeI32v(pCoder, taosArrayGetSize(pRsp->aCreateTbRsp)) < 0) return -1; + for (int32_t i = 0; i < taosArrayGetSize(pRsp->aCreateTbRsp); ++i) { + if (tEncodeSVCreateTbRsp(pCoder, taosArrayGet(pRsp->aCreateTbRsp, i)) < 0) return -1; + } + + tEndEncode(pCoder); + return 0; +} + +int32_t tDecodeSSubmitRsp2(SDecoder *pCoder, SSubmitRsp2 *pRsp) { + int32_t code = 0; + + memset(pRsp, 0, sizeof(SSubmitRsp2)); + + // decode + if (tStartDecode(pCoder) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + if (tDecodeI32v(pCoder, &pRsp->code) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + if (tDecodeI32v(pCoder, &pRsp->affectedRows) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + int32_t nCreateTbRsp; + if (tDecodeI32v(pCoder, &nCreateTbRsp) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + + pRsp->aCreateTbRsp = taosArrayInit(nCreateTbRsp, sizeof(SVCreateTbRsp)); + if (pRsp->aCreateTbRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int32_t i = 0; i < nCreateTbRsp; ++i) { + SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pRsp->aCreateTbRsp, 1); + if (tDecodeSVCreateTbRsp(pCoder, pCreateTbRsp) < 0) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + } + + tEndDecode(pCoder); + +_exit: + if (code) { + if (pRsp->aCreateTbRsp) { + taosArrayDestroyEx(pRsp->aCreateTbRsp, NULL /* todo */); + } + } + return code; +} + +int32_t tCreateSSubmitRsp2(SSubmitRsp2 **ppRsp) { + int32_t code = 0; + + SSubmitRsp2 *pRsp = (SSubmitRsp2 *)taosMemoryCalloc(1, sizeof(*pRsp)); + if (pRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + pRsp->aCreateTbRsp = taosArrayInit(16, sizeof(SVCreateTbRsp)); + if (pRsp->aCreateTbRsp == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + +_exit: + if (code) { + *ppRsp = NULL; + if (pRsp) { + if (pRsp->aCreateTbRsp) taosArrayDestroy(pRsp->aCreateTbRsp); + taosMemoryFree(pRsp); + } + } else { + *ppRsp = pRsp; + } + return code; +} + +void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) { + if (pRsp) { + taosArrayDestroyEx(pRsp->aCreateTbRsp, NULL /* TODO: set according to flag */); + taosMemoryFree(pRsp); + } +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2618e45e53..d3cc133be1 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -822,43 +822,39 @@ static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, return TSDB_CODE_SUCCESS; } -static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { - ASSERT(pMsg != NULL); - SSubmitMsgIter msgIter = {0}; - SMeta *pMeta = pVnode->pMeta; - SSubmitBlk *pBlock = NULL; +// static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { +// ASSERT(pMsg != NULL); +// SSubmitMsgIter msgIter = {0}; +// SMeta *pMeta = pVnode->pMeta; +// SSubmitBlk *pBlock = NULL; - if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; - while (true) { - if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; - if (pBlock == NULL) break; +// if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; +// while (true) { +// if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; +// if (pBlock == NULL) break; - vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags); - } +// vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags); +// } - return 0; -} +// return 0; +// } static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { #if 1 int32_t code = 0; - SDecoder dc = {0}; - SSubmitRsp submitRsp = {0}; + SSubmitRsp2 submitRsp = {0}; SSubmitReq2 *pSubmitReq = NULL; SArray *newTbUids = NULL; + // decode + SDecoder dc = {0}; tDecoderInit(&dc, (char *)pReq + sizeof(SMsgHead), len - sizeof(SMsgHead)); if (tDecodeSSubmitReq2(&dc, &pSubmitReq) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; } - - submitRsp.pArray = taosArrayInit(1, sizeof(SSubmitBlkRsp)); - if (submitRsp.pArray == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _exit; - } + tDecoderClear(&dc); // auto create table if (pSubmitReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) { @@ -874,6 +870,12 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq } } + // check + for (int32_t i = 0; i < taosArrayGetSize(pSubmitReq->aSubmitTbData); ++i) { + SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, i); + // TODO + } + // insert table data for (int32_t iSubmitTbData = 0; iSubmitTbData < taosArrayGetSize(pSubmitReq->aSubmitTbData); iSubmitTbData++) { SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, iSubmitTbData);