more
This commit is contained in:
parent
8d8b7b1c00
commit
a511aed8d5
|
@ -58,6 +58,7 @@ int metaCommit(SMeta *pMeta);
|
||||||
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
|
||||||
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
|
||||||
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||||
|
STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
|
||||||
|
|
||||||
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
||||||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_TSDB_H_
|
#define _TD_TSDB_H_
|
||||||
|
|
||||||
#include "mallocator.h"
|
#include "mallocator.h"
|
||||||
|
#include "meta.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -58,7 +59,7 @@ typedef struct STsdbCfg {
|
||||||
} STsdbCfg;
|
} STsdbCfg;
|
||||||
|
|
||||||
// STsdb
|
// STsdb
|
||||||
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
|
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta);
|
||||||
void tsdbClose(STsdb *);
|
void tsdbClose(STsdb *);
|
||||||
void tsdbRemove(const char *path);
|
void tsdbRemove(const char *path);
|
||||||
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp);
|
int tsdbInsertData(STsdb *pTsdb, SSubmitMsg *pMsg, SSubmitRsp *pRsp);
|
||||||
|
|
|
@ -106,7 +106,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
|
||||||
|
|
||||||
// Open tsdb
|
// Open tsdb
|
||||||
sprintf(dir, "%s/tsdb", pVnode->path);
|
sprintf(dir, "%s/tsdb", pVnode->path);
|
||||||
pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode));
|
pVnode->pTsdb = tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta);
|
||||||
if (pVnode->pTsdb == NULL) {
|
if (pVnode->pTsdb == NULL) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -590,3 +590,26 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
||||||
|
STSchemaBuilder sb;
|
||||||
|
STSchema * pTSchema = NULL;
|
||||||
|
SSchema * pSchema;
|
||||||
|
SSchemaWrapper *pSW;
|
||||||
|
|
||||||
|
pSW = metaGetTableSchema(pMeta, uid, sver, true);
|
||||||
|
if (pSW == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Rebuild a schema
|
||||||
|
tdInitTSchemaBuilder(&sb, 0);
|
||||||
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
|
pSchema = pSW->pSchema + i;
|
||||||
|
tdAddColToSchema(&sb, pSchema->type, pSchema->colId, pSchema->bytes);
|
||||||
|
}
|
||||||
|
pTSchema = tdGetSchemaFromBuilder(&sb);
|
||||||
|
tdDestroyTSchemaBuilder(&sb);
|
||||||
|
|
||||||
|
return pTSchema;
|
||||||
|
}
|
|
@ -29,4 +29,5 @@ target_link_libraries(
|
||||||
PUBLIC common
|
PUBLIC common
|
||||||
PUBLIC tkv
|
PUBLIC tkv
|
||||||
PUBLIC tfs
|
PUBLIC tfs
|
||||||
|
PUBLIC meta
|
||||||
)
|
)
|
|
@ -17,6 +17,7 @@
|
||||||
#define _TD_TSDB_DEF_H_
|
#define _TD_TSDB_DEF_H_
|
||||||
|
|
||||||
#include "mallocator.h"
|
#include "mallocator.h"
|
||||||
|
#include "meta.h"
|
||||||
#include "tcompression.h"
|
#include "tcompression.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
|
@ -48,6 +49,7 @@ struct STsdb {
|
||||||
SRtn rtn;
|
SRtn rtn;
|
||||||
SMemAllocatorFactory *pmaf;
|
SMemAllocatorFactory *pmaf;
|
||||||
STsdbFS * fs;
|
STsdbFS * fs;
|
||||||
|
SMeta * pMeta;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define REPO_ID(r) ((r)->vgId)
|
#define REPO_ID(r) ((r)->vgId)
|
||||||
|
|
|
@ -70,7 +70,7 @@ typedef struct {
|
||||||
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
|
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
|
||||||
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
|
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
|
||||||
|
|
||||||
STsdbFS *tsdbNewFS(STsdbCfg *pCfg);
|
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg);
|
||||||
void * tsdbFreeFS(STsdbFS *pfs);
|
void * tsdbFreeFS(STsdbFS *pfs);
|
||||||
int tsdbOpenFS(STsdb *pRepo);
|
int tsdbOpenFS(STsdb *pRepo);
|
||||||
void tsdbCloseFS(STsdb *pRepo);
|
void tsdbCloseFS(STsdb *pRepo);
|
||||||
|
|
|
@ -420,6 +420,13 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) {
|
||||||
pCommitIter = pCommith->iters + i;
|
pCommitIter = pCommith->iters + i;
|
||||||
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
|
pCommitIter->pIter = tSkipListCreateIter(pTbData->pData);
|
||||||
tSkipListIterNext(pCommitIter->pIter);
|
tSkipListIterNext(pCommitIter->pIter);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
// TODO
|
||||||
|
pCommitIter->pTable = (STable *)malloc(sizeof(STable));
|
||||||
|
pCommitIter->pTable->uid = pTbData->uid;
|
||||||
|
pCommitIter->pTable->pSchema = metaGetTableSchema();
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -191,7 +191,7 @@ static int tsdbAddDFileSetToStatus(SFSStatus *pStatus, const SDFileSet *pSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ================== STsdbFS
|
// ================== STsdbFS
|
||||||
STsdbFS *tsdbNewFS(STsdbCfg *pCfg) {
|
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg) {
|
||||||
int keep = pCfg->keep;
|
int keep = pCfg->keep;
|
||||||
int days = pCfg->daysPerFile;
|
int days = pCfg->daysPerFile;
|
||||||
int maxFSet = TSDB_MAX_FSETS(keep, days);
|
int maxFSet = TSDB_MAX_FSETS(keep, days);
|
||||||
|
|
|
@ -15,12 +15,13 @@
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "tsdbDef.h"
|
||||||
|
|
||||||
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
|
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
|
||||||
|
SMeta *pMeta);
|
||||||
static void tsdbFree(STsdb *pTsdb);
|
static void tsdbFree(STsdb *pTsdb);
|
||||||
static int tsdbOpenImpl(STsdb *pTsdb);
|
static int tsdbOpenImpl(STsdb *pTsdb);
|
||||||
static void tsdbCloseImpl(STsdb *pTsdb);
|
static void tsdbCloseImpl(STsdb *pTsdb);
|
||||||
|
|
||||||
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) {
|
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta) {
|
||||||
STsdb *pTsdb = NULL;
|
STsdb *pTsdb = NULL;
|
||||||
|
|
||||||
// Set default TSDB Options
|
// Set default TSDB Options
|
||||||
|
@ -35,7 +36,7 @@ STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAl
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the handle
|
// Create the handle
|
||||||
pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF);
|
pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta);
|
||||||
if (pTsdb == NULL) {
|
if (pTsdb == NULL) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -62,7 +63,8 @@ void tsdbClose(STsdb *pTsdb) {
|
||||||
void tsdbRemove(const char *path) { taosRemoveDir(path); }
|
void tsdbRemove(const char *path) { taosRemoveDir(path); }
|
||||||
|
|
||||||
/* ------------------------ STATIC METHODS ------------------------ */
|
/* ------------------------ STATIC METHODS ------------------------ */
|
||||||
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) {
|
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
|
||||||
|
SMeta *pMeta) {
|
||||||
STsdb *pTsdb = NULL;
|
STsdb *pTsdb = NULL;
|
||||||
|
|
||||||
pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
|
pTsdb = (STsdb *)calloc(1, sizeof(STsdb));
|
||||||
|
@ -75,6 +77,7 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
|
||||||
pTsdb->vgId = vgId;
|
pTsdb->vgId = vgId;
|
||||||
tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
|
tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
|
||||||
pTsdb->pmaf = pMAF;
|
pTsdb->pmaf = pMAF;
|
||||||
|
pTsdb->pMeta = pMeta;
|
||||||
|
|
||||||
pTsdb->fs = tsdbNewFS(pTsdbCfg);
|
pTsdb->fs = tsdbNewFS(pTsdbCfg);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue