refactor more
This commit is contained in:
parent
e871eb8cf6
commit
a614570ccf
|
@ -12,7 +12,7 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#if !defined(_TD_TSDB_H_)
|
||||
#ifndef _TD_TSDB_H_
|
||||
#define _TD_TSDB_H_
|
||||
|
||||
#include <pthread.h>
|
||||
|
@ -30,8 +30,15 @@ extern "C" {
|
|||
#define TSDB_VERSION_MAJOR 1
|
||||
#define TSDB_VERSION_MINOR 0
|
||||
|
||||
#define TSDB_INVALID_SUPER_TABLE_ID -1
|
||||
|
||||
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
|
||||
enum { TSDB_PRECISION_MILLI, TSDB_PRECISION_MICRO, TSDB_PRECISION_NANO };
|
||||
typedef enum {
|
||||
TSDB_SUPER_TABLE, // super table
|
||||
TSDB_NTABLE, // table not created from super table
|
||||
TSDB_STABLE // table created from super table
|
||||
} TSDB_TABLE_TYPE;
|
||||
|
||||
typedef struct {
|
||||
int8_t precision;
|
||||
|
@ -63,6 +70,28 @@ typedef struct {
|
|||
int32_t tid; // the table ID in the repository.
|
||||
} STableId;
|
||||
|
||||
// --------- TSDB TABLE configuration
|
||||
typedef struct {
|
||||
TSDB_TABLE_TYPE type;
|
||||
STableId tableId;
|
||||
int64_t superUid;
|
||||
STSchema * schema;
|
||||
STSchema * tagSchema;
|
||||
SDataRow tagValues;
|
||||
} STableCfg;
|
||||
|
||||
int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32_t tid);
|
||||
int tsdbTableSetSuperUid(STableCfg *config, int64_t uid);
|
||||
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
|
||||
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
|
||||
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup);
|
||||
void tsdbClearTableCfg(STableCfg *config);
|
||||
|
||||
int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg);
|
||||
int tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
|
||||
int tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
|
||||
|
||||
|
||||
// Submit message for this TSDB
|
||||
typedef struct {
|
||||
int32_t numOfTables;
|
||||
|
@ -88,25 +117,8 @@ typedef struct STsdbRepoInfo {
|
|||
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
|
||||
// TODO: Other informations to add
|
||||
} STsdbRepoInfo;
|
||||
|
||||
STsdbRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo);
|
||||
|
||||
// the meter configuration
|
||||
typedef struct {
|
||||
STableId tableId;
|
||||
|
||||
int64_t stableUid;
|
||||
int64_t createdTime;
|
||||
|
||||
int32_t numOfCols; // number of columns. For table form super table, not includes the tag schema
|
||||
STSchema *schema; // If numOfCols == schema_->numOfCols, it is a normal table, stableName = NULL
|
||||
// If numOfCols < schema->numOfCols, it is a table created from super table
|
||||
// assert(numOfCols <= schema->numOfCols);
|
||||
|
||||
SDataRow tagValues; // NULL if it is normal table
|
||||
// otherwise, it contains the tag values.
|
||||
} STableCfg;
|
||||
|
||||
// the meter information report structure
|
||||
typedef struct {
|
||||
STableCfg tableCfg;
|
||||
|
@ -114,6 +126,7 @@ typedef struct {
|
|||
int64_t tableTotalDataSize; // In bytes
|
||||
int64_t tableTotalDiskSize; // In bytes
|
||||
} STableInfo;
|
||||
STableInfo * tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid);
|
||||
|
||||
// -- For table manipulation
|
||||
|
||||
|
@ -124,8 +137,6 @@ typedef struct {
|
|||
*
|
||||
* @return 0 for success, -1 for failure and the error number is set
|
||||
*/
|
||||
int32_t tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg);
|
||||
int32_t tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
|
||||
|
||||
/**
|
||||
* Drop a table in a repository and free all the resources it takes
|
||||
|
@ -135,7 +146,6 @@ int32_t tsdbAlterTable(tsdb_repo_t *repo, STableCfg *pCfg);
|
|||
*
|
||||
* @return 0 for success, -1 for failure and the error number is set
|
||||
*/
|
||||
int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
|
||||
|
||||
/**
|
||||
* Get the information of a table in the repository
|
||||
|
@ -145,7 +155,6 @@ int32_t tsdbDropTable(tsdb_repo_t *pRepo, STableId tableId);
|
|||
*
|
||||
* @return a table information handle for success, NULL for failure and the error number is set
|
||||
*/
|
||||
STableInfo *tsdbGetTableInfo(tsdb_repo_t *pRepo, STableId tid);
|
||||
|
||||
// -- FOR INSERT DATA
|
||||
/**
|
||||
|
|
|
@ -31,53 +31,23 @@ extern "C" {
|
|||
// Initially, there are 4 tables
|
||||
#define TSDB_INIT_NUMBER_OF_SUPER_TABLE 4
|
||||
|
||||
typedef enum {
|
||||
TSDB_SUPER_TABLE, // super table
|
||||
TSDB_NTABLE, // table not created from super table
|
||||
TSDB_STABLE // table created from super table
|
||||
} TSDB_TABLE_TYPE;
|
||||
|
||||
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
|
||||
|
||||
// ---------- TSDB TABLE DEFINITION
|
||||
typedef struct STable {
|
||||
STableId tableId;
|
||||
TSDB_TABLE_TYPE type;
|
||||
|
||||
int64_t createdTime;
|
||||
|
||||
// super table UID -1 for normal table
|
||||
int32_t stableUid;
|
||||
|
||||
int32_t numOfCols;
|
||||
|
||||
// Schema for this table
|
||||
// For TSDB_SUPER_TABLE, it is the schema including tags
|
||||
// For TSDB_NTABLE, it is only the schema, not including tags
|
||||
// For TSDB_STABLE, it is NULL
|
||||
STSchema *pSchema;
|
||||
|
||||
// Tag value for this table
|
||||
// For TSDB_SUPER_TABLE and TSDB_NTABLE, it is NULL
|
||||
// For TSDB_STABLE, it is the tag value string
|
||||
SDataRow pTagVal;
|
||||
|
||||
// Object content;
|
||||
// For TSDB_SUPER_TABLE, it is the index of tables created from it
|
||||
// For TSDB_STABLE and TSDB_NTABLE, it is the cache data
|
||||
STableId tableId;
|
||||
int32_t superUid; // Super table UID
|
||||
STSchema * schema;
|
||||
STSchema * tagSchema;
|
||||
SDataRow tagVal;
|
||||
union {
|
||||
void *pData;
|
||||
void *pIndex;
|
||||
void *pData; // For TSDB_NTABLE and TSDB_STABLE, it is the skiplist for cache data
|
||||
void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
|
||||
} content;
|
||||
|
||||
// A handle to deal with event
|
||||
void *eventHandler;
|
||||
|
||||
// A handle to deal with stream
|
||||
void *streamHandler;
|
||||
|
||||
struct STable *next;
|
||||
|
||||
void * eventHandler; // TODO
|
||||
void * streamHandler; // TODO
|
||||
struct STable *next; // TODO: remove the next
|
||||
} STable;
|
||||
|
||||
void * tsdbEncodeTable(STable *pTable, int *contLen);
|
||||
|
|
|
@ -297,17 +297,17 @@ STsdbRepoInfo *tsdbGetStatus(tsdb_repo_t *pRepo) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg) {
|
||||
int tsdbCreateTable(tsdb_repo_t *repo, STableCfg *pCfg) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
return tsdbCreateTableImpl(pRepo->tsdbMeta, pCfg);
|
||||
}
|
||||
|
||||
int32_t tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) {
|
||||
int tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbDropTable(tsdb_repo_t *repo, STableId tableId) {
|
||||
int tsdbDropTable(tsdb_repo_t *repo, STableId tableId) {
|
||||
// TODO
|
||||
if (repo == NULL) return -1;
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
|
@ -334,6 +334,87 @@ int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a table configuration
|
||||
*/
|
||||
int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32_t tid) {
|
||||
if (config == NULL) return -1;
|
||||
if (type != TSDB_NTABLE && type != TSDB_STABLE) return -1;
|
||||
|
||||
memset((void *)config, 0, sizeof(STableCfg));
|
||||
|
||||
config->type = type;
|
||||
config->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||
config->tableId.uid = uid;
|
||||
config->tableId.tid = tid;
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the super table UID of the created table
|
||||
*/
|
||||
int tsdbTableSetSuperUid(STableCfg *config, int64_t uid) {
|
||||
if (config->type != TSDB_STABLE) return -1;
|
||||
if (uid == TSDB_INVALID_SUPER_TABLE_ID) return -1;
|
||||
|
||||
config->superUid = uid;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the table schema in the configuration
|
||||
* @param config the configuration to set
|
||||
* @param pSchema the schema to set
|
||||
* @param dup use the schema directly or duplicate one for use
|
||||
*
|
||||
* @return 0 for success and -1 for failure
|
||||
*/
|
||||
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup) {
|
||||
if (dup) {
|
||||
config->schema = tdDupSchema(pSchema);
|
||||
} else {
|
||||
config->schema = pSchema;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the table schema in the configuration
|
||||
* @param config the configuration to set
|
||||
* @param pSchema the schema to set
|
||||
* @param dup use the schema directly or duplicate one for use
|
||||
*
|
||||
* @return 0 for success and -1 for failure
|
||||
*/
|
||||
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
|
||||
if (config->type != TSDB_STABLE) return -1;
|
||||
|
||||
if (dup) {
|
||||
config->tagSchema = tdDupSchema(pSchema);
|
||||
} else {
|
||||
config->tagSchema = pSchema;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup) {
|
||||
if (config->type != TSDB_STABLE) return -1;
|
||||
|
||||
if (dup) {
|
||||
config->tagValues = tdDataRowDup(row);
|
||||
} else {
|
||||
config->tagValues = row;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbClearTableCfg(STableCfg *config) {
|
||||
if (config->schema) tdFreeSchema(config->schema);
|
||||
if (config->tagSchema) tdFreeSchema(config->tagSchema);
|
||||
if (config->tagValues) tdFreeDataRow(config->tagValues);
|
||||
}
|
||||
|
||||
// Check the configuration and set default options
|
||||
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
||||
// Check precision
|
||||
|
|
|
@ -132,59 +132,61 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
|
|||
}
|
||||
|
||||
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
|
||||
if (tsdbCheckTableCfg(pCfg) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (tsdbCheckTableCfg(pCfg) < 0) return -1;
|
||||
|
||||
STable *pSTable = NULL;
|
||||
STable *super = NULL;
|
||||
int newSuper = 0;
|
||||
|
||||
if (IS_CREATE_STABLE(pCfg)) { // to create a TSDB_STABLE, check if super table exists
|
||||
pSTable = tsdbGetTableByUid(pMeta, pCfg->stableUid);
|
||||
if (pSTable == NULL) { // super table not exists, try to create it
|
||||
if (pCfg->type == TSDB_STABLE) {
|
||||
super = tsdbGetTableByUid(pMeta, pCfg->superUid);
|
||||
if (super == NULL) { // super table not exists, try to create it
|
||||
newSuper = 1;
|
||||
pSTable = (STable *)calloc(1, sizeof(STable));
|
||||
if (pSTable == NULL) return -1;
|
||||
// TODO: use function to implement create table object
|
||||
super = (STable *)calloc(1, sizeof(STable));
|
||||
if (super == NULL) return -1;
|
||||
|
||||
pSTable->tableId.uid = pCfg->stableUid;
|
||||
pSTable->tableId.tid = -1;
|
||||
pSTable->type = TSDB_SUPER_TABLE;
|
||||
// pSTable->createdTime = pCfg->createdTime; // The created time is not required
|
||||
pSTable->stableUid = -1;
|
||||
pSTable->numOfCols = pCfg->numOfCols;
|
||||
pSTable->pSchema = tdDupSchema(pCfg->schema);
|
||||
pSTable->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
|
||||
super->type = TSDB_SUPER_TABLE;
|
||||
super->tableId.uid = pCfg->superUid;
|
||||
super->tableId.tid = -1;
|
||||
super->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||
super->schema = tdDupSchema(pCfg->schema);
|
||||
super->tagSchema = tdDupSchema(pCfg->tagSchema);
|
||||
super->tagVal = tdDataRowDup(pCfg->tagValues);
|
||||
super->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
|
||||
0, NULL); // Allow duplicate key, no lock
|
||||
if (pSTable->content.pIndex == NULL) {
|
||||
free(pSTable);
|
||||
|
||||
if (super->content.pIndex == NULL) {
|
||||
tdFreeSchema(super->schema);
|
||||
tdFreeSchema(super->tagSchema);
|
||||
tdFreeDataRow(super->tagVal);
|
||||
free(super);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (pSTable->type != TSDB_SUPER_TABLE) return -1;
|
||||
if (super->type != TSDB_SUPER_TABLE) return -1;
|
||||
}
|
||||
}
|
||||
|
||||
STable *pTable = (STable *)malloc(sizeof(STable));
|
||||
if (pTable == NULL) {
|
||||
if (newSuper) tsdbFreeTable(pSTable);
|
||||
STable *table = (STable *)malloc(sizeof(STable));
|
||||
if (table == NULL) {
|
||||
if (newSuper) tsdbFreeTable(super);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pTable->tableId = pCfg->tableId;
|
||||
pTable->createdTime = pCfg->createdTime;
|
||||
table->tableId = pCfg->tableId;
|
||||
if (IS_CREATE_STABLE(pCfg)) { // TSDB_STABLE
|
||||
pTable->type = TSDB_STABLE;
|
||||
pTable->stableUid = pCfg->stableUid;
|
||||
pTable->pTagVal = tdDataRowDup(pCfg->tagValues);
|
||||
table->type = TSDB_STABLE;
|
||||
table->superUid = pCfg->superUid;
|
||||
table->tagVal = tdDataRowDup(pCfg->tagValues);
|
||||
} else { // TSDB_NTABLE
|
||||
pTable->type = TSDB_NTABLE;
|
||||
pTable->stableUid = -1;
|
||||
pTable->pSchema = tdDupSchema(pCfg->schema);
|
||||
table->type = TSDB_NTABLE;
|
||||
table->superUid = -1;
|
||||
table->schema = tdDupSchema(pCfg->schema);
|
||||
}
|
||||
pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, 0, 8, 0, 0, NULL);
|
||||
table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, 0, 8, 0, 0, NULL);
|
||||
|
||||
if (newSuper) tsdbAddTableToMeta(pMeta, pSTable);
|
||||
tsdbAddTableToMeta(pMeta, pTable);
|
||||
if (newSuper) tsdbAddTableToMeta(pMeta, super);
|
||||
tsdbAddTableToMeta(pMeta, table);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -236,9 +238,9 @@ int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
|
|||
static int tsdbFreeTable(STable *pTable) {
|
||||
// TODO: finish this function
|
||||
if (pTable->type == TSDB_STABLE) {
|
||||
tdFreeDataRow(pTable->pTagVal);
|
||||
tdFreeDataRow(pTable->tagVal);
|
||||
} else {
|
||||
tdFreeSchema(pTable->pSchema);
|
||||
tdFreeSchema(pTable->schema);
|
||||
}
|
||||
|
||||
// Free content
|
||||
|
|
Loading…
Reference in New Issue