TD-353
This commit is contained in:
parent
112ce3b1a4
commit
adc6a190be
|
@ -177,7 +177,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_APP_ERROR, 0, 0x0509, "vnode app
|
|||
|
||||
// tsdb
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, 0, 0x0600, "tsdb invalid table id")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, 0, 0x0601, "tsdb invalid table schema version")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_TYPE, 0, 0x0601, "tsdb invalid table type")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_SCHEMA_VERSION, 0, 0x0602, "tsdb invalid table schema version")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_ALREADY_EXIST, 0, 0x0603, "tsdb table already exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CONFIG, 0, 0x0604, "tsdb invalid configuration")
|
||||
|
@ -190,6 +190,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE, 0, 0x060A, "tsdb tag v
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE, 0, 0x060B, "tsdb timestamp is out of range")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP, 0, 0x060C, "tsdb submit message is messed up")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb invalid action")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message")
|
||||
|
||||
// query
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")
|
||||
|
|
|
@ -108,6 +108,7 @@ int tsdbDropTable(TSDB_REPO_T *repo, STableId tableId) {
|
|||
}
|
||||
|
||||
void* tsdbGetTableTagVal(TSDB_REPO_T* repo, const STableId* id, int32_t colId, int16_t type, int16_t bytes) {
|
||||
// TODO: this function should be changed also
|
||||
STsdbMeta* pMeta = tsdbGetMeta(repo);
|
||||
STable* pTable = tsdbGetTableByUid(pMeta, id->uid);
|
||||
|
||||
|
@ -128,7 +129,10 @@ void* tsdbGetTableTagVal(TSDB_REPO_T* repo, const STableId* id, int32_t colId, i
|
|||
}
|
||||
|
||||
char *tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id) {
|
||||
STsdbMeta *pMeta = tsdbGetMeta(repo);
|
||||
// TODO: need to change as thread-safe
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
STable * pTable = tsdbGetTableByUid(pMeta, id->uid);
|
||||
|
||||
if (pTable == NULL) {
|
||||
|
@ -140,19 +144,30 @@ char *tsdbGetTableName(TSDB_REPO_T *repo, const STableId *id) {
|
|||
|
||||
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
|
||||
if (pMsg == NULL) return NULL;
|
||||
|
||||
SSchema *pSchema = (SSchema *)pMsg->data;
|
||||
int16_t numOfCols = htons(pMsg->numOfColumns);
|
||||
int16_t numOfTags = htons(pMsg->numOfTags);
|
||||
|
||||
STSchemaBuilder schemaBuilder = {0};
|
||||
|
||||
STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg));
|
||||
if (pCfg == NULL) return NULL;
|
||||
if (pCfg == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err;
|
||||
if (tdInitTSchemaBuilder(&schemaBuilder, htonl(pMsg->sversion)) < 0) goto _err;
|
||||
if (tdInitTSchemaBuilder(&schemaBuilder, htonl(pMsg->sversion)) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
for (int i = 0; i < numOfCols; i++) {
|
||||
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||
if (tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
if (tsdbTableSetSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
|
||||
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
|
||||
|
@ -161,7 +176,10 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
|
|||
// Decode tag schema
|
||||
tdResetTSchemaBuilder(&schemaBuilder, htonl(pMsg->tversion));
|
||||
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
|
||||
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||
if (tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
if (tsdbTableSetTagSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
|
||||
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
|
||||
|
@ -173,9 +191,15 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
|
|||
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
||||
|
||||
SKVRowBuilder kvRowBuilder = {0};
|
||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
|
||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
|
||||
tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes);
|
||||
if (tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
accBytes += htons(pSchema[i].bytes);
|
||||
}
|
||||
|
||||
|
@ -786,92 +810,41 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
#define TSDB_SUPER_TABLE_SL_LEVEL 5 // TODO: may change here
|
||||
// #define TSDB_META_FILE_NAME "META"
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
static int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t tid) {
|
||||
if (config == NULL) return -1;
|
||||
if (type != TSDB_CHILD_TABLE && type != TSDB_NORMAL_TABLE && type != TSDB_STREAM_TABLE) return -1;
|
||||
if (type != TSDB_CHILD_TABLE && type != TSDB_NORMAL_TABLE && type != TSDB_STREAM_TABLE) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_TYPE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memset((void *)config, 0, sizeof(STableCfg));
|
||||
memset((void *)config, 0, sizeof(*config));
|
||||
|
||||
config->type = type;
|
||||
config->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||
config->tableId.uid = uid;
|
||||
config->tableId.tid = tid;
|
||||
config->name = NULL;
|
||||
config->sql = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the super table UID of the created table
|
||||
*/
|
||||
static int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid) {
|
||||
if (config->type != TSDB_CHILD_TABLE) return -1;
|
||||
if (uid == TSDB_INVALID_SUPER_TABLE_ID) return -1;
|
||||
|
||||
config->superUid = uid;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the table schema in the configuration
|
||||
* @param config the configuration to set
|
||||
* @param pSchema the schema to set
|
||||
* @param dup use the schema directly or duplicate one for use
|
||||
*
|
||||
* @return 0 for success and -1 for failure
|
||||
*/
|
||||
static int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup) {
|
||||
if (dup) {
|
||||
config->schema = tdDupSchema(pSchema);
|
||||
if (config->schema == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
config->schema = pSchema;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the table schema in the configuration
|
||||
* @param config the configuration to set
|
||||
* @param pSchema the schema to set
|
||||
* @param dup use the schema directly or duplicate one for use
|
||||
*
|
||||
* @return 0 for success and -1 for failure
|
||||
*/
|
||||
static int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
|
||||
if (config->type != TSDB_CHILD_TABLE) return -1;
|
||||
|
||||
if (dup) {
|
||||
config->tagSchema = tdDupSchema(pSchema);
|
||||
} else {
|
||||
config->tagSchema = pSchema;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup) {
|
||||
if (config->type != TSDB_CHILD_TABLE) return -1;
|
||||
|
||||
if (dup) {
|
||||
config->tagValues = tdKVRowDup(row);
|
||||
} else {
|
||||
config->tagValues = row;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbTableSetName(STableCfg *config, char *name, bool dup) {
|
||||
if (dup) {
|
||||
config->name = strdup(name);
|
||||
if (config->name == NULL) return -1;
|
||||
if (config->name == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
config->name = name;
|
||||
}
|
||||
|
@ -879,28 +852,86 @@ static int tsdbTableSetName(STableCfg *config, char *name, bool dup) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) {
|
||||
if (config->type != TSDB_CHILD_TABLE) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_CREATE_TB_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dup) {
|
||||
config->tagSchema = tdDupSchema(pSchema);
|
||||
if (config->tagSchema == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
config->tagSchema = pSchema;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbTableSetSName(STableCfg *config, char *sname, bool dup) {
|
||||
if (config->type != TSDB_CHILD_TABLE) return -1;
|
||||
if (config->type != TSDB_CHILD_TABLE) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_CREATE_TB_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dup) {
|
||||
config->sname = strdup(sname);
|
||||
if (config->sname == NULL) return -1;
|
||||
if (config->sname == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
config->sname = sname;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid) {
|
||||
if (config->type != TSDB_CHILD_TABLE || uid == TSDB_INVALID_SUPER_TABLE_ID) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_CREATE_TB_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
config->superUid = uid;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup) {
|
||||
if (config->type != TSDB_CHILD_TABLE) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_CREATE_TB_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dup) {
|
||||
config->tagValues = tdKVRowDup(row);
|
||||
if (config->tagValues == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
config->tagValues = row;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
|
||||
if (config->type != TSDB_STREAM_TABLE) return -1;
|
||||
if (config->type != TSDB_STREAM_TABLE) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_CREATE_TB_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (dup) {
|
||||
config->sql = strdup(sql);
|
||||
if (config->sql == NULL) return -1;
|
||||
if (config->sql == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
config->sql = sql;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
#endif
|
Loading…
Reference in New Issue