fix: release SStreamObj when dropping
This commit is contained in:
parent
04f9419829
commit
b39c80940f
|
@ -2677,15 +2677,6 @@ typedef struct {
|
||||||
int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
|
int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
|
||||||
int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
|
int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t vgId;
|
|
||||||
SEpSet epSet;
|
|
||||||
} SVgEpSet;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t padding;
|
|
||||||
} SRSmaExecMsg;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t version; // for compatibility(default 0)
|
int8_t version; // for compatibility(default 0)
|
||||||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||||||
|
|
|
@ -38,7 +38,6 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw);
|
||||||
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
|
static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma);
|
||||||
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
|
static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
|
||||||
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
|
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
|
||||||
static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups);
|
|
||||||
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq);
|
static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq);
|
static int32_t mndProcessDropSmaReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
|
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
|
||||||
|
@ -840,6 +839,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
mndReleaseVgroup(pMnode, pVgroup);
|
mndReleaseVgroup(pMnode, pVgroup);
|
||||||
mndReleaseStb(pMnode, pStb);
|
mndReleaseStb(pMnode, pStb);
|
||||||
|
|
|
@ -800,11 +800,6 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
||||||
return conflict;
|
return conflict;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndTransFreeObj(SSdb *pSdb) {
|
|
||||||
sdbFreeRowsByType(pSdb, SDB_STREAM);
|
|
||||||
sdbFreeRowsByType(pSdb, SDB_SMA);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||||
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||||
if (strlen(pTrans->dbname1) == 0 && strlen(pTrans->dbname2) == 0) {
|
if (strlen(pTrans->dbname1) == 0 && strlen(pTrans->dbname2) == 0) {
|
||||||
|
@ -832,8 +827,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransFreeObj(pMnode->pSdb);
|
|
||||||
|
|
||||||
mDebug("trans:%d, prepare finished", pTrans->id);
|
mDebug("trans:%d, prepare finished", pTrans->id);
|
||||||
|
|
||||||
STrans *pNew = mndAcquireTrans(pMnode, pTrans->id);
|
STrans *pNew = mndAcquireTrans(pMnode, pTrans->id);
|
||||||
|
|
|
@ -367,7 +367,6 @@ int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
|
||||||
*/
|
*/
|
||||||
void sdbSetApplyInfo(SSdb *pSdb, int64_t index, int64_t term, int64_t config);
|
void sdbSetApplyInfo(SSdb *pSdb, int64_t index, int64_t term, int64_t config);
|
||||||
void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config);
|
void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config);
|
||||||
void sdbFreeRowsByType(SSdb *pSdb, ESdbType type);
|
|
||||||
|
|
||||||
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
||||||
void sdbFreeRaw(SSdbRaw *pRaw);
|
void sdbFreeRaw(SSdbRaw *pRaw);
|
||||||
|
|
|
@ -65,25 +65,6 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
||||||
return pSdb;
|
return pSdb;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sdbFreeRowsByType(SSdb *pSdb, ESdbType type) {
|
|
||||||
SHashObj *hash = pSdb->hashObjs[type];
|
|
||||||
if (hash == NULL || !taosHashGetSize(hash)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSdbRow **ppRow = taosHashIterate(hash, NULL);
|
|
||||||
while (ppRow != NULL) {
|
|
||||||
SSdbRow *pRow = *ppRow;
|
|
||||||
if (pRow == NULL) {
|
|
||||||
ppRow = taosHashIterate(hash, ppRow);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
sdbFreeRow(pSdb, pRow, false);
|
|
||||||
ppRow = taosHashIterate(hash, ppRow);
|
|
||||||
}
|
|
||||||
taosHashClear(hash);
|
|
||||||
}
|
|
||||||
|
|
||||||
void sdbCleanup(SSdb *pSdb) {
|
void sdbCleanup(SSdb *pSdb) {
|
||||||
mDebug("start to cleanup sdb");
|
mDebug("start to cleanup sdb");
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue