TD-27
This commit is contained in:
parent
47f6402cbe
commit
c4b19166b3
|
@ -31,7 +31,7 @@ typedef void (*afterFunc)(void *);
|
|||
typedef struct {
|
||||
int fd; // File descriptor
|
||||
int nDel; // number of deletions
|
||||
int tombSize; // Number of records
|
||||
int tombSize; // deleted size
|
||||
int64_t size; // Total file size
|
||||
void * map; // Map from uid ==> position
|
||||
iterFunc iFunc;
|
||||
|
|
|
@ -241,9 +241,22 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
|
|||
}
|
||||
table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
|
||||
|
||||
// Register to meta
|
||||
if (newSuper) tsdbAddTableToMeta(pMeta, super, true);
|
||||
tsdbAddTableToMeta(pMeta, table, true);
|
||||
|
||||
// Write to meta file
|
||||
int bufLen = 0;
|
||||
if (newSuper) {
|
||||
void *buf = tsdbEncodeTable(super, &bufLen);
|
||||
tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen);
|
||||
tsdbFreeEncode(buf);
|
||||
}
|
||||
|
||||
void *buf = tsdbEncodeTable(table, &bufLen);
|
||||
tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen);
|
||||
tsdbFreeEncode(buf);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,9 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, af
|
|||
mfh->iFunc = iFunc;
|
||||
mfh->aFunc = aFunc;
|
||||
mfh->appH = appH;
|
||||
mfh->nDel = 0;
|
||||
mfh->tombSize = 0;
|
||||
mfh->size = 0;
|
||||
|
||||
// OPEN MAP
|
||||
mfh->map =
|
||||
|
@ -62,6 +65,7 @@ SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, af
|
|||
free(mfh);
|
||||
return NULL;
|
||||
}
|
||||
mfh->size += TSDB_META_FILE_HEADER_SIZE;
|
||||
} else { // file exists, recover from file
|
||||
if (tsdbRestoreFromMetaFile(fname, mfh) < 0) {
|
||||
taosHashCleanup(mfh->map);
|
||||
|
@ -80,7 +84,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
|
|||
|
||||
SRecordInfo info;
|
||||
info.offset = mfh->size;
|
||||
info.size = contLen; // TODO: Here is not correct
|
||||
info.size = contLen;
|
||||
info.uid = uid;
|
||||
|
||||
mfh->size += (contLen + sizeof(SRecordInfo));
|
||||
|
@ -90,7 +94,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
|
|||
}
|
||||
|
||||
// TODO: make below a function to implement
|
||||
if (lseek(mfh->fd, info.offset, SEEK_CUR) < 0) {
|
||||
if (lseek(mfh->fd, info.offset, SEEK_SET) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -196,6 +200,11 @@ static int32_t tsdbWriteMetaHeader(int fd) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbReadMetaHeader(int fd) {
|
||||
lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbCreateMetaFile(char *fname) {
|
||||
int fd = open(fname, O_RDWR | O_CREAT, 0755);
|
||||
if (fd < 0) return -1;
|
||||
|
@ -229,6 +238,8 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
mfh->size += TSDB_META_FILE_HEADER_SIZE;
|
||||
|
||||
mfh->fd = fd;
|
||||
|
||||
void *buf = NULL;
|
||||
|
@ -241,20 +252,30 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
|
|||
mfh->size += (info.size + sizeof(SRecordInfo));
|
||||
mfh->tombSize += (info.size + sizeof(SRecordInfo));
|
||||
lseek(mfh->fd, info.size, SEEK_CUR);
|
||||
mfh->size = mfh->size + sizeof(SRecordInfo) + info.size;
|
||||
mfh->tombSize = mfh->tombSize + sizeof(SRecordInfo) + info.size;
|
||||
} else {
|
||||
if (taosHashPut(mfh->map, (char *)(&info.uid), sizeof(info.uid), (void *)(&info), sizeof(SRecordInfo)) < 0) {
|
||||
if (buf) free(buf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf = realloc(buf, info.size);
|
||||
if (buf == NULL) return -1;
|
||||
|
||||
if (read(mfh->fd, buf, info.size) < 0) return -1;
|
||||
if (read(mfh->fd, buf, info.size) < 0) {
|
||||
if (buf) free(buf);
|
||||
return -1;
|
||||
}
|
||||
(*mfh->iFunc)(mfh->appH, buf, info.size);
|
||||
|
||||
mfh->size = mfh->size + sizeof(SRecordInfo) + info.size;
|
||||
}
|
||||
|
||||
}
|
||||
(*mfh->aFunc)(mfh->appH);
|
||||
|
||||
if (buf) free(buf);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue