TD-34
This commit is contained in:
parent
305523f47a
commit
e0863b4c0b
|
@ -34,15 +34,22 @@ typedef enum {
|
||||||
TSDB_FILE_TYPE_MAX
|
TSDB_FILE_TYPE_MAX
|
||||||
} TSDB_FILE_TYPE;
|
} TSDB_FILE_TYPE;
|
||||||
|
|
||||||
|
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX)
|
||||||
|
|
||||||
extern const char *tsdbFileSuffix[];
|
extern const char *tsdbFileSuffix[];
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
int fd;
|
||||||
char fname[128];
|
char fname[128];
|
||||||
int64_t size; // total size of the file
|
int64_t size; // total size of the file
|
||||||
int64_t tombSize; // unused file size
|
int64_t tombSize; // unused file size
|
||||||
|
int32_t totalBlocks;
|
||||||
|
int32_t totalSubBlocks;
|
||||||
} SFile;
|
} SFile;
|
||||||
|
|
||||||
|
#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t fileId;
|
int32_t fileId;
|
||||||
SFile files[TSDB_FILE_TYPE_MAX];
|
SFile files[TSDB_FILE_TYPE_MAX];
|
||||||
|
@ -50,14 +57,26 @@ typedef struct {
|
||||||
|
|
||||||
// TSDB file handle
|
// TSDB file handle
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t daysPerFile;
|
int maxFGroups;
|
||||||
int32_t keep;
|
int numOfFGroups;
|
||||||
int32_t minRowPerFBlock;
|
|
||||||
int32_t maxRowsPerFBlock;
|
|
||||||
int32_t maxTables;
|
|
||||||
SFileGroup fGroup[];
|
SFileGroup fGroup[];
|
||||||
} STsdbFileH;
|
} STsdbFileH;
|
||||||
|
|
||||||
|
#define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId
|
||||||
|
#define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId
|
||||||
|
|
||||||
|
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles);
|
||||||
|
void tsdbCloseFileH(STsdbFileH *pFileH);
|
||||||
|
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
|
||||||
|
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t len;
|
||||||
|
int32_t padding; // For padding purpose
|
||||||
|
int64_t offset;
|
||||||
|
} SCompIdx;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
|
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
|
||||||
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
|
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
|
||||||
|
@ -78,13 +97,31 @@ typedef struct {
|
||||||
TSKEY keyLast;
|
TSKEY keyLast;
|
||||||
} SCompBlock;
|
} SCompBlock;
|
||||||
|
|
||||||
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX)
|
typedef struct {
|
||||||
|
int32_t delimiter; // For recovery usage
|
||||||
|
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
|
||||||
|
int64_t uid;
|
||||||
|
int32_t padding; // For padding purpose
|
||||||
|
int32_t numOfBlocks; // TODO: make the struct padding
|
||||||
|
SCompBlock blocks[];
|
||||||
|
} SCompInfo;
|
||||||
|
|
||||||
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
|
// TODO: take pre-calculation into account
|
||||||
int32_t maxRowsPerFBlock, int32_t maxTables);
|
typedef struct {
|
||||||
|
int16_t colId; // Column ID
|
||||||
|
int16_t len; // Column length
|
||||||
|
int32_t type : 8;
|
||||||
|
int32_t offset : 24;
|
||||||
|
} SCompCol;
|
||||||
|
|
||||||
|
// TODO: Take recover into account
|
||||||
|
typedef struct {
|
||||||
|
int32_t delimiter; // For recovery usage
|
||||||
|
int32_t numOfCols; // For recovery usage
|
||||||
|
int64_t uid; // For recovery usage
|
||||||
|
SCompCol cols[];
|
||||||
|
} SCompData;
|
||||||
|
|
||||||
void tsdbCloseFile(STsdbFileH *pFileH);
|
|
||||||
int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables);
|
|
||||||
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
|
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,72 +27,126 @@
|
||||||
#define TSDB_FILE_HEAD_SIZE 512
|
#define TSDB_FILE_HEAD_SIZE 512
|
||||||
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t len;
|
|
||||||
int32_t padding; // For padding purpose
|
|
||||||
int64_t offset;
|
|
||||||
} SCompIdx;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t delimiter; // For recovery usage
|
|
||||||
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
|
|
||||||
int64_t uid;
|
|
||||||
int32_t padding; // For padding purpose
|
|
||||||
int32_t numOfBlocks; // TODO: make the struct padding
|
|
||||||
SCompBlock blocks[];
|
|
||||||
} SCompInfo;
|
|
||||||
|
|
||||||
// TODO: take pre-calculation into account
|
|
||||||
typedef struct {
|
|
||||||
int16_t colId; // Column ID
|
|
||||||
int16_t len; // Column length
|
|
||||||
int32_t type : 8;
|
|
||||||
int32_t offset : 24;
|
|
||||||
} SCompCol;
|
|
||||||
|
|
||||||
// TODO: Take recover into account
|
|
||||||
typedef struct {
|
|
||||||
int32_t delimiter; // For recovery usage
|
|
||||||
int32_t numOfCols; // For recovery usage
|
|
||||||
int64_t uid; // For recovery usage
|
|
||||||
SCompCol cols[];
|
|
||||||
} SCompData;
|
|
||||||
|
|
||||||
const char *tsdbFileSuffix[] = {
|
const char *tsdbFileSuffix[] = {
|
||||||
".head", // TSDB_FILE_TYPE_HEAD
|
".head", // TSDB_FILE_TYPE_HEAD
|
||||||
".data", // TSDB_FILE_TYPE_DATA
|
".data", // TSDB_FILE_TYPE_DATA
|
||||||
".last" // TSDB_FILE_TYPE_LAST
|
".last" // TSDB_FILE_TYPE_LAST
|
||||||
};
|
};
|
||||||
|
|
||||||
static int tsdbWriteFileHead(int fd, SFile *pFile) {
|
static int compFGroupKey(const void *key, const void *fgroup);
|
||||||
|
static int compFGroup(const void *arg1, const void *arg2);
|
||||||
|
static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname);
|
||||||
|
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile);
|
||||||
|
static int tsdbWriteFileHead(SFile *pFile);
|
||||||
|
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
|
||||||
|
|
||||||
|
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
|
||||||
|
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles);
|
||||||
|
if (pFileH == NULL) { // TODO: deal with ERROR here
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFileH->maxFGroups = maxFiles;
|
||||||
|
|
||||||
|
DIR *dir = opendir(dataDir);
|
||||||
|
if (dir == NULL) {
|
||||||
|
free(pFileH);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct dirent *dp;
|
||||||
|
while ((dp = readdir(dir)) != NULL) {
|
||||||
|
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue;
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
return pFileH;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); }
|
||||||
|
|
||||||
|
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
|
||||||
|
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1;
|
||||||
|
|
||||||
|
SFileGroup fGroup;
|
||||||
|
SFileGroup *pFGroup = &fGroup;
|
||||||
|
if (fid < TSDB_MIN_FILE_ID(pFileH) || fid > TSDB_MAX_FILE_ID(pFileH) ||
|
||||||
|
bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey) ==
|
||||||
|
NULL) {
|
||||||
|
pFGroup->fileId = fid;
|
||||||
|
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) {
|
||||||
|
// TODO: deal with the ERROR here, remove those creaed file
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pFileH->fGroup[pFileH->numOfFGroups++] = fGroup;
|
||||||
|
qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
|
||||||
|
SFileGroup *pGroup =
|
||||||
|
bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey);
|
||||||
|
if (pGroup == NULL) return -1;
|
||||||
|
|
||||||
|
// Remove from disk
|
||||||
|
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
remove(pGroup->files[type].fname);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Adjust the memory
|
||||||
|
int filesBehind = pFileH->numOfFGroups - (((char *)pGroup - (char *)(pFileH->fGroup)) / sizeof(SFileGroup) + 1);
|
||||||
|
if (filesBehind > 0) {
|
||||||
|
memmove((void *)pGroup, (void *)((char *)pGroup + sizeof(SFileGroup)), sizeof(SFileGroup) * filesBehind);
|
||||||
|
}
|
||||||
|
pFileH->numOfFGroups--;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int compFGroupKey(const void *key, const void *fgroup) {
|
||||||
|
int fid = *(int *)key;
|
||||||
|
SFileGroup *pFGroup = (SFileGroup *)fgroup;
|
||||||
|
return (fid - pFGroup->fileId);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int compFGroup(const void *arg1, const void *arg2) {
|
||||||
|
return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbWriteFileHead(SFile *pFile) {
|
||||||
char head[TSDB_FILE_HEAD_SIZE] = "\0";
|
char head[TSDB_FILE_HEAD_SIZE] = "\0";
|
||||||
|
|
||||||
pFile->size += TSDB_FILE_HEAD_SIZE;
|
pFile->size += TSDB_FILE_HEAD_SIZE;
|
||||||
|
|
||||||
// TODO: write version and File statistic to the head
|
// TODO: write version and File statistic to the head
|
||||||
lseek(fd, 0, SEEK_SET);
|
lseek(pFile->fd, 0, SEEK_SET);
|
||||||
if (write(fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1;
|
if (write(pFile->fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbWriteHeadFileIdx(int fd, int maxTables, SFile *pFile) {
|
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
|
||||||
int size = sizeof(SCompIdx) * maxTables;
|
int size = sizeof(SCompIdx) * maxTables;
|
||||||
void *buf = calloc(1, size);
|
void *buf = calloc(1, size);
|
||||||
if (buf == NULL) return -1;
|
if (buf == NULL) return -1;
|
||||||
|
|
||||||
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
|
if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
|
||||||
free(buf);
|
free(buf);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (write(fd, buf, size) < 0) {
|
if (write(pFile->fd, buf, size) < 0) {
|
||||||
free(buf);
|
free(buf);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pFile->size += size;
|
pFile->size += size;
|
||||||
|
|
||||||
|
free(buf);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -104,12 +158,27 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the function
|
||||||
* Create a file and set the SFile object
|
if (TSDB_IS_FILE_OPENED(pFile)) return -1;
|
||||||
*/
|
|
||||||
|
pFile->fd = open(pFile->fname, oflag, 0755);
|
||||||
|
if (pFile->fd < 0) return -1;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbCloseFile(SFile *pFile) {
|
||||||
|
if (!TSDB_IS_FILE_OPENED(pFile)) return -1;
|
||||||
|
int ret = close(pFile->fd);
|
||||||
|
pFile->fd = -1;
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) {
|
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) {
|
||||||
memset((void *)pFile, 0, sizeof(SFile));
|
memset((void *)pFile, 0, sizeof(SFile));
|
||||||
pFile->type = type;
|
pFile->type = type;
|
||||||
|
pFile->fd = -1;
|
||||||
|
|
||||||
tsdbGetFileName(dataDir, fileId, type, pFile->fname);
|
tsdbGetFileName(dataDir, fileId, type, pFile->fname);
|
||||||
if (access(pFile->fname, F_OK) == 0) {
|
if (access(pFile->fname, F_OK) == 0) {
|
||||||
|
@ -117,93 +186,28 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int fd = open(pFile->fname, O_WRONLY | O_CREAT, 0755);
|
if (tsdbOpenFileForWrite(pFile, O_WRONLY | O_CREAT) < 0) {
|
||||||
if (fd < 0) return -1;
|
// TODO: deal with the ERROR here
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (type == TSDB_FILE_TYPE_HEAD) {
|
if (type == TSDB_FILE_TYPE_HEAD) {
|
||||||
if (tsdbWriteHeadFileIdx(fd, maxTables, pFile) < 0) {
|
if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) {
|
||||||
close(fd);
|
tsdbCloseFile(pFile);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbWriteFileHead(fd, pFile) < 0) {
|
if (tsdbWriteFileHead(pFile) < 0) {
|
||||||
close(fd);
|
tsdbCloseFile(pFile);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
close(fd);
|
tsdbCloseFile(pFile);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbRemoveFile(SFile *pFile) {
|
|
||||||
if (pFile == NULL) return -1;
|
|
||||||
return remove(pFile->fname);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a file group with fileId and return a SFileGroup object
|
|
||||||
int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables) {
|
|
||||||
if (dataDir == NULL || pFGroup == NULL) return -1;
|
|
||||||
|
|
||||||
memset((void *)pFGroup, 0, sizeof(SFileGroup));
|
|
||||||
|
|
||||||
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
|
||||||
if (tsdbCreateFile(dataDir, fileId, type, maxTables, &(pFGroup->files[type])) < 0) {
|
|
||||||
// TODO: deal with the error here, remove the created files
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pFGroup->fileId = fileId;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize the TSDB file handle
|
|
||||||
*/
|
|
||||||
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
|
|
||||||
int32_t maxRowsPerFBlock, int32_t maxTables) {
|
|
||||||
STsdbFileH *pTsdbFileH =
|
|
||||||
(STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile));
|
|
||||||
if (pTsdbFileH == NULL) return NULL;
|
|
||||||
|
|
||||||
pTsdbFileH->daysPerFile = daysPerFile;
|
|
||||||
pTsdbFileH->keep = keep;
|
|
||||||
pTsdbFileH->minRowPerFBlock = minRowsPerFBlock;
|
|
||||||
pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock;
|
|
||||||
pTsdbFileH->maxTables = maxTables;
|
|
||||||
|
|
||||||
// Open the directory to read information of each file
|
|
||||||
DIR *dir = opendir(dataDir);
|
|
||||||
if (dir == NULL) {
|
|
||||||
free(pTsdbFileH);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
char fname[256];
|
|
||||||
|
|
||||||
struct dirent *dp;
|
|
||||||
while ((dp = readdir(dir)) != NULL) {
|
|
||||||
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue;
|
|
||||||
if (true /* check if the file is the .head file */) {
|
|
||||||
int fileId = 0;
|
|
||||||
int vgId = 0;
|
|
||||||
sscanf(dp->d_name, "v%df%d.head", &vgId, &fileId);
|
|
||||||
// TODO
|
|
||||||
|
|
||||||
// Open head file
|
|
||||||
|
|
||||||
// Open data file
|
|
||||||
|
|
||||||
// Open last file
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return pTsdbFileH;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
|
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
|
||||||
TSKEY *maxKey) {
|
TSKEY *maxKey) {
|
||||||
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
||||||
|
|
|
@ -182,7 +182,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO
|
||||||
char dataDir[128] = "\0";
|
char dataDir[128] = "\0";
|
||||||
tsdbGetDataDirName(pRepo, dataDir);
|
tsdbGetDataDirName(pRepo, dataDir);
|
||||||
pRepo->tsdbFileH =
|
pRepo->tsdbFileH =
|
||||||
tsdbInitFile(dataDir, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->maxTables);
|
tsdbInitFileH(dataDir, pCfg->maxTables);
|
||||||
if (pRepo->tsdbFileH == NULL) {
|
if (pRepo->tsdbFileH == NULL) {
|
||||||
free(pRepo->rootDir);
|
free(pRepo->rootDir);
|
||||||
tsdbFreeCache(pRepo->tsdbCache);
|
tsdbFreeCache(pRepo->tsdbCache);
|
||||||
|
@ -787,7 +787,7 @@ static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) {
|
||||||
|
|
||||||
for (int tid = 0; tid < maxTables; tid++) {
|
for (int tid = 0; tid < maxTables; tid++) {
|
||||||
if (iters[tid] == NULL) continue;
|
if (iters[tid] == NULL) continue;
|
||||||
tSkipListDestroy(iters[tid]);
|
tSkipListDestroyIter(iters[tid]);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(iters);
|
free(iters);
|
||||||
|
@ -836,42 +836,42 @@ static void *tsdbCommitToFile(void *arg) {
|
||||||
SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols);
|
SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols);
|
||||||
void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock);
|
void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock);
|
||||||
|
|
||||||
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
|
// int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
|
||||||
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
|
// int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
|
||||||
|
|
||||||
for (int fid = sfid; fid <= efid; fid++) {
|
// for (int fid = sfid; fid <= efid; fid++) {
|
||||||
TSKEY minKey = 0, maxKey = 0;
|
// TSKEY minKey = 0, maxKey = 0;
|
||||||
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
|
// tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
|
||||||
|
|
||||||
// tsdbOpenFileForWrite(pRepo, fid);
|
// // tsdbOpenFileForWrite(pRepo, fid);
|
||||||
|
|
||||||
for (int tid = 0; tid < pCfg->maxTables; tid++) {
|
// for (int tid = 0; tid < pCfg->maxTables; tid++) {
|
||||||
STable *pTable = pMeta->tables[tid];
|
// STable *pTable = pMeta->tables[tid];
|
||||||
if (pTable == NULL || pTable->imem == NULL) continue;
|
// if (pTable == NULL || pTable->imem == NULL) continue;
|
||||||
if (iters[tid] == NULL) { // create table iterator
|
// if (iters[tid] == NULL) { // create table iterator
|
||||||
iters[tid] = tSkipListCreateIter(pTable->imem->pData);
|
// iters[tid] = tSkipListCreateIter(pTable->imem->pData);
|
||||||
// TODO: deal with the error
|
// // TODO: deal with the error
|
||||||
if (iters[tid] == NULL) break;
|
// if (iters[tid] == NULL) break;
|
||||||
if (!tSkipListIterNext(iters[tid])) {
|
// if (!tSkipListIterNext(iters[tid])) {
|
||||||
// assert(0);
|
// // assert(0);
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
// Init row data part
|
// // Init row data part
|
||||||
cols[0] = (SDataCol *)buf;
|
// cols[0] = (SDataCol *)buf;
|
||||||
for (int col = 1; col < schemaNCols(pTable->schema); col++) {
|
// for (int col = 1; col < schemaNCols(pTable->schema); col++) {
|
||||||
cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock);
|
// cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// Loop the iterator
|
// // Loop the iterator
|
||||||
int rowsRead = 0;
|
// int rowsRead = 0;
|
||||||
while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) >
|
// while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) >
|
||||||
0) {
|
// 0) {
|
||||||
// printf("rowsRead:%d-----------\n", rowsRead);
|
// // printf("rowsRead:%d-----------\n", rowsRead);
|
||||||
int k = 0;
|
// int k = 0;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
|
|
||||||
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue