enh: code optimization and memory leak
This commit is contained in:
parent
0de7f3e3a7
commit
8b3b6a8962
|
@ -87,7 +87,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
while (pVnode->refCount > 0) taosMsleep(10);
|
while (pVnode->refCount > 0) taosMsleep(10);
|
||||||
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);
|
dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId);
|
||||||
|
|
||||||
|
|
||||||
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10);
|
||||||
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10);
|
||||||
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10);
|
||||||
|
|
|
@ -118,12 +118,12 @@ struct SSmaStat {
|
||||||
#define RSMA_FS_LOCK(r) (&(r)->lock)
|
#define RSMA_FS_LOCK(r) (&(r)->lock)
|
||||||
|
|
||||||
struct SRSmaInfoItem {
|
struct SRSmaInfoItem {
|
||||||
int8_t level;
|
int8_t level : 4;
|
||||||
int8_t triggerStat;
|
int8_t fetchLevel : 4;
|
||||||
uint8_t nSkipped; // number of skipped to fetch data from all active window
|
int8_t triggerStat;
|
||||||
int8_t fetchLevel;
|
uint16_t nSkipped; // number of skipped to fetch data from all active window
|
||||||
int32_t maxDelay; // ms
|
int32_t maxDelay; // ms
|
||||||
tmr_h tmrId;
|
tmr_h tmrId;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SRSmaInfo {
|
struct SRSmaInfo {
|
||||||
|
@ -131,8 +131,8 @@ struct SRSmaInfo {
|
||||||
int64_t suid;
|
int64_t suid;
|
||||||
int64_t refId; // refId of SRSmaStat
|
int64_t refId; // refId of SRSmaStat
|
||||||
int64_t lastRecv; // ms
|
int64_t lastRecv; // ms
|
||||||
int8_t delFlag;
|
|
||||||
int8_t assigned; // 0 idle, 1 assgined for exec
|
int8_t assigned; // 0 idle, 1 assgined for exec
|
||||||
|
int8_t delFlag;
|
||||||
int16_t padding;
|
int16_t padding;
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
SRSmaInfoItem items[TSDB_RETENTION_L2];
|
||||||
|
|
|
@ -328,7 +328,6 @@ struct SVnode {
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
|
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
|
||||||
|
|
||||||
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
#define VND_TSDB(vnd) ((vnd)->pTsdb)
|
||||||
|
|
|
@ -109,7 +109,7 @@ int32_t smaBegin(SSma *pSma) {
|
||||||
/**
|
/**
|
||||||
* @brief pre-commit for rollup sma(sync commit).
|
* @brief pre-commit for rollup sma(sync commit).
|
||||||
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
* 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED.
|
||||||
* 2) wait all triggered fetch tasks finished
|
* 2) wait for all triggered fetch tasks to finish
|
||||||
* 3) perform persist task for qTaskInfo
|
* 3) perform persist task for qTaskInfo
|
||||||
*
|
*
|
||||||
* @param pSma
|
* @param pSma
|
||||||
|
@ -127,14 +127,14 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) {
|
||||||
// step 1: set rsma stat paused
|
// step 1: set rsma stat paused
|
||||||
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
|
||||||
|
|
||||||
// step 2: wait all triggered fetch tasks finished
|
// step 2: wait for all triggered fetch tasks to finish
|
||||||
int32_t nLoops = 0;
|
int32_t nLoops = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
if (T_REF_VAL_GET(pStat) == 0) {
|
if (T_REF_VAL_GET(pStat) == 0) {
|
||||||
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
|
smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
|
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
|
||||||
}
|
}
|
||||||
++nLoops;
|
++nLoops;
|
||||||
if (nLoops > 1000) {
|
if (nLoops > 1000) {
|
||||||
|
@ -319,14 +319,14 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) {
|
||||||
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
|
pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied;
|
||||||
ASSERT(pRSmaStat->commitAppliedVer > 0);
|
ASSERT(pRSmaStat->commitAppliedVer > 0);
|
||||||
|
|
||||||
// step 2: wait all triggered fetch tasks finished
|
// step 2: wait for all triggered fetch tasks to finish
|
||||||
int32_t nLoops = 0;
|
int32_t nLoops = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
if (T_REF_VAL_GET(pStat) == 0) {
|
if (T_REF_VAL_GET(pStat) == 0) {
|
||||||
smaDebug("vgId:%d, rsma commit, fetch tasks all finished", SMA_VID(pSma));
|
smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma));
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, rsma commit, fetch tasks not all finished yet", SMA_VID(pSma));
|
smaDebug("vgId:%d, rsma commit, fetch tasks are not all finished yet", SMA_VID(pSma));
|
||||||
}
|
}
|
||||||
++nLoops;
|
++nLoops;
|
||||||
if (nLoops > 1000) {
|
if (nLoops > 1000) {
|
||||||
|
|
|
@ -275,14 +275,14 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
|
||||||
}
|
}
|
||||||
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
taosHashCleanup(RSMA_INFO_HASH(pStat));
|
||||||
|
|
||||||
// step 3: wait all triggered fetch tasks finished
|
// step 3: wait for all triggered fetch tasks to finish
|
||||||
int32_t nLoops = 0;
|
int32_t nLoops = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
|
if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) {
|
||||||
smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma));
|
smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma));
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma));
|
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
|
||||||
}
|
}
|
||||||
++nLoops;
|
++nLoops;
|
||||||
if (nLoops > 1000) {
|
if (nLoops > 1000) {
|
||||||
|
|
|
@ -148,9 +148,13 @@ int32_t smaClose(SSma *pSma) {
|
||||||
|
|
||||||
int32_t smaPreClose(SSma *pSma) {
|
int32_t smaPreClose(SSma *pSma) {
|
||||||
if (pSma && VND_IS_RSMA(pSma->pVnode)) {
|
if (pSma && VND_IS_RSMA(pSma->pVnode)) {
|
||||||
SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma);
|
SSmaEnv *pEnv = NULL;
|
||||||
|
SRSmaStat *pStat = NULL;
|
||||||
|
if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv))) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) {
|
for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) {
|
||||||
tsem_post(&(pRSmaStat->notEmpty));
|
tsem_post(&(pStat->notEmpty));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -1524,7 +1524,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
|
SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId);
|
||||||
|
|
||||||
if (!pStat) {
|
if (!pStat) {
|
||||||
smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
|
smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId,
|
||||||
pRSmaInfo->refId);
|
pRSmaInfo->refId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1557,10 +1557,12 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
|
||||||
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
|
SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
|
||||||
// async process
|
// async process
|
||||||
pItem->fetchLevel = pItem->level;
|
pItem->fetchLevel = pItem->level;
|
||||||
|
#if 0
|
||||||
SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid);
|
SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid);
|
||||||
SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1);
|
SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1);
|
||||||
ASSERT(qItem->level == pItem->level);
|
ASSERT(qItem->level == pItem->level);
|
||||||
ASSERT(qItem->fetchLevel == pItem->fetchLevel);
|
ASSERT(qItem->fetchLevel == pItem->fetchLevel);
|
||||||
|
#endif
|
||||||
tsem_post(&(pStat->notEmpty));
|
tsem_post(&(pStat->notEmpty));
|
||||||
smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level,
|
smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level,
|
||||||
pRSmaInfo->suid);
|
pRSmaInfo->suid);
|
||||||
|
|
|
@ -782,6 +782,9 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx
|
||||||
pBt);
|
pBt);
|
||||||
tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
|
tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
|
||||||
tdbOsFree(pNewCell);
|
tdbOsFree(pNewCell);
|
||||||
|
if (TDB_CELLDECODER_FREE_VAL(&cd)) {
|
||||||
|
tdbFree(cd.pVal);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// move to next new page
|
// move to next new page
|
||||||
|
|
Loading…
Reference in New Issue