fix: resolve memory leak on exceptions in mndCreateTopic
This commit is contained in:
parent
14e2e1e574
commit
9e2062f54a
|
@ -377,6 +377,10 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
||||||
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
|
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
|
||||||
const char *userName) {
|
const char *userName) {
|
||||||
mInfo("start to create topic:%s", pCreate->name);
|
mInfo("start to create topic:%s", pCreate->name);
|
||||||
|
STrans *pTrans = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
SNode *pAst = NULL;
|
||||||
|
SQueryPlan *pPlan = NULL;
|
||||||
|
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
@ -401,7 +405,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
if (pCreate->withMeta) {
|
if (pCreate->withMeta) {
|
||||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
return -1;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
topicObj.ast = taosStrdup(pCreate->ast);
|
topicObj.ast = taosStrdup(pCreate->ast);
|
||||||
|
@ -409,32 +413,21 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
|
|
||||||
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
|
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
|
||||||
|
|
||||||
SNode *pAst = NULL;
|
|
||||||
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
|
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
|
||||||
taosMemoryFree(topicObj.ast);
|
|
||||||
taosMemoryFree(topicObj.sql);
|
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
return -1;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryPlan *pPlan = NULL;
|
|
||||||
|
|
||||||
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
||||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
||||||
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
|
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
|
||||||
taosMemoryFree(topicObj.ast);
|
goto _OUT;
|
||||||
taosMemoryFree(topicObj.sql);
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
||||||
if (topicObj.ntbColIds == NULL) {
|
if (topicObj.ntbColIds == NULL) {
|
||||||
taosMemoryFree(topicObj.ast);
|
|
||||||
taosMemoryFree(topicObj.sql);
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
extractTopicTbInfo(pAst, &topicObj);
|
extractTopicTbInfo(pAst, &topicObj);
|
||||||
|
@ -446,25 +439,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
|
|
||||||
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
|
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
taosMemoryFree(topicObj.ast);
|
goto _OUT;
|
||||||
taosMemoryFree(topicObj.sql);
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
|
if (nodesNodeToString((SNode *)pPlan, false, &topicObj.physicalPlan, NULL) != 0) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
taosMemoryFree(topicObj.ast);
|
goto _OUT;
|
||||||
taosMemoryFree(topicObj.sql);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
nodesDestroyNode((SNode *)pPlan);
|
|
||||||
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
|
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
|
||||||
if (pStb == NULL) {
|
if (pStb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||||
return -1;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
strcpy(topicObj.stbName, pCreate->subStbName);
|
strcpy(topicObj.stbName, pCreate->subStbName);
|
||||||
|
@ -483,13 +469,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
/*topicObj.withTbName = 1;*/
|
/*topicObj.withTbName = 1;*/
|
||||||
/*topicObj.withSchema = 1;*/
|
/*topicObj.withSchema = 1;*/
|
||||||
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-topic");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
taosMemoryFreeClear(topicObj.ast);
|
goto _OUT;
|
||||||
taosMemoryFreeClear(topicObj.sql);
|
|
||||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||||
|
@ -499,9 +482,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
||||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
goto _OUT;
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
|
@ -530,17 +511,16 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
tEncodeSize(tEncodeSTqCheckInfo, &info, len, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
mndTransDrop(pTrans);
|
goto _OUT;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
SEncoder encoder;
|
SEncoder encoder;
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
|
if (tEncodeSTqCheckInfo(&encoder, &info) < 0) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
mndTransDrop(pTrans);
|
goto _OUT;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
||||||
|
@ -553,32 +533,32 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
mndTransDrop(pTrans);
|
goto _OUT;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
buf = NULL;
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
goto _OUT;
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
|
||||||
|
_OUT:
|
||||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||||
taosMemoryFreeClear(topicObj.sql);
|
taosMemoryFreeClear(topicObj.sql);
|
||||||
taosMemoryFreeClear(topicObj.ast);
|
taosMemoryFreeClear(topicObj.ast);
|
||||||
taosArrayDestroy(topicObj.ntbColIds);
|
taosArrayDestroy(topicObj.ntbColIds);
|
||||||
|
|
||||||
if (topicObj.schema.nCols) {
|
if (topicObj.schema.nCols) {
|
||||||
taosMemoryFreeClear(topicObj.schema.pSchema);
|
taosMemoryFreeClear(topicObj.schema.pSchema);
|
||||||
}
|
}
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyNode((SNode *)pPlan);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||||
|
|
Loading…
Reference in New Issue