refactor create table
This commit is contained in:
parent
9c4e60149c
commit
30d34fd25c
|
@ -109,6 +109,7 @@ void tsdbClearTableCfg(STableCfg *config);
|
||||||
|
|
||||||
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
|
int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val);
|
||||||
char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes);
|
char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes);
|
||||||
|
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
|
||||||
|
|
||||||
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
|
int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg);
|
||||||
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
|
int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId);
|
||||||
|
|
|
@ -438,6 +438,60 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) {
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) {
|
||||||
|
if (pMsg == NULL) return NULL;
|
||||||
|
SSchema *pSchema = (SSchema *)pMsg->data;
|
||||||
|
int16_t numOfCols = htons(pMsg->numOfColumns);
|
||||||
|
int16_t numOfTags = htons(pMsg->numOfTags);
|
||||||
|
|
||||||
|
STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg));
|
||||||
|
if (pCfg == NULL) return NULL;
|
||||||
|
|
||||||
|
if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err;
|
||||||
|
STSchema *pDSchema = tdNewSchema(numOfCols);
|
||||||
|
if (pDSchema == NULL) goto _err;
|
||||||
|
for (int i = 0; i < numOfCols; i++) {
|
||||||
|
tdSchemaAddCol(pDSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||||
|
}
|
||||||
|
if (tsdbTableSetSchema(pCfg, pDSchema, false) < 0) goto _err;
|
||||||
|
if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err;
|
||||||
|
|
||||||
|
if (numOfTags > 0) {
|
||||||
|
STSchema *pTSchema = tdNewSchema(numOfTags);
|
||||||
|
for (int i = numOfCols; i < numOfCols + numOfTags; i++) {
|
||||||
|
tdSchemaAddCol(pTSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||||
|
}
|
||||||
|
if (tsdbTableSetTagSchema(pCfg, pTSchema, false) < 0) goto _err;
|
||||||
|
if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err;
|
||||||
|
if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err;
|
||||||
|
|
||||||
|
char * pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
||||||
|
int accBytes = 0;
|
||||||
|
SKVRowBuilder kvRowBuilder;
|
||||||
|
|
||||||
|
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err;
|
||||||
|
for (int i = 0; i < numOfTags; i++) {
|
||||||
|
STColumn *pCol = schemaColAt(pTSchema, i);
|
||||||
|
tdAddColToKVRow(&kvRowBuilder, pCol->colId, pCol->type, pTagData + accBytes);
|
||||||
|
accBytes += htons(pSchema[i+numOfCols].bytes);
|
||||||
|
}
|
||||||
|
tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false);
|
||||||
|
tdDestroyKVRowBuilder(&kvRowBuilder);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->tableType == TSDB_STREAM_TABLE) {
|
||||||
|
char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema);
|
||||||
|
tsdbTableSetStreamSql(pCfg, sql, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pCfg;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbClearTableCfg(pCfg);
|
||||||
|
tfree(pCfg);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
|
// int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
|
||||||
int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
|
int tsdbDropTable(TsdbRepoT *repo, STableId tableId) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
|
|
|
@ -104,6 +104,16 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
|
||||||
|
STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
|
||||||
|
if (pCfg == NULL) return terrno;
|
||||||
|
int32_t code = tsdbCreateTable(pVnode->tsdb, pCfg);
|
||||||
|
|
||||||
|
tsdbClearTableCfg(pCfg);
|
||||||
|
free(pCfg);
|
||||||
|
return code;
|
||||||
|
|
||||||
|
#if 0
|
||||||
SMDCreateTableMsg *pTable = pCont;
|
SMDCreateTableMsg *pTable = pCont;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -165,6 +175,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
||||||
|
|
||||||
vTrace("vgId:%d, table:%s is created, result:%x", pVnode->vgId, pTable->tableId, code);
|
vTrace("vgId:%d, table:%s is created, result:%x", pVnode->vgId, pTable->tableId, code);
|
||||||
return code;
|
return code;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
|
||||||
|
|
Loading…
Reference in New Issue