TD-27
This commit is contained in:
parent
31705da49b
commit
b87aef0959
|
@ -25,15 +25,21 @@ extern "C" {
|
|||
#define TSDB_META_FILE_NAME "META"
|
||||
#define TSDB_META_HASH_FRACTION 1.1
|
||||
|
||||
typedef int (*iterFunc)(void *, void *cont, int contLen);
|
||||
typedef void (*afterFunc)(void *);
|
||||
|
||||
typedef struct {
|
||||
int fd; // File descriptor
|
||||
int nDel; // number of deletions
|
||||
int nRecord; // Number of records
|
||||
int tombSize; // Number of records
|
||||
int64_t size; // Total file size
|
||||
void * map; // Map from uid ==> position
|
||||
iterFunc iFunc;
|
||||
afterFunc aFunc;
|
||||
void * appH;
|
||||
} SMetaFile;
|
||||
|
||||
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables);
|
||||
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH);
|
||||
int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
|
||||
int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid);
|
||||
int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t contLen);
|
||||
|
|
|
@ -77,7 +77,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
|
|||
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo);
|
||||
static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
|
||||
static int tsdbOpenMetaFile(char *tsdbDir);
|
||||
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg);
|
||||
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
|
||||
|
||||
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
|
||||
|
@ -219,25 +218,23 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int fd = tsdbOpenMetaFile(tsdbDir);
|
||||
if (fd < 0) {
|
||||
free(pRepo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsdbRecoverRepo(fd, &(pRepo->config)) < 0) {
|
||||
close(fd);
|
||||
free(pRepo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pRepo->tsdbCache = tsdbInitCache(5);
|
||||
if (pRepo->tsdbCache == NULL) {
|
||||
// TODO: deal with error
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pRepo->rootDir = strdup(tsdbDir);
|
||||
|
||||
pRepo->tsdbMeta = tsdbInitMeta(tsdbDir, pRepo->config.maxTables);
|
||||
if (pRepo == NULL) {
|
||||
free(pRepo->rootDir);
|
||||
free(pRepo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize);
|
||||
if (pRepo->tsdbCache == NULL) {
|
||||
tsdbFreeMeta(pRepo->tsdbMeta);
|
||||
free(pRepo->rootDir);
|
||||
free(pRepo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pRepo->state = TSDB_REPO_STATE_ACTIVE;
|
||||
|
||||
return (tsdb_repo_t *)pRepo;
|
||||
|
@ -623,12 +620,6 @@ static int tsdbOpenMetaFile(char *tsdbDir) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbRecoverRepo(int fd, STsdbCfg *pCfg) {
|
||||
// TODO: read tsdb configuration from file
|
||||
// recover tsdb meta
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||
// TODO
|
||||
int32_t level = 0;
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
|
||||
static int tsdbFreeTable(STable *pTable);
|
||||
static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
|
||||
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable);
|
||||
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
|
||||
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
|
||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
||||
|
@ -95,6 +95,36 @@ void *tsdbFreeEncode(void *cont) {
|
|||
if (cont != NULL) free(cont);
|
||||
}
|
||||
|
||||
int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
||||
STsdbMeta *pMeta = (STsdbMeta *)pHandle;
|
||||
|
||||
STable *pTable = tsdbDecodeTable(cont, contLen);
|
||||
if (pTable == NULL) return -1;
|
||||
|
||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||
pTable->content.pIndex =
|
||||
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey);
|
||||
} else {
|
||||
pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
|
||||
}
|
||||
|
||||
tsdbAddTableToMeta(pMeta, pTable, false);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbOrgMeta(void *pHandle) {
|
||||
STsdbMeta *pMeta = (STsdbMeta *)pHandle;
|
||||
|
||||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) {
|
||||
tsdbAddTableIntoIndex(pMeta, pTable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize the meta handle
|
||||
* ASSUMPTIONS: VALID PARAMETER
|
||||
|
@ -119,7 +149,7 @@ STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pMeta->mfh = tsdbInitMetaFile(rootDir, maxTables);
|
||||
pMeta->mfh = tsdbInitMetaFile(rootDir, maxTables, tsdbRestoreTable, tsdbOrgMeta, pMeta);
|
||||
if (pMeta->mfh == NULL) {
|
||||
taosHashCleanup(pMeta->map);
|
||||
free(pMeta->tables);
|
||||
|
@ -211,8 +241,8 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
|
|||
}
|
||||
table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
|
||||
|
||||
if (newSuper) tsdbAddTableToMeta(pMeta, super);
|
||||
tsdbAddTableToMeta(pMeta, table);
|
||||
if (newSuper) tsdbAddTableToMeta(pMeta, super, true);
|
||||
tsdbAddTableToMeta(pMeta, table, true);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -293,7 +323,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) {
|
|||
return *(STable **)ptr;
|
||||
}
|
||||
|
||||
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable) {
|
||||
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
|
||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||
// add super table to the linked list
|
||||
if (pMeta->superList == NULL) {
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
typedef struct {
|
||||
int32_t offset;
|
||||
int32_t size;
|
||||
int64_t uid;
|
||||
} SRecordInfo;
|
||||
|
||||
static int32_t tsdbGetMetaFileName(char *rootDir, char *fname);
|
||||
|
@ -34,14 +35,17 @@ static int32_t tsdbWriteMetaHeader(int fd);
|
|||
static int tsdbCreateMetaFile(char *fname);
|
||||
static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh);
|
||||
|
||||
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables) {
|
||||
// TODO
|
||||
SMetaFile *tsdbInitMetaFile(char *rootDir, int32_t maxTables, iterFunc iFunc, afterFunc aFunc, void *appH) {
|
||||
char fname[128] = "\0";
|
||||
if (tsdbGetMetaFileName(rootDir, fname) < 0) return NULL;
|
||||
|
||||
SMetaFile *mfh = (SMetaFile *)calloc(1, sizeof(SMetaFile));
|
||||
if (mfh == NULL) return NULL;
|
||||
|
||||
mfh->iFunc = iFunc;
|
||||
mfh->aFunc = aFunc;
|
||||
mfh->appH = appH;
|
||||
|
||||
// OPEN MAP
|
||||
mfh->map =
|
||||
taosHashInit(maxTables * TSDB_META_HASH_FRACTION, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||
|
@ -77,6 +81,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
|
|||
SRecordInfo info;
|
||||
info.offset = mfh->size;
|
||||
info.size = contLen; // TODO: Here is not correct
|
||||
info.uid = uid;
|
||||
|
||||
mfh->size += (contLen + sizeof(SRecordInfo));
|
||||
|
||||
|
@ -99,7 +104,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
|
|||
|
||||
fsync(mfh->fd);
|
||||
|
||||
mfh->nRecord++;
|
||||
mfh->tombSize++;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -225,7 +230,31 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
|
|||
}
|
||||
|
||||
mfh->fd = fd;
|
||||
// TODO: iterate to read the meta file to restore the meta data
|
||||
|
||||
void *buf = NULL;
|
||||
int buf_size = 0;
|
||||
|
||||
SRecordInfo info;
|
||||
while (1) {
|
||||
if (read(mfh->fd, (void *)(&info), sizeof(SRecordInfo)) == 0) break;
|
||||
if (info.offset < 0) {
|
||||
mfh->size += (info.size + sizeof(SRecordInfo));
|
||||
mfh->tombSize += (info.size + sizeof(SRecordInfo));
|
||||
lseek(mfh->fd, info.size, SEEK_CUR);
|
||||
} else {
|
||||
if (taosHashPut(mfh->map, (char *)(&info.uid), sizeof(info.uid), (void *)(&info), sizeof(SRecordInfo)) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf = realloc(buf, info.size);
|
||||
if (buf == NULL) return -1;
|
||||
|
||||
if (read(mfh->fd, buf, info.size) < 0) return -1;
|
||||
(*mfh->iFunc)(mfh->appH, buf, info.size);
|
||||
}
|
||||
|
||||
}
|
||||
(*mfh->aFunc)(mfh->appH);
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue