commit
009478db8a
|
@ -607,7 +607,7 @@ static void doInitGlobalConfig() {
|
|||
cfg.minValue = TSDB_MIN_CACHE_BLOCK_SIZE;
|
||||
cfg.maxValue = TSDB_MAX_CACHE_BLOCK_SIZE;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_Mb;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "blocks";
|
||||
|
@ -617,7 +617,7 @@ static void doInitGlobalConfig() {
|
|||
cfg.minValue = TSDB_MIN_TOTAL_BLOCKS;
|
||||
cfg.maxValue = TSDB_MAX_TOTAL_BLOCKS;
|
||||
cfg.ptrLength = 0;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_BYTE;
|
||||
cfg.unitType = TAOS_CFG_UTYPE_NONE;
|
||||
taosInitConfigOption(cfg);
|
||||
|
||||
cfg.option = "days";
|
||||
|
|
|
@ -92,6 +92,7 @@ typedef struct {
|
|||
STSchema * schema;
|
||||
STSchema * tagSchema;
|
||||
SDataRow tagValues;
|
||||
char * sql;
|
||||
} STableCfg;
|
||||
|
||||
int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid);
|
||||
|
@ -101,6 +102,7 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
|
|||
int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup);
|
||||
int tsdbTableSetName(STableCfg *config, char *name, bool dup);
|
||||
int tsdbTableSetSName(STableCfg *config, char *sname, bool dup);
|
||||
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup);
|
||||
void tsdbClearTableCfg(STableCfg *config);
|
||||
|
||||
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
|
||||
|
|
|
@ -85,12 +85,13 @@ typedef struct STable {
|
|||
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
|
||||
struct STable *next; // TODO: remove the next
|
||||
struct STable *prev;
|
||||
tstr * name; // NOTE: there a flexible string here
|
||||
tstr * name; // NOTE: there a flexible string here
|
||||
char * sql;
|
||||
} STable;
|
||||
|
||||
#define TSDB_GET_TABLE_LAST_KEY(tb) ((tb)->lastKey)
|
||||
|
||||
void * tsdbEncodeTable(STable *pTable, int *contLen);
|
||||
void tsdbEncodeTable(STable *pTable, char *buf, int *contLen);
|
||||
STable *tsdbDecodeTable(void *cont, int contLen);
|
||||
void tsdbFreeEncode(void *cont);
|
||||
|
||||
|
|
|
@ -446,7 +446,7 @@ int32_t tsdbInsertData(TsdbRepoT *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * p
|
|||
*/
|
||||
int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid) {
|
||||
if (config == NULL) return -1;
|
||||
if (type != TSDB_NORMAL_TABLE && type != TSDB_CHILD_TABLE) return -1;
|
||||
if (type != TSDB_CHILD_TABLE && type != TSDB_NORMAL_TABLE && type != TSDB_STREAM_TABLE) return -1;
|
||||
|
||||
memset((void *)config, 0, sizeof(STableCfg));
|
||||
|
||||
|
@ -455,6 +455,7 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t t
|
|||
config->tableId.uid = uid;
|
||||
config->tableId.tid = tid;
|
||||
config->name = NULL;
|
||||
config->sql = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -540,12 +541,26 @@ int tsdbTableSetSName(STableCfg *config, char *sname, bool dup) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
|
||||
if (config->type != TSDB_STREAM_TABLE) return -1;
|
||||
|
||||
if (dup) {
|
||||
config->sql = strdup(sql);
|
||||
if (config->sql == NULL) return -1;
|
||||
} else {
|
||||
config->sql = sql;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbClearTableCfg(STableCfg *config) {
|
||||
if (config->schema) tdFreeSchema(config->schema);
|
||||
if (config->tagSchema) tdFreeSchema(config->tagSchema);
|
||||
if (config->tagValues) tdFreeDataRow(config->tagValues);
|
||||
tfree(config->name);
|
||||
tfree(config->sname);
|
||||
tfree(config->sql);
|
||||
}
|
||||
|
||||
int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||
|
|
|
@ -15,7 +15,6 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
|
|||
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
|
||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
||||
static int tsdbEstimateTableEncodeSize(STable *pTable);
|
||||
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFromIdx);
|
||||
|
||||
/**
|
||||
|
@ -28,16 +27,10 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rm
|
|||
* @return binary content for success
|
||||
* NULL fro failure
|
||||
*/
|
||||
void *tsdbEncodeTable(STable *pTable, int *contLen) {
|
||||
if (pTable == NULL) return NULL;
|
||||
void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
|
||||
if (pTable == NULL) return;
|
||||
|
||||
*contLen = tsdbEstimateTableEncodeSize(pTable);
|
||||
if (*contLen < 0) return NULL;
|
||||
|
||||
void *ret = calloc(1, *contLen);
|
||||
if (ret == NULL) return NULL;
|
||||
|
||||
void *ptr = ret;
|
||||
void *ptr = buf;
|
||||
T_APPEND_MEMBER(ptr, pTable, STable, type);
|
||||
// Encode name, todo refactor
|
||||
*(int *)ptr = varDataLen(pTable->name);
|
||||
|
@ -55,11 +48,16 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) {
|
|||
ptr = tdEncodeSchema(ptr, pTable->tagSchema);
|
||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
tdTagRowCpy(ptr, pTable->tagVal);
|
||||
ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen);
|
||||
} else {
|
||||
ptr = tdEncodeSchema(ptr, pTable->schema);
|
||||
}
|
||||
|
||||
return ret;
|
||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
||||
ptr = taosEncodeString(ptr, pTable->sql);
|
||||
}
|
||||
|
||||
*contLen = (char *)ptr - buf;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,10 +95,15 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
|
|||
pTable->tagSchema = tdDecodeSchema(&ptr);
|
||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
pTable->tagVal = tdTagRowDecode(ptr);
|
||||
ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen);
|
||||
} else {
|
||||
pTable->schema = tdDecodeSchema(&ptr);
|
||||
}
|
||||
|
||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
||||
ptr = taosDecodeString(ptr, &(pTable->sql));
|
||||
}
|
||||
|
||||
return pTable;
|
||||
}
|
||||
|
||||
|
@ -213,7 +216,7 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
|
|||
}
|
||||
|
||||
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
|
||||
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE) {
|
||||
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
|
||||
return pTable->schema;
|
||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
|
||||
|
@ -286,6 +289,76 @@ char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes) {
|
|||
}
|
||||
}
|
||||
|
||||
static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) {
|
||||
STable *pTable = NULL;
|
||||
size_t tsize = 0;
|
||||
|
||||
pTable = (STable *)calloc(1, sizeof(STable));
|
||||
if (pTable == NULL) {
|
||||
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pTable->type = pCfg->type;
|
||||
|
||||
if (isSuper) {
|
||||
pTable->type = TSDB_SUPER_TABLE;
|
||||
pTable->tableId.uid = pCfg->superUid;
|
||||
pTable->tableId.tid = -1;
|
||||
pTable->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||
pTable->schema = tdDupSchema(pCfg->schema);
|
||||
pTable->tagSchema = tdDupSchema(pCfg->tagSchema);
|
||||
|
||||
tsize = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN);
|
||||
pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1);
|
||||
if (pTable->name == NULL) {
|
||||
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->sname, tsize);
|
||||
|
||||
STColumn *pColSchema = schemaColAt(pTable->tagSchema, 0);
|
||||
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, 1, 0, 0,
|
||||
getTagIndexKey); // Allow duplicate key, no lock
|
||||
if (pTable->pIndex == NULL) {
|
||||
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
} else {
|
||||
pTable->type = pCfg->type;
|
||||
pTable->tableId.uid = pCfg->tableId.uid;
|
||||
pTable->tableId.tid = pCfg->tableId.tid;
|
||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||
|
||||
tsize = strnlen(pCfg->name, TSDB_TABLE_NAME_LEN);
|
||||
pTable->name = calloc(1, tsize + VARSTR_HEADER_SIZE + 1);
|
||||
if (pTable->name == NULL) {
|
||||
terrno = TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
STR_WITH_SIZE_TO_VARSTR(pTable->name, pCfg->name, tsize);
|
||||
|
||||
if (pCfg->type == TSDB_CHILD_TABLE) {
|
||||
pTable->superUid = pCfg->superUid;
|
||||
pTable->tagVal = tdDataRowDup(pCfg->tagValues);
|
||||
} else if (pCfg->type == TSDB_NORMAL_TABLE) {
|
||||
pTable->superUid = -1;
|
||||
pTable->schema = tdDupSchema(pCfg->schema);
|
||||
} else {
|
||||
ASSERT(pCfg->type == TSDB_STREAM_TABLE);
|
||||
pTable->superUid = -1;
|
||||
pTable->schema = tdDupSchema(pCfg->schema);
|
||||
pTable->sql = strdup(pCfg->sql);
|
||||
}
|
||||
}
|
||||
|
||||
return pTable;
|
||||
|
||||
_err:
|
||||
tsdbFreeTable(pTable);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
@ -306,61 +379,19 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
|
|||
super = tsdbGetTableByUid(pMeta, pCfg->superUid);
|
||||
if (super == NULL) { // super table not exists, try to create it
|
||||
newSuper = 1;
|
||||
// TODO: use function to implement create table object
|
||||
super = (STable *)calloc(1, sizeof(STable));
|
||||
super = tsdbNewTable(pCfg, true);
|
||||
if (super == NULL) return -1;
|
||||
|
||||
super->type = TSDB_SUPER_TABLE;
|
||||
super->tableId.uid = pCfg->superUid;
|
||||
super->tableId.tid = -1;
|
||||
super->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||
super->schema = tdDupSchema(pCfg->schema);
|
||||
super->tagSchema = tdDupSchema(pCfg->tagSchema);
|
||||
super->tagVal = NULL;
|
||||
|
||||
// todo refactor extract method
|
||||
size_t size = strnlen(pCfg->sname, TSDB_TABLE_NAME_LEN);
|
||||
super->name = calloc(1, size + VARSTR_HEADER_SIZE + 1);
|
||||
STR_WITH_SIZE_TO_VARSTR(super->name, pCfg->sname, size);
|
||||
|
||||
// index the first tag column
|
||||
STColumn* pColSchema = schemaColAt(super->tagSchema, 0);
|
||||
super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes,
|
||||
1, 0, 1, getTagIndexKey); // Allow duplicate key, no lock
|
||||
|
||||
if (super->pIndex == NULL) {
|
||||
tdFreeSchema(super->schema);
|
||||
tdFreeSchema(super->tagSchema);
|
||||
tdFreeTagRow(super->tagVal);
|
||||
free(super);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (super->type != TSDB_SUPER_TABLE) return -1;
|
||||
}
|
||||
}
|
||||
|
||||
STable *table = (STable *)calloc(1, sizeof(STable));
|
||||
STable *table = tsdbNewTable(pCfg, false);
|
||||
if (table == NULL) {
|
||||
if (newSuper) tsdbFreeTable(super);
|
||||
return -1;
|
||||
}
|
||||
|
||||
table->tableId = pCfg->tableId;
|
||||
|
||||
size_t size = strnlen(pCfg->name, TSDB_TABLE_NAME_LEN);
|
||||
table->name = calloc(1, size + VARSTR_HEADER_SIZE + 1);
|
||||
STR_WITH_SIZE_TO_VARSTR(table->name, pCfg->name, size);
|
||||
|
||||
table->lastKey = 0;
|
||||
if (IS_CREATE_STABLE(pCfg)) { // TSDB_CHILD_TABLE
|
||||
table->type = TSDB_CHILD_TABLE;
|
||||
table->superUid = pCfg->superUid;
|
||||
table->tagVal = tdTagRowDup(pCfg->tagValues);
|
||||
} else { // TSDB_NORMAL_TABLE
|
||||
table->type = TSDB_NORMAL_TABLE;
|
||||
table->superUid = -1;
|
||||
table->schema = tdDupSchema(pCfg->schema);
|
||||
if (newSuper) {
|
||||
tsdbFreeTable(super);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// Register to meta
|
||||
|
@ -375,15 +406,15 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
|
|||
|
||||
// Write to meta file
|
||||
int bufLen = 0;
|
||||
char *buf = malloc(1024*1024);
|
||||
if (newSuper) {
|
||||
void *buf = tsdbEncodeTable(super, &bufLen);
|
||||
tsdbEncodeTable(super, buf, &bufLen);
|
||||
tsdbInsertMetaRecord(pMeta->mfh, super->tableId.uid, buf, bufLen);
|
||||
tsdbFreeEncode(buf);
|
||||
}
|
||||
|
||||
void *buf = tsdbEncodeTable(table, &bufLen);
|
||||
tsdbEncodeTable(table, buf, &bufLen);
|
||||
tsdbInsertMetaRecord(pMeta->mfh, table->tableId.uid, buf, bufLen);
|
||||
tsdbFreeEncode(buf);
|
||||
tfree(buf);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -441,13 +472,18 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) {
|
|||
}
|
||||
|
||||
static int tsdbFreeTable(STable *pTable) {
|
||||
// TODO: finish this function
|
||||
if (pTable == NULL) return 0;
|
||||
|
||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
tdFreeTagRow(pTable->tagVal);
|
||||
} else {
|
||||
tdFreeSchema(pTable->schema);
|
||||
}
|
||||
|
||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
||||
tfree(pTable->sql);
|
||||
}
|
||||
|
||||
// Free content
|
||||
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) {
|
||||
tdFreeSchema(pTable->tagSchema);
|
||||
|
@ -494,6 +530,9 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
|
|||
if (pTable->type == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
|
||||
tsdbAddTableIntoIndex(pMeta, pTable);
|
||||
}
|
||||
if (pTable->type == TSDB_STREAM_TABLE && addIdx) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
pMeta->nTables++;
|
||||
}
|
||||
|
@ -525,7 +564,6 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFrom
|
|||
|
||||
tSkipListDestroyIter(pIter);
|
||||
|
||||
// TODO: Remove the table from the list
|
||||
if (pTable->prev != NULL) {
|
||||
pTable->prev->next = pTable->next;
|
||||
if (pTable->next != NULL) {
|
||||
|
@ -539,6 +577,9 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable, bool rmFrom
|
|||
if (pTable->type == TSDB_CHILD_TABLE && rmFromIdx) {
|
||||
tsdbRemoveTableFromIndex(pMeta, pTable);
|
||||
}
|
||||
if (pTable->type == TSDB_STREAM_TABLE && rmFromIdx) {
|
||||
// TODO
|
||||
}
|
||||
|
||||
pMeta->nTables--;
|
||||
}
|
||||
|
@ -603,26 +644,6 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbEstimateTableEncodeSize(STable *pTable) {
|
||||
int size = 0;
|
||||
size += T_MEMBER_SIZE(STable, type);
|
||||
size += sizeof(int) + varDataLen(pTable->name);
|
||||
size += T_MEMBER_SIZE(STable, tableId);
|
||||
size += T_MEMBER_SIZE(STable, superUid);
|
||||
size += T_MEMBER_SIZE(STable, sversion);
|
||||
|
||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||
size += tdGetSchemaEncodeSize(pTable->schema);
|
||||
size += tdGetSchemaEncodeSize(pTable->tagSchema);
|
||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
STagRow *pTagRow = (STagRow *)(pTable->tagVal);
|
||||
size += dataRowLen(pTable->tagVal) + pTagRow->dataLen;
|
||||
} else {
|
||||
size += tdGetSchemaEncodeSize(pTable->schema);
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
char *getTSTupleKey(const void * data) {
|
||||
SDataRow row = (SDataRow)data;
|
||||
|
|
|
@ -217,6 +217,28 @@ static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) {
|
|||
return NULL; // error happened
|
||||
}
|
||||
|
||||
static FORCE_INLINE void *taosEncodeString(void *buf, char *value) {
|
||||
size_t size = strlen(value);
|
||||
|
||||
buf = taosEncodeVariant64(buf, size);
|
||||
memcpy(buf, value, size);
|
||||
|
||||
return POINTER_SHIFT(buf, size);
|
||||
}
|
||||
|
||||
static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
|
||||
uint64_t size = 0;
|
||||
|
||||
buf = taosDecodeVariant64(buf, &size);
|
||||
*value = (char *)malloc(size + 1);
|
||||
if (*value == NULL) return NULL;
|
||||
memcpy(*value, buf, size);
|
||||
|
||||
(*value)[size] = '\0';
|
||||
|
||||
return POINTER_SHIFT(buf, size);
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -106,6 +106,7 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
|
|||
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||
SMDCreateTableMsg *pTable = pCont;
|
||||
int32_t code = 0;
|
||||
char sql[1024] = "\0";
|
||||
|
||||
vTrace("vgId:%d, table:%s, start to create", pVnode->vgId, pTable->tableId);
|
||||
int16_t numOfColumns = htons(pTable->numOfColumns);
|
||||
|
@ -151,6 +152,11 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
|||
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
||||
}
|
||||
|
||||
if (pTable->tableType == TSDB_STREAM_TABLE) {
|
||||
// TODO: set sql value
|
||||
tsdbTableSetStreamSql(&tCfg, sql, false);
|
||||
}
|
||||
|
||||
code = tsdbCreateTable(pVnode->tsdb, &tCfg);
|
||||
tdFreeDataRow(dataRow);
|
||||
tfree(pDestTagSchema);
|
||||
|
|
Loading…
Reference in New Issue