TD-90
This commit is contained in:
parent
caa9cb4254
commit
268c6dd26b
|
@ -41,7 +41,6 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
|
||||||
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid);
|
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid);
|
||||||
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid);
|
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid);
|
||||||
T_APPEND_MEMBER(ptr, pTable, STable, superUid);
|
T_APPEND_MEMBER(ptr, pTable, STable, superUid);
|
||||||
// T_APPEND_MEMBER(ptr, pTable, STable, sversion);
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
ptr = tdEncodeSchema(ptr, pTable->schema);
|
ptr = tdEncodeSchema(ptr, pTable->schema);
|
||||||
|
@ -87,7 +86,6 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
|
||||||
T_READ_MEMBER(ptr, uint64_t, pTable->tableId.uid);
|
T_READ_MEMBER(ptr, uint64_t, pTable->tableId.uid);
|
||||||
T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid);
|
T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid);
|
||||||
T_READ_MEMBER(ptr, uint64_t, pTable->superUid);
|
T_READ_MEMBER(ptr, uint64_t, pTable->superUid);
|
||||||
// T_READ_MEMBER(ptr, int32_t, pTable->sversion);
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
pTable->schema = tdDecodeSchema(&ptr);
|
pTable->schema = tdDecodeSchema(&ptr);
|
||||||
|
@ -360,6 +358,34 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) {
|
||||||
|
ASSERT(pTable->type == TSDB_SUPER_TABLE);
|
||||||
|
ASSERT(schemaVersion(pTable->tagSchema) < schemaVersion(newSchema));
|
||||||
|
STSchema *pOldSchema = pTable->tagSchema;
|
||||||
|
STSchema *pNewSchema = tdDupSchema(newSchema);
|
||||||
|
if (pNewSchema == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
|
pTable->tagSchema = pNewSchema;
|
||||||
|
tdFreeSchema(pOldSchema);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int tsdbCheckAndUpdateTable(STable *pTable, STableCfg *pCfg) {
|
||||||
|
ASSERT(pTable->type != TSDB_CHILD_TABLE);
|
||||||
|
|
||||||
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
|
if (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema)) {
|
||||||
|
int32_t code = tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
// TODO: try to update the data schema
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
|
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
@ -384,6 +410,8 @@ int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg) {
|
||||||
if (super == NULL) return -1;
|
if (super == NULL) return -1;
|
||||||
} else {
|
} else {
|
||||||
if (super->type != TSDB_SUPER_TABLE) return -1;
|
if (super->type != TSDB_SUPER_TABLE) return -1;
|
||||||
|
if (super->tableId.uid != pCfg->superUid) return -1;
|
||||||
|
tsdbCheckAndUpdateTable(super, pCfg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -458,24 +486,31 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
|
||||||
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
|
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
|
||||||
|
|
||||||
if (numOfTags > 0) {
|
if (numOfTags > 0) {
|
||||||
int accBytes = 0;
|
// Decode tag schema
|
||||||
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
|
||||||
|
|
||||||
SKVRowBuilder kvRowBuilder = {0};
|
|
||||||
tdResetTSchemaBuilder(&schemaBuilder, htonl(pMsg->tversion));
|
tdResetTSchemaBuilder(&schemaBuilder, htonl(pMsg->tversion));
|
||||||
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
|
|
||||||
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
|
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
|
||||||
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
tdAddColToSchema(&schemaBuilder, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||||
tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes);
|
|
||||||
accBytes += htons(pSchema[i].bytes);
|
|
||||||
}
|
}
|
||||||
if (tsdbTableSetTagSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
|
if (tsdbTableSetTagSchema(pCfg, tdGetSchemaFromBuilder(&schemaBuilder), false) < 0) goto _err;
|
||||||
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
|
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
|
||||||
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
|
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
|
||||||
|
|
||||||
|
// Decode tag values
|
||||||
|
if (pMsg->tagDataLen) {
|
||||||
|
int accBytes = 0;
|
||||||
|
char *pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
||||||
|
|
||||||
|
SKVRowBuilder kvRowBuilder = {0};
|
||||||
|
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
|
||||||
|
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
|
||||||
|
tdAddColToKVRow(&kvRowBuilder, htons(pSchema[i].colId), pSchema[i].type, pTagData + accBytes);
|
||||||
|
accBytes += htons(pSchema[i].bytes);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
|
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
|
||||||
tdDestroyKVRowBuilder(&kvRowBuilder);
|
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pMsg->tableType == TSDB_STREAM_TABLE) {
|
if (pMsg->tableType == TSDB_STREAM_TABLE) {
|
||||||
char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
||||||
|
|
Loading…
Reference in New Issue