TD-353
This commit is contained in:
parent
131be7b0da
commit
2f9e141512
|
@ -142,7 +142,7 @@ typedef struct {
|
||||||
} STsdbFileInfo;
|
} STsdbFileInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char* fname;
|
char fname[TSDB_FILENAME_LEN];
|
||||||
int fd;
|
int fd;
|
||||||
|
|
||||||
STsdbFileInfo info;
|
STsdbFileInfo info;
|
||||||
|
@ -345,7 +345,6 @@ void tsdbFitRetention(STsdbRepo* pRepo);
|
||||||
int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
|
int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
|
||||||
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
|
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
|
||||||
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
|
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
|
||||||
int tsdbCpySFile(SFile* src, SFile* dst);
|
|
||||||
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
||||||
|
|
||||||
// ------------------ tsdbRWHelper.c
|
// ------------------ tsdbRWHelper.c
|
||||||
|
@ -389,7 +388,7 @@ int tsdbLoadBlockData(SRWHelper* pHelper, SCompBlock* pCompBlock, SDataCols* t
|
||||||
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
|
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
|
||||||
|
|
||||||
char* tsdbGetMetaFileName(char* rootDir);
|
char* tsdbGetMetaFileName(char* rootDir);
|
||||||
char* tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type);
|
void tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type, char* fname);
|
||||||
int tsdbLockRepo(STsdbRepo* pRepo);
|
int tsdbLockRepo(STsdbRepo* pRepo);
|
||||||
int tsdbUnlockRepo(STsdbRepo* pRepo);
|
int tsdbUnlockRepo(STsdbRepo* pRepo);
|
||||||
char* tsdbGetDataDirName(char* rootDir);
|
char* tsdbGetDataDirName(char* rootDir);
|
||||||
|
|
|
@ -245,8 +245,7 @@ int tsdbCreateFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
|
||||||
memset((void *)pFile, 0, sizeof(SFile));
|
memset((void *)pFile, 0, sizeof(SFile));
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
|
|
||||||
pFile->fname = tsdbGetDataFileName(pRepo, fid, type);
|
tsdbGetDataFileName(pRepo, fid, type, pFile->fname);
|
||||||
if (pFile->fname == NULL) return -1;
|
|
||||||
|
|
||||||
if (access(pFile->fname, F_OK) == 0) {
|
if (access(pFile->fname, F_OK) == 0) {
|
||||||
tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), fid);
|
tsdbError("vgId:%d file %s already exists", REPO_ID(pRepo), fid);
|
||||||
|
@ -343,18 +342,6 @@ void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbCpySFile(SFile *src, SFile *dst) {
|
|
||||||
*dst = *src;
|
|
||||||
dst->fname = strdup(dst->fname);
|
|
||||||
|
|
||||||
if (dst->fname == NULL) {
|
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
|
void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
|
||||||
ASSERT(pFGroup != NULL);
|
ASSERT(pFGroup != NULL);
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
|
@ -380,8 +367,7 @@ static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
|
||||||
uint32_t version;
|
uint32_t version;
|
||||||
char buf[512] = "\0";
|
char buf[512] = "\0";
|
||||||
|
|
||||||
pFile->fname = tsdbGetDataFileName(pRepo, fid, type);
|
tsdbGetDataFileName(pRepo, fid, type, pFile->fname);
|
||||||
if (pFile->fname == NULL) return -1;
|
|
||||||
|
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
|
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
|
||||||
|
@ -410,10 +396,7 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbDestroyFile(SFile *pFile) {
|
static void tsdbDestroyFile(SFile *pFile) { tsdbCloseFile(pFile); }
|
||||||
tsdbCloseFile(pFile);
|
|
||||||
tfree(pFile->fname);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int compFGroup(const void *arg1, const void *arg2) {
|
static int compFGroup(const void *arg1, const void *arg2) {
|
||||||
int val1 = ((SFileGroup *)arg1)->fileId;
|
int val1 = ((SFileGroup *)arg1)->fileId;
|
||||||
|
|
|
@ -337,17 +337,8 @@ char *tsdbGetMetaFileName(char *rootDir) {
|
||||||
return fname;
|
return fname;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *tsdbGetDataFileName(STsdbRepo *pRepo, int fid, int type) {
|
void tsdbGetDataFileName(STsdbRepo *pRepo, int fid, int type, char *fname) {
|
||||||
int tlen = strlen(pRepo->rootDir) + strlen(tsdbFileSuffix[type]) + 24;
|
snprintf(fname, TSDB_FILENAME_LEN, "%s/%s/v%df%d.%s", pRepo->rootDir, TSDB_DATA_DIR_NAME, REPO_ID(pRepo), fid, tsdbFileSuffix[type]);
|
||||||
|
|
||||||
char *fname = malloc(tlen);
|
|
||||||
if (fname == NULL) {
|
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
sprintf(fname, "%s/%s/v%df%d.%s", pRepo->rootDir, TSDB_DATA_DIR_NAME, REPO_ID(pRepo), fid, tsdbFileSuffix[type]);
|
|
||||||
return fname;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbLockRepo(STsdbRepo *pRepo) {
|
int tsdbLockRepo(STsdbRepo *pRepo) {
|
||||||
|
|
|
@ -499,6 +499,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
char * dataDir = NULL;
|
char * dataDir = NULL;
|
||||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||||
STsdbCfg * pCfg = &pRepo->config;
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
SFileGroup *pGroup = NULL;
|
SFileGroup *pGroup = NULL;
|
||||||
|
|
||||||
TSKEY minKey = 0, maxKey = 0;
|
TSKEY minKey = 0, maxKey = 0;
|
||||||
|
@ -588,10 +589,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbCloseHelperFile(pHelper, 0);
|
tsdbCloseHelperFile(pHelper, 0);
|
||||||
// TODO: make it atomic with some methods
|
|
||||||
|
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF;
|
pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF;
|
||||||
pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF;
|
pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF;
|
||||||
pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF;
|
pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF;
|
||||||
|
pthread_rwlock_unlock(&(pFileH->fhlock));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
|
|
@ -100,13 +100,12 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
|
|
||||||
// Set the files
|
// Set the files
|
||||||
pHelper->files.fid = pGroup->fileId;
|
pHelper->files.fid = pGroup->fileId;
|
||||||
tsdbCpySFile(&pHelper->files.headF, &pGroup->files[TSDB_FILE_TYPE_HEAD]);
|
pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
|
||||||
tsdbCpySFile(&pHelper->files.dataF, &pGroup->files[TSDB_FILE_TYPE_DATA]);
|
pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
|
||||||
tsdbCpySFile(&pHelper->files.lastF, &pGroup->files[TSDB_FILE_TYPE_LAST]);
|
pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||||
|
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, pHelper->files.nHeadF.fname);
|
||||||
pHelper->files.nHeadF.fname = tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD);
|
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, pHelper->files.nLastF.fname);
|
||||||
pHelper->files.nLastF.fname = tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open the files
|
// Open the files
|
||||||
|
@ -1036,15 +1035,10 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey)
|
||||||
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
|
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
|
||||||
memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
|
memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
|
||||||
pHelper->files.fid = -1;
|
pHelper->files.fid = -1;
|
||||||
tfree(pHelper->files.headF.fname);
|
|
||||||
pHelper->files.headF.fd = -1;
|
pHelper->files.headF.fd = -1;
|
||||||
tfree(pHelper->files.dataF.fname);
|
|
||||||
pHelper->files.dataF.fd = -1;
|
pHelper->files.dataF.fd = -1;
|
||||||
tfree(pHelper->files.lastF.fname);
|
|
||||||
pHelper->files.lastF.fd = -1;
|
pHelper->files.lastF.fd = -1;
|
||||||
tfree(pHelper->files.nHeadF.fname);
|
|
||||||
pHelper->files.nHeadF.fd = -1;
|
pHelper->files.nHeadF.fd = -1;
|
||||||
tfree(pHelper->files.nLastF.fname);
|
|
||||||
pHelper->files.nLastF.fd = -1;
|
pHelper->files.nLastF.fd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue