refact: tsma code optimization

This commit is contained in:
kailixu 2022-12-03 22:34:29 +08:00
parent 614462b878
commit 3a59c5381c
5 changed files with 26 additions and 39 deletions

View File

@ -3264,9 +3264,6 @@ void tDestroySSubmitRsp2(SSubmitRsp2* pRsp, int32_t flag);
#define TSDB_MSG_FLG_ENCODE 0x1
#define TSDB_MSG_FLG_DECODE 0x2
int32_t tBuildSubmitReq(int32_t vgId, SSubmitReq2* pReq, void** pData, uint32_t* pLen, __tmalloc_fn_t fp,
__tfree_fn_t ff);
#pragma pack(pop)
#ifdef __cplusplus

View File

@ -47,9 +47,6 @@ void *taosMemoryMallocAlign(uint32_t alignment, int64_t size);
} \
} while (0)
typedef void *(*__tmalloc_fn_t)(int64_t);
typedef void (*__tfree_fn_t)(void *);
#ifdef __cplusplus
}
#endif

View File

@ -7000,31 +7000,3 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) {
}
}
}
int32_t tBuildSubmitReq(int32_t vgId, SSubmitReq2 *pReq, void **pData, uint32_t *pLen, __tmalloc_fn_t fp, __tfree_fn_t ff) {
int32_t code = TSDB_CODE_SUCCESS;
uint32_t len = 0;
void *pBuf = NULL;
tEncodeSize(tEncodeSSubmitReq2, pReq, len, code);
if (TSDB_CODE_SUCCESS == code) {
SEncoder encoder;
len += sizeof(SMsgHead);
pBuf = (*fp)(len);
if (NULL == pBuf) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
((SMsgHead *)pBuf)->vgId = htonl(vgId);
((SMsgHead *)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
code = tEncodeSSubmitReq2(&encoder, pReq);
tEncoderClear(&encoder);
}
if (TSDB_CODE_SUCCESS == code) {
*pData = pBuf;
*pLen = len;
} else {
(*ff)(pBuf);
}
return code;
}

View File

@ -199,7 +199,7 @@ int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema,
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
SBatchDeleteReq* pDeleteReq, void* ppData, int32_t* pLen);
SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen);
// sma
int32_t smaInit();

View File

@ -304,7 +304,9 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema,
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
SBatchDeleteReq* pDeleteReq, void* ppData, int32_t* pLen) {
SBatchDeleteReq* pDeleteReq, void** ppData, int32_t* pLen) {
void* pBuf = NULL;
int32_t len = 0;
SSubmitReq2* pReq = NULL;
SArray* tagArray = NULL;
SArray* createTbArray = NULL;
@ -462,18 +464,37 @@ int32_t tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* p
taosArrayPush(pReq->aSubmitTbData, pTbData);
}
terrno = tBuildSubmitReq(TD_VID(pVnode), pReq, ppData, pLen, rpcMallocCont, rpcFreeCont);
// encode
tEncodeSize(tEncodeSSubmitReq2, pReq, len, terrno);
if (TSDB_CODE_SUCCESS == terrno) {
SEncoder encoder;
len += sizeof(SMsgHead);
pBuf = rpcMallocCont(len);
if (NULL == pBuf) {
goto _end;
}
((SMsgHead*)pBuf)->vgId = htonl(TD_VID(pVnode));
((SMsgHead*)pBuf)->contLen = htonl(len);
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead));
if (tEncodeSSubmitReq2(&encoder, pReq) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to encode submit req since %s", terrstr());
}
tEncoderClear(&encoder);
}
_end:
taosArrayDestroy(tagArray);
taosArrayDestroy(pVals);
tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE);
if (terrno != 0) {
rpcFreeCont(pBuf);
taosArrayDestroy(pDeleteReq->deleteReqs);
return TSDB_CODE_FAILED;
}
if (ppData) *ppData = pBuf;
if (pLen) *pLen = len;
return TSDB_CODE_SUCCESS;
}