refact vnode
This commit is contained in:
parent
cb744ab67b
commit
553f0aa11d
|
@ -43,10 +43,8 @@ typedef struct STable {
|
|||
#define TABLE_TID(t) (t)->tid
|
||||
#define TABLE_UID(t) (t)->uid
|
||||
|
||||
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
|
||||
STfs *pTfs);
|
||||
STsdb *tsdbOpen(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
|
||||
void tsdbClose(STsdb *);
|
||||
void tsdbRemove(const char *path);
|
||||
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
|
||||
int tsdbPrepareCommit(STsdb *pTsdb);
|
||||
int tsdbCommit(STsdb *pTsdb);
|
||||
|
@ -167,16 +165,14 @@ struct STsdb {
|
|||
SRtn rtn;
|
||||
SMemAllocatorFactory *pmaf;
|
||||
STsdbFS *fs;
|
||||
SMeta *pMeta;
|
||||
STfs *pTfs;
|
||||
SSmaEnvs smaEnvs;
|
||||
};
|
||||
|
||||
#define REPO_ID(r) ((r)->vgId)
|
||||
#define REPO_CFG(r) (&(r)->config)
|
||||
#define REPO_FS(r) ((r)->fs)
|
||||
#define REPO_META(r) ((r)->pMeta)
|
||||
#define REPO_TFS(r) ((r)->pTfs)
|
||||
#define REPO_META(r) ((r)->pVnode->pMeta)
|
||||
#define REPO_TFS(r) ((r)->pVnode->pTfs)
|
||||
#define IS_REPO_LOCKED(r) ((r)->repoLocked)
|
||||
#define REPO_TSMA_NUM(r) ((r)->smaEnvs.nTSma)
|
||||
#define REPO_RSMA_NUM(r) ((r)->smaEnvs.nRSma)
|
||||
|
|
|
@ -100,7 +100,7 @@ int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
|||
|
||||
level = tsdbGetFidLevel(pSet->fid, pRtn);
|
||||
|
||||
if (tfsAllocDisk(pRepo->pTfs, level, &did) < 0) {
|
||||
if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) {
|
||||
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
|
||||
return -1;
|
||||
}
|
||||
|
@ -427,7 +427,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
|
|||
pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable));
|
||||
pCommitIter->pTable->uid = pTbData->uid;
|
||||
pCommitIter->pTable->tid = pTbData->uid;
|
||||
pCommitIter->pTable->pSchema = metaGetTbTSchema(pRepo->pMeta, pTbData->uid, 0);
|
||||
pCommitIter->pTable->pSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -459,7 +459,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
|
|||
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
|
||||
|
||||
if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
|
||||
if (tfsAllocDisk(REPO_TFS(pRepo), tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
|
||||
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -644,7 +644,7 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
|
|||
}
|
||||
|
||||
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(pRepo->pTfs), pRepo->vgId,
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(REPO_TFS(pRepo)), pRepo->vgId,
|
||||
tsdbTxnFname[ftype]);
|
||||
}
|
||||
|
||||
|
@ -912,7 +912,7 @@ static int tsdbScanRootDir(STsdb *pRepo) {
|
|||
const STfsFile *pf;
|
||||
|
||||
tsdbGetRootDir(REPO_ID(pRepo), rootDir);
|
||||
STfsDir *tdir = tfsOpendir(pRepo->pTfs, rootDir);
|
||||
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), rootDir);
|
||||
if (tdir == NULL) {
|
||||
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
|
||||
return -1;
|
||||
|
@ -946,7 +946,7 @@ static int tsdbScanDataDir(STsdb *pRepo) {
|
|||
const STfsFile *pf;
|
||||
|
||||
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
|
||||
STfsDir *tdir = tfsOpendir(pRepo->pTfs, dataDir);
|
||||
STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), dataDir);
|
||||
if (tdir == NULL) {
|
||||
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
|
||||
return -1;
|
||||
|
@ -1128,7 +1128,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tdir = tfsOpendir(pRepo->pTfs, dataDir);
|
||||
tdir = tfsOpendir(REPO_TFS(pRepo), dataDir);
|
||||
if (tdir == NULL) {
|
||||
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
|
||||
tstrerror(terrno));
|
||||
|
|
|
@ -311,7 +311,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
|
|||
pDFile->info.fver = tsdbGetDFSVersion(ftype);
|
||||
|
||||
tsdbGetFilename(pRepo->vgId, fid, ver, ftype, fname);
|
||||
tfsInitFile(pRepo->pTfs, &(pDFile->f), did, fname);
|
||||
tfsInitFile(REPO_TFS(pRepo), &(pDFile->f), did, fname);
|
||||
}
|
||||
|
||||
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile) {
|
||||
|
@ -330,7 +330,7 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) {
|
|||
|
||||
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile) {
|
||||
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
|
||||
buf = tfsDecodeFile(pRepo->pTfs, buf, &(pDFile->f));
|
||||
buf = tfsDecodeFile(REPO_TFS(pRepo), buf, &(pDFile->f));
|
||||
TSDB_FILE_SET_CLOSED(pDFile);
|
||||
|
||||
return buf;
|
||||
|
@ -365,7 +365,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T
|
|||
if (errno == ENOENT) {
|
||||
// Try to create directory recursively
|
||||
char *s = strdup(TSDB_FILE_REL_NAME(pDFile));
|
||||
if (tfsMkdirRecurAt(pRepo->pTfs, taosDirName(s), TSDB_FILE_DID(pDFile)) < 0) {
|
||||
if (tfsMkdirRecurAt(REPO_TFS(pRepo), taosDirName(s), TSDB_FILE_DID(pDFile)) < 0) {
|
||||
taosMemoryFreeClear(s);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -15,14 +15,12 @@
|
|||
|
||||
#include "vnodeInt.h"
|
||||
|
||||
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
|
||||
SMeta *pMeta, STfs *pTfs);
|
||||
static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
|
||||
static void tsdbFree(STsdb *pTsdb);
|
||||
static int tsdbOpenImpl(STsdb *pTsdb);
|
||||
static void tsdbCloseImpl(STsdb *pTsdb);
|
||||
|
||||
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
|
||||
STfs *pTfs) {
|
||||
STsdb *tsdbOpen(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) {
|
||||
STsdb *pTsdb = NULL;
|
||||
|
||||
// Set default TSDB Options
|
||||
|
@ -37,7 +35,7 @@ STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAl
|
|||
}
|
||||
|
||||
// Create the handle
|
||||
pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta, pTfs);
|
||||
pTsdb = tsdbNew(path, pVnode, pTsdbCfg, pMAF);
|
||||
if (pTsdb == NULL) {
|
||||
// TODO: handle error
|
||||
return NULL;
|
||||
|
@ -61,11 +59,8 @@ void tsdbClose(STsdb *pTsdb) {
|
|||
}
|
||||
}
|
||||
|
||||
void tsdbRemove(const char *path) { taosRemoveDir(path); }
|
||||
|
||||
/* ------------------------ STATIC METHODS ------------------------ */
|
||||
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
|
||||
SMeta *pMeta, STfs *pTfs) {
|
||||
static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) {
|
||||
STsdb *pTsdb = NULL;
|
||||
|
||||
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb));
|
||||
|
@ -75,11 +70,10 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
|
|||
}
|
||||
|
||||
pTsdb->path = strdup(path);
|
||||
pTsdb->vgId = vgId;
|
||||
pTsdb->vgId = TD_VID(pVnode);
|
||||
pTsdb->pVnode = pVnode;
|
||||
tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
|
||||
pTsdb->pmaf = pMAF;
|
||||
pTsdb->pMeta = pMeta;
|
||||
pTsdb->pTfs = pTfs;
|
||||
pTsdb->fs = tsdbNewFS(pTsdbCfg);
|
||||
|
||||
return pTsdb;
|
||||
|
|
|
@ -986,7 +986,7 @@ static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, in
|
|||
pCheckInfo->numOfBlocks = 0;
|
||||
|
||||
STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
|
||||
table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||
table.pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
|
||||
|
||||
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {
|
||||
code = terrno;
|
||||
|
@ -1091,7 +1091,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl
|
|||
int32_t slotIndex) {
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
STSchema* pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||
STSchema* pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
|
||||
int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr);
|
||||
|
@ -1483,7 +1483,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
|
|||
int32_t numOfColsOfRow1 = 0;
|
||||
|
||||
if (pSchema1 == NULL) {
|
||||
pSchema1 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row1));
|
||||
pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
|
||||
}
|
||||
|
||||
if (isRow1DataRow) {
|
||||
|
@ -1496,7 +1496,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit
|
|||
if (row2) {
|
||||
isRow2DataRow = TD_IS_TP_ROW(row2);
|
||||
if (pSchema2 == NULL) {
|
||||
pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row2));
|
||||
pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
|
||||
}
|
||||
if (isRow2DataRow) {
|
||||
numOfColsOfRow2 = schemaNCols(pSchema2);
|
||||
|
@ -2514,7 +2514,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
|
||||
win->ekey = key;
|
||||
if (rv != TD_ROW_SVER(row)) {
|
||||
pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0);
|
||||
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
|
||||
rv = TD_ROW_SVER(row);
|
||||
}
|
||||
mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
|
||||
|
|
|
@ -260,7 +260,7 @@ static void poolFree(void *arg, void *ptr) {
|
|||
|
||||
int32_t tsdbInitSma(STsdb *pTsdb) {
|
||||
// tSma
|
||||
int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(pTsdb->pMeta, false));
|
||||
int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(REPO_META(pTsdb), false));
|
||||
if (numOfTSma > 0) {
|
||||
atomic_store_16(&REPO_TSMA_NUM(pTsdb), (int16_t)numOfTSma);
|
||||
}
|
||||
|
@ -348,7 +348,7 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did)
|
|||
}
|
||||
|
||||
char aname[TSDB_FILENAME_LEN] = {0};
|
||||
tfsAbsoluteName(pTsdb->pTfs, did, path, aname);
|
||||
tfsAbsoluteName(REPO_TFS(pTsdb), did, path, aname);
|
||||
if (tsdbOpenDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) {
|
||||
tsdbFreeSmaEnv(pEnv);
|
||||
return NULL;
|
||||
|
@ -519,14 +519,14 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
|||
char rname[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
SDiskID did = {0};
|
||||
tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did);
|
||||
tfsAllocDisk(REPO_TFS(pTsdb), TFS_PRIMARY_LEVEL, &did);
|
||||
if (did.level < 0 || did.id < 0) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname);
|
||||
|
||||
if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) {
|
||||
if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rname, did) != TSDB_CODE_SUCCESS) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -557,7 +557,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t
|
|||
}
|
||||
|
||||
// cache smaMeta
|
||||
STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid, true);
|
||||
STSma *pSma = metaGetSmaInfoByIndex(REPO_META(pTsdb), indexUid, true);
|
||||
if (pSma == NULL) {
|
||||
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
|
||||
taosHashCleanup(pItem->expiredWindows);
|
||||
|
@ -613,7 +613,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!pTsdb->pMeta) {
|
||||
if (!REPO_META(pTsdb)) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
@ -1583,7 +1583,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
|
|||
// record current timezone of server side
|
||||
vCreateSmaReq.tSma.timezoneInt = tsTimezone;
|
||||
|
||||
if (metaCreateTSma(pTsdb->pMeta, &vCreateSmaReq) < 0) {
|
||||
if (metaCreateTSma(REPO_META(pTsdb), &vCreateSmaReq) < 0) {
|
||||
// TODO: handle error
|
||||
tdDestroyTSma(&vCreateSmaReq.tSma);
|
||||
return -1;
|
||||
|
@ -1610,7 +1610,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
|
|||
// }
|
||||
//
|
||||
|
||||
if (metaDropTSma(pTsdb->pMeta, vDropSmaReq.indexUid) < 0) {
|
||||
if (metaDropTSma(REPO_META(pTsdb), vDropSmaReq.indexUid) < 0) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -96,8 +96,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
|
||||
// open tsdb
|
||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TSDB_DIR);
|
||||
pVnode->pTsdb =
|
||||
tsdbOpen(tdir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
|
||||
pVnode->pTsdb = tsdbOpen(tdir, pVnode, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode));
|
||||
if (pVnode->pTsdb == NULL) {
|
||||
vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
|
|
Loading…
Reference in New Issue