refact
This commit is contained in:
parent
9241af2298
commit
b1351da7cd
|
@ -48,6 +48,7 @@ extern "C" {
|
||||||
#include <wchar.h>
|
#include <wchar.h>
|
||||||
#include <wctype.h>
|
#include <wctype.h>
|
||||||
#include <wordexp.h>
|
#include <wordexp.h>
|
||||||
|
#include <libgen.h>
|
||||||
|
|
||||||
#include <sys/mman.h>
|
#include <sys/mman.h>
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include "tsdbint.h"
|
#include "tsdbint.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
|
||||||
extern int32_t tsTsdbMetaCompactRatio;
|
extern int32_t tsTsdbMetaCompactRatio;
|
||||||
|
|
||||||
|
@ -721,8 +722,8 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
||||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||||
|
|
||||||
ASSERT(pSet == NULL || pSet->fid == fid);
|
ASSERT(pSet == NULL || pSet->fid == fid);
|
||||||
|
|
||||||
|
@ -776,7 +777,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCreateCommitIters(SCommitH *pCommith) {
|
static int tsdbCreateCommitIters(SCommitH *pCommith) {
|
||||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
SMemTable *pMem = pRepo->imem;
|
SMemTable *pMem = pRepo->imem;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
|
||||||
|
@ -893,18 +894,18 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) {
|
||||||
|
|
||||||
static void tsdbDestroyCommitH(SCommitH *pCommith) {
|
static void tsdbDestroyCommitH(SCommitH *pCommith) {
|
||||||
pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols);
|
pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols);
|
||||||
pCommith->aSubBlk = taosArrayDestroy(&pCommith->aSubBlk);
|
pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk);
|
||||||
pCommith->aSupBlk = taosArrayDestroy(&pCommith->aSupBlk);
|
pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk);
|
||||||
pCommith->aBlkIdx = taosArrayDestroy(&pCommith->aBlkIdx);
|
pCommith->aBlkIdx = taosArrayDestroy(pCommith->aBlkIdx);
|
||||||
tsdbDestroyCommitIters(pCommith);
|
tsdbDestroyCommitIters(pCommith);
|
||||||
tsdbDestroyReadH(&(pCommith->readh));
|
tsdbDestroyReadH(&(pCommith->readh));
|
||||||
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
|
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbNextCommitFid(SCommitH *pCommith) {
|
static int tsdbNextCommitFid(SCommitH *pCommith) {
|
||||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||||
int fid = TSDB_IVLD_FID;
|
int fid = TSDB_IVLD_FID;
|
||||||
|
|
||||||
for (int i = 0; i < pCommith->niters; i++) {
|
for (int i = 0; i < pCommith->niters; i++) {
|
||||||
SCommitIter *pIter = pCommith->iters + i;
|
SCommitIter *pIter = pCommith->iters + i;
|
||||||
|
@ -1252,7 +1253,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
|
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
|
||||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||||
SMergeInfo mInfo;
|
SMergeInfo mInfo;
|
||||||
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
|
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
|
||||||
|
@ -1285,7 +1286,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
|
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
|
||||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||||
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
|
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
|
||||||
SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
|
SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
|
||||||
|
@ -1410,12 +1411,12 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
|
||||||
|
|
||||||
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
|
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
|
||||||
bool isLastOneBlock) {
|
bool isLastOneBlock) {
|
||||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||||
SBlock block;
|
SBlock block;
|
||||||
SDFile * pDFile;
|
SDFile * pDFile;
|
||||||
bool isLast;
|
bool isLast;
|
||||||
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
|
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
|
||||||
|
|
||||||
int biter = 0;
|
int biter = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -1527,7 +1528,7 @@ static void tsdbResetCommitTable(SCommitH *pCommith) {
|
||||||
|
|
||||||
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
||||||
SDiskID did;
|
SDiskID did;
|
||||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
|
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
|
||||||
|
|
||||||
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id));
|
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id));
|
||||||
|
@ -1732,9 +1733,9 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
|
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
|
||||||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||||
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
|
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
|
||||||
|
|
||||||
ASSERT(mergeRows > 0);
|
ASSERT(mergeRows > 0);
|
||||||
|
|
||||||
|
|
|
@ -316,8 +316,8 @@ static int tsdbCompactMeta(STsdb *pRepo) {
|
||||||
|
|
||||||
static void tsdbDestroyCompactH(SCompactH *pComph) {
|
static void tsdbDestroyCompactH(SCompactH *pComph) {
|
||||||
pComph->pDataCols = tdFreeDataCols(pComph->pDataCols);
|
pComph->pDataCols = tdFreeDataCols(pComph->pDataCols);
|
||||||
pComph->aSupBlk = taosArrayDestroy(&pComph->aSupBlk);
|
pComph->aSupBlk = taosArrayDestroy(pComph->aSupBlk);
|
||||||
pComph->aBlkIdx = taosArrayDestroy(&pComph->aBlkIdx);
|
pComph->aBlkIdx = taosArrayDestroy(pComph->aBlkIdx);
|
||||||
tsdbDestroyCompTbArray(pComph);
|
tsdbDestroyCompTbArray(pComph);
|
||||||
tsdbDestroyReadH(&(pComph->readh));
|
tsdbDestroyReadH(&(pComph->readh));
|
||||||
tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
|
tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
|
||||||
|
@ -370,7 +370,7 @@ static int tsdbCompactMeta(STsdb *pRepo) {
|
||||||
tfree(pTh->pInfo);
|
tfree(pTh->pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
pComph->tbArray = taosArrayDestroy(&pComph->tbArray);
|
pComph->tbArray = taosArrayDestroy(pComph->tbArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCacheFSetIndex(SCompactH *pComph) {
|
static int tsdbCacheFSetIndex(SCompactH *pComph) {
|
||||||
|
|
Loading…
Reference in New Issue