fix memory leak
This commit is contained in:
parent
a1cfa351a4
commit
273adb9895
|
@ -2956,7 +2956,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
||||||
// taosMemoryFree(pSubTopicEp->schema.pSchema);
|
if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
|
||||||
taosArrayDestroy(pSubTopicEp->vgs);
|
taosArrayDestroy(pSubTopicEp->vgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,6 @@ typedef struct {
|
||||||
TBC* pCur;
|
TBC* pCur;
|
||||||
} SStreamStateCur;
|
} SStreamStateCur;
|
||||||
|
|
||||||
#if 1
|
|
||||||
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
|
int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen);
|
||||||
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
|
int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen);
|
||||||
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
|
int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key);
|
||||||
|
@ -69,8 +68,6 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
|
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
|
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -438,6 +438,7 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
|
||||||
}
|
}
|
||||||
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||||
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
|
||||||
|
ASSERT(numOfCols == pResInfo->numOfCols);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
|
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
|
||||||
pResInfo->fields[i].bytes = pSchema[i].bytes;
|
pResInfo->fields[i].bytes = pSchema[i].bytes;
|
||||||
|
|
|
@ -841,7 +841,7 @@ void tmqFreeImpl(void* handle) {
|
||||||
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
|
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||||
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
||||||
taosArrayDestroy(pTopic->vgs);
|
taosArrayDestroy(pTopic->vgs);
|
||||||
}
|
}
|
||||||
|
@ -1218,6 +1218,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
SMqClientTopic topic = {0};
|
SMqClientTopic topic = {0};
|
||||||
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
|
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
|
||||||
topic.schema = pTopicEp->schema;
|
topic.schema = pTopicEp->schema;
|
||||||
|
pTopicEp->schema.nCols = 0;
|
||||||
|
pTopicEp->schema.pSchema = NULL;
|
||||||
tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(topic.topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
|
||||||
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
||||||
|
|
||||||
|
@ -1251,7 +1253,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
|
||||||
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
int32_t sz = taosArrayGetSize(tmq->clientTopics);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (pTopic->schema.nCols) taosMemoryFree(pTopic->schema.pSchema);
|
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||||
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
int32_t vgSz = taosArrayGetSize(pTopic->vgs);
|
||||||
taosArrayDestroy(pTopic->vgs);
|
taosArrayDestroy(pTopic->vgs);
|
||||||
}
|
}
|
||||||
|
|
|
@ -487,6 +487,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
|
taosArrayDestroy(pConsumerNew->rebNewTopics);
|
||||||
pConsumerNew->rebNewTopics = newSub;
|
pConsumerNew->rebNewTopics = newSub;
|
||||||
subscribe.topicNames = NULL;
|
subscribe.topicNames = NULL;
|
||||||
|
|
||||||
|
|
|
@ -145,7 +145,10 @@ SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
|
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
|
||||||
if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg);
|
if (pVgEp) {
|
||||||
|
taosMemoryFreeClear(pVgEp->qmsg);
|
||||||
|
taosMemoryFree(pVgEp);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
|
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
|
||||||
|
@ -200,18 +203,10 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L
|
||||||
}
|
}
|
||||||
|
|
||||||
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
|
void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
|
||||||
if (pConsumer->currentTopics) {
|
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
|
||||||
taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
|
||||||
}
|
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
|
||||||
if (pConsumer->rebNewTopics) {
|
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
|
||||||
taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
|
|
||||||
}
|
|
||||||
if (pConsumer->rebRemovedTopics) {
|
|
||||||
taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
|
|
||||||
}
|
|
||||||
if (pConsumer->assignedTopics) {
|
|
||||||
taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
|
int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
|
||||||
|
@ -428,6 +423,13 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
|
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (1) {
|
||||||
|
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
|
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
|
||||||
|
}
|
||||||
taosHashCleanup(pSub->consumerHash);
|
taosHashCleanup(pSub->consumerHash);
|
||||||
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
|
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1187,6 +1187,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||||
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
|
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1197,6 +1198,7 @@ static int32_t mndCheckAlterColForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
NEXT:
|
NEXT:
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1228,6 +1230,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
|
||||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
|
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
|
||||||
mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
|
mError("stream:%s, check colId:%d conflicted", pStream->name, pCol->colId);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1238,6 +1241,7 @@ static int32_t mndCheckAlterColForStream(SMnode *pMnode, const char *stbFullName
|
||||||
NEXT:
|
NEXT:
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1275,6 +1279,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
|
||||||
if ((pCol->colId) > 0 && (pCol->colId == colId)) {
|
if ((pCol->colId) > 0 && (pCol->colId == colId)) {
|
||||||
sdbRelease(pSdb, pSma);
|
sdbRelease(pSdb, pSma);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
|
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
|
||||||
mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
|
mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1285,6 +1290,7 @@ static int32_t mndCheckAlterColForTSma(SMnode *pMnode, const char *stbFullName,
|
||||||
NEXT:
|
NEXT:
|
||||||
sdbRelease(pSdb, pSma);
|
sdbRelease(pSdb, pSma);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1774,8 +1780,8 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, void **pCont, int32_t *pLen) {
|
int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char *dbFName, char *stbFName, void **pCont, int32_t *pLen) {
|
||||||
int32_t ret = -1;
|
int32_t ret = -1;
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
|
||||||
if (NULL == pDb) {
|
if (NULL == pDb) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1786,10 +1792,10 @@ int32_t mndBuildSMCreateStbRsp(SMnode *pMnode, char* dbFName, char* stbFName, vo
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SEncoder ec = {0};
|
SEncoder ec = {0};
|
||||||
uint32_t contLen = 0;
|
uint32_t contLen = 0;
|
||||||
SMCreateStbRsp stbRsp = {0};
|
SMCreateStbRsp stbRsp = {0};
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
tNameFromString(&name, pObj->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||||
|
|
||||||
stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
stbRsp.pMeta = taosMemoryCalloc(1, sizeof(STableMetaRsp));
|
||||||
|
@ -1834,7 +1840,6 @@ _OVER:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
|
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
|
||||||
void *alterOriData, int32_t alterOriDataLen) {
|
void *alterOriData, int32_t alterOriDataLen) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
@ -2091,6 +2096,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
if (pCol->tableId == suid) {
|
if (pCol->tableId == suid) {
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
goto NEXT;
|
goto NEXT;
|
||||||
|
@ -2099,6 +2105,7 @@ static int32_t mndCheckDropStbForTopic(SMnode *pMnode, const char *stbFullName,
|
||||||
NEXT:
|
NEXT:
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -2136,6 +2143,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
||||||
if (pCol->tableId == suid) {
|
if (pCol->tableId == suid) {
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
goto NEXT;
|
goto NEXT;
|
||||||
|
@ -2144,6 +2152,7 @@ static int32_t mndCheckDropStbForStream(SMnode *pMnode, const char *stbFullName,
|
||||||
NEXT:
|
NEXT:
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -490,8 +490,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
mndReleaseConsumer(pMnode, pConsumerOld);
|
mndReleaseConsumer(pMnode, pConsumerOld);
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
goto REB_FAIL;
|
goto REB_FAIL;
|
||||||
}
|
}
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3.3 set removed consumer
|
// 3.3 set removed consumer
|
||||||
|
@ -509,8 +513,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
mndReleaseConsumer(pMnode, pConsumerOld);
|
mndReleaseConsumer(pMnode, pConsumerOld);
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
goto REB_FAIL;
|
goto REB_FAIL;
|
||||||
}
|
}
|
||||||
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
|
taosMemoryFree(pConsumerNew);
|
||||||
}
|
}
|
||||||
#if 0
|
#if 0
|
||||||
if (consumerNum) {
|
if (consumerNum) {
|
||||||
|
|
|
@ -224,6 +224,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
|
if (taosDecodeSSchemaWrapper(buf, &pTopic->schema) == NULL) {
|
||||||
goto TOPIC_DECODE_OVER;
|
goto TOPIC_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(buf);
|
||||||
} else {
|
} else {
|
||||||
pTopic->schema.nCols = 0;
|
pTopic->schema.nCols = 0;
|
||||||
pTopic->schema.version = 0;
|
pTopic->schema.version = 0;
|
||||||
|
@ -266,6 +267,11 @@ static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
|
||||||
|
|
||||||
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
|
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
|
||||||
mTrace("topic:%s, perform delete action", pTopic->name);
|
mTrace("topic:%s, perform delete action", pTopic->name);
|
||||||
|
taosMemoryFreeClear(pTopic->sql);
|
||||||
|
taosMemoryFreeClear(pTopic->ast);
|
||||||
|
taosMemoryFreeClear(pTopic->physicalPlan);
|
||||||
|
if (pTopic->schema.nCols) taosMemoryFreeClear(pTopic->schema.pSchema);
|
||||||
|
taosArrayDestroy(pTopic->ntbColIds);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -347,6 +353,7 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
nodesDestroyList(pNodeList);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,6 +423,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
taosMemoryFree(topicObj.sql);
|
taosMemoryFree(topicObj.sql);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
nodesDestroyNode(pAst);
|
||||||
|
nodesDestroyNode((SNode *)pPlan);
|
||||||
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
|
SStbObj *pStb = mndAcquireStb(pMnode, pCreate->subStbName);
|
||||||
if (pStb == NULL) {
|
if (pStb == NULL) {
|
||||||
|
@ -512,6 +521,10 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||||
|
taosMemoryFreeClear(topicObj.sql);
|
||||||
|
taosMemoryFreeClear(topicObj.ast);
|
||||||
|
taosArrayDestroy(topicObj.ntbColIds);
|
||||||
|
if (topicObj.schema.nCols) taosMemoryFreeClear(topicObj.schema.pSchema);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -147,6 +147,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
|
||||||
taosHashCancelIterate(pStore->pHash, pIter);
|
taosHashCancelIterate(pStore->pHash, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(buf);
|
||||||
}
|
}
|
||||||
// close and rename file
|
// close and rename file
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
|
|
@ -386,11 +386,12 @@ void* taosArrayDestroy(SArray* pArray) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArrayDestroyP(SArray* pArray, FDelete fp) {
|
void taosArrayDestroyP(SArray* pArray, FDelete fp) {
|
||||||
if(!pArray) return;
|
if (pArray) {
|
||||||
for (int32_t i = 0; i < pArray->size; i++) {
|
for (int32_t i = 0; i < pArray->size; i++) {
|
||||||
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
|
fp(*(void**)TARRAY_GET_ELEM(pArray, i));
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pArray);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosArrayDestroyEx(SArray* pArray, FDelete fp) {
|
void taosArrayDestroyEx(SArray* pArray, FDelete fp) {
|
||||||
|
|
Loading…
Reference in New Issue