fix(mnode): memory leak
This commit is contained in:
parent
b6660ca5b2
commit
a2757eb4f3
|
@ -636,6 +636,7 @@ typedef struct {
|
||||||
|
|
||||||
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
|
||||||
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
|
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
|
||||||
|
void tFreeStreamObj(SStreamObj* pObj);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char streamName[TSDB_STREAM_FNAME_LEN];
|
char streamName[TSDB_STREAM_FNAME_LEN];
|
||||||
|
|
|
@ -34,6 +34,7 @@ int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate);
|
||||||
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
|
SDbObj *mndAcquireDbByStb(SMnode *pMnode, const char *stbName);
|
||||||
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
|
int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreate, SDbObj *pDb);
|
||||||
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
|
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb);
|
||||||
|
void mndFreeStb(SStbObj *pStb);
|
||||||
|
|
||||||
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
void mndExtractDbNameFromStbFullName(const char *stbFullName, char *dst);
|
||||||
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t dstSize);
|
||||||
|
|
|
@ -116,6 +116,25 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tFreeStreamObj(SStreamObj *pStream) {
|
||||||
|
taosMemoryFree(pStream->sql);
|
||||||
|
taosMemoryFree(pStream->ast);
|
||||||
|
taosMemoryFree(pStream->physicalPlan);
|
||||||
|
if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema);
|
||||||
|
|
||||||
|
int32_t sz = taosArrayGetSize(pStream->tasks);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||||
|
int32_t taskSz = taosArrayGetSize(pLevel);
|
||||||
|
for (int32_t j = 0; j < taskSz; j++) {
|
||||||
|
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||||
|
tFreeSStreamTask(pTask);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pLevel);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pStream->tasks);
|
||||||
|
}
|
||||||
|
|
||||||
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
|
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
|
||||||
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
|
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||||
if (pVgEpNew == NULL) return NULL;
|
if (pVgEpNew == NULL) return NULL;
|
||||||
|
|
|
@ -266,6 +266,15 @@ _OVER:
|
||||||
return pRow;
|
return pRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mndFreeStb(SStbObj *pStb) {
|
||||||
|
taosArrayDestroy(pStb->pFuncs);
|
||||||
|
taosMemoryFreeClear(pStb->pColumns);
|
||||||
|
taosMemoryFreeClear(pStb->pTags);
|
||||||
|
taosMemoryFreeClear(pStb->comment);
|
||||||
|
taosMemoryFreeClear(pStb->pAst1);
|
||||||
|
taosMemoryFreeClear(pStb->pAst2);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
|
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
|
||||||
mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
|
mTrace("stb:%s, perform insert action, row:%p", pStb->name, pStb);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -273,12 +282,7 @@ static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb) {
|
||||||
|
|
||||||
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb) {
|
||||||
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
|
mTrace("stb:%s, perform delete action, row:%p", pStb->name, pStb);
|
||||||
taosArrayDestroy(pStb->pFuncs);
|
mndFreeStb(pStb);
|
||||||
taosMemoryFreeClear(pStb->pColumns);
|
|
||||||
taosMemoryFreeClear(pStb->pTags);
|
|
||||||
taosMemoryFreeClear(pStb->comment);
|
|
||||||
taosMemoryFreeClear(pStb->pAst1);
|
|
||||||
taosMemoryFreeClear(pStb->pAst2);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -167,6 +167,9 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream) {
|
||||||
|
|
||||||
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
|
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream) {
|
||||||
mTrace("stream:%s, perform delete action", pStream->name);
|
mTrace("stream:%s, perform delete action", pStream->name);
|
||||||
|
taosWLockLatch(&pStream->lock);
|
||||||
|
tFreeStreamObj(pStream);
|
||||||
|
taosWUnLockLatch(&pStream->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -493,10 +496,17 @@ static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStre
|
||||||
|
|
||||||
stbObj.uid = pStream->targetStbUid;
|
stbObj.uid = pStream->targetStbUid;
|
||||||
|
|
||||||
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) goto _OVER;
|
if (mndAddStbToTrans(pMnode, pTrans, pDb, &stbObj) < 0) {
|
||||||
|
mndFreeStb(&stbObj);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
tFreeSMCreateStbReq(&createReq);
|
||||||
|
mndFreeStb(&stbObj);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
_OVER:
|
_OVER:
|
||||||
|
tFreeSMCreateStbReq(&createReq);
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -715,6 +725,7 @@ _OVER:
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
|
||||||
tFreeSCMCreateStreamReq(&createStreamReq);
|
tFreeSCMCreateStreamReq(&createStreamReq);
|
||||||
|
tFreeStreamObj(&streamObj);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue