TD-34
This commit is contained in:
parent
016b5e8550
commit
d37876fe31
|
@ -42,13 +42,16 @@ typedef enum {
|
||||||
extern const char *tsdbFileSuffix[];
|
extern const char *tsdbFileSuffix[];
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
|
||||||
int fd;
|
|
||||||
char fname[128];
|
|
||||||
int64_t size; // total size of the file
|
int64_t size; // total size of the file
|
||||||
int64_t tombSize; // unused file size
|
int64_t tombSize; // unused file size
|
||||||
int32_t totalBlocks;
|
int32_t totalBlocks;
|
||||||
int32_t totalSubBlocks;
|
int32_t totalSubBlocks;
|
||||||
|
} SFileInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int fd;
|
||||||
|
char fname[128];
|
||||||
|
SFileInfo info;
|
||||||
} SFile;
|
} SFile;
|
||||||
|
|
||||||
#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1)
|
#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1)
|
||||||
|
@ -74,7 +77,7 @@ void tsdbCloseFileH(STsdbFileH *pFileH);
|
||||||
int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose);
|
int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose);
|
||||||
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
|
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
|
||||||
int tsdbOpenFile(SFile *pFile, int oflag);
|
int tsdbOpenFile(SFile *pFile, int oflag);
|
||||||
SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
|
int tsdbCloseFile(SFile *pFile); SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
|
||||||
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
|
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -268,7 +268,7 @@ static int compFGroup(const void *arg1, const void *arg2) {
|
||||||
static int tsdbWriteFileHead(SFile *pFile) {
|
static int tsdbWriteFileHead(SFile *pFile) {
|
||||||
char head[TSDB_FILE_HEAD_SIZE] = "\0";
|
char head[TSDB_FILE_HEAD_SIZE] = "\0";
|
||||||
|
|
||||||
pFile->size += TSDB_FILE_HEAD_SIZE;
|
pFile->info.size += TSDB_FILE_HEAD_SIZE;
|
||||||
|
|
||||||
// TODO: write version and File statistic to the head
|
// TODO: write version and File statistic to the head
|
||||||
lseek(pFile->fd, 0, SEEK_SET);
|
lseek(pFile->fd, 0, SEEK_SET);
|
||||||
|
@ -292,7 +292,7 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pFile->size += size;
|
pFile->info.size += size;
|
||||||
|
|
||||||
free(buf);
|
free(buf);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -315,6 +315,12 @@ int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbCloseFile(SFile *pFile) {
|
||||||
|
int ret = close(pFile->fd);
|
||||||
|
pFile->fd = -1;
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) {
|
SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) {
|
||||||
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid);
|
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid);
|
||||||
if (pGroup == NULL) return NULL;
|
if (pGroup == NULL) return NULL;
|
||||||
|
@ -325,14 +331,6 @@ SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) {
|
||||||
return pGroup;
|
return pGroup;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCloseFile(SFile *pFile) {
|
|
||||||
if (!TSDB_IS_FILE_OPENED(pFile)) return -1;
|
|
||||||
int ret = close(pFile->fd);
|
|
||||||
pFile->fd = -1;
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) {
|
int tsdbCreateFile(char *dataDir, int fileId, char *suffix, int maxTables, SFile *pFile, int writeHeader, int toClose) {
|
||||||
memset((void *)pFile, 0, sizeof(SFile));
|
memset((void *)pFile, 0, sizeof(SFile));
|
||||||
pFile->fd = -1;
|
pFile->fd = -1;
|
||||||
|
|
|
@ -937,7 +937,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
||||||
} else {
|
} else {
|
||||||
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
|
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
|
||||||
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
|
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
|
||||||
hFile.size += pIdx->len;
|
hFile.info.size += pIdx->len;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
|
@ -977,8 +977,23 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: close the files and replace the .head and .last file
|
// Write the SCompIdx part
|
||||||
{}
|
if (lseek(hFile.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {/* TODO */}
|
||||||
|
if (write(hFile.fd, (void *)pIndices, sizeof(SCompIdx) * pCfg->maxTables) < 0) {/* TODO */}
|
||||||
|
|
||||||
|
// close the files
|
||||||
|
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
tsdbCloseFile(&pGroup->files[type]);
|
||||||
|
}
|
||||||
|
tsdbCloseFile(&hFile);
|
||||||
|
if (isNewLastFile) tsdbCloseFile(&lFile);
|
||||||
|
// TODO: replace the .head and .last file
|
||||||
|
rename(hFile.fname, pGroup->files[TSDB_FILE_TYPE_HEAD].fname);
|
||||||
|
pGroup->files[TSDB_FILE_TYPE_HEAD].info = hFile.info;
|
||||||
|
if (isNewLastFile) {
|
||||||
|
rename(lFile.fname, pGroup->files[TSDB_FILE_TYPE_LAST].fname);
|
||||||
|
pGroup->files[TSDB_FILE_TYPE_LAST].info = lFile.info;
|
||||||
|
}
|
||||||
|
|
||||||
if (pIndices) free(pIndices);
|
if (pIndices) free(pIndices);
|
||||||
if (pCompInfo) free(pCompInfo);
|
if (pCompInfo) free(pCompInfo);
|
||||||
|
|
Loading…
Reference in New Issue