Merge pull request #9902 from taosdata/feature/tfs

refact tfs module
This commit is contained in:
Shengliang Guan 2022-01-19 17:52:34 +08:00 committed by GitHub
commit 61fd05654a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 671 additions and 611 deletions

View File

@ -16,77 +16,226 @@
#ifndef _TD_TFS_H_
#define _TD_TFS_H_
#include "tglobal.h"
#include "tcfg.h"
#ifdef __cplusplus
extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct STfs STfs;
typedef struct STfsDir STfsDir;
typedef struct {
int32_t level;
int32_t id;
} SDiskID;
#define TFS_UNDECIDED_LEVEL -1
#define TFS_UNDECIDED_ID -1
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
// FS APIs ====================================
typedef struct {
int64_t total;
int64_t used;
int64_t avail;
} SFSMeta;
int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk);
void tfsCleanup();
void tfsUpdateSize(SFSMeta *pFSMeta);
void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id);
const char *TFS_PRIMARY_PATH();
const char *TFS_DISK_PATH(int32_t level, int32_t id);
// TFILE APIs ====================================
typedef struct {
int32_t level;
int32_t id;
char rname[TSDB_FILENAME_LEN]; // REL name
SDiskID did;
char aname[TSDB_FILENAME_LEN]; // ABS name
} TFILE;
char rname[TSDB_FILENAME_LEN]; // REL name
STfs *pTfs;
} STfsFile;
#define TFILE_LEVEL(pf) ((pf)->level)
#define TFILE_ID(pf) ((pf)->id)
#define TFILE_NAME(pf) ((pf)->aname)
#define TFILE_REL_NAME(pf) ((pf)->rname)
/**
* @brief Open a fs.
*
* @param pCfg Config of the fs.
* @param ndisk Length of the config.
* @return STfs* The fs object.
*/
STfs *tfsOpen(SDiskCfg *pCfg, int32_t ndisk);
#define tfsopen(pf, flags) open(TFILE_NAME(pf), flags)
#define tfsclose(fd) close(fd)
#define tfsremove(pf) remove(TFILE_NAME(pf))
#define tfscopy(sf, df) taosCopyFile(TFILE_NAME(sf), TFILE_NAME(df))
#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df))
/**
* @brief Close a fs.
*
* @param pTfs The fs object to close.
*/
void tfsClose(STfs *pTfs);
void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname);
bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
int32_t tfsEncodeFile(void **buf, TFILE *pf);
void *tfsDecodeFile(void *buf, TFILE *pf);
void tfsbasename(const TFILE *pf, char *dest);
void tfsdirname(const TFILE *pf, char *dest);
/**
* @brief Update the disk size.
*
* @param pTfs The fs object.
*/
void tfsUpdateSize(STfs *pTfs);
// DIR APIs ====================================
int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id);
int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id);
int32_t tfsMkdir(const char *rname);
int32_t tfsRmdir(const char *rname);
int32_t tfsRename(char *orname, char *nrname);
/**
* @brief Get the disk size.
*
* @param pTfs The fs object.
*/
SDiskSize tfsGetSize(STfs *pTfs);
typedef struct TDIR TDIR;
/**
* @brief Allocate an existing available tier level from fs.
*
* @param pTfs The fs object.
* @param expLevel Disk level want to allocate.
* @param pDiskId The disk ID after allocation.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId);
TDIR *tfsOpendir(const char *rname);
const TFILE *tfsReaddir(TDIR *tdir);
void tfsClosedir(TDIR *tdir);
/**
* @brief Get the primary path.
*
* @param pTfs The fs object.
* @return const char * The primary path.
*/
const char *tfsGetPrimaryPath(STfs *pTfs);
/**
* @brief Get the disk path.
*
* @param pTfs The fs object.
* @param diskId The diskId.
* @return const char * The primary path.
*/
const char *tfsGetDiskPath(STfs *pTfs, SDiskID diskId);
/**
* @brief Make directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdir(STfs *pTfs, const char *rname);
/**
* @brief Create directories in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @param diskId The disk ID.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief Recursive create directories in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @param diskId The disk ID.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId);
/**
* @brief Remove directory at all levels in tfs.
*
* @param pTfs The fs object.
* @param rname The rel name of directory.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRmdir(STfs *pTfs, const char *rname);
/**
* @brief Rename file/directory in tfs.
*
* @param pTfs The fs object.
* @param orname The rel name of old file.
* @param nrname The rel name of new file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRename(STfs *pTfs, char *orname, char *nrname);
/**
* @brief Init file object in tfs.
*
* @param pTfs The fs object.
* @param pFile The file object.
* @param diskId The disk ID.
* @param rname The rel name of file.
*/
void tfsInitFile(STfs *pTfs, STfsFile *pFile, SDiskID diskId, const char *rname);
/**
* @brief Determine whether they are the same file.
*
* @param pFile1 The file object.
* @param pFile2 The file object.
* @param bool The compare result.
*/
bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2);
/**
* @brief Encode file name to a buffer.
*
* @param buf The buffer where file name are saved.
* @param pFile The file object.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsEncodeFile(void **buf, STfsFile *pFile);
/**
* @brief Decode file name from a buffer.
*
* @param pTfs The fs object.
* @param buf The buffer where file name are saved.
* @param pFile The file object.
* @return void * Buffer address after decode.
*/
void *tfsDecodeFile(STfs *pTfs, void *buf, STfsFile *pFile);
/**
* @brief Get the basename of the file.
*
* @param pFile The file object.
* @param dest The buffer where basename will be saved.
*/
void tfsBasename(const STfsFile *pFile, char *dest);
/**
* @brief Get the dirname of the file.
*
* @param pFile The file object.
* @param dest The buffer where dirname will be saved.
*/
void tfsDirname(const STfsFile *pFile, char *dest);
/**
* @brief Remove file in tfs.
*
* @param pFile The file to be removed.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsRemoveFile(const STfsFile *pFile);
/**
* @brief Copy file in tfs.
*
* @param pFile1 The src file.
* @param pFile2 The dest file.
* @return int32_t 0 for success, -1 for failure.
*/
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2);
/**
* @brief Open a directory for traversal.
*
* @param rname The rel name of file.
* @return STfsDir* The dir object.
*/
STfsDir *tfsOpendir(STfs *pTfs, const char *rname);
/**
* @brief Get a file from dir and move to next pos.
*
* @param pDir The dir object.
* @return STfsFile* The file in dir.
*/
const STfsFile *tfsReaddir(STfsDir *pDir);
/**
* @brief Close a directory.
*
* @param pDir The dir object.
*/
void tfsClosedir(STfsDir *pDir);
#ifdef __cplusplus
}

View File

@ -58,8 +58,8 @@ int64_t taosWriteFile(FileFd fd, const void *buf, int64_t count);
void taosCloseFile(FileFd fd);
int32_t taosRenameFile(char *oldName, char *newName);
int64_t taosCopyFile(char *from, char *to);
int32_t taosRenameFile(const char *oldName, const char *newName);
int64_t taosCopyFile(const char *from, const char *to);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);

View File

@ -373,10 +373,14 @@ do { \
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
#define TSDB_MAX_TIERS 3
#define TSDB_MAX_DISKS_PER_TIER 16
#define TSDB_MAX_DISKS (TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER)
#define TFS_MAX_TIERS 3
#define TFS_MAX_DISKS_PER_TIER 16
#define TFS_MAX_DISKS (TFS_MAX_TIERS * TFS_MAX_DISKS_PER_TIER)
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TFS_MAX_TIERS - 1)
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
enum { TRANS_STAT_INIT = 0, TRANS_STAT_EXECUTING, TRANS_STAT_EXECUTED, TRANS_STAT_ROLLBACKING, TRANS_STAT_ROLLBACKED };
enum { TRANS_OPER_INIT = 0, TRANS_OPER_EXECUTE, TRANS_OPER_ROLLBACK };

View File

@ -137,7 +137,7 @@ int32_t tsDiskCfgNum = 0;
#ifndef _STORAGE
SDiskCfg tsDiskCfg[1];
#else
SDiskCfg tsDiskCfg[TSDB_MAX_DISKS];
SDiskCfg tsDiskCfg[TFS_MAX_DISKS];
#endif
/*

View File

@ -134,6 +134,7 @@ typedef struct SDnode {
SBnodeMgmt bmgmt;
SVnodesMgmt vmgmt;
STransMgmt tmgmt;
STfs *pTfs;
SStartupReq startup;
} SDnode;

View File

@ -43,6 +43,7 @@ extern "C" {
#include "qnode.h"
#include "snode.h"
#include "vnode.h"
#include "tfs.h"
extern int32_t dDebugFlag;

View File

@ -173,11 +173,12 @@ SDnode *dndCreate(SDnodeObjCfg *pCfg) {
return NULL;
}
SDiskCfg dCfg;
strcpy(dCfg.dir, pDnode->cfg.dataDir);
SDiskCfg dCfg = {0};
tstrncpy(dCfg.dir, pDnode->cfg.dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0;
dCfg.primary = 1;
if (tfsInit(&dCfg, 1) != 0) {
pDnode->pTfs = tfsOpen(&dCfg, 1);
if (pDnode->pTfs == NULL) {
dError("failed to init tfs since %s", terrstr());
dndClose(pDnode);
return NULL;
@ -251,7 +252,7 @@ void dndClose(SDnode *pDnode) {
dndCleanupQnode(pDnode);
dndCleanupVnodes(pDnode);
dndCleanupMgmt(pDnode);
tfsCleanup();
tfsClose(pDnode->pTfs);
dndCloseImp(pDnode);
free(pDnode);
@ -313,4 +314,28 @@ void dndCleanup() {
taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up");
}
// OTHER FUNCTIONS ===================================
void taosGetDisk() {
#if 0
const double unit = 1024 * 1024 * 1024;
SDiskSize diskSize = tfsGetSize(pTfs);
tfsUpdateSize(&fsMeta);
tsTotalDataDirGB = (float)(fsMeta.total / unit);
tsUsedDataDirGB = (float)(fsMeta.used / unit);
tsAvailDataDirGB = (float)(fsMeta.avail / unit);
if (taosGetDiskSize(tsLogDir, &diskSize) == 0) {
tsTotalLogDirGB = (float)(diskSize.total / unit);
tsAvailLogDirGB = (float)(diskSize.avail / unit);
}
if (taosGetDiskSize(tsTempDir, &diskSize) == 0) {
tsTotalTmpDirGB = (float)(diskSize.total / unit);
tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit);
}
#endif
}

View File

@ -381,7 +381,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
pMgmt->openVnodes, pMgmt->totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pDnode = pDnode, .vgId = pCfg->vgId};
SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
@ -587,6 +587,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
vnodeCfg.pDnode = pDnode;
vnodeCfg.pTfs = pDnode->pTfs;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());

View File

@ -19,6 +19,7 @@
#include "mallocator.h"
#include "meta.h"
#include "common.h"
#include "tfs.h"
#ifdef __cplusplus
extern "C" {
@ -80,7 +81,7 @@ typedef struct {
} STableKeyInfo;
// STsdb
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta);
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs);
void tsdbClose(STsdb *);
void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp);

View File

@ -21,6 +21,7 @@
#include "meta.h"
#include "tarray.h"
#include "tfs.h"
#include "tq.h"
#include "tsdb.h"
#include "wal.h"
@ -36,7 +37,8 @@ typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef struct SVnodeCfg {
int32_t vgId;
SDnode * pDnode;
SDnode *pDnode;
STfs *pTfs;
uint64_t wsize;
uint64_t ssize;
uint64_t lsize;
@ -52,9 +54,9 @@ typedef struct SVnodeCfg {
typedef struct {
int32_t sver;
char * timezone;
char * locale;
char * charset;
const char *timezone;
const char *locale;
const char *charset;
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutReqToVQueryQFp putReqToVQueryQFp;
} SVnodeOpt;

View File

@ -50,6 +50,7 @@ struct STsdb {
SMemAllocatorFactory *pmaf;
STsdbFS * fs;
SMeta * pMeta;
STfs * pTfs;
};
#define REPO_ID(r) ((r)->vgId)

View File

@ -29,12 +29,15 @@
#define TSDB_FILE_INFO(tf) (&((tf)->info))
#define TSDB_FILE_F(tf) (&((tf)->f))
#define TSDB_FILE_FD(tf) ((tf)->fd)
#define TSDB_FILE_FULL_NAME(tf) TFILE_NAME(TSDB_FILE_F(tf))
#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_FD(tf) >= 0)
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1)
#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf))
#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf))
#define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level)
#define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id)
#define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did)
#define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname)
#define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname)
#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_FD(tf))
#define TSDB_FILE_STATE(tf) ((tf)->state)
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
@ -54,10 +57,10 @@ typedef struct {
} SMFInfo;
typedef struct {
SMFInfo info;
TFILE f;
int fd;
uint8_t state;
SMFInfo info;
STfsFile f;
int fd;
uint8_t state;
} SMFile;
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
@ -175,17 +178,17 @@ typedef struct {
} SDFInfo;
typedef struct {
SDFInfo info;
TFILE f;
int fd;
uint8_t state;
SDFInfo info;
STfsFile f;
int fd;
uint8_t state;
} SDFile;
void tsdbInitDFile(SDFile* pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype);
void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(void* buf, SDFile* pDFile);
int tsdbCreateDFile(SDFile* pDFile, bool updateHeader);
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader);
int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
@ -263,7 +266,7 @@ static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte
return (int)nbyte;
}
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsremove(TSDB_FILE_F(pDFile)); }
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile));
@ -278,7 +281,7 @@ static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nby
}
static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
if (tfscopy(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
@ -311,14 +314,14 @@ typedef struct {
} \
} while (0);
void tsdbInitDFileSet(SDFileSet* pSet, SDiskID did, int vid, int fid, uint32_t ver);
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet* pSet, SDiskID did, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(void* buf, SDFileSet* pSet);
void* tsdbDecodeDFileSet(STsdb *pRepo, void* buf, SDFileSet* pSet);
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader);
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet* pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet);

View File

@ -78,6 +78,7 @@ struct SVnode {
tsem_t canCommit;
SQHandle* pQuery;
SDnode* pDnode;
STfs* pTfs;
};
int vnodeScheduleTask(SVnodeTask* task);

View File

@ -97,15 +97,14 @@ int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) {
level = tsdbGetFidLevel(pSet->fid, pRtn);
tfsAllocDisk(level, &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
if (tfsAllocDisk(pRepo->pTfs, level, &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
if (did.level > TSDB_FSET_LEVEL(pSet)) {
// Need to move the FSET to higher level
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs));
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
@ -456,8 +455,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
return -1;
}
@ -484,9 +482,9 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// Set and open commit FSET
if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) {
// Create a new FSET to write data
tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)));
tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateDFileSet(pWSet, true) < 0) {
if (tsdbCreateDFileSet(pRepo, pWSet, true) < 0) {
tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo),
TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno));
if (pCommith->isRFileSet) {
@ -509,8 +507,8 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
// TSDB_FILE_HEAD
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pWHeadf, true) < 0) {
tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pRepo, pWHeadf, true) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf),
tstrerror(terrno));
@ -556,10 +554,10 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
} else {
tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST);
pCommith->isLFileSame = false;
if (tsdbCreateDFile(pWLastf, true) < 0) {
if (tsdbCreateDFile(pRepo, pWLastf, true) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf),
tstrerror(terrno));

View File

@ -186,8 +186,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
}
} else {
// Create new fset as compacted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &did) < 0) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph);

View File

@ -23,14 +23,14 @@ static const char *tsdbTxnFname[] = {"current.t", "current"};
static int tsdbComparFidFSet(const void *arg1, const void *arg2);
static void tsdbResetFSStatus(SFSStatus *pStatus);
static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid);
static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus);
static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo);
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]);
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]);
static int tsdbOpenFSFromCurrent(STsdb *pRepo);
static int tsdbScanAndTryFixFS(STsdb *pRepo);
static int tsdbScanRootDir(STsdb *pRepo);
static int tsdbScanDataDir(STsdb *pRepo);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf);
static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf);
static int tsdbRestoreCurrent(STsdb *pRepo);
static int tsdbComparTFILE(const void *arg1, const void *arg2);
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired);
@ -97,7 +97,7 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
return tlen;
}
static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
static void *tsdbDecodeDFileSetArray(STsdb*pRepo, void *buf, SArray *pArray) {
uint64_t nset;
SDFileSet dset;
@ -105,7 +105,7 @@ static void *tsdbDecodeDFileSetArray(void *buf, SArray *pArray) {
buf = taosDecodeFixedU64(buf, &nset);
for (size_t i = 0; i < nset; i++) {
buf = tsdbDecodeDFileSet(buf, &dset);
buf = tsdbDecodeDFileSet(pRepo, buf, &dset);
taosArrayPush(pArray, (void *)(&dset));
}
return buf;
@ -122,13 +122,13 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
return tlen;
}
static void *tsdbDecodeFSStatus(void *buf, SFSStatus *pStatus) {
static void *tsdbDecodeFSStatus(STsdb*pRepo, void *buf, SFSStatus *pStatus) {
tsdbResetFSStatus(pStatus);
// pStatus->pmf = &(pStatus->mf);
// buf = tsdbDecodeSMFile(buf, pStatus->pmf);
buf = tsdbDecodeDFileSetArray(buf, pStatus->df);
buf = tsdbDecodeDFileSetArray(pRepo, buf, pStatus->df);
return buf;
}
@ -311,7 +311,7 @@ int tsdbOpenFS(STsdb *pRepo) {
ASSERT(pfs != NULL);
tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current);
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
tsdbGetRtnSnap(pRepo, &pRepo->rtn);
if (access(current, F_OK) == 0) {
@ -375,7 +375,7 @@ int tsdbEndFSTxn(STsdb *pRepo) {
SFSStatus *pStatus;
// Write current file system snapshot
if (tsdbSaveFSStatus(pfs->nstatus, REPO_ID(pRepo)) < 0) {
if (tsdbSaveFSStatus(pRepo, pfs->nstatus) < 0) {
tsdbEndFSTxnWithError(pfs);
return -1;
}
@ -405,7 +405,7 @@ int tsdbEndFSTxnWithError(STsdbFS *pfs) {
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFileSetToStatus(pfs->nstatus, pSet); }
static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
SFSHeader fsheader;
void * pBuf = NULL;
void * ptr;
@ -413,8 +413,8 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
char tfname[TSDB_FILENAME_LEN] = "\0";
char cfname[TSDB_FILENAME_LEN] = "\0";
tsdbGetTxnFname(vid, TSDB_TXN_TEMP_FILE, tfname);
tsdbGetTxnFname(vid, TSDB_TXN_CURR_FILE, cfname);
tsdbGetTxnFname(pRepo, TSDB_TXN_TEMP_FILE, tfname);
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, cfname);
int fd = open(tfname, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (fd < 0) {
@ -645,8 +645,9 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) {
}
}
static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]);
static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(pRepo->pTfs), pRepo->vgId,
tsdbTxnFname[ftype]);
}
static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
@ -657,7 +658,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
char current[TSDB_FILENAME_LEN] = "\0";
void * ptr;
tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current);
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
// current file exists, try to recover
fd = open(current, O_RDONLY | O_BINARY);
@ -725,7 +726,7 @@ static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
}
ptr = buffer;
ptr = tsdbDecodeFSStatus(ptr, pStatus);
ptr = tsdbDecodeFSStatus(pRepo, ptr, pStatus);
} else {
tsdbResetFSStatus(pStatus);
}
@ -910,17 +911,17 @@ static int tsdbScanRootDir(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo);
const TFILE *pf;
const STfsFile *pf;
tsdbGetRootDir(REPO_ID(pRepo), rootDir);
TDIR *tdir = tfsOpendir(rootDir);
STfsDir *tdir = tfsOpendir(pRepo->pTfs, rootDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno));
return -1;
}
while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname);
tfsBasename(pf, bname);
if (strcmp(bname, tsdbTxnFname[TSDB_TXN_CURR_FILE]) == 0 || strcmp(bname, "data") == 0) {
// Skip current file and data directory
@ -931,8 +932,8 @@ static int tsdbScanRootDir(STsdb *pRepo) {
// continue;
// }
(void)tfsremove(pf);
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf));
(void)tfsRemoveFile(pf);
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), pf->aname);
}
tfsClosedir(tdir);
@ -944,21 +945,21 @@ static int tsdbScanDataDir(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo);
const TFILE *pf;
const STfsFile *pf;
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
TDIR *tdir = tfsOpendir(dataDir);
STfsDir *tdir = tfsOpendir(pRepo->pTfs, dataDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno));
return -1;
}
while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname);
tfsBasename(pf, bname);
if (!tsdbIsTFileInFS(pfs, pf)) {
(void)tfsremove(pf);
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), TFILE_NAME(pf));
(void)tfsRemoveFile(pf);
tsdbDebug("vgId:%d invalid file %s is removed", REPO_ID(pRepo), pf->aname);
}
}
@ -967,7 +968,7 @@ static int tsdbScanDataDir(STsdb *pRepo) {
return 0;
}
static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
SFSIter fsiter;
tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD);
SDFileSet *pSet;
@ -987,8 +988,8 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// static int tsdbRestoreMeta(STsdb *pRepo) {
// char rootDir[TSDB_FILENAME_LEN];
// char bname[TSDB_FILENAME_LEN];
// TDIR * tdir = NULL;
// const TFILE *pf = NULL;
// STfsDir * tdir = NULL;
// const STfsFile *pf = NULL;
// const char * pattern = "^meta(-ver[0-9]+)?$";
// regex_t regex;
// STsdbFS * pfs = REPO_FS(pRepo);
@ -1007,7 +1008,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// }
// while ((pf = tfsReaddir(tdir))) {
// tfsbasename(pf, bname);
// tfsBasename(pf, bname);
// if (strcmp(bname, "data") == 0) {
// // Skip the data/ directory
@ -1016,7 +1017,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// if (strcmp(bname, tsdbTxnFname[TSDB_TXN_TEMP_FILE]) == 0) {
// // Skip current.t file
// tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
// tsdbInfo("vgId:%d file %s exists, remove it", REPO_ID(pRepo), pf->aname);
// (void)tfsremove(pf);
// continue;
// }
@ -1026,7 +1027,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// // Match
// if (pfs->cstatus->pmf != NULL) {
// tsdbError("vgId:%d failed to restore meta since two file exists, file1 %s and file2 %s", REPO_ID(pRepo),
// TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), TFILE_NAME(pf));
// TSDB_FILE_FULL_NAME(pfs->cstatus->pmf), pf->aname);
// terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
// tfsClosedir(tdir);
// regfree(&regex);
@ -1081,7 +1082,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
// }
// } else if (code == REG_NOMATCH) {
// // Not match
// tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
// tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), pf->aname);
// tfsremove(pf);
// continue;
// } else {
@ -1108,8 +1109,8 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) {
static int tsdbRestoreDFileSet(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
TDIR * tdir = NULL;
const TFILE *pf = NULL;
STfsDir * tdir = NULL;
const STfsFile *pf = NULL;
const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last)(-ver[0-9]+)?$";
SArray * fArray = NULL;
regex_t regex;
@ -1120,7 +1121,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
// Resource allocation and init
regcomp(&regex, pattern, REG_EXTENDED);
fArray = taosArrayInit(1024, sizeof(TFILE));
fArray = taosArrayInit(1024, sizeof(STfsFile));
if (fArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
@ -1129,7 +1130,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
return -1;
}
tdir = tfsOpendir(dataDir);
tdir = tfsOpendir(pRepo->pTfs, dataDir);
if (tdir == NULL) {
tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir,
tstrerror(terrno));
@ -1139,7 +1140,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
}
while ((pf = tfsReaddir(tdir))) {
tfsbasename(pf, bname);
tfsBasename(pf, bname);
int code = regexec(&regex, bname, 0, NULL, 0);
if (code == 0) {
@ -1152,8 +1153,8 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
}
} else if (code == REG_NOMATCH) {
// Not match
tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), TFILE_NAME(pf));
(void)tfsremove(pf);
tsdbInfo("vgId:%d invalid file %s exists, remove it", REPO_ID(pRepo), pf->aname);
(void)tfsRemoveFile(pf);
continue;
} else {
// Has other error
@ -1200,7 +1201,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) {
uint32_t tversion;
char _bname[TSDB_FILENAME_LEN];
tfsbasename(pf, _bname);
tfsBasename(pf, _bname);
tsdbParseDFilename(_bname, &tvid, &tfid, &ttype, &tversion);
ASSERT(tvid == REPO_ID(pRepo));
@ -1287,7 +1288,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
return -1;
}
if (tsdbSaveFSStatus(pRepo->fs->cstatus, REPO_ID(pRepo)) < 0) {
if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) {
tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
@ -1296,8 +1297,8 @@ static int tsdbRestoreCurrent(STsdb *pRepo) {
}
static int tsdbComparTFILE(const void *arg1, const void *arg2) {
TFILE *pf1 = (TFILE *)arg1;
TFILE *pf2 = (TFILE *)arg2;
STfsFile *pf1 = (STfsFile *)arg1;
STfsFile *pf2 = (STfsFile *)arg2;
int vid1, fid1, vid2, fid2;
TSDB_FILE_T ftype1, ftype2;
@ -1305,8 +1306,8 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
char bname1[TSDB_FILENAME_LEN];
char bname2[TSDB_FILENAME_LEN];
tfsbasename(pf1, bname1);
tfsbasename(pf2, bname2);
tfsBasename(pf1, bname1);
tfsBasename(pf2, bname2);
tsdbParseDFilename(bname1, &vid1, &fid1, &ftype1, &version1);
tsdbParseDFilename(bname2, &vid2, &fid2, &ftype2, &version2);

View File

@ -295,7 +295,7 @@ static int tsdbRollBackMFile(SMFile *pMFile) {
#endif
// ============== Operations on SDFile
void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver, TSDB_FILE_T ftype) {
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype) {
char fname[TSDB_FILENAME_LEN];
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_OK);
@ -305,8 +305,8 @@ void tsdbInitDFile(SDFile *pDFile, SDiskID did, int vid, int fid, uint32_t ver,
memset(&(pDFile->info), 0, sizeof(pDFile->info));
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
tsdbGetFilename(vid, fid, ver, ftype, fname);
tfsInitFile(&(pDFile->f), did.level, did.id, fname);
tsdbGetFilename(pRepo->vgId, fid, ver, ftype, fname);
tfsInitFile(pRepo->pTfs, &(pDFile->f), did, fname);
}
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile) {
@ -323,9 +323,9 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) {
return tlen;
}
void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) {
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile) {
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
buf = tfsDecodeFile(buf, &(pDFile->f));
buf = tfsDecodeFile(pRepo->pTfs, buf, &(pDFile->f));
TSDB_FILE_SET_CLOSED(pDFile);
return buf;
@ -352,15 +352,15 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
return buf;
}
int tsdbCreateDFile(SDFile *pDFile, bool updateHeader) {
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader) {
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
pDFile->fd = open(TSDB_FILE_FULL_NAME(pDFile), O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0755);
if (pDFile->fd < 0) {
if (errno == ENOENT) {
// Try to create directory recursively
char *s = strdup(TFILE_REL_NAME(&(pDFile->f)));
if (tfsMkdirRecurAt(dirname(s), TSDB_FILE_LEVEL(pDFile), TSDB_FILE_ID(pDFile)) < 0) {
char *s = strdup(TSDB_FILE_REL_NAME(pDFile));
if (tfsMkdirRecurAt(pRepo->pTfs, dirname(s), TSDB_FILE_DID(pDFile)) < 0) {
tfree(s);
return -1;
}
@ -559,13 +559,13 @@ static int tsdbRollBackDFile(SDFile *pDFile) {
}
// ============== Operations on SDFileSet
void tsdbInitDFileSet(SDFileSet *pSet, SDiskID did, int vid, int fid, uint32_t ver) {
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver) {
pSet->fid = fid;
pSet->state = 0;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
tsdbInitDFile(pDFile, did, vid, fid, ver, ftype);
tsdbInitDFile(pRepo, pDFile, did, fid, ver, ftype);
}
}
@ -587,14 +587,14 @@ int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
return tlen;
}
void *tsdbDecodeDFileSet(void *buf, SDFileSet *pSet) {
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
int32_t fid;
buf = taosDecodeFixedI32(buf, &(fid));
pSet->state = 0;
pSet->fid = fid;
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
buf = tsdbDecodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
buf = tsdbDecodeSDFile(pRepo, buf, TSDB_DFILE_IN_SET(pSet, ftype));
}
return buf;
}
@ -633,9 +633,9 @@ int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
return 0;
}
int tsdbCreateDFileSet(SDFileSet *pSet, bool updateHeader) {
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCreateDFile(TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader) < 0) {
tsdbCloseDFileSet(pSet);
tsdbRemoveDFileSet(pSet);
return -1;

View File

@ -16,12 +16,13 @@
#include "tsdbDef.h"
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta);
SMeta *pMeta, STfs *pTfs);
static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta) {
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
STfs *pTfs) {
STsdb *pTsdb = NULL;
// Set default TSDB Options
@ -36,7 +37,7 @@ STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAl
}
// Create the handle
pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta);
pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta, pTfs);
if (pTsdb == NULL) {
// TODO: handle error
return NULL;
@ -64,7 +65,7 @@ void tsdbRemove(const char *path) { taosRemoveDir(path); }
/* ------------------------ STATIC METHODS ------------------------ */
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
SMeta *pMeta) {
SMeta *pMeta, STfs *pTfs) {
STsdb *pTsdb = NULL;
pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
@ -78,6 +79,7 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
pTsdb->pmaf = pMAF;
pTsdb->pMeta = pMeta;
pTsdb->pTfs = pTfs;
pTsdb->fs = tsdbNewFS(pTsdbCfg);
@ -494,7 +496,7 @@ uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t
}
} else { // get the named file at the specified index. If not there, return 0
fname = malloc(256);
sprintf(fname, "%s/vnode/vnode%d/%s", TFS_PRIMARY_PATH(), REPO_ID(pRepo), name);
sprintf(fname, "%s/vnode/vnode%d/%s", tfsGetPrimaryPath(pRepo->pTfs), REPO_ID(pRepo), name);
if (access(fname, F_OK) != 0) {
tfree(fname);
return 0;

View File

@ -28,6 +28,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
if (pVnodeCfg != NULL) {
cfg.vgId = pVnodeCfg->vgId;
cfg.pDnode = pVnodeCfg->pDnode;
cfg.pTfs = pVnodeCfg->pTfs;
}
// Validate options
@ -75,6 +76,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
pVnode->vgId = pVnodeCfg->vgId;
pVnode->pDnode = pVnodeCfg->pDnode;
pVnode->pTfs = pVnodeCfg->pTfs;
pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
@ -109,7 +111,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
// Open tsdb
sprintf(dir, "%s/tsdb", pVnode->path);
pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta);
pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
if (pVnode->pTsdb == NULL) {
// TODO: handle error
return -1;

View File

@ -33,47 +33,69 @@ extern int32_t fsDebugFlag;
#define fError(...) { if (fsDebugFlag & DEBUG_ERROR) { taosPrintLog("TFS ERROR ", 255, __VA_ARGS__); }}
#define fWarn(...) { if (fsDebugFlag & DEBUG_WARN) { taosPrintLog("TFS WARN ", 255, __VA_ARGS__); }}
#define fInfo(...) { if (fsDebugFlag & DEBUG_INFO) { taosPrintLog("TFS ", 255, __VA_ARGS__); }}
#define fDebug(...) { if (fsDebugFlag & DEBUG_DEBUG) { taosPrintLog("TFS ", cqDebugFlag, __VA_ARGS__); }}
#define fTrace(...) { if (fsDebugFlag & DEBUG_TRACE) { taosPrintLog("TFS ", cqDebugFlag, __VA_ARGS__); }}
#define fDebug(...) { if (fsDebugFlag & DEBUG_DEBUG) { taosPrintLog("TFS ", fsDebugFlag, __VA_ARGS__); }}
#define fTrace(...) { if (fsDebugFlag & DEBUG_TRACE) { taosPrintLog("TFS ", fsDebugFlag, __VA_ARGS__); }}
// Global Definitions
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
typedef struct SDisk {
typedef struct {
int32_t level;
int32_t id;
char *path;
SDiskSize size;
} SDisk;
} STfsDisk;
typedef struct STier {
typedef struct {
pthread_spinlock_t lock;
int32_t level;
int16_t nextid; // next disk id to allocate
int16_t ndisk; // # of disks mounted to this tier
int16_t nAvailDisks; // # of Available disks
SDisk *disks[TSDB_MAX_DISKS_PER_TIER];
int32_t nextid; // next disk id to allocate
int32_t ndisk; // # of disks mounted to this tier
int32_t nAvailDisks; // # of Available disks
STfsDisk *disks[TFS_MAX_DISKS_PER_TIER];
SDiskSize size;
} STier;
} STfsTier;
#define TIER_LEVEL(pt) ((pt)->level)
#define TIER_NDISKS(pt) ((pt)->ndisk)
#define TIER_SIZE(pt) ((pt)->tmeta.size)
#define TIER_FREE_SIZE(pt) ((pt)->tmeta.free)
typedef struct {
STfsDisk *pDisk;
} SDiskIter;
#define DISK_AT_TIER(pt, id) ((pt)->disks[id])
#define DISK_DIR(pd) ((pd)->path)
typedef struct STfsDir {
SDiskIter iter;
SDiskID did;
char dirname[TSDB_FILENAME_LEN];
STfsFile tfile;
DIR *dir;
STfs *pTfs;
} STfsDir;
SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir);
SDisk *tfsFreeDisk(SDisk *pDisk);
int32_t tfsUpdateDiskSize(SDisk *pDisk);
typedef struct STfs {
pthread_spinlock_t lock;
SDiskSize size;
int32_t nlevel;
STfsTier tiers[TFS_MAX_TIERS];
SHashObj *hash; // name to did map
} STfs;
int32_t tfsInitTier(STier *pTier, int32_t level);
void tfsDestroyTier(STier *pTier);
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
void tfsUpdateTierSize(STier *pTier);
int32_t tfsAllocDiskOnTier(STier *pTier);
void tfsPosNextId(STier *pTier);
STfsDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir);
STfsDisk *tfsFreeDisk(STfsDisk *pDisk);
int32_t tfsUpdateDiskSize(STfsDisk *pDisk);
int32_t tfsInitTier(STfsTier *pTier, int32_t level);
void tfsDestroyTier(STfsTier *pTier);
STfsDisk *tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg);
void tfsUpdateTierSize(STfsTier *pTier);
int32_t tfsAllocDiskOnTier(STfsTier *pTier);
void tfsPosNextId(STfsTier *pTier);
#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock)
#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock)
#define tfsLock(pTfs) pthread_spin_lock(&(pTfs)->lock)
#define tfsUnLock(pTfs) pthread_spin_unlock(&(pTfs)->lock)
#define TFS_TIER_AT(pTfs, level) (&(pTfs)->tiers[level])
#define TFS_DISK_AT(pTfs, did) ((pTfs)->tiers[(did).level].disks[(did).id])
#define TFS_PRIMARY_DISK(pTfs) ((pTfs)->tiers[0].disks[0])
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
#ifdef __cplusplus
}

