TD-34
This commit is contained in:
parent
0d8cc7972f
commit
cc543aaf50
|
@ -105,11 +105,12 @@ SDataRow tdDataRowDup(SDataRow row);
|
||||||
|
|
||||||
// ----------------- Data column structure
|
// ----------------- Data column structure
|
||||||
typedef struct SDataCol {
|
typedef struct SDataCol {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
int bytes;
|
int16_t colId;
|
||||||
int len;
|
int bytes;
|
||||||
int offset;
|
int len;
|
||||||
void * pData;
|
int offset;
|
||||||
|
void * pData;
|
||||||
} SDataCol;
|
} SDataCol;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -122,9 +123,9 @@ typedef struct {
|
||||||
SDataCol cols[];
|
SDataCol cols[];
|
||||||
} SDataCols;
|
} SDataCols;
|
||||||
|
|
||||||
#define keyCol(cols) (&((cols)->cols[0])) // Key column
|
#define keyCol(pCols) (&((pCols)->cols[0])) // Key column
|
||||||
#define dataColsKeyFirst(cols) ((int64_t *)(keyCol(cols)->pData))[0]
|
#define dataColsKeyFirst(pCols) ((int64_t *)(keyCol(pCols)->pData))[0]
|
||||||
#define dataColsKeyLast(cols) ((int64_t *)(keyCol(cols)->pData))[(cols)->numOfPoints - 1]
|
#define dataColsKeyLast(pCols) ((int64_t *)(keyCol(pCols)->pData))[(pCols)->numOfPoints - 1]
|
||||||
|
|
||||||
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
|
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
|
||||||
void tdResetDataCols(SDataCols *pCols);
|
void tdResetDataCols(SDataCols *pCols);
|
||||||
|
|
|
@ -324,6 +324,7 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema) {
|
||||||
pCols->cols[i].type = colType(schemaColAt(pSchema, i));
|
pCols->cols[i].type = colType(schemaColAt(pSchema, i));
|
||||||
pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i));
|
pCols->cols[i].bytes = colBytes(schemaColAt(pSchema, i));
|
||||||
pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i));
|
pCols->cols[i].offset = colOffset(schemaColAt(pSchema, i));
|
||||||
|
pCols->cols[i].colId = colColId(schemaColAt(pSchema, i));
|
||||||
}
|
}
|
||||||
|
|
||||||
return pCols;
|
return pCols;
|
||||||
|
|
|
@ -176,6 +176,14 @@ uint32_t ip2uint(const char *const ip_addr);
|
||||||
void taosSetAllocMode(int mode, const char* path, bool autoDump);
|
void taosSetAllocMode(int mode, const char* path, bool autoDump);
|
||||||
void taosDumpMemoryLeak();
|
void taosDumpMemoryLeak();
|
||||||
|
|
||||||
|
#define TD_EQ 0x1
|
||||||
|
#define TD_GT 0x2
|
||||||
|
#define TD_LT 0x4
|
||||||
|
#define TD_GE (TD_EQ | TD_GT)
|
||||||
|
#define TD_LE (TD_EQ | TD_LT)
|
||||||
|
void *taosbsearch(const void *key, const void *base, size_t nmemb, size_t size,
|
||||||
|
int (*compar)(const void *, const void *), int flags);
|
||||||
|
|
||||||
#ifdef TAOS_MEM_CHECK
|
#ifdef TAOS_MEM_CHECK
|
||||||
|
|
||||||
void * taos_malloc(size_t size, const char *file, uint32_t line);
|
void * taos_malloc(size_t size, const char *file, uint32_t line);
|
||||||
|
|
|
@ -617,3 +617,72 @@ char *taosCharsetReplace(char *charsetstr) {
|
||||||
|
|
||||||
return strdup(charsetstr);
|
return strdup(charsetstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define elePtrAt(base, size, idx) (void *)((char *)(base) + (size) * (idx))
|
||||||
|
void * taosbsearch(const void *key, const void *base, size_t nmemb, size_t size, int (*compar)(const void *, const void *), int flags) {
|
||||||
|
// TODO: need to check the correctness of this function
|
||||||
|
int l = 0;
|
||||||
|
int r = nmemb;
|
||||||
|
int idx = 0;
|
||||||
|
int comparison;
|
||||||
|
|
||||||
|
if (flags == TD_EQ) {
|
||||||
|
return bsearch(key, base, nmemb, size, compar);
|
||||||
|
} else if (flags == TD_GE) {
|
||||||
|
if ((*compar)(key, elePtrAt(base, size, 0)) <= 0) return elePtrAt(base, size, 0);
|
||||||
|
if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) > 0) return NULL;
|
||||||
|
|
||||||
|
while (l < r) {
|
||||||
|
idx = (l + r) / 2;
|
||||||
|
comparison = (*compar)(key, elePtrAt(base, size, idx));
|
||||||
|
if (comparison < 0) {
|
||||||
|
r = idx;
|
||||||
|
} else if (comparison > 0) {
|
||||||
|
l = idx + 1;
|
||||||
|
} else {
|
||||||
|
return elePtrAt(base, size, idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*compar)(key, elePtrAt(base, size, idx) < 0)) {
|
||||||
|
return elePtrAt(base, size, idx);
|
||||||
|
} else {
|
||||||
|
if (idx + 1 > nmemb - 1) {
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
return elePtrAt(base, size, idx + 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (flags == TD_LE) {
|
||||||
|
if ((*compar)(key, elePtrAt(base, size, nmemb - 1)) >= 0) return elePtrAt(base, size, nmemb - 1);
|
||||||
|
if ((*compar)(key, elePtrAt(base, size, 0)) < 0) return NULL;
|
||||||
|
|
||||||
|
while (l < r) {
|
||||||
|
idx = (l + r) / 2;
|
||||||
|
comparison = (*compar)(key, elePtrAt(base, size, idx));
|
||||||
|
if (comparison < 0) {
|
||||||
|
r = idx;
|
||||||
|
} else if (comparison > 0) {
|
||||||
|
l = idx + 1;
|
||||||
|
} else {
|
||||||
|
return elePtrAt(base, size, idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((*compar)(key, elePtrAt(base, size, idx)) > 0) {
|
||||||
|
return elePtrAt(base, size, idx);
|
||||||
|
} else {
|
||||||
|
if (idx == 0) {
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
return elePtrAt(base, size, idx - 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
assert(0);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
|
#include "dataformat.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tglobalcfg.h"
|
#include "tglobalcfg.h"
|
||||||
|
|
||||||
|
@ -69,20 +70,26 @@ typedef struct {
|
||||||
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles);
|
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles);
|
||||||
void tsdbCloseFileH(STsdbFileH *pFileH);
|
void tsdbCloseFileH(STsdbFileH *pFileH);
|
||||||
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
|
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
|
||||||
|
int tsdbOpenFile(SFile *pFile, int oflag);
|
||||||
|
SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
|
||||||
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
|
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t padding; // For padding purpose
|
int32_t offset;
|
||||||
int64_t offset;
|
int32_t hasLast : 1;
|
||||||
} SCompIdx;
|
int32_t numOfSuperBlocks : 31;
|
||||||
|
int32_t checksum;
|
||||||
|
TSKEY maxKey;
|
||||||
|
} SCompIdx; /* sizeof(SCompIdx) = 24 */
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
|
* if numOfSubBlocks == 0, then the SCompBlock is a sub-block
|
||||||
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
|
* if numOfSubBlocks >= 1, then the SCompBlock is a super-block
|
||||||
* the data block offset and length
|
* - if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
|
||||||
* if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
|
* the data block offset and length
|
||||||
* binary
|
* - if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
|
||||||
|
* binary
|
||||||
*/
|
*/
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t last : 1; // If the block in data file or last file
|
int64_t last : 1; // If the block in data file or last file
|
||||||
|
@ -101,11 +108,12 @@ typedef struct {
|
||||||
int32_t delimiter; // For recovery usage
|
int32_t delimiter; // For recovery usage
|
||||||
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
|
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int32_t padding; // For padding purpose
|
|
||||||
int32_t numOfBlocks; // TODO: make the struct padding
|
|
||||||
SCompBlock blocks[];
|
SCompBlock blocks[];
|
||||||
} SCompInfo;
|
} SCompInfo;
|
||||||
|
|
||||||
|
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables);
|
||||||
|
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf);
|
||||||
|
|
||||||
// TODO: take pre-calculation into account
|
// TODO: take pre-calculation into account
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int16_t colId; // Column ID
|
int16_t colId; // Column ID
|
||||||
|
@ -122,6 +130,8 @@ typedef struct {
|
||||||
SCompCol cols[];
|
SCompCol cols[];
|
||||||
} SCompData;
|
} SCompData;
|
||||||
|
|
||||||
|
int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols);
|
||||||
|
|
||||||
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
|
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname);
|
||||||
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile);
|
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile);
|
||||||
static int tsdbWriteFileHead(SFile *pFile);
|
static int tsdbWriteFileHead(SFile *pFile);
|
||||||
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
|
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
|
||||||
|
static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
|
||||||
|
|
||||||
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
|
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
|
||||||
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles);
|
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles);
|
||||||
|
@ -70,9 +71,7 @@ int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables)
|
||||||
|
|
||||||
SFileGroup fGroup;
|
SFileGroup fGroup;
|
||||||
SFileGroup *pFGroup = &fGroup;
|
SFileGroup *pFGroup = &fGroup;
|
||||||
if (fid < TSDB_MIN_FILE_ID(pFileH) || fid > TSDB_MAX_FILE_ID(pFileH) ||
|
if (tsdbSearchFGroup(pFileH, fid) == NULL) {
|
||||||
bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey) ==
|
|
||||||
NULL) {
|
|
||||||
pFGroup->fileId = fid;
|
pFGroup->fileId = fid;
|
||||||
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) {
|
if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) {
|
||||||
|
@ -107,6 +106,86 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) {
|
||||||
|
SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
|
||||||
|
if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
|
||||||
|
|
||||||
|
if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1;
|
||||||
|
// TODO: need to check the correctness
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
|
||||||
|
SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
|
||||||
|
|
||||||
|
if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1;
|
||||||
|
|
||||||
|
if (read(pFile->fd, buf, pIdx->len) < 0) return -1;
|
||||||
|
|
||||||
|
// TODO: need to check the correctness
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbWriteBlockToFileImpl(SFile * pFile, // File to write
|
||||||
|
SDataCols * pCols, // Data column buffer
|
||||||
|
int numOfPointsToWrie, // Number of points to write to the file
|
||||||
|
SCompBlock *pBlock // SCompBlock to hold block information to return
|
||||||
|
) {
|
||||||
|
// pBlock->last = 0;
|
||||||
|
// pBlock->offset = lseek(pFile->fd, 0, SEEK_END);
|
||||||
|
// // pBlock->algorithm = ;
|
||||||
|
// pBlock->numOfPoints = pCols->numOfPoints;
|
||||||
|
// // pBlock->sversion = ;
|
||||||
|
// // pBlock->len = ;
|
||||||
|
// pBlock->numOfSubBlocks = 1;
|
||||||
|
// pBlock->keyFirst = dataColsKeyFirst(pCols);
|
||||||
|
// pBlock->keyLast = dataColsKeyLast(pCols);
|
||||||
|
// for (int i = 0; i < pCols->numOfCols; i++) {
|
||||||
|
// // TODO: if all col value is NULL, do not save it
|
||||||
|
// pBlock->numOfCols++;
|
||||||
|
// pCompData->numOfCols++;
|
||||||
|
// SCompCol *pCompCol = pCompData->cols + i;
|
||||||
|
// pCompCol->colId = pCols->cols[i].colId;
|
||||||
|
// pCompCol->type = pCols->cols[i].type;
|
||||||
|
|
||||||
|
// // pCompCol->len = ;
|
||||||
|
// // pCompCol->offset = ;
|
||||||
|
// }
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols) {
|
||||||
|
memset((void *)pBlock, 0, sizeof(SCompBlock));
|
||||||
|
SFile *pFile = NULL;
|
||||||
|
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols);
|
||||||
|
if (pCompData == NULL) return -1;
|
||||||
|
pCompData->delimiter = TSDB_FILE_DELIMITER;
|
||||||
|
// pCompData->uid = ;
|
||||||
|
|
||||||
|
if (isMerge) {
|
||||||
|
TSKEY keyFirst = dataColsKeyFirst(pCols);
|
||||||
|
// 1. Binary search the block the data can merged into
|
||||||
|
|
||||||
|
if (1/* the data should only merged into last file */) {
|
||||||
|
} else {
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Write directly to the file without merge
|
||||||
|
if (1/*pCols->numOfPoints < pCfg->minRowsPerFileBlock*/) {
|
||||||
|
// TODO: write the data to the last file
|
||||||
|
} else {
|
||||||
|
// TODO: wirte the data to the data file
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: need to update pIdx
|
||||||
|
|
||||||
|
if (pCompData) free(pCompData);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int compFGroupKey(const void *key, const void *fgroup) {
|
static int compFGroupKey(const void *key, const void *fgroup) {
|
||||||
int fid = *(int *)key;
|
int fid = *(int *)key;
|
||||||
SFileGroup *pFGroup = (SFileGroup *)fgroup;
|
SFileGroup *pFGroup = (SFileGroup *)fgroup;
|
||||||
|
@ -158,7 +237,7 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the function
|
int tsdbOpenFile(SFile *pFile, int oflag) { // TODO: change the function
|
||||||
if (TSDB_IS_FILE_OPENED(pFile)) return -1;
|
if (TSDB_IS_FILE_OPENED(pFile)) return -1;
|
||||||
|
|
||||||
pFile->fd = open(pFile->fname, oflag, 0755);
|
pFile->fd = open(pFile->fname, oflag, 0755);
|
||||||
|
@ -167,6 +246,16 @@ static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the f
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SFileGroup * tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid) {
|
||||||
|
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid);
|
||||||
|
if (pGroup == NULL) return NULL;
|
||||||
|
|
||||||
|
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
|
tsdbOpenFile(&(pGroup->files[type]), O_RDWR);
|
||||||
|
}
|
||||||
|
return pGroup;
|
||||||
|
}
|
||||||
|
|
||||||
static int tsdbCloseFile(SFile *pFile) {
|
static int tsdbCloseFile(SFile *pFile) {
|
||||||
if (!TSDB_IS_FILE_OPENED(pFile)) return -1;
|
if (!TSDB_IS_FILE_OPENED(pFile)) return -1;
|
||||||
int ret = close(pFile->fd);
|
int ret = close(pFile->fd);
|
||||||
|
@ -186,7 +275,7 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbOpenFileForWrite(pFile, O_WRONLY | O_CREAT) < 0) {
|
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) {
|
||||||
// TODO: deal with the ERROR here
|
// TODO: deal with the ERROR here
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -212,4 +301,12 @@ void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t file
|
||||||
TSKEY *maxKey) {
|
TSKEY *maxKey) {
|
||||||
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
||||||
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
|
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) {
|
||||||
|
if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId)
|
||||||
|
return NULL;
|
||||||
|
void *ptr = bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey);
|
||||||
|
if (ptr == NULL) return NULL;
|
||||||
|
return (SFileGroup *)ptr;
|
||||||
}
|
}
|
|
@ -764,6 +764,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
|
||||||
|
|
||||||
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
|
||||||
int numOfRows = 0;
|
int numOfRows = 0;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
SSkipListNode *node = tSkipListIterGet(pIter);
|
SSkipListNode *node = tSkipListIterGet(pIter);
|
||||||
if (node == NULL) break;
|
if (node == NULL) break;
|
||||||
|
@ -776,6 +777,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
if (numOfRows > maxRowsToRead) break;
|
if (numOfRows > maxRowsToRead) break;
|
||||||
} while (tSkipListIterNext(pIter));
|
} while (tSkipListIterNext(pIter));
|
||||||
|
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -865,24 +867,122 @@ static void *tsdbCommitData(void *arg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) {
|
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) {
|
||||||
|
int flag = 0;
|
||||||
|
|
||||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
STsdbCfg * pCfg = &pRepo->config;
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
|
SFile tFile, lFile;
|
||||||
|
SFileGroup *pGroup = NULL;
|
||||||
|
SCompIdx * pIndices = NULL;
|
||||||
|
SCompInfo * pCompInfo = NULL;
|
||||||
|
size_t compInfoSize = 0;
|
||||||
|
SCompBlock compBlock;
|
||||||
|
SCompBlock *pBlock = &compBlock;
|
||||||
|
|
||||||
TSKEY minKey = 0, maxKey = 0;
|
TSKEY minKey = 0, maxKey = 0;
|
||||||
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
|
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
|
||||||
|
|
||||||
for (int tid = 0; tid < pCfg->maxTables; tid++) {
|
for (int tid = 0; tid < pCfg->maxTables; tid++) {
|
||||||
STable *pTable = pMeta->tables[tid];
|
STable * pTable = pMeta->tables[tid];
|
||||||
SSkipListIterator *pIter = iters[tid];
|
SSkipListIterator *pIter = iters[tid];
|
||||||
|
int isLoadCompBlocks = 0;
|
||||||
|
|
||||||
if (pIter == NULL) continue;
|
if (pIter == NULL) continue;
|
||||||
tdInitDataCols(pCols, pTable->schema);
|
tdInitDataCols(pCols, pTable->schema);
|
||||||
|
|
||||||
while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) {
|
int numOfWrites = 0;
|
||||||
// TODO
|
// while (tsdbReadRowsFromCache(pIter, maxKey, pCfg->maxRowsPerFileBlock, pCols)) {
|
||||||
int k = 0;
|
// break;
|
||||||
}
|
// if (!flag) {
|
||||||
|
// // There are data to commit to this file, we need to create/open it for read/write.
|
||||||
|
// // At the meantime, we set the flag to prevent further create/open operations
|
||||||
|
// if (tsdbCreateFGroup(pFileH, pRepo->rootDir, fid, pCfg->maxTables) < 0) {
|
||||||
|
// // TODO: deal with the ERROR here
|
||||||
|
// }
|
||||||
|
// // Open files for commit
|
||||||
|
// pGroup = tsdbOpenFilesForCommit(pFileH, fid);
|
||||||
|
// if (pGroup == NULL) {
|
||||||
|
// // TODO: deal with the ERROR here
|
||||||
|
// }
|
||||||
|
// // TODO: open .h file and if neccessary, open .l file
|
||||||
|
// {}
|
||||||
|
// pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
|
||||||
|
// if (pIndices == NULL) {
|
||||||
|
// // TODO: deal with the ERROR
|
||||||
|
// }
|
||||||
|
// // load the SCompIdx part
|
||||||
|
// if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {
|
||||||
|
// // TODO: deal with the ERROR here
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // TODO: sendfile those not need changed table content
|
||||||
|
// for (int ttid = 0; ttid < tid; ttid++) {
|
||||||
|
// // SCompIdx *pIdx = &pIndices[ttid];
|
||||||
|
// // if (pIdx->len > 0) {
|
||||||
|
// // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, pIdx->offset, 0, SEEK_CUR);
|
||||||
|
// // sendfile(fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len);
|
||||||
|
// // }
|
||||||
|
// }
|
||||||
|
// flag = 1;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// SCompIdx *pIdx = &pIndices[tid];
|
||||||
|
|
||||||
|
// /* The first time to write to the table, need to decide
|
||||||
|
// * if it is neccessary to load the SComplock part. If it
|
||||||
|
// * is needed, just load it, or, just use sendfile and
|
||||||
|
// * append it.
|
||||||
|
// */
|
||||||
|
// if (numOfWrites == 0 && pIdx->offset > 0) {
|
||||||
|
// if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) {
|
||||||
|
// pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len);
|
||||||
|
// if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {
|
||||||
|
// // TODO: deal with the ERROR here
|
||||||
|
// }
|
||||||
|
// if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1;
|
||||||
|
// } else {
|
||||||
|
// // TODO: sendfile the prefix part
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) {
|
||||||
|
// // // TODO: deal with the ERROR here
|
||||||
|
// // }
|
||||||
|
|
||||||
|
// // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock);
|
||||||
|
|
||||||
|
|
||||||
|
// // if (1 /* the SCompBlock part is not loaded*/) {
|
||||||
|
// // // Append to .data file generate a SCompBlock and record it
|
||||||
|
// // } else {
|
||||||
|
// // }
|
||||||
|
|
||||||
|
// // // TODO: need to reset the pCols
|
||||||
|
|
||||||
|
// numOfWrites++;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if (pCols->numOfPoints > 0) {
|
||||||
|
// // TODO: still has data to commit, commit it
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if (1/* SCompBlock part is loaded, write it to .head file*/) {
|
||||||
|
// // TODO
|
||||||
|
// } else {
|
||||||
|
// // TODO: use sendfile send the old part and append the newly added part
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Write the SCompIdx part
|
||||||
|
|
||||||
|
// Close all files and return
|
||||||
|
if (flag) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pIndices) free(pIndices);
|
||||||
|
if (pCompInfo) free(pCompInfo);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue