From 3a59c5381c22c97f6db080c6ebb89401951ddf1a Mon Sep 17 00:00:00 2001 From: kailixu Date: Sat, 3 Dec 2022 22:34:29 +0800 Subject: [PATCH] refact: tsma code optimization --- include/common/tmsg.h | 3 --- include/os/osMemory.h | 3 --- source/common/src/tmsg.c | 28 -------------------------- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tqSink.c | 29 +++++++++++++++++++++++---- 5 files changed, 26 insertions(+), 39 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c7f3caafb5..ad992fd9db 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3264,9 +3264,6 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag); #define TSDB_MSG_FLG_ENCODE 0x1 #define TSDB_MSG_FLG_DECODE 0x2 -int32_t tBuildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen, __tmalloc_fn_t fp, - __tfree_fn_t ff); - #pragma pack(pop) #ifdef __cplusplus diff --git a/include/os/osMemory.h b/include/os/osMemory.h index a13b8f3014..4681ff6674 100644 --- a/include/os/osMemory.h +++ b/include/os/osMemory.h @@ -47,9 +47,6 @@ void *taosMemoryMallocAlign(uint32_t alignment, int64_t size); } \ } while (0) -typedef void *(*__tmalloc_fn_t)(int64_t); -typedef void (*__tfree_fn_t)(void *); - #ifdef __cplusplus } #endif diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 47738e3436..8dc4633443 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7000,31 +7000,3 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) { } } } - -int32_t tBuildSubmitReq(int32_t vgId, SSubmitReq2 *pReq, void **pData, uint32_t *pLen, __tmalloc_fn_t fp, __tfree_fn_t ff) { - int32_t code = TSDB_CODE_SUCCESS; - uint32_t len = 0; - void *pBuf = NULL; - tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); - if (TSDB_CODE_SUCCESS == code) { - SEncoder encoder; - len += sizeof(SMsgHead); - pBuf = (*fp)(len); - if (NULL == pBuf) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - ((SMsgHead *)pBuf)->vgId = htonl(vgId); - ((SMsgHead *)pBuf)->contLen = htonl(len); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); - code = tEncodeSSubmitReq2(&encoder, pReq); - tEncoderClear(&encoder); - } - - if (TSDB_CODE_SUCCESS == code) { - *pData = pBuf; - *pLen = len; - } else { - (*ff)(pBuf); - } - return code; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 1da40c8b73..dad25ea346 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -199,7 +199,7 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, - SBatchDeleteReq* pDeleteReq, void* ppData, int32_t* pLen); + SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 49c4973342..965fc60a2e 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -304,7 +304,9 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName, - SBatchDeleteReq* pDeleteReq, void* ppData, int32_t* pLen) { + SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen) { + void* pBuf = NULL; + int32_t len = 0; SSubmitReq2* pReq = NULL; SArray* tagArray = NULL; SArray* createTbArray = NULL; @@ -462,18 +464,37 @@ int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* p taosArrayPush(pReq->aSubmitTbData, pTbData); } - - terrno = tBuildSubmitReq(TD_VID(pVnode), pReq, ppData, pLen, rpcMallocCont, rpcFreeCont); - + + // encode + tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno); + if (TSDB_CODE_SUCCESS == terrno) { + SEncoder encoder; + len += sizeof(SMsgHead); + pBuf = rpcMallocCont(len); + if (NULL == pBuf) { + goto _end; + } + ((SMsgHead*)pBuf)->vgId = htonl(TD_VID(pVnode)); + ((SMsgHead*)pBuf)->contLen = htonl(len); + tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); + if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("failed to encode submit req since %s", terrstr()); + } + tEncoderClear(&encoder); + } _end: taosArrayDestroy(tagArray); taosArrayDestroy(pVals); tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); if (terrno != 0) { + rpcFreeCont(pBuf); taosArrayDestroy(pDeleteReq->deleteReqs); return TSDB_CODE_FAILED; } + if (ppData) *ppData = pBuf; + if (pLen) *pLen = len; return TSDB_CODE_SUCCESS; }