View File

@ -16,227 +16,200 @@
#define _DEFAULT_SOURCE
#include "tfsInt.h"
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
static int32_t tfsMount(STfs *pTfs, SDiskCfg *pCfg);
static int32_t tfsCheck(STfs *pTfs);
static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg);
static int32_t tfsFormatDir(char *idir, char *odir);
static STfsDisk *tfsGetDiskByName(STfs *pTfs, const char *dir);
static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir);
static STfsDisk *tfsNextDisk(STfs *pTfs, SDiskIter *pIter);
typedef struct {
pthread_spinlock_t lock;
SFSMeta meta;
int32_t nlevel;
STier tiers[TSDB_MAX_TIERS];
SHashObj *map; // name to did map
} SFS;
typedef struct {
SDisk *pDisk;
} SDiskIter;
#define TFS_META() (pfs->meta)
#define TFS_NLEVEL() (pfs->nlevel)
#define TFS_TIERS() (pfs->tiers)
#define TFS_TIER_AT(level) (TFS_TIERS() + (level))
#define TFS_DISK_AT(level, id) DISK_AT_TIER(TFS_TIER_AT(level), id)
#define TFS_PRIMARY_DISK() TFS_DISK_AT(TFS_PRIMARY_LEVEL, TFS_PRIMARY_ID)
#define TFS_IS_VALID_LEVEL(level) (((level) >= 0) && ((level) < TFS_NLEVEL()))
#define TFS_IS_VALID_ID(level, id) (((id) >= 0) && ((id) < TIER_NDISKS(TFS_TIER_AT(level))))
#define TFS_IS_VALID_DISK(level, id) (TFS_IS_VALID_LEVEL(level) && TFS_IS_VALID_ID(level, id))
#define tfsLock() pthread_spin_lock(&(pfs->lock))
#define tfsUnLock() pthread_spin_unlock(&(pfs->lock))
static SFS tfs = {0};
static SFS *pfs = &tfs;
// STATIC DECLARATION
static int32_t tfsMount(SDiskCfg *pCfg);
static int32_t tfsCheck();
static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg);
static int32_t tfsFormatDir(char *idir, char *odir);
static SDisk *tfsGetDiskByID(SDiskID did);
static SDisk *tfsGetDiskByName(const char *dir);
static int32_t tfsOpendirImpl(TDIR *tdir);
static void tfsInitDiskIter(SDiskIter *pIter);
static SDisk *tfsNextDisk(SDiskIter *pIter);
// FS APIs ====================================
int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) {
if (ndisk < 0) {
STfs *tfsOpen(SDiskCfg *pCfg, int32_t ndisk) {
if (ndisk < 0 || ndisk > TFS_MAX_DISKS) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
return NULL;
}
for (int32_t level = 0; level < TSDB_MAX_TIERS; level++) {
if (tfsInitTier(TFS_TIER_AT(level), level) < 0) {
while (true) {
level--;
if (level < 0) break;
STfs *pTfs = calloc(1, sizeof(STfs));
if (pTfs == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tfsDestroyTier(TFS_TIER_AT(level));
}
if (pthread_spin_init(&pTfs->lock, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
tfsClose(pTfs);
return NULL;
}
return -1;
for (int32_t level = 0; level < TFS_MAX_TIERS; level++) {
STfsTier *pTier = &pTfs->tiers[level];
if (tfsInitTier(pTier, level) < 0) {
tfsClose(pTfs);
return NULL;
}
}
pthread_spin_init(&(pfs->lock), 0);
pfs->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pfs->map == NULL) {
pTfs->hash = taosHashInit(TFS_MAX_DISKS * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pTfs->hash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tfsCleanup();
return -1;
tfsClose(pTfs);
return NULL;
}
for (int32_t idisk = 0; idisk < ndisk; idisk++) {
if (tfsMount(pDiskCfg + idisk) < 0) {
tfsCleanup();
return -1;
if (tfsMount(pTfs, &pCfg[idisk]) < 0) {
tfsClose(pTfs);
return NULL;
}
}
if (tfsCheck() < 0) {
tfsCleanup();
return -1;
if (tfsCheck(pTfs) < 0) {
tfsClose(pTfs);
return NULL;
}
tfsUpdateSize(NULL);
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
tfsPosNextId(TFS_TIER_AT(level));
tfsUpdateSize(pTfs);
for (int32_t level = 0; level < pTfs->nlevel; level++) {
tfsPosNextId(&pTfs->tiers[level]);
}
return 0;
return pTfs;
}
void tfsCleanup() {
taosHashCleanup(pfs->map);
pfs->map = NULL;
void tfsClose(STfs *pTfs) {
if (pTfs == NULL) return;
pthread_spin_destroy(&(pfs->lock));
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
tfsDestroyTier(TFS_TIER_AT(level));
for (int32_t level = 0; level < TFS_MAX_LEVEL; level++) {
tfsDestroyTier(&pTfs->tiers[level]);
}
taosHashCleanup(pTfs->hash);
pthread_spin_destroy(&pTfs->lock);
free(pTfs);
}
void tfsUpdateSize(SFSMeta *pFSMeta) {
SFSMeta fsMeta = {0};
void tfsUpdateSize(STfs *pTfs) {
SDiskSize size = {0};
if (pFSMeta == NULL) {
pFSMeta = &fsMeta;
}
memset(pFSMeta, 0, sizeof(SFSMeta));
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
for (int32_t level = 0; level < pTfs->nlevel; level++) {
STfsTier *pTier = &pTfs->tiers[level];
tfsUpdateTierSize(pTier);
pFSMeta->total += pTier->size.total;
pFSMeta->avail += pTier->size.avail;
pFSMeta->used += pTier->size.used;
size.total += pTier->size.total;
size.avail += pTier->size.avail;
size.used += pTier->size.used;
}
tfsLock();
pfs->meta = *pFSMeta;
tfsUnLock();
tfsLock(pTfs);
pTfs->size = size;
tfsUnLock(pTfs);
}
/* Allocate an existing available tier level
*/
void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id) {
ASSERT(expLevel >= 0);
SDiskSize tfsGetSize(STfs *pTfs) {
tfsLock(pTfs);
SDiskSize size = pTfs->size;
tfsUnLock(pTfs);
*level = expLevel;
*id = TFS_UNDECIDED_ID;
return size;
}
if (*level >= TFS_NLEVEL()) {
*level = TFS_NLEVEL() - 1;
int32_t tfsAllocDisk(STfs *pTfs, int32_t expLevel, SDiskID *pDiskId) {
pDiskId->level = expLevel;
pDiskId->id = -1;
if (pDiskId->level >= pTfs->nlevel) {
pDiskId->level--;
}
while (*level >= 0) {
*id = tfsAllocDiskOnTier(TFS_TIER_AT(*level));
if (*id == TFS_UNDECIDED_ID) {
(*level)--;
while (pDiskId->level >= 0) {
pDiskId->id = tfsAllocDiskOnTier(&pTfs->tiers[pDiskId->level]);
if (pDiskId->id < 0) {
pDiskId->level--;
continue;
}
return;
return 0;
}
*level = TFS_UNDECIDED_LEVEL;
*id = TFS_UNDECIDED_ID;
terrno = TSDB_CODE_FS_NO_VALID_DISK;
return -1;
}
const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); }
const char *TFS_DISK_PATH(int32_t level, int32_t id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
const char *tfsGetPrimaryPath(STfs *pTfs) { return TFS_PRIMARY_DISK(pTfs)->path; }
// TFILE APIs ====================================
void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname) {
ASSERT(TFS_IS_VALID_DISK(level, id));
const char *tfsGetDiskPath(STfs *pTfs, SDiskID diskId) { return TFS_DISK_AT(pTfs, diskId)->path; }
SDisk *pDisk = TFS_DISK_AT(level, id);
void tfsInitFile(STfs *pTfs, STfsFile *pFile, SDiskID diskId, const char *rname) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
pf->level = level;
pf->id = id;
tstrncpy(pf->rname, bname, TSDB_FILENAME_LEN);
pFile->did = diskId;
tstrncpy(pFile->rname, rname, TSDB_FILENAME_LEN);
char tmpName[TMPNAME_LEN] = {0};
snprintf(tmpName, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), bname);
tstrncpy(pf->aname, tmpName, TSDB_FILENAME_LEN);
snprintf(tmpName, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
tstrncpy(pFile->aname, tmpName, TSDB_FILENAME_LEN);
pFile->pTfs = pTfs;
}
bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2) {
ASSERT(pf1 != NULL || pf2 != NULL);
if (pf1 == NULL || pf2 == NULL) return false;
if (pf1->level != pf2->level) return false;
if (pf1->id != pf2->id) return false;
if (strncmp(pf1->rname, pf2->rname, TSDB_FILENAME_LEN) != 0) return false;
bool tfsIsSameFile(const STfsFile *pFile1, const STfsFile *pFile2) {
if (pFile1 == NULL || pFile2 == NULL || pFile1->pTfs != pFile2->pTfs) return false;
if (pFile1->did.level != pFile2->did.level) return false;
if (pFile1->did.id != pFile2->did.id) return false;
if (strncmp(pFile1->rname, pFile2->rname, TSDB_FILENAME_LEN) != 0) return false;
return true;
}
int32_t tfsEncodeFile(void **buf, TFILE *pf) {
int32_t tfsEncodeFile(void **buf, STfsFile *pFile) {
int32_t tlen = 0;
tlen += taosEncodeVariantI32(buf, pf->level);
tlen += taosEncodeVariantI32(buf, pf->id);
tlen += taosEncodeString(buf, pf->rname);
tlen += taosEncodeVariantI32(buf, pFile->did.level);
tlen += taosEncodeVariantI32(buf, pFile->did.id);
tlen += taosEncodeString(buf, pFile->rname);
return tlen;
}
void *tfsDecodeFile(void *buf, TFILE *pf) {
int32_t level, id;
char *rname;
void *tfsDecodeFile(STfs *pTfs, void *buf, STfsFile *pFile) {
SDiskID diskId = {0};
char *rname = NULL;
buf = taosDecodeVariantI32(buf, &(level));
buf = taosDecodeVariantI32(buf, &(id));
buf = taosDecodeVariantI32(buf, &diskId.level);
buf = taosDecodeVariantI32(buf, &diskId.id);
buf = taosDecodeString(buf, &rname);
tfsInitFile(pf, level, id, rname);
tfsInitFile(pTfs, pFile, diskId, rname);
tfree(rname);
return buf;
}
void tfsbasename(const TFILE *pf, char *dest) {
void tfsBasename(const STfsFile *pFile, char *dest) {
char tname[TSDB_FILENAME_LEN] = "\0";
tstrncpy(tname, pf->aname, TSDB_FILENAME_LEN);
tstrncpy(tname, pFile->aname, TSDB_FILENAME_LEN);
tstrncpy(dest, basename(tname), TSDB_FILENAME_LEN);
}
void tfsdirname(const TFILE *pf, char *dest) {
void tfsDirname(const STfsFile *pFile, char *dest) {
char tname[TSDB_FILENAME_LEN] = "\0";
tstrncpy(tname, pf->aname, TSDB_FILENAME_LEN);
tstrncpy(tname, pFile->aname, TSDB_FILENAME_LEN);
tstrncpy(dest, dirname(tname), TSDB_FILENAME_LEN);
}
// DIR APIs ====================================
int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id) {
SDisk *pDisk = TFS_DISK_AT(level, id);
char aname[TMPNAME_LEN];
int32_t tfsRemoveFile(const STfsFile *pFile) {
return remove(pFile->aname);
}
snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname);
int32_t tfsCopyFile(const STfsFile *pFile1, const STfsFile *pFile2) {
return taosCopyFile(pFile1->aname, pFile2->aname);
}
int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId) {
STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId);
char aname[TMPNAME_LEN];
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
if (taosMkDir(aname) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@ -245,8 +218,8 @@ int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id) {
return 0;
}
int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) {
if (tfsMkdirAt(rname, level, id) < 0) {
int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId) {
if (tfsMkdirAt(pTfs, rname, diskId) < 0) {
if (errno == ENOENT) {
// Try to create upper
char *s = strdup(rname);
@ -259,7 +232,7 @@ int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) {
// https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man3/dirname.3.html
char *dir = strdup(dirname(s));
if (tfsMkdirRecurAt(dir, level, id) < 0) {
if (tfsMkdirRecurAt(pTfs, dir, diskId) < 0) {
free(s);
free(dir);
return -1;
@ -267,7 +240,7 @@ int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) {
free(s);
free(dir);
if (tfsMkdirAt(rname, level, id) < 0) {
if (tfsMkdirAt(pTfs, rname, diskId) < 0) {
return -1;
}
} else {
@ -278,11 +251,12 @@ int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) {
return 0;
}
int32_t tfsMkdir(const char *rname) {
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
if (tfsMkdirAt(rname, level, id) < 0) {
int32_t tfsMkdir(STfs *pTfs, const char *rname) {
for (int32_t level = 0; level < pTfs->nlevel; level++) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = 0; id < pTier->ndisk; id++) {
SDiskID did = {.id = id, .level = level};
if (tfsMkdirAt(pTfs, rname, did) < 0) {
return -1;
}
}
@ -291,16 +265,14 @@ int32_t tfsMkdir(const char *rname) {
return 0;
}
int32_t tfsRmdir(const char *rname) {
int32_t tfsRmdir(STfs *pTfs, const char *rname) {
char aname[TMPNAME_LEN] = "\0";
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
SDisk *pDisk = pTier->disks[id];
snprintf(aname, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TS_PATH_DELIMITER, rname);
for (int32_t level = 0; level < pTfs->nlevel; level++) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id];
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
taosRemoveDir(aname);
}
}
@ -308,117 +280,108 @@ int32_t tfsRmdir(const char *rname) {
return 0;
}
#if 0
int32_t tfsRename(char *orname, char *nrname) {
int32_t tfsRename(STfs *pTfs, char *orname, char *nrname) {
char oaname[TMPNAME_LEN] = "\0";
char naname[TMPNAME_LEN] = "\0";
for (int32_t level = 0; level < pfs->nlevel; level++) {
STier *pTier = TFS_TIER_AT(level);
for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
SDisk *pDisk = DISK_AT_TIER(pTier, id);
snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname);
snprintf(naname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), nrname);
taosRenameFile(oaname, naname);
for (int32_t level = 0; level < pTfs->nlevel; level++) {
STfsTier *pTier = TFS_TIER_AT(pTfs, level);
for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id];
snprintf(oaname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, orname);
snprintf(naname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, nrname);
if (taosRenameFile(oaname, naname) != 0) {
return -1;
}
}
}
return 0;
}
#endif
struct TDIR {
SDiskIter iter;
int32_t level;
int32_t id;
char dirname[TSDB_FILENAME_LEN];
TFILE tfile;
DIR *dir;
};
TDIR *tfsOpendir(const char *rname) {
TDIR *tdir = (TDIR *)calloc(1, sizeof(*tdir));
if (tdir == NULL) {
STfsDir *tfsOpendir(STfs *pTfs, const char *rname) {
STfsDir *pDir = calloc(1, sizeof(STfsDir));
if (pDir == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tfsInitDiskIter(&(tdir->iter));
tstrncpy(tdir->dirname, rname, TSDB_FILENAME_LEN);
SDiskID diskId = {.id = 0, .level = 0};
pDir->iter.pDisk = TFS_DISK_AT(pTfs, diskId);
pDir->pTfs = pTfs;
tstrncpy(pDir->dirname, rname, TSDB_FILENAME_LEN);
if (tfsOpendirImpl(tdir) < 0) {
free(tdir);
if (tfsOpendirImpl(pTfs, pDir) < 0) {
free(pDir);
return NULL;
}
return tdir;
return pDir;
}
const TFILE *tfsReaddir(TDIR *tdir) {
if (tdir == NULL || tdir->dir == NULL) return NULL;
const STfsFile *tfsReaddir(STfsDir *pDir) {
if (pDir == NULL || pDir->dir == NULL) return NULL;
char bname[TMPNAME_LEN * 2] = "\0";
while (true) {
struct dirent *dp = NULL;
dp = readdir(tdir->dir);
dp = readdir(pDir->dir);
if (dp != NULL) {
// Skip . and ..
if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0) continue;
snprintf(bname, TMPNAME_LEN * 2, "%s/%s", tdir->dirname, dp->d_name);
tfsInitFile(&(tdir->tfile), tdir->level, tdir->id, bname);
return &(tdir->tfile);
snprintf(bname, TMPNAME_LEN * 2, "%s%s%s", pDir->dirname, TD_DIRSEP, dp->d_name);
tfsInitFile(pDir->pTfs, &pDir->tfile, pDir->did, bname);
return &pDir->tfile;
}
if (tfsOpendirImpl(tdir) < 0) {
if (tfsOpendirImpl(pDir->pTfs, pDir) < 0) {
return NULL;
}
if (tdir->dir == NULL) {
if (pDir->dir == NULL) {
terrno = TSDB_CODE_SUCCESS;
return NULL;
}
}
}
void tfsClosedir(TDIR *tdir) {
if (tdir) {
if (tdir->dir != NULL) {
closedir(tdir->dir);
tdir->dir = NULL;
void tfsClosedir(STfsDir *pDir) {
if (pDir) {
if (pDir->dir != NULL) {
closedir(pDir->dir);
pDir->dir = NULL;
}
free(tdir);
free(pDir);
}
}
// private
static int32_t tfsMount(SDiskCfg *pCfg) {
SDiskID did;
SDisk *pDisk = NULL;
static int32_t tfsMount(STfs *pTfs, SDiskCfg *pCfg) {
if (tfsCheckAndFormatCfg(pTfs, pCfg) < 0) {
return -1;
}
if (tfsCheckAndFormatCfg(pCfg) < 0) return -1;
did.level = pCfg->level;
pDisk = tfsMountDiskToTier(TFS_TIER_AT(did.level), pCfg);
SDiskID did = {.level = pCfg->level};
STfsDisk *pDisk = tfsMountDiskToTier(TFS_TIER_AT(pTfs, did.level), pCfg);
if (pDisk == NULL) {
fError("failed to mount disk %s to level %d since %s", pCfg->dir, pCfg->level, tstrerror(terrno));
fError("failed to mount disk %s to level %d since %s", pCfg->dir, pCfg->level, terrstr());
return -1;
}
did.id = pDisk->id;
taosHashPut(pfs->map, (void *)(pCfg->dir), strnlen(pCfg->dir, TSDB_FILENAME_LEN), (void *)(&did), sizeof(did));
if (pfs->nlevel < pCfg->level + 1) pfs->nlevel = pCfg->level + 1;
taosHashPut(pTfs->hash, (void *)(pCfg->dir), strnlen(pCfg->dir, TSDB_FILENAME_LEN), (void *)(&did), sizeof(did));
if (pTfs->nlevel < pCfg->level + 1) {
pTfs->nlevel = pCfg->level + 1;
}
return 0;
}
static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
static int32_t tfsCheckAndFormatCfg(STfs *pTfs, SDiskCfg *pCfg) {
char dirName[TSDB_FILENAME_LEN] = "\0";
struct stat pstat;
if (pCfg->level < 0 || pCfg->level >= TSDB_MAX_TIERS) {
if (pCfg->level < 0 || pCfg->level >= TFS_MAX_TIERS) {
fError("failed to mount %s to FS since invalid level %d", pCfg->dir, pCfg->level);
terrno = TSDB_CODE_FS_INVLD_CFG;
return -1;
@ -431,7 +394,7 @@ static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
return -1;
}
if (TFS_PRIMARY_DISK() != NULL) {
if (TFS_PRIMARY_DISK(pTfs) != NULL) {
fError("failed to mount %s to FS since duplicate primary mount", pCfg->dir);
terrno = TSDB_CODE_FS_DUP_PRIMARY;
return -1;
@ -444,7 +407,7 @@ static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
return -1;
}
if (tfsGetDiskByName(dirName) != NULL) {
if (tfsGetDiskByName(pTfs, dirName) != NULL) {
fError("failed to mount %s to FS since duplicate mount", pCfg->dir);
terrno = TSDB_CODE_FS_INVLD_CFG;
return -1;
@ -494,15 +457,15 @@ static int32_t tfsFormatDir(char *idir, char *odir) {
return 0;
}
static int32_t tfsCheck() {
if (TFS_PRIMARY_DISK() == NULL) {
static int32_t tfsCheck(STfs *pTfs) {
if (TFS_PRIMARY_DISK(pTfs) == NULL) {
fError("no primary disk is set");
terrno = TSDB_CODE_FS_NO_PRIMARY_DISK;
return -1;
}
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
if (TIER_NDISKS(TFS_TIER_AT(level)) == 0) {
for (int32_t level = 0; level < pTfs->nlevel; level++) {
if (TFS_TIER_AT(pTfs, level)->ndisk == 0) {
fError("no disk at level %d", level);
terrno = TSDB_CODE_FS_NO_MOUNT_AT_TIER;
return -1;
@ -512,66 +475,55 @@ static int32_t tfsCheck() {
return 0;
}
static SDisk *tfsGetDiskByID(SDiskID did) { return TFS_DISK_AT(did.level, did.id); }
static SDisk *tfsGetDiskByName(const char *dir) {
SDiskID did;
SDisk *pDisk = NULL;
void *pr = NULL;
pr = taosHashGet(pfs->map, (void *)dir, strnlen(dir, TSDB_FILENAME_LEN));
static STfsDisk *tfsGetDiskByName(STfs *pTfs, const char *dir) {
void *pr = taosHashGet(pTfs->hash, (void *)dir, strnlen(dir, TSDB_FILENAME_LEN));
if (pr == NULL) return NULL;
did = *(SDiskID *)pr;
pDisk = tfsGetDiskByID(did);
ASSERT(pDisk != NULL);
SDiskID did = *(SDiskID *)pr;
STfsDisk *pDisk = TFS_DISK_AT(pTfs, did);
return pDisk;
}
static int32_t tfsOpendirImpl(TDIR *tdir) {
SDisk *pDisk = NULL;
char adir[TMPNAME_LEN * 2] = "\0";
static int32_t tfsOpendirImpl(STfs *pTfs, STfsDir *pDir) {
STfsDisk *pDisk = NULL;
char adir[TMPNAME_LEN * 2] = "\0";
if (tdir->dir != NULL) {
closedir(tdir->dir);
tdir->dir = NULL;
if (pDir->dir != NULL) {
closedir(pDir->dir);
pDir->dir = NULL;
}
while (true) {
pDisk = tfsNextDisk(&(tdir->iter));
pDisk = tfsNextDisk(pTfs, &pDir->iter);
if (pDisk == NULL) return 0;
tdir->level = pDisk->level;
tdir->id = pDisk->id;
pDir->did.level = pDisk->level;
pDir->did.id = pDisk->id;
snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TS_PATH_DELIMITER,tdir->dirname);
tdir->dir = opendir(adir);
if (tdir->dir != NULL) break;
snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TD_DIRSEP, pDir->dirname);
pDir->dir = opendir(adir);
if (pDir->dir != NULL) break;
}
return 0;
}
static void tfsInitDiskIter(SDiskIter *pIter) { pIter->pDisk = TFS_DISK_AT(0, 0); }
static SDisk *tfsNextDisk(SDiskIter *pIter) {
SDisk *pDisk = pIter->pDisk;
static STfsDisk *tfsNextDisk(STfs *pTfs, SDiskIter *pIter) {
if (pIter == NULL) return NULL;
STfsDisk *pDisk = pIter->pDisk;
if (pDisk == NULL) return NULL;
int32_t level = pDisk->level;
int32_t id = pDisk->id;
SDiskID did = {.level = pDisk->level, .id = pDisk->id + 1};
id++;
if (id < TIER_NDISKS(TFS_TIER_AT(level))) {
pIter->pDisk = TFS_DISK_AT(level, id);
ASSERT(pIter->pDisk != NULL);
if (did.id < TFS_TIER_AT(pTfs, did.level)->ndisk) {
pIter->pDisk = TFS_DISK_AT(pTfs, did);
} else {
level++;
id = 0;
if (level < TFS_NLEVEL()) {
pIter->pDisk = TFS_DISK_AT(level, id);
ASSERT(pIter->pDisk != NULL);
did.level++;
did.id = 0;
if (did.level < pTfs->nlevel) {
pIter->pDisk = TFS_DISK_AT(pTfs, did);
} else {
pIter->pDisk = NULL;
}
@ -579,25 +531,3 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) {
return pDisk;
}
// OTHER FUNCTIONS ===================================
void taosGetDisk() {
const double unit = 1024 * 1024 * 1024;
SDiskSize diskSize;
SFSMeta fsMeta;
tfsUpdateSize(&fsMeta);
tsTotalDataDirGB = (float)(fsMeta.total / unit);
tsUsedDataDirGB = (float)(fsMeta.used / unit);
tsAvailDataDirGB = (float)(fsMeta.avail / unit);
if (taosGetDiskSize(tsLogDir, &diskSize) == 0) {
tsTotalLogDirGB = (float)(diskSize.total / unit);
tsAvailLogDirGB = (float)(diskSize.avail / unit);
}
if (taosGetDiskSize(tsTempDir, &diskSize) == 0) {
tsTotalTmpDirGB = (float)(diskSize.total / unit);
tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit);
}
}

View File

@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "tfsInt.h"
SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
SDisk *pDisk = calloc(1, sizeof(SDisk));
STfsDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
STfsDisk *pDisk = calloc(1, sizeof(STfsDisk));
if (pDisk == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
@ -36,7 +36,7 @@ SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
return pDisk;
}
SDisk *tfsFreeDisk(SDisk *pDisk) {
STfsDisk *tfsFreeDisk(STfsDisk *pDisk) {
if (pDisk != NULL) {
free(pDisk->path);
free(pDisk);
@ -45,8 +45,8 @@ SDisk *tfsFreeDisk(SDisk *pDisk) {
return NULL;
}
int32_t tfsUpdateDiskSize(SDisk *pDisk) {
if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) {
int32_t tfsUpdateDiskSize(STfsDisk *pDisk) {
if (taosGetDiskSize(pDisk->path, &pDisk->size) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr());
return -1;

View File

@ -16,11 +16,8 @@
#define _DEFAULT_SOURCE
#include "tfsInt.h"
#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock)
#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock)
int32_t tfsInitTier(STier *pTier, int32_t level) {
memset(pTier, 0, sizeof(STier));
int32_t tfsInitTier(STfsTier *pTier, int32_t level) {
memset(pTier, 0, sizeof(STfsTier));
if (pthread_spin_init(&pTier->lock, 0) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
@ -31,17 +28,17 @@ int32_t tfsInitTier(STier *pTier, int32_t level) {
return 0;
}
void tfsDestroyTier(STier *pTier) {
for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) {
void tfsDestroyTier(STfsTier *pTier) {
for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; id++) {
pTier->disks[id] = tfsFreeDisk(pTier->disks[id]);
}
pTier->ndisk = 0;
pthread_spin_destroy(&(pTier->lock));
pthread_spin_destroy(&pTier->lock);
}
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
if (pTier->ndisk >= TSDB_MAX_DISKS_PER_TIER) {
STfsDisk *tfsMountDiskToTier(STfsTier *pTier, SDiskCfg *pCfg) {
if (pTier->ndisk >= TFS_MAX_DISKS_PER_TIER) {
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
return NULL;
}
@ -61,12 +58,12 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
id = pTier->ndisk;
}
if (id >= TSDB_MAX_DISKS_PER_TIER) {
if (id >= TFS_MAX_DISKS_PER_TIER) {
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
return NULL;
}
SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
STfsDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
if (pDisk == NULL) return NULL;
pTier->disks[id] = pDisk;
@ -76,15 +73,16 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
return pTier->disks[id];
}
void tfsUpdateTierSize(STier *pTier) {
void tfsUpdateTierSize(STfsTier *pTier) {
SDiskSize size = {0};
int16_t nAvailDisks = 0;
int32_t nAvailDisks = 0;
tfsLockTier(pTier);
for (int32_t id = 0; id < pTier->ndisk; id++) {
SDisk *pDisk = pTier->disks[id];
STfsDisk *pDisk = pTier->disks[id];
if (pDisk == NULL) continue;
if (tfsUpdateDiskSize(pDisk) < 0) continue;
size.total += pDisk->size.total;
size.used += pDisk->size.used;
@ -99,7 +97,7 @@ void tfsUpdateTierSize(STier *pTier) {
}
// Round-Robin to allocate disk on a tier
int32_t tfsAllocDiskOnTier(STier *pTier) {
int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
terrno = TSDB_CODE_FS_NO_VALID_DISK;
tfsLockTier(pTier);
@ -110,9 +108,9 @@ int32_t tfsAllocDiskOnTier(STier *pTier) {
}
int32_t retId = -1;
for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; ++id) {
int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
SDisk *pDisk = pTier->disks[diskId];
for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; ++id) {
int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
STfsDisk *pDisk = pTier->disks[diskId];
if (pDisk == NULL) continue;
@ -128,12 +126,12 @@ int32_t tfsAllocDiskOnTier(STier *pTier) {
return retId;
}
void tfsPosNextId(STier *pTier) {
void tfsPosNextId(STfsTier *pTier) {
int32_t nextid = 0;
for (int32_t id = 1; id < pTier->ndisk; id++) {
SDisk *pLDisk = pTier->disks[nextid];
SDisk *pDisk = pTier->disks[id];
STfsDisk *pLDisk = pTier->disks[nextid];
STfsDisk *pDisk = pTier->disks[id];
if (pDisk->size.avail > TFS_MIN_DISK_FREE_SIZE && pDisk->size.avail > pLDisk->size.avail) {
nextid = id;
}

View File

@ -142,7 +142,7 @@ int64_t taosWriteFile(FileFd fd, const void *buf, int64_t n) {
int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence) { return (int64_t)lseek(fd, (long)offset, whence); }
int64_t taosCopyFile(char *from, char *to) {
int64_t taosCopyFile(const char *from, const char *to) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
@ -400,7 +400,7 @@ int32_t taosFsyncFile(FileFd fd) {
#endif
}
int32_t taosRenameFile(char *oldName, char *newName) {
int32_t taosRenameFile(const char *oldName, const char *newName) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
int32_t code = MoveFileEx(oldName, newName, MOVEFILE_REPLACE_EXISTING | MOVEFILE_COPY_ALLOWED);
if (code < 0) {

View File

@ -67,7 +67,31 @@ sql insert into c1 values(now+1s, 1)
sql insert into c1 values(now+2s, 2)
sql insert into c1 values(now+3s, 3)
return
print =============== query data
sql select * from c1
if $rows != 3 then
return -1
endi
print $data00 $data01
print $data10 $data11
print $data20 $data11
if $data01 != 1 then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data21 != 3 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print =============== query data
sql select * from c1
if $rows != 3 then

View File

@ -1,105 +0,0 @@
############## config parameter #####################
$node1 = 192.168.0.201
$node2 = 192.168.0.202
$node3 = 192.168.0.203
$node4 = 192.168.0.204
$self = $node1
$num = 25
#deploy = 0, start = 1, stop = 2
$option = 0
print =============== option:$option
############### stop dnodes #####################
if $option == 0 then
system sh/stop_dnodes.sh
endi
############### process firstEp #####################
$firstEp = $node1 . :7100
$firstPort = 7100
if $self == $node1 then
if $option == 1 then
system sh/exec.sh -n dnode1 -s start
endi
if $option == 2 then
system sh/exec.sh -n dnode1 -s stop -x SIGINT
endi
if $option == 0 then
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c firstEp -v $firstEp
system sh/cfg.sh -n dnode1 -c secondEp -v $firstEp
system sh/cfg.sh -n dnode1 -c fqdn -v $node1
system sh/cfg.sh -n dnode1 -c serverPort -v $firstPort
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
system sh/exec.sh -n dnode1 -s start
sql connect
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node1 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node2 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node3 port $port
endw
$i = 0
while $i < $num
$port = $i * 100
$port = $port + 8100
$i = $i + 1
sql create dnode $node4 port $port
endw
endi
endi
############### process nodes #####################
$i = 0
while $i < $num
$index = $i + 80
$port = $i * 100
$port = $port + 8100
$dnodename = dnode . $index
$i = $i + 1
if $option == 1 then
system sh/exec.sh -n $dnodename -s start
endi
if $option == 2 then
system sh/exec.sh -n $dnodename -s stop -x SIGINT
endi
if $option == 0 then
system sh/deploy.sh -n $dnodename -i 1
system sh/cfg.sh -n $dnodename -c firstEp -v $firstEp
system sh/cfg.sh -n $dnodename -c secondEp -v $firstEp
system sh/cfg.sh -n $dnodename -c fqdn -v $self
system sh/cfg.sh -n $dnodename -c serverPort -v $port
system sh/exec.sh -n $dnodename -s start
endi
endw