refact TSDB
This commit is contained in:
parent
3e6172c62f
commit
8215bc4de3
|
@ -73,7 +73,6 @@ struct STsdb {
|
||||||
SVnode *pVnode;
|
SVnode *pVnode;
|
||||||
bool repoLocked;
|
bool repoLocked;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
STsdbCfg config;
|
|
||||||
STsdbMemTable *mem;
|
STsdbMemTable *mem;
|
||||||
STsdbMemTable *imem;
|
STsdbMemTable *imem;
|
||||||
SRtn rtn;
|
SRtn rtn;
|
||||||
|
@ -185,7 +184,7 @@ struct STsdbFS {
|
||||||
};
|
};
|
||||||
|
|
||||||
#define REPO_ID(r) TD_VID((r)->pVnode)
|
#define REPO_ID(r) TD_VID((r)->pVnode)
|
||||||
#define REPO_CFG(r) (&(r)->config)
|
#define REPO_CFG(r) (&(r)->pVnode->config.tsdbCfg)
|
||||||
#define REPO_FS(r) ((r)->fs)
|
#define REPO_FS(r) ((r)->fs)
|
||||||
#define REPO_META(r) ((r)->pVnode->pMeta)
|
#define REPO_META(r) ((r)->pVnode->pMeta)
|
||||||
#define REPO_TFS(r) ((r)->pVnode->pTfs)
|
#define REPO_TFS(r) ((r)->pVnode->pTfs)
|
||||||
|
@ -534,23 +533,6 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// tsdbDBDef
|
|
||||||
// typedef struct SDBFile SDBFile;
|
|
||||||
// typedef DB_ENV* TDBEnv;
|
|
||||||
|
|
||||||
// struct SDBFile {
|
|
||||||
// int32_t fid;
|
|
||||||
// DB* pDB;
|
|
||||||
// char* path;
|
|
||||||
// };
|
|
||||||
|
|
||||||
// int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF);
|
|
||||||
// void tsdbCloseDBF(SDBFile* pDBF);
|
|
||||||
// int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path);
|
|
||||||
// void tsdbCloseBDBEnv(DB_ENV* pEnv);
|
|
||||||
// int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize);
|
|
||||||
// void* tsdbGetSmaDataByKey(SDBFile* pDBF, void* key, uint32_t keySize, uint32_t* valueSize);
|
|
||||||
|
|
||||||
// tsdbFile
|
// tsdbFile
|
||||||
#define TSDB_FILE_HEAD_SIZE 512
|
#define TSDB_FILE_HEAD_SIZE 512
|
||||||
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
||||||
|
|
|
@ -55,7 +55,7 @@ typedef struct {
|
||||||
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
|
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
|
||||||
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
|
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
|
||||||
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
|
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
|
||||||
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRows)
|
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows)
|
||||||
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
||||||
|
|
||||||
static void tsdbStartCommit(STsdb *pRepo);
|
static void tsdbStartCommit(STsdb *pRepo);
|
||||||
|
|
|
@ -35,8 +35,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) {
|
||||||
pTsdb->pVnode = pVnode;
|
pTsdb->pVnode = pVnode;
|
||||||
pTsdb->repoLocked = false;
|
pTsdb->repoLocked = false;
|
||||||
tdbMutexInit(&pTsdb->mutex, NULL);
|
tdbMutexInit(&pTsdb->mutex, NULL);
|
||||||
pTsdb->config = pVnode->config.tsdbCfg;
|
pTsdb->fs = tsdbNewFS(REPO_CFG(pTsdb));
|
||||||
pTsdb->fs = tsdbNewFS(&pTsdb->config);
|
|
||||||
|
|
||||||
// create dir (TODO: use tfsMkdir)
|
// create dir (TODO: use tfsMkdir)
|
||||||
taosMkDir(pTsdb->path);
|
taosMkDir(pTsdb->path);
|
||||||
|
|
|
@ -311,7 +311,7 @@ static bool emptyQueryTimewindow(STsdbReadHandle* pTsdbReadHandle) {
|
||||||
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
|
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
|
||||||
// the expired data to client, even it is queried already.
|
// the expired data to client, even it is queried already.
|
||||||
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
|
static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
|
||||||
STsdbCfg* pCfg = &pTsdb->config;
|
STsdbCfg* pCfg = REPO_CFG(pTsdb);
|
||||||
|
|
||||||
int64_t now = taosGetTimestamp(pCfg->precision);
|
int64_t now = taosGetTimestamp(pCfg->precision);
|
||||||
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
|
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
|
||||||
|
@ -404,7 +404,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, SQueryTableDataCond* pC
|
||||||
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
|
pReadHandle->defaultLoadColumn = getDefaultLoadColumns(pReadHandle, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->config.maxRows);
|
pReadHandle->pDataCols = tdNewDataCols(1000, pReadHandle->pTsdb->pVnode->config.tsdbCfg.maxRows);
|
||||||
if (pReadHandle->pDataCols == NULL) {
|
if (pReadHandle->pDataCols == NULL) {
|
||||||
tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
|
tsdbError("%p failed to malloc buf for pDataCols, %s", pReadHandle, pReadHandle->idStr);
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
@ -889,7 +889,7 @@ static bool moveToNextRowInMem(STableCheckInfo* pCheckInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
|
static bool hasMoreDataInCache(STsdbReadHandle* pHandle) {
|
||||||
STsdbCfg* pCfg = &pHandle->pTsdb->config;
|
STsdbCfg* pCfg = REPO_CFG(pHandle->pTsdb);
|
||||||
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
|
size_t size = taosArrayGetSize(pHandle->pTableCheckInfo);
|
||||||
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
|
assert(pHandle->activeIndex < size && pHandle->activeIndex >= 0 && size >= 1);
|
||||||
pHandle->cur.fid = INT32_MIN;
|
pHandle->cur.fid = INT32_MIN;
|
||||||
|
@ -1169,7 +1169,7 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle,
|
||||||
|
|
||||||
static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
|
static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo) {
|
||||||
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
||||||
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
|
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||||
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
||||||
TSKEY key;
|
TSKEY key;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -1754,7 +1754,7 @@ int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* p
|
||||||
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
|
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock) {
|
||||||
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
||||||
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
||||||
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
|
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||||
|
|
||||||
initTableMemIterator(pTsdbReadHandle, pCheckInfo);
|
initTableMemIterator(pTsdbReadHandle, pCheckInfo);
|
||||||
|
|
||||||
|
@ -2198,7 +2198,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
|
||||||
int32_t numOfBlocks = 0;
|
int32_t numOfBlocks = 0;
|
||||||
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
|
||||||
|
|
||||||
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
|
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||||
STimeWindow win = TSWINDOW_INITIALIZER;
|
STimeWindow win = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -2304,7 +2304,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
|
||||||
|
|
||||||
// find the start data block in file
|
// find the start data block in file
|
||||||
pTsdbReadHandle->locateStart = true;
|
pTsdbReadHandle->locateStart = true;
|
||||||
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
|
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||||
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
|
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
|
||||||
|
|
||||||
tsdbRLockFS(pFileHandle);
|
tsdbRLockFS(pFileHandle);
|
||||||
|
@ -2405,7 +2405,7 @@ static int32_t getDataBlocksInFiles(STsdbReadHandle* pTsdbReadHandle, bool* exis
|
||||||
// find the start data block in file
|
// find the start data block in file
|
||||||
if (!pTsdbReadHandle->locateStart) {
|
if (!pTsdbReadHandle->locateStart) {
|
||||||
pTsdbReadHandle->locateStart = true;
|
pTsdbReadHandle->locateStart = true;
|
||||||
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
|
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||||
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
|
int32_t fid = getFileIdFromKey(pTsdbReadHandle->window.skey, pCfg->days, pCfg->precision);
|
||||||
|
|
||||||
tsdbRLockFS(pFileHandle);
|
tsdbRLockFS(pFileHandle);
|
||||||
|
@ -2496,7 +2496,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
||||||
STsdbReadHandle* pTsdbReadHandle) {
|
STsdbReadHandle* pTsdbReadHandle) {
|
||||||
int numOfRows = 0;
|
int numOfRows = 0;
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pTsdbReadHandle->pColumns);
|
||||||
STsdbCfg* pCfg = &pTsdbReadHandle->pTsdb->config;
|
STsdbCfg* pCfg = REPO_CFG(pTsdbReadHandle->pTsdb);
|
||||||
win->skey = TSKEY_INITIAL_VAL;
|
win->skey = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
|
@ -248,7 +248,8 @@ int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget) {
|
||||||
|
|
||||||
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
||||||
ASSERT(pBlock->numOfSubBlocks > 0);
|
ASSERT(pBlock->numOfSubBlocks > 0);
|
||||||
int8_t update = pReadh->pRepo->config.update;
|
STsdbCfg *pCfg = REPO_CFG(pReadh->pRepo);
|
||||||
|
int8_t update = pCfg->update;
|
||||||
|
|
||||||
SBlock *iBlock = pBlock;
|
SBlock *iBlock = pBlock;
|
||||||
if (pBlock->numOfSubBlocks > 1) {
|
if (pBlock->numOfSubBlocks > 1) {
|
||||||
|
@ -279,7 +280,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
|
||||||
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
|
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
|
||||||
bool mergeBitmap) {
|
bool mergeBitmap) {
|
||||||
ASSERT(pBlock->numOfSubBlocks > 0);
|
ASSERT(pBlock->numOfSubBlocks > 0);
|
||||||
int8_t update = pReadh->pRepo->config.update;
|
int8_t update = pReadh->pRepo->pVnode->config.tsdbCfg.update;
|
||||||
|
|
||||||
SBlock *iBlock = pBlock;
|
SBlock *iBlock = pBlock;
|
||||||
if (pBlock->numOfSubBlocks > 1) {
|
if (pBlock->numOfSubBlocks > 1) {
|
||||||
|
|
|
@ -60,9 +60,10 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
||||||
SSubmitBlk *pBlock = NULL;
|
SSubmitBlk *pBlock = NULL;
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
STSRow *row = NULL;
|
STSRow *row = NULL;
|
||||||
TSKEY now = taosGetTimestamp(pTsdb->config.precision);
|
STsdbCfg *pCfg = REPO_CFG(pTsdb);
|
||||||
TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep2;
|
TSKEY now = taosGetTimestamp(pCfg->precision);
|
||||||
TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.days;
|
TSKEY minKey = now - tsTickPerDay[pCfg->precision] * pCfg->keep2;
|
||||||
|
TSKEY maxKey = now + tsTickPerDay[pCfg->precision] * pCfg->days;
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
pMsg->length = htonl(pMsg->length);
|
pMsg->length = htonl(pMsg->length);
|
||||||
|
|
Loading…
Reference in New Issue