more refact
This commit is contained in:
parent
d083bcb7e9
commit
ef3102baf6
|
@ -13,39 +13,39 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef _TD_TSDB_BUFFER_H_
|
// #ifndef _TD_TSDB_BUFFER_H_
|
||||||
#define _TD_TSDB_BUFFER_H_
|
// #define _TD_TSDB_BUFFER_H_
|
||||||
|
|
||||||
typedef struct {
|
// typedef struct {
|
||||||
int64_t blockId;
|
// int64_t blockId;
|
||||||
int offset;
|
// int offset;
|
||||||
int remain;
|
// int remain;
|
||||||
char data[];
|
// char data[];
|
||||||
} STsdbBufBlock;
|
// } STsdbBufBlock;
|
||||||
|
|
||||||
typedef struct {
|
// typedef struct {
|
||||||
pthread_cond_t poolNotEmpty;
|
// pthread_cond_t poolNotEmpty;
|
||||||
int bufBlockSize;
|
// int bufBlockSize;
|
||||||
int tBufBlocks;
|
// int tBufBlocks;
|
||||||
int nBufBlocks;
|
// int nBufBlocks;
|
||||||
int nRecycleBlocks;
|
// int nRecycleBlocks;
|
||||||
int nElasticBlocks;
|
// int nElasticBlocks;
|
||||||
int64_t index;
|
// int64_t index;
|
||||||
SList* bufBlockList;
|
// SList* bufBlockList;
|
||||||
} STsdbBufPool;
|
// } STsdbBufPool;
|
||||||
|
|
||||||
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
|
// #define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
|
||||||
|
|
||||||
STsdbBufPool* tsdbNewBufPool();
|
// STsdbBufPool* tsdbNewBufPool();
|
||||||
void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
// void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
||||||
int tsdbOpenBufPool(STsdb* pRepo);
|
// int tsdbOpenBufPool(STsdb* pRepo);
|
||||||
void tsdbCloseBufPool(STsdb* pRepo);
|
// void tsdbCloseBufPool(STsdb* pRepo);
|
||||||
SListNode* tsdbAllocBufBlockFromPool(STsdb* pRepo);
|
// SListNode* tsdbAllocBufBlockFromPool(STsdb* pRepo);
|
||||||
int tsdbExpandPool(STsdb* pRepo, int32_t oldTotalBlocks);
|
// int tsdbExpandPool(STsdb* pRepo, int32_t oldTotalBlocks);
|
||||||
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
|
// void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
|
||||||
|
|
||||||
// health cite
|
// // health cite
|
||||||
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
|
// STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
|
||||||
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
|
// void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
|
||||||
|
|
||||||
#endif /* _TD_TSDB_BUFFER_H_ */
|
// #endif /* _TD_TSDB_BUFFER_H_ */
|
||||||
|
|
|
@ -43,8 +43,8 @@ extern "C" {
|
||||||
#include "tsdbLog.h"
|
#include "tsdbLog.h"
|
||||||
// Meta
|
// Meta
|
||||||
#include "tsdbMeta.h"
|
#include "tsdbMeta.h"
|
||||||
// Buffer
|
// // Buffer
|
||||||
#include "tsdbBuffer.h"
|
// #include "tsdbBuffer.h"
|
||||||
// MemTable
|
// MemTable
|
||||||
#include "tsdbMemTable.h"
|
#include "tsdbMemTable.h"
|
||||||
// File
|
// File
|
||||||
|
@ -74,7 +74,7 @@ struct STsdb {
|
||||||
STsdbAppH appH;
|
STsdbAppH appH;
|
||||||
STsdbStat stat;
|
STsdbStat stat;
|
||||||
STsdbMeta* tsdbMeta;
|
STsdbMeta* tsdbMeta;
|
||||||
STsdbBufPool* pPool;
|
// STsdbBufPool* pPool;
|
||||||
SMemTable* mem;
|
SMemTable* mem;
|
||||||
SMemTable* imem;
|
SMemTable* imem;
|
||||||
STsdbFS* fs;
|
STsdbFS* fs;
|
||||||
|
@ -105,18 +105,18 @@ int32_t tsdbLoadLastCache(STsdb *pRepo, STable* pTable);
|
||||||
void tsdbGetRootDir(int repoid, char dirName[]);
|
void tsdbGetRootDir(int repoid, char dirName[]);
|
||||||
void tsdbGetDataDir(int repoid, char dirName[]);
|
void tsdbGetDataDir(int repoid, char dirName[]);
|
||||||
|
|
||||||
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdb* pRepo) {
|
// static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdb* pRepo) {
|
||||||
ASSERT(pRepo != NULL);
|
// ASSERT(pRepo != NULL);
|
||||||
if (pRepo->mem == NULL) return NULL;
|
// if (pRepo->mem == NULL) return NULL;
|
||||||
|
|
||||||
SListNode* pNode = listTail(pRepo->mem->bufBlockList);
|
// SListNode* pNode = listTail(pRepo->mem->bufBlockList);
|
||||||
if (pNode == NULL) return NULL;
|
// if (pNode == NULL) return NULL;
|
||||||
|
|
||||||
STsdbBufBlock* pBufBlock = NULL;
|
// STsdbBufBlock* pBufBlock = NULL;
|
||||||
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock));
|
// tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock));
|
||||||
|
|
||||||
return pBufBlock;
|
// return pBufBlock;
|
||||||
}
|
// }
|
||||||
|
|
||||||
// static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
|
// static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
|
||||||
// ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
|
// ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
|
||||||
|
|
|
@ -13,6 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
|
||||||
#include "tsdbHealth.h"
|
#include "tsdbHealth.h"
|
||||||
#include "tsdbint.h"
|
#include "tsdbint.h"
|
||||||
|
|
||||||
|
@ -210,4 +212,6 @@ void tsdbRecycleBufferBlock(STsdbBufPool *pPool, SListNode *pNode, bool bELastic
|
||||||
TD_DLIST_NELES(pPool->bufBlockList));
|
TD_DLIST_NELES(pPool->bufBlockList));
|
||||||
} else
|
} else
|
||||||
pPool->nBufBlocks--;
|
pPool->nBufBlocks--;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
|
@ -575,7 +575,7 @@ _err:
|
||||||
// rename meta.tmp -> meta
|
// rename meta.tmp -> meta
|
||||||
tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf),
|
tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf),
|
||||||
TSDB_FILE_FULL_NAME(pMFile));
|
TSDB_FILE_FULL_NAME(pMFile));
|
||||||
taosRename(mf.f.aname, pMFile->f.aname);
|
taosRenameFile(mf.f.aname, pMFile->f.aname);
|
||||||
tstrncpy(mf.f.aname, pMFile->f.aname, TSDB_FILENAME_LEN);
|
tstrncpy(mf.f.aname, pMFile->f.aname, TSDB_FILENAME_LEN);
|
||||||
tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN);
|
tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN);
|
||||||
// update current meta file info
|
// update current meta file info
|
||||||
|
|
|
@ -89,13 +89,6 @@ STsdb *tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbOpenBufPool(pRepo) < 0) {
|
|
||||||
tsdbError("vgId:%d failed to open TSDB repository while opening buffer pool since %s", config.tsdbId,
|
|
||||||
tstrerror(terrno));
|
|
||||||
tsdbClose(pRepo, false);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbOpenFS(pRepo) < 0) {
|
if (tsdbOpenFS(pRepo) < 0) {
|
||||||
tsdbError("vgId:%d failed to open TSDB repository while opening FS since %s", config.tsdbId, tstrerror(terrno));
|
tsdbError("vgId:%d failed to open TSDB repository while opening FS since %s", config.tsdbId, tstrerror(terrno));
|
||||||
tsdbClose(pRepo, false);
|
tsdbClose(pRepo, false);
|
||||||
|
@ -145,7 +138,6 @@ int tsdbClose(STsdb *repo, int toCommit) {
|
||||||
pRepo->imem = NULL;
|
pRepo->imem = NULL;
|
||||||
|
|
||||||
tsdbCloseFS(pRepo);
|
tsdbCloseFS(pRepo);
|
||||||
tsdbCloseBufPool(pRepo);
|
|
||||||
tsdbCloseMeta(pRepo);
|
tsdbCloseMeta(pRepo);
|
||||||
tsdbFreeRepo(pRepo);
|
tsdbFreeRepo(pRepo);
|
||||||
tsdbDebug("vgId:%d repository is closed", vgId);
|
tsdbDebug("vgId:%d repository is closed", vgId);
|
||||||
|
@ -193,19 +185,19 @@ int tsdbUnlockRepo(STsdb *pRepo) {
|
||||||
// return 0;
|
// return 0;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
int tsdbCheckCommit(STsdb *pRepo) {
|
// int tsdbCheckCommit(STsdb *pRepo) {
|
||||||
ASSERT(pRepo->mem != NULL);
|
// ASSERT(pRepo->mem != NULL);
|
||||||
STsdbCfg *pCfg = &(pRepo->config);
|
// STsdbCfg *pCfg = &(pRepo->config);
|
||||||
|
|
||||||
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
// STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||||
ASSERT(pBufBlock != NULL);
|
// ASSERT(pBufBlock != NULL);
|
||||||
if ((pRepo->mem->extraBuffList != NULL) ||
|
// if ((pRepo->mem->extraBuffList != NULL) ||
|
||||||
((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
|
// ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) {
|
||||||
// trigger commit
|
// // trigger commit
|
||||||
if (tsdbAsyncCommit(pRepo) < 0) return -1;
|
// if (tsdbAsyncCommit(pRepo) < 0) return -1;
|
||||||
}
|
// }
|
||||||
return 0;
|
// return 0;
|
||||||
}
|
// }
|
||||||
|
|
||||||
STsdbMeta *tsdbGetMeta(STsdb *pRepo) { return pRepo->tsdbMeta; }
|
STsdbMeta *tsdbGetMeta(STsdb *pRepo) { return pRepo->tsdbMeta; }
|
||||||
|
|
||||||
|
@ -612,13 +604,6 @@ static STsdb *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRepo->pPool = tsdbNewBufPool(pCfg);
|
|
||||||
if (pRepo->pPool == NULL) {
|
|
||||||
tsdbError("vgId:%d failed to create buffer pool since %s", REPO_ID(pRepo), tstrerror(terrno));
|
|
||||||
tsdbFreeRepo(pRepo);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRepo->fs = tsdbNewFS(pCfg);
|
pRepo->fs = tsdbNewFS(pCfg);
|
||||||
if (pRepo->fs == NULL) {
|
if (pRepo->fs == NULL) {
|
||||||
tsdbError("vgId:%d failed to TSDB file system since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d failed to TSDB file system since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
@ -632,7 +617,6 @@ static STsdb *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
|
||||||
static void tsdbFreeRepo(STsdb *pRepo) {
|
static void tsdbFreeRepo(STsdb *pRepo) {
|
||||||
if (pRepo) {
|
if (pRepo) {
|
||||||
tsdbFreeFS(pRepo->fs);
|
tsdbFreeFS(pRepo->fs);
|
||||||
tsdbFreeBufPool(pRepo->pPool);
|
|
||||||
tsdbFreeMeta(pRepo->tsdbMeta);
|
tsdbFreeMeta(pRepo->tsdbMeta);
|
||||||
tsdbFreeMergeBuf(pRepo->mergeBuf);
|
tsdbFreeMergeBuf(pRepo->mergeBuf);
|
||||||
// tsdbFreeMemTable(pRepo->mem);
|
// tsdbFreeMemTable(pRepo->mem);
|
||||||
|
|
|
@ -13,6 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "tfunctional.h"
|
#include "tfunctional.h"
|
||||||
#include "tsdbRowMergeBuf.h"
|
#include "tsdbRowMergeBuf.h"
|
||||||
|
@ -1000,3 +1002,5 @@ static int tsdbUpdateTableLatestInfo(STsdb *pRepo, STable *pTable, SMemRow row)
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
Loading…
Reference in New Issue