refactor
This commit is contained in:
parent
53cc394523
commit
cca6f87a38
|
@ -19,11 +19,11 @@
|
||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
#include "tdataformat.h"
|
|
||||||
#include "tname.h"
|
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
|
#include "tdataformat.h"
|
||||||
|
#include "tname.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -74,9 +74,6 @@ typedef struct {
|
||||||
|
|
||||||
typedef void TSDB_REPO_T; // use void to hide implementation details from outside
|
typedef void TSDB_REPO_T; // use void to hide implementation details from outside
|
||||||
|
|
||||||
void tsdbSetDefaultCfg(STsdbCfg *pCfg);
|
|
||||||
STsdbCfg *tsdbCreateDefaultCfg();
|
|
||||||
void tsdbFreeCfg(STsdbCfg *pCfg);
|
|
||||||
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
|
STsdbCfg *tsdbGetCfg(const TSDB_REPO_T *repo);
|
||||||
|
|
||||||
// --------- TSDB REPOSITORY DEFINITION
|
// --------- TSDB REPOSITORY DEFINITION
|
||||||
|
@ -106,23 +103,14 @@ typedef struct {
|
||||||
char * sql;
|
char * sql;
|
||||||
} STableCfg;
|
} STableCfg;
|
||||||
|
|
||||||
int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid);
|
|
||||||
int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid);
|
|
||||||
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
|
|
||||||
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
|
|
||||||
int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup);
|
|
||||||
int tsdbTableSetName(STableCfg *config, char *name, bool dup);
|
|
||||||
int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
|
|
||||||
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
|
|
||||||
void tsdbClearTableCfg(STableCfg *config);
|
void tsdbClearTableCfg(STableCfg *config);
|
||||||
|
|
||||||
void* tsdbGetTableTagVal(TSDB_REPO_T* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes);
|
void * tsdbGetTableTagVal(TSDB_REPO_T *repo, const STableId *id, int32_t colId, int16_t type, int16_t bytes);
|
||||||
char* tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id);
|
char * tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id);
|
||||||
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
|
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
|
||||||
|
|
||||||
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
|
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
|
||||||
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
|
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
|
||||||
int tsdbAlterTable(TSDB_REPO_T *repo, STableCfg *pCfg);
|
|
||||||
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
|
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
|
||||||
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
|
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
|
||||||
void tsdbStartStream(TSDB_REPO_T *repo);
|
void tsdbStartStream(TSDB_REPO_T *repo);
|
||||||
|
@ -156,7 +144,7 @@ STableInfo *tsdbGetTableInfo(TSDB_REPO_T *pRepo, STableId tid);
|
||||||
*
|
*
|
||||||
* @return the number of points inserted, -1 for failure and the error number is set
|
* @return the number of points inserted, -1 for failure and the error number is set
|
||||||
*/
|
*/
|
||||||
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * pRsp) ;
|
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
|
||||||
|
|
||||||
// -- FOR QUERY TIME SERIES DATA
|
// -- FOR QUERY TIME SERIES DATA
|
||||||
|
|
||||||
|
@ -199,7 +187,7 @@ typedef void *TsdbPosT;
|
||||||
* @param qinfo query info handle from query processor
|
* @param qinfo query info handle from query processor
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
|
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
|
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
|
||||||
|
@ -207,15 +195,17 @@ TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STab
|
||||||
* all tables in this group.
|
* all tables in this group.
|
||||||
*
|
*
|
||||||
* @param tsdb tsdb handle
|
* @param tsdb tsdb handle
|
||||||
* @param pCond query condition, including time window, result set order, and basic required columns for each block
|
* @param pCond query condition, including time window, result set order, and basic required columns for each
|
||||||
|
* block
|
||||||
* @param groupInfo tableId list.
|
* @param groupInfo tableId list.
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void* qinfo);
|
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupInfo, void *qinfo);
|
||||||
|
|
||||||
SArray* tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
|
SArray *tsdbGetQueriedTableIdList(TsdbQueryHandleT *pHandle);
|
||||||
|
|
||||||
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo);
|
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
|
||||||
|
void *qinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* move to next block if exists
|
* move to next block if exists
|
||||||
|
|
|
@ -404,11 +404,6 @@ STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbAlterTable(TSDB_REPO_T *pRepo, STableCfg *pCfg) {
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
@ -511,119 +506,6 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize a table configuration
|
|
||||||
*/
|
|
||||||
int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid) {
|
|
||||||
if (config == NULL) return -1;
|
|
||||||
if (type != TSDB_CHILD_TABLE && type != TSDB_NORMAL_TABLE && type != TSDB_STREAM_TABLE) return -1;
|
|
||||||
|
|
||||||
memset((void *)config, 0, sizeof(STableCfg));
|
|
||||||
|
|
||||||
config->type = type;
|
|
||||||
config->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
|
||||||
config->tableId.uid = uid;
|
|
||||||
config->tableId.tid = tid;
|
|
||||||
config->name = NULL;
|
|
||||||
config->sql = NULL;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the super table UID of the created table
|
|
||||||
*/
|
|
||||||
int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid) {
|
|
||||||
if (config->type != TSDB_CHILD_TABLE) return -1;
|
|
||||||
if (uid == TSDB_INVALID_SUPER_TABLE_ID) return -1;
|
|
||||||
|
|
||||||
config->superUid = uid;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the table schema in the configuration
|
|
||||||
* @param config the configuration to set
|
|
||||||
* @param pSchema the schema to set
|
|
||||||
* @param dup use the schema directly or duplicate one for use
|
|
||||||
*
|
|
||||||
* @return 0 for success and -1 for failure
|
|
||||||
*/
|
|
||||||
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup) {
|
|
||||||
if (dup) {
|
|
||||||
config->schema = tdDupSchema(pSchema);
|
|
||||||
} else {
|
|
||||||
config->schema = pSchema;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the table schema in the configuration
|
|
||||||
* @param config the configuration to set
|
|
||||||
* @param pSchema the schema to set
|
|
||||||
* @param dup use the schema directly or duplicate one for use
|
|
||||||
*
|
|
||||||
* @return 0 for success and -1 for failure
|
|
||||||
*/
|
|
||||||
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
|
|
||||||
if (config->type != TSDB_CHILD_TABLE) return -1;
|
|
||||||
|
|
||||||
if (dup) {
|
|
||||||
config->tagSchema = tdDupSchema(pSchema);
|
|
||||||
} else {
|
|
||||||
config->tagSchema = pSchema;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup) {
|
|
||||||
if (config->type != TSDB_CHILD_TABLE) return -1;
|
|
||||||
|
|
||||||
if (dup) {
|
|
||||||
config->tagValues = tdKVRowDup(row);
|
|
||||||
} else {
|
|
||||||
config->tagValues = row;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbTableSetName(STableCfg *config, char *name, bool dup) {
|
|
||||||
if (dup) {
|
|
||||||
config->name = strdup(name);
|
|
||||||
if (config->name == NULL) return -1;
|
|
||||||
} else {
|
|
||||||
config->name = name;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbTableSetSName(STableCfg *config, char *sname, bool dup) {
|
|
||||||
if (config->type != TSDB_CHILD_TABLE) return -1;
|
|
||||||
|
|
||||||
if (dup) {
|
|
||||||
config->sname = strdup(sname);
|
|
||||||
if (config->sname == NULL) return -1;
|
|
||||||
} else {
|
|
||||||
config->sname = sname;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
|
|
||||||
if (config->type != TSDB_STREAM_TABLE) return -1;
|
|
||||||
|
|
||||||
if (dup) {
|
|
||||||
config->sql = strdup(sql);
|
|
||||||
if (config->sql == NULL) return -1;
|
|
||||||
} else {
|
|
||||||
config->sql = sql;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tsdbClearTableCfg(STableCfg *config) {
|
void tsdbClearTableCfg(STableCfg *config) {
|
||||||
if (config) {
|
if (config) {
|
||||||
if (config->schema) tdFreeSchema(config->schema);
|
if (config->schema) tdFreeSchema(config->schema);
|
||||||
|
|
|
@ -8,11 +8,18 @@
|
||||||
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
|
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
|
||||||
// #define TSDB_META_FILE_NAME "META"
|
// #define TSDB_META_FILE_NAME "META"
|
||||||
|
|
||||||
|
|
||||||
static int tsdbFreeTable(STable *pTable);
|
static int tsdbFreeTable(STable *pTable);
|
||||||
static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
|
static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
|
||||||
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
|
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
|
||||||
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx);
|
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx);
|
||||||
|
static int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid);
|
||||||
|
static int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid);
|
||||||
|
static int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
|
||||||
|
static int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
|
||||||
|
static int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup);
|
||||||
|
static int tsdbTableSetName(STableCfg *config, char *name, bool dup);
|
||||||
|
static int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
|
||||||
|
static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encode a TSDB table object as a binary content
|
* Encode a TSDB table object as a binary content
|
||||||
|
@ -797,3 +804,113 @@ char *getTSTupleKey(const void * data) {
|
||||||
SDataRow row = (SDataRow)data;
|
SDataRow row = (SDataRow)data;
|
||||||
return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE);
|
return POINTER_SHIFT(row, TD_DATA_ROW_HEAD_SIZE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid) {
|
||||||
|
if (config == NULL) return -1;
|
||||||
|
if (type != TSDB_CHILD_TABLE && type != TSDB_NORMAL_TABLE && type != TSDB_STREAM_TABLE) return -1;
|
||||||
|
|
||||||
|
memset((void *)config, 0, sizeof(STableCfg));
|
||||||
|
|
||||||
|
config->type = type;
|
||||||
|
config->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||||
|
config->tableId.uid = uid;
|
||||||
|
config->tableId.tid = tid;
|
||||||
|
config->name = NULL;
|
||||||
|
config->sql = NULL;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the super table UID of the created table
|
||||||
|
*/
|
||||||
|
static int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid) {
|
||||||
|
if (config->type != TSDB_CHILD_TABLE) return -1;
|
||||||
|
if (uid == TSDB_INVALID_SUPER_TABLE_ID) return -1;
|
||||||
|
|
||||||
|
config->superUid = uid;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the table schema in the configuration
|
||||||
|
* @param config the configuration to set
|
||||||
|
* @param pSchema the schema to set
|
||||||
|
* @param dup use the schema directly or duplicate one for use
|
||||||
|
*
|
||||||
|
* @return 0 for success and -1 for failure
|
||||||
|
*/
|
||||||
|
static int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup) {
|
||||||
|
if (dup) {
|
||||||
|
config->schema = tdDupSchema(pSchema);
|
||||||
|
} else {
|
||||||
|
config->schema = pSchema;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the table schema in the configuration
|
||||||
|
* @param config the configuration to set
|
||||||
|
* @param pSchema the schema to set
|
||||||
|
* @param dup use the schema directly or duplicate one for use
|
||||||
|
*
|
||||||
|
* @return 0 for success and -1 for failure
|
||||||
|
*/
|
||||||
|
static int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
|
||||||
|
if (config->type != TSDB_CHILD_TABLE) return -1;
|
||||||
|
|
||||||
|
if (dup) {
|
||||||
|
config->tagSchema = tdDupSchema(pSchema);
|
||||||
|
} else {
|
||||||
|
config->tagSchema = pSchema;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup) {
|
||||||
|
if (config->type != TSDB_CHILD_TABLE) return -1;
|
||||||
|
|
||||||
|
if (dup) {
|
||||||
|
config->tagValues = tdKVRowDup(row);
|
||||||
|
} else {
|
||||||
|
config->tagValues = row;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbTableSetName(STableCfg *config, char *name, bool dup) {
|
||||||
|
if (dup) {
|
||||||
|
config->name = strdup(name);
|
||||||
|
if (config->name == NULL) return -1;
|
||||||
|
} else {
|
||||||
|
config->name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbTableSetSName(STableCfg *config, char *sname, bool dup) {
|
||||||
|
if (config->type != TSDB_CHILD_TABLE) return -1;
|
||||||
|
|
||||||
|
if (dup) {
|
||||||
|
config->sname = strdup(sname);
|
||||||
|
if (config->sname == NULL) return -1;
|
||||||
|
} else {
|
||||||
|
config->sname = sname;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
|
||||||
|
if (config->type != TSDB_STREAM_TABLE) return -1;
|
||||||
|
|
||||||
|
if (dup) {
|
||||||
|
config->sql = strdup(sql);
|
||||||
|
if (config->sql == NULL) return -1;
|
||||||
|
} else {
|
||||||
|
config->sql = sql;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -131,11 +131,7 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
|
return TSDB_CODE_SUCCESS;
|
||||||
if (pCfg == NULL) return terrno;
|
|
||||||
int32_t code = tsdbAlterTable(pVnode->tsdb, pCfg);
|
|
||||||
tsdbClearTableCfg(pCfg);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
|
Loading…
Reference in New Issue