From 81fef75fb83dfe6a16c5c126b209b10cd5148222 Mon Sep 17 00:00:00 2001 From: slguan Date: Thu, 13 Feb 2020 22:41:30 +0800 Subject: [PATCH] #1177 --- src/inc/mnode.h | 71 ++ src/inc/sdb.h | 9 + src/inc/taosmsg.h | 11 + src/mnode/inc/mgmtChildTable.h | 21 +- src/mnode/inc/mgmtNormalTable.h | 45 +- src/mnode/inc/mgmtStreamTable.h | 45 +- src/mnode/inc/mgmtSuperTable.h | 23 +- src/mnode/inc/mgmtTable.h | 23 +- src/mnode/inc/mgmtVgroup.h | 4 + src/mnode/src/mgmtChildTable.c | 146 ++++ src/mnode/src/mgmtDb.c | 4 +- src/mnode/src/mgmtDnodeInt.c | 2 +- src/mnode/src/mgmtNormalTable.c | 181 ++++ src/mnode/src/mgmtShell.c | 14 +- src/mnode/src/mgmtStreamTable.c | 102 ++- src/mnode/src/mgmtSuperTable.c | 436 ++++++++++ src/mnode/src/mgmtTable.c | 1381 +++++-------------------------- src/mnode/src/mgmtVgroup.c | 51 +- src/sdb/inc/sdbint.h | 1 + 19 files changed, 1296 insertions(+), 1274 deletions(-) diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 0ae526af42..ef97bdfcb1 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -22,6 +22,11 @@ extern "C" { #include "os.h" +#include "taosdef.h" +#include "taosmsg.h" +#include "taoserror.h" + + #include "sdb.h" #include "tglobalcfg.h" #include "thash.h" @@ -132,6 +137,72 @@ typedef struct _tab_obj { // SSchema schema[]; } STabObj; + +typedef struct SSuperTableObj { + char tableId[TSDB_TABLE_ID_LEN + 1]; + uint64_t uid; + int32_t sid; + int32_t vgId; + int64_t createdTime; + int32_t sversion; + int32_t numOfTags; + int32_t numOfMeters; + int32_t numOfColumns; + int32_t schemaSize; + int8_t reserved[7]; + int8_t updateEnd[1]; + int16_t nextColId; + pthread_rwlock_t rwLock; + tSkipList * pSkipList; + struct SSuperTableObj *pHead; + struct SSuperTableObj *prev, *next; + int8_t* schema; +} SSuperTableObj; + +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; + char superTableId[TSDB_TABLE_ID_LEN + 1]; + uint64_t uid; + int32_t sid; + int32_t vgId; + int64_t createdTime; + int8_t reserved[7]; + int8_t updateEnd[1]; + SSuperTableObj *superTable; +} SChildTableObj; + +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; + uint64_t uid; + int32_t sid; + int32_t vgId; + int64_t createdTime; + int32_t sversion; + int32_t numOfColumns; + int32_t schemaSize; + char reserved[3]; + char updateEnd[1]; + int16_t nextColId; + char* schema; +} SNormalTableObj; + +typedef struct { + char tableId[TSDB_TABLE_ID_LEN + 1]; + uint64_t uid; + int32_t sid; + int32_t vgId; + int64_t createdTime; + int32_t sversion; + int32_t numOfColumns; + int32_t schemaSize; + char reserved[3]; + char updateEnd[1]; + int16_t nextColId; + char* pSql; //null-terminated string + char* schema; +} SStreamTableObj; + + typedef struct _vg_obj { uint32_t vgId; char dbName[TSDB_DB_NAME_LEN]; diff --git a/src/inc/sdb.h b/src/inc/sdb.h index a0e0a1b2f2..636a764237 100644 --- a/src/inc/sdb.h +++ b/src/inc/sdb.h @@ -136,6 +136,15 @@ void sdbCleanUpPeers(); int sdbCfgNode(char *cont); +int64_t sdbGetVersion(); + + +#define TSDB_MAX_TABLES 1000 +extern void* tsChildTableSdb; +extern void* tsNormalTableSdb; +extern void* tsStreamTableSdb; +extern void* tsSuperTableSdb; + #ifdef __cplusplus } #endif diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 9333344473..811b9590bb 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -20,6 +20,9 @@ extern "C" { #endif +#include +#include + #include "tsdb.h" #include "taoserror.h" @@ -294,6 +297,11 @@ typedef struct SMColumn { short bytes; } SMColumn; +typedef struct { + int32_t size; + int8_t* data; +} SVariableMsg; + typedef struct { short vnode; int32_t sid; @@ -310,6 +318,9 @@ typedef struct { char reserved[16]; int32_t sversion; SMColumn schema[]; + + SVariableMsg tags; + } SCreateMsg; typedef struct { diff --git a/src/mnode/inc/mgmtChildTable.h b/src/mnode/inc/mgmtChildTable.h index 635c03a110..b9fc058546 100644 --- a/src/mnode/inc/mgmtChildTable.h +++ b/src/mnode/inc/mgmtChildTable.h @@ -24,27 +24,14 @@ extern "C" { #include #include "taosdef.h" -struct SSuperTableObj; - - -typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; - char superTableId[TSDB_TABLE_ID_LEN + 1]; - int64_t uid; - int32_t sid; - int32_t vgId; - int64_t createdTime; - int32_t sversion; - char reserved[3]; - char updateEnd[1]; - struct SSuperTableObj *superTable; -} SChildTableObj; +#include "mnode.h" int32_t mgmtInitChildTables(); void mgmtCleanUpChildTables(); -int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate); -int32_t mgmtDropChildTable(SDbObj *pDb, char *meterId, int ignore); +int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid); +int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable); int32_t mgmtAlterChildTable(SDbObj *pDb, SAlterTableMsg *pAlter); +int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent); SChildTableObj* mgmtGetChildTable(char *tableId); SSchema* mgmtGetChildTableSchema(SChildTableObj *pTable); diff --git a/src/mnode/inc/mgmtNormalTable.h b/src/mnode/inc/mgmtNormalTable.h index 91a80b0329..17cc5721cd 100644 --- a/src/mnode/inc/mgmtNormalTable.h +++ b/src/mnode/inc/mgmtNormalTable.h @@ -23,41 +23,16 @@ extern "C" { #include #include -typedef struct { - char tableId[TSDB_TABLE_ID_LEN + 1]; - int64_t uid; - int32_t sid; - int32_t vgId; - int32_t sversion; // schema version - int32_t createdTime; - - int32_t numOfTags; // for metric - int32_t numOfMeters; // for metric - int32_t numOfColumns; - int32_t schemaSize; - short nextColId; - char tableType : 4; - char status : 3; - char isDirty : 1; // if the table change tag column 1 value - char reserved[15]; - char updateEnd[1]; - - pthread_rwlock_t rwLock; - tSkipList * pSkipList; - struct _tab_obj *pHead; // for metric, a link list for all meters created - // according to this metric - char *pTagData; // TSDB_TABLE_ID_LEN(metric_name)+ - // tags_value1/tags_value2/tags_value3 - struct _tab_obj *prev, *next; - char * pSql; // pointer to SQL, for SC, null-terminated string - char * pReserve1; - char * pReserve2; - char * schema; - // SSchema schema[]; -} SNormalTableObj; - - -int32_t mgmtInitSTable(); +#include "mnode.h" + +int32_t mgmtInitNormalTables(); +void mgmtCleanUpNormalTables(); +int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid); +int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable); +int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int32_t ncols); +int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName); +SNormalTableObj* mgmtGetNormalTable(char *tableId); +SSchema* mgmtGetNormalTableSchema(SNormalTableObj *pTable); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtStreamTable.h b/src/mnode/inc/mgmtStreamTable.h index 26ab05d287..b67f05771a 100644 --- a/src/mnode/inc/mgmtStreamTable.h +++ b/src/mnode/inc/mgmtStreamTable.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TBASE_MNODE_STABLE_H -#define TBASE_MNODE_STABLE_H +#ifndef TBASE_MNODE_STREAM_TABLE_H +#define TBASE_MNODE_STREAM_TABLE_H #ifdef __cplusplus extern "C" { @@ -23,41 +23,16 @@ extern "C" { #include #include -typedef struct { - char meterId[TSDB_TABLE_ID_LEN + 1]; - uint64_t uid; - STableGid gid; +#include "mnode.h" - int32_t sversion; // schema version - int32_t createdTime; +int32_t mgmtInitStreamTables(); +void mgmtCleanUpStreamTables(); +int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid); +int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable); +int32_t mgmtAlterStreamTable(SDbObj *pDb, SAlterTableMsg *pAlter); +SStreamTableObj* mgmtGetStreamTable(char *tableId); +SSchema* mgmtGetStreamTableSchema(SStreamTableObj *pTable); - int32_t numOfTags; // for metric - int32_t numOfMeters; // for metric - int32_t numOfColumns; - int32_t schemaSize; - short nextColId; - char tableType : 4; - char status : 3; - char isDirty : 1; // if the table change tag column 1 value - char reserved[15]; - char updateEnd[1]; - - pthread_rwlock_t rwLock; - tSkipList * pSkipList; - struct _tab_obj *pHead; // for metric, a link list for all meters created - // according to this metric - char *pTagData; // TSDB_TABLE_ID_LEN(metric_name)+ - // tags_value1/tags_value2/tags_value3 - struct _tab_obj *prev, *next; - char * pSql; // pointer to SQL, for SC, null-terminated string - char * pReserve1; - char * pReserve2; - char * schema; - // SSchema schema[]; -} STabObj; - - -int32_t mgmtInitSTable(); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index c997a1b3b0..4b21a6474a 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -24,25 +24,20 @@ extern "C" { #include #include "taosdef.h" - - -typedef struct { - char superTableId[TSDB_TABLE_ID_LEN + 1]; - int64_t uid; - int32_t sid; - int32_t vgId; - int32_t sversion; - int32_t createdTime; - char reserved[7]; - char updateEnd[1]; -} SSuperTableObj; +#include "mnode.h" int32_t mgmtInitSuperTables(); void mgmtCleanUpSuperTables(); int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate); -int32_t mgmtDropSuperTable(SDbObj *pDb, char *meterId, int ignore); -int32_t mgmtAlterSuperTable(SDbObj *pDb, SAlterTableMsg *pAlter); +int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pTable); SSuperTableObj* mgmtGetSuperTable(char *tableId); +int32_t mgmtFindTagCol(SSuperTableObj *pTable, const char *tagName); +int32_t mgmtAddSuperTableTag(SSuperTableObj *pTable, SSchema schema[], int32_t ntags); +int32_t mgmtDropSuperTableTag(SSuperTableObj *pTable, char *tagName); +int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pTable, char *oldTagName, char *newTagName); +int32_t mgmtAddSuperTableColumn(SSuperTableObj *pTable, SSchema schema[], int32_t ncols); +int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pTable, char *colName); + SSchema* mgmtGetSuperTableSchema(SSuperTableObj *pTable); diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index f4a5466ff8..7b445baae3 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -25,24 +25,29 @@ extern "C" { #include #include "mnode.h" + +typedef struct { + ETableType type; + void* obj; +} STableObj; + int mgmtInitMeters(); -STabObj *mgmtGetTable(char *meterId); +STableObj mgmtGetTable(char *tableId); + STabObj *mgmtGetTableInfo(char *src, char *tags[]); int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pInfo); -int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate); -int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore); -int mgmtAlterMeter(SDbObj *pDb, SAlterTableMsg *pAlter); +int mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate); +int mgmtDropTable(SDbObj *pDb, char *meterId, int ignore); +int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter); int mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +int mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn); void mgmtCleanUpMeters(); SSchema *mgmtGetTableSchema(STabObj *pTable); // get schema for a meter -int32_t mgmtFindTagCol(STabObj * pTable, const char * tagName); - int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable); int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable); -int mgmtGetMetricMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); -int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn); +int mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); +int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn); diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 121ae000b0..714080ae7c 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -33,6 +33,10 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn); int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn); void mgmtCleanUpVgroups(); + +SVgObj *mgmtGetAvailVgroup(SDbObj *pDb); +int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup); + #ifdef __cplusplus } #endif diff --git a/src/mnode/src/mgmtChildTable.c b/src/mnode/src/mgmtChildTable.c index ba278f5457..a5396ce473 100644 --- a/src/mnode/src/mgmtChildTable.c +++ b/src/mnode/src/mgmtChildTable.c @@ -35,3 +35,149 @@ #include "ttime.h" #include "tstatus.h" +#include "sdb.h" + +#include "mgmtChildTable.h" +#include "mgmtSuperTable.h" + + + +int32_t mgmtInitChildTables() { + return 0; +} + +void mgmtCleanUpChildTables() { +} + +char *mgmtBuildCreateChildTableMsg(SChildTableObj *pTable, char *pMsg, int vnode) { + +} + +int32_t mgmtCreateChildTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) { + int numOfTables = sdbGetNumOfRows(tsChildTableSdb); + if (numOfTables >= tsMaxTables) { + mError("child table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables); + return TSDB_CODE_TOO_MANY_TABLES; + } + + char *pTagData = (char *)pCreate->schema; // it is a tag key + SSuperTableObj *pSuperTable = mgmtGetSuperTable(pTagData); + if (pSuperTable == NULL) { + mError("table:%s, corresponding super table does not exist", pCreate->meterId); + return TSDB_CODE_INVALID_TABLE; + } + + SChildTableObj *pTable = (SChildTableObj *)calloc(sizeof(SChildTableObj), 1); + if (pTable == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + strcpy(pTable->tableId, pCreate->meterId); + strcpy(pTable->superTableId, pSuperTable->tableId); + pTable->createdTime = taosGetTimestampMs(); + pTable->superTable = pSuperTable; + pTable->vgId = vgId; + pTable->sid = sid; + pTable->uid = (((uint64_t) pTable->vgId) << 40) + ((((uint64_t) pTable->sid) & ((1ul << 24) - 1ul)) << 16) + + ((uint64_t) sdbGetVersion() & ((1ul << 16) - 1ul)); + + SVariableMsg tags = {0}; + tags.size = mgmtGetTagsLength(pSuperTable, INT_MAX) + (uint32_t)TSDB_TABLE_ID_LEN; + tags.data = (char *)calloc(1, tags.size); + if (tags.data == NULL) { + free(pTable); + mError("table:%s, corresponding super table schema is null", pCreate->meterId); + return TSDB_CODE_INVALID_TABLE; + } + memcpy(tags.data, pTagData, tags.size); + + if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) { + mError("table:%s, update sdb error", pCreate->meterId); + return TSDB_CODE_SDB_ERROR; + } + + mgmtAddTimeSeries(pTable->superTable->numOfColumns - 1); + mgmtSendCreateMsgToVgroup(pTable, pVgroup); + + mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", + pTable->tableId, vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + + return 0; +} + +int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) { + SVgObj *pVgroup; + SAcctObj *pAcct; + + pAcct = mgmtGetAcct(pDb->cfg.acct); + + if (pAcct != NULL) { + pAcct->acctInfo.numOfTimeSeries -= (pTable->superTable->numOfColumns - 1); + } + + pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + return TSDB_CODE_OTHERS; + } + + mgmtRestoreTimeSeries(pTable->superTable->numOfColumns - 1); + mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup); + sdbDeleteRow(tsChildTableSdb, pTable); + + if (pVgroup->numOfMeters <= 0) { + mgmtDropVgroup(pDb, pVgroup); + } + + return 0; +} + +SChildTableObj* mgmtGetChildTable(char *tableId) { + return (SChildTableObj *)sdbGetRow(tsChildTableSdb, tableId); +} + +int32_t mgmtModifyChildTableTagValueByName(SChildTableObj *pTable, char *tagName, char *nContent) { + int col = mgmtFindTagCol(pTable->superTable, tagName); + if (col < 0 || col > pTable->superTable->numOfTags) { + return TSDB_CODE_APP_ERROR; + } + + //TODO send msg to dnode + mTrace("Succeed to modify tag column %d of table %s", col, pTable->tableId); + return TSDB_CODE_SUCCESS; + +// int rowSize = 0; +// SSchema *schema = (SSchema *)(pSuperTable->schema + (pSuperTable->numOfColumns + col) * sizeof(SSchema)); +// +// if (col == 0) { +// pTable->isDirty = 1; +// removeMeterFromMetricIndex(pSuperTable, pTable); +// } +// memcpy(pTable->pTagData + mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN, nContent, schema->bytes); +// if (col == 0) { +// addMeterIntoMetricIndex(pMetric, pTable); +// } +// +// // Encode the string +// int size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1; +// char *msg = (char *)malloc(size); +// if (msg == NULL) { +// mError("failed to allocate message memory while modify tag value"); +// return TSDB_CODE_APP_ERROR; +// } +// memset(msg, 0, size); +// +// mgmtMeterActionEncode(pTable, msg, size, &rowSize); +// +// int32_t ret = sdbUpdateRow(meterSdb, msg, rowSize, 1); // Need callback function +// tfree(msg); +// +// if (pTable->isDirty) pTable->isDirty = 0; +// +// if (ret < 0) { +// mError("Failed to modify tag column %d of table %s", col, pTable->meterId); +// return TSDB_CODE_APP_ERROR; +// } +// +// mTrace("Succeed to modify tag column %d of table %s", col, pTable->meterId); +// return TSDB_CODE_SUCCESS; +} + diff --git a/src/mnode/src/mgmtDb.c b/src/mnode/src/mgmtDb.c index c8bed2e497..0e324d7eda 100644 --- a/src/mnode/src/mgmtDb.c +++ b/src/mnode/src/mgmtDb.c @@ -282,7 +282,7 @@ void mgmtDropDbFromSdb(SDbObj *pDb) { STabObj *pMetric = pDb->pMetric; while (pMetric) { STabObj *pNext = pMetric->next; - mgmtDropMeter(pDb, pMetric->meterId, 0); + mgmtDropTable(pDb, pMetric->meterId, 0); pMetric = pNext; } @@ -324,7 +324,7 @@ int mgmtDropDbByName(SAcctObj *pAcct, char *name, short ignoreNotExists) { } if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - return TSDB_CODE_MONITOR_DB_FORBEIDDEN; + return TSDB_CODE_MONITOR_DB_FORBEIDEN; } return mgmtDropDb(pDb); diff --git a/src/mnode/src/mgmtDnodeInt.c b/src/mnode/src/mgmtDnodeInt.c index 6c487c798a..3576003d33 100644 --- a/src/mnode/src/mgmtDnodeInt.c +++ b/src/mnode/src/mgmtDnodeInt.c @@ -229,7 +229,7 @@ char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode) { return pMsg; } -int mgmtSendCreateMsgToVgroup(STabObj *pTable, SVgObj *pVgroup) { +int mgmtSendCreateMsgToVgroup(STabObj table, SVgObj *pVgroup) { char * pMsg, *pStart; int i, msgLen = 0; SDnodeObj *pObj; diff --git a/src/mnode/src/mgmtNormalTable.c b/src/mnode/src/mgmtNormalTable.c index ba278f5457..c4e2cfbfe1 100644 --- a/src/mnode/src/mgmtNormalTable.c +++ b/src/mnode/src/mgmtNormalTable.c @@ -35,3 +35,184 @@ #include "ttime.h" #include "tstatus.h" + +#include "sdb.h" +#include "mgmtNormalTable.h" + +int32_t mgmtInitNormalTables() { + return 0; +} + +void mgmtCleanUpNormalTables() { + sdbCloseTable(tsNormalTableSdb); +} + +int32_t mgmtCreateNormalTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) { + int numOfTables = sdbGetNumOfRows(tsChildTableSdb); + if (numOfTables >= TSDB_MAX_TABLES) { + mError("normal table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); + return TSDB_CODE_TOO_MANY_TABLES; + } + + SNormalTableObj *pTable = (SNormalTableObj *)calloc(sizeof(SNormalTableObj), 1); + if (pTable == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + strcpy(pTable->tableId, pCreate->meterId); + pTable->createdTime = taosGetTimestampMs(); + pTable->vgId = vgId; + pTable->sid = sid; + pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul)); + pTable->sversion = 0; + pTable->numOfColumns = pCreate->numOfColumns; + + int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; + pTable->schemaSize = numOfCols * sizeof(SSchema); + pTable->schema = (int8_t *)calloc(1, pTable->schemaSize); + if (pTable->schema == NULL) { + free(pTable); + mError("table:%s, no schema input", pCreate->meterId); + return TSDB_CODE_INVALID_TABLE; + } + memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); + + pTable->nextColId = 0; + for (int col = 0; col < pCreate->numOfColumns; col++) { + SSchema *tschema = (SSchema *)pTable->schema; + tschema[col].colId = pTable->nextColId++; + } + + if (sdbInsertRow(tsNormalTableSdb, pTable, 0) < 0) { + mError("table:%s, update sdb error", pCreate->meterId); + return TSDB_CODE_SDB_ERROR; + } + +// mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d", +// pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode); +// +// mgmtAddTimeSeries(pTable->numOfColumns - 1); +// mgmtSendCreateMsgToVgroup(pTable, pVgroup); + + mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", + pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + + return 0; +} + +int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) { + SVgObj *pVgroup; + SAcctObj *pAcct; + + pAcct = mgmtGetAcct(pDb->cfg.acct); + + if (pAcct != NULL) { + pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); + } + + pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + return TSDB_CODE_OTHERS; + } + + mgmtRestoreTimeSeries(pTable->numOfColumns - 1); + + mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup); + + sdbDeleteRow(tsChildTableSdb, pTable); + + if (pVgroup->numOfMeters <= 0) { + mgmtDropVgroup(pDb, pVgroup); + } + + return 0; +} + +SNormalTableObj* mgmtGetNormalTable(char *tableId) { + return (SNormalTableObj *)sdbGetRow(tsNormalTableSdb, tableId); +} + +static int32_t mgmtFindNormalTableColumnIndex(SNormalTableObj *pTable, char *colName) { + SSchema *schema = (SSchema *) pTable->schema; + for (int32_t i = 0; i < pTable->numOfColumns; i++) { + if (strcasecmp(schema[i].name, colName) == 0) { + return i; + } + } + + return -1; +} + +int32_t mgmtAddNormalTableColumn(SNormalTableObj *pTable, SSchema schema[], int ncols) { + if (ncols <= 0) { + return TSDB_CODE_APP_ERROR; + } + + for (int i = 0; i < ncols; i++) { + if (mgmtFindNormalTableColumnIndex(pTable, schema[i].name) > 0) { + return TSDB_CODE_APP_ERROR; + } + } + + SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId); + if (pDb == NULL) { + mError("table: %s not belongs to any database", pTable->tableId); + return TSDB_CODE_APP_ERROR; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("DB: %s not belongs to andy account", pDb->name); + return TSDB_CODE_APP_ERROR; + } + + pTable->schema = realloc(pTable->schema, pTable->schemaSize + sizeof(SSchema) * ncols); + + memcpy(pTable->schema + pTable->schemaSize, schema, sizeof(SSchema) * ncols); + + SSchema *tschema = (SSchema *) (pTable->schema + sizeof(SSchema) * pTable->numOfColumns); + for (int i = 0; i < ncols; i++) { + tschema[i].colId = pTable->nextColId++; + } + + pTable->schemaSize += sizeof(SSchema) * ncols; + pTable->numOfColumns += ncols; + pTable->sversion++; + pAcct->acctInfo.numOfTimeSeries += ncols; + + sdbUpdateRow(tsNormalTableSdb, pTable, 0, 1); + return TSDB_CODE_SUCCESS; +} + +int32_t mgmtDropNormalTableColumnByName(SNormalTableObj *pTable, char *colName) { + int32_t col = mgmtFindNormalTableColumnIndex(pTable, colName); + if (col < 0) { + return TSDB_CODE_APP_ERROR; + } + + SDbObj *pDb = mgmtGetDbByMeterId(pTable->tableId); + if (pDb == NULL) { + mError("table: %s not belongs to any database", pTable->tableId); + return TSDB_CODE_APP_ERROR; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("DB: %s not belongs to any account", pDb->name); + return TSDB_CODE_APP_ERROR; + } + + memmove(pTable->schema + sizeof(SSchema) * col, pTable->schema + sizeof(SSchema) * (col + 1), + sizeof(SSchema) * (pTable->numOfColumns - col - 1)); + + + pTable->schemaSize -= sizeof(SSchema); + pTable->numOfColumns--; + pTable->schema = realloc(pTable->schema, pTable->schemaSize); + pTable->sversion++; + + pAcct->acctInfo.numOfTimeSeries--; + sdbUpdateRow(tsNormalTableSdb, pTable, 0, 1); + + return TSDB_CODE_SUCCESS; +} diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 1204ba9720..aea091b752 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -230,7 +230,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) { pCreateMsg->meterId, pDb, pDb->name, pMeterDb, pMeterDb->name); assert(pDb == pMeterDb); - int32_t code = mgmtCreateMeter(pDb, pCreateMsg); + int32_t code = mgmtCreateTable(pDb, pCreateMsg); char stableName[TSDB_TABLE_ID_LEN] = {0}; strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN); @@ -896,7 +896,7 @@ static void mgmtInitShowMsgFp() { mgmtGetMetaFp[TSDB_MGMT_TABLE_DNODE] = mgmtGetDnodeMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_MNODE] = mgmtGetMnodeMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_VGROUP] = mgmtGetVgroupMeta; - mgmtGetMetaFp[TSDB_MGMT_TABLE_METRIC] = mgmtGetMetricMeta; + mgmtGetMetaFp[TSDB_MGMT_TABLE_METRIC] = mgmtGetSuperTableMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_MODULE] = mgmtGetModuleMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_QUERIES] = mgmtGetQueryMeta; mgmtGetMetaFp[TSDB_MGMT_TABLE_STREAMS] = mgmtGetStreamMeta; @@ -910,11 +910,11 @@ static void mgmtInitShowMsgFp() { mgmtRetrieveFp[TSDB_MGMT_TABLE_ACCT] = mgmtRetrieveAccts; mgmtRetrieveFp[TSDB_MGMT_TABLE_USER] = mgmtRetrieveUsers; mgmtRetrieveFp[TSDB_MGMT_TABLE_DB] = mgmtRetrieveDbs; - mgmtRetrieveFp[TSDB_MGMT_TABLE_TABLE] = mgmtRetrieveMeters; + mgmtRetrieveFp[TSDB_MGMT_TABLE_TABLE] = mgmtRetrieveTables; mgmtRetrieveFp[TSDB_MGMT_TABLE_DNODE] = mgmtRetrieveDnodes; mgmtRetrieveFp[TSDB_MGMT_TABLE_MNODE] = mgmtRetrieveMnodes; mgmtRetrieveFp[TSDB_MGMT_TABLE_VGROUP] = mgmtRetrieveVgroups; - mgmtRetrieveFp[TSDB_MGMT_TABLE_METRIC] = mgmtRetrieveMetrics; + mgmtRetrieveFp[TSDB_MGMT_TABLE_METRIC] = mgmtRetrieveSuperTables; mgmtRetrieveFp[TSDB_MGMT_TABLE_MODULE] = mgmtRetrieveModules; mgmtRetrieveFp[TSDB_MGMT_TABLE_QUERIES] = mgmtRetrieveQueries; mgmtRetrieveFp[TSDB_MGMT_TABLE_STREAMS] = mgmtRetrieveStreams; @@ -1097,7 +1097,7 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); if (pDb) { - code = mgmtCreateMeter(pDb, pCreate); + code = mgmtCreateTable(pDb, pCreate); } else { code = TSDB_CODE_DB_NOT_SELECTED; } @@ -1139,7 +1139,7 @@ int mgmtProcessDropTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { SDbObj *pDb = NULL; if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); - code = mgmtDropMeter(pDb, pDrop->meterId, pDrop->igNotExists); + code = mgmtDropTable(pDb, pDrop->meterId, pDrop->igNotExists); if (code == 0) { mTrace("meter:%s is dropped by user:%s", pDrop->meterId, pConn->pUser->user); // mLPrint("meter:%s is dropped by user:%s", pDrop->meterId, pConn->pUser->user); @@ -1177,7 +1177,7 @@ int mgmtProcessAlterTableMsg(char *pMsg, int msgLen, SConnObj *pConn) { pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes); } - code = mgmtAlterMeter(pDb, pAlter); + code = mgmtAlterTable(pDb, pAlter); if (code == 0) { mLPrint("meter:%s is altered by %s", pAlter->meterId, pConn->pUser->user); } diff --git a/src/mnode/src/mgmtStreamTable.c b/src/mnode/src/mgmtStreamTable.c index ba278f5457..82d93372ee 100644 --- a/src/mnode/src/mgmtStreamTable.c +++ b/src/mnode/src/mgmtStreamTable.c @@ -23,7 +23,7 @@ #include "mgmtDb.h" #include "mgmtDnodeInt.h" #include "mgmtVgroup.h" -#include "mgmtSupertableQuery.h" +#include "mgmtStreamtableQuery.h" #include "mgmtTable.h" #include "taosmsg.h" #include "tast.h" @@ -35,3 +35,103 @@ #include "ttime.h" #include "tstatus.h" + +#include "sdb.h" +#include "mgmtStreamTable.h" + +int32_t mgmtInitStreamTables() { + return 0; +} + +void mgmtCleanUpStreamTables() { +} + +int32_t mgmtCreateStreamTable(SDbObj *pDb, SCreateTableMsg *pCreate, int32_t vgId, int32_t sid) { + int numOfTables = sdbGetNumOfRows(tsStreamTableSdb); + if (numOfTables >= TSDB_MAX_TABLES) { + mError("stream table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); + return TSDB_CODE_TOO_MANY_TABLES; + } + + SStreamTableObj *pTable = (SStreamTableObj *)calloc(sizeof(SStreamTableObj), 1); + if (pTable == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + strcpy(pTable->tableId, pCreate->meterId); + pTable->createdTime = taosGetTimestampMs(); + pTable->vgId = vgId; + pTable->sid = sid; + pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul)); + pTable->sversion = 0; + pTable->numOfColumns = pCreate->numOfColumns; + + int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; + pTable->schemaSize = numOfCols * sizeof(SSchema) + pCreate->sqlLen; + pTable->schema = (int8_t *)calloc(1, pTable->schemaSize); + if (pTable->schema == NULL) { + free(pTable); + mError("table:%s, no schema input", pCreate->meterId); + return TSDB_CODE_INVALID_TABLE; + } + memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); + + pTable->nextColId = 0; + for (int col = 0; col < pCreate->numOfColumns; col++) { + SSchema *tschema = (SSchema *)pTable->schema; + tschema[col].colId = pTable->nextColId++; + } + + pTable->pSql = pTable->schema + numOfCols * sizeof(SSchema); + memcpy(pTable->pSql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen); + pTable->pSql[pCreate->sqlLen - 1] = 0; + mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->pSql); + + if (sdbInsertRow(tsStreamTableSdb, pTable, 0) < 0) { + mError("table:%s, update sdb error", pCreate->meterId); + return TSDB_CODE_SDB_ERROR; + } + +// mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d", +// pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode); +// +// mgmtAddTimeSeries(pTable->numOfColumns - 1); +// mgmtSendCreateMsgToVgroup(pTable, pVgroup); + + mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", + pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + + return 0; +} + +int32_t mgmtDropStreamTable(SDbObj *pDb, SStreamTableObj *pTable) { + SVgObj * pVgroup; + SAcctObj *pAcct; + + pAcct = mgmtGetAcct(pDb->cfg.acct); + + if (pAcct != NULL) { + pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); + } + + pVgroup = mgmtGetVgroup(pTable->vgId); + if (pVgroup == NULL) { + return TSDB_CODE_OTHERS; + } + + mgmtRestoreTimeSeries(pTable->numOfColumns - 1); + + mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup); + + sdbDeleteRow(tsChildTableSdb, pTable); + + if (pVgroup->numOfMeters <= 0) { + mgmtDropVgroup(pDb, pVgroup); + } + + return 0; +} + +SStreamTableObj* mgmtGetStreamTable(char *tableId); { + return (SStreamTableObj *)sdbGetRow(tsStreamTableSdb, tableId); +} \ No newline at end of file diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index ba278f5457..5092ee8a74 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -35,3 +35,439 @@ #include "ttime.h" #include "tstatus.h" + +#include "sdb.h" +#include "mgmtSuperTable.h" +#include "mgmtChildTable.h" + + +#define mgmtDestroyMeter(pMetric) \ + do { \ + tfree(pMetric->schema); \ + pMetric->pSkipList = tSkipListDestroy((pMetric)->pSkipList); \ + tfree(pMetric); \ + } while (0) + + +typedef struct { + char meterId[TSDB_TABLE_ID_LEN + 1]; + char type; + uint32_t cols; + char data[]; +} SMeterBatchUpdateMsg; + +typedef struct { + int32_t col; + int32_t pos; + SSchema schema; +} SchemaUnit; + +typedef struct { + char meterId[TSDB_TABLE_ID_LEN + 1]; + char action; + int32_t dataSize; + char data[]; +} SMeterUpdateMsg; + +int32_t mgmtInitSuperTables() { + return 0; +} + +void mgmtCleanUpSuperTables() { +} + +int32_t mgmtCreateSuperTable(SDbObj *pDb, SCreateTableMsg *pCreate) { + int numOfTables = sdbGetNumOfRows(tsSuperTableSdb); + if (numOfTables >= TSDB_MAX_TABLES) { + mError("super table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, TSDB_MAX_TABLES); + return TSDB_CODE_TOO_MANY_TABLES; + } + + SSuperTableObj *pMetric = (SSuperTableObj *)calloc(sizeof(SSuperTableObj), 1); + if (pMetric == NULL) { + return TSDB_CODE_SERV_OUT_OF_MEMORY; + } + + strcpy(pMetric->tableId, pCreate->meterId); + pMetric->createdTime = taosGetTimestampMs(); + pMetric->vgId = 0; + pMetric->sid = 0; + pMetric->uid = (((uint64_t)pMetric->createdTime) << 16) + ((uint64_t)sdbGetVersion() & ((1ul << 16) - 1ul)); + pMetric->sversion = 0; + pMetric->numOfColumns = pCreate->numOfColumns; + pMetric->numOfTags = pCreate->numOfTags; + pMetric->numOfMeters = 0; + + int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; + pMetric->schemaSize = numOfCols * sizeof(SSchema); + pMetric->schema = (int8_t *)calloc(1, pMetric->schemaSize); + if (pMetric->schema == NULL) { + free(pMetric); + mError("table:%s, no schema input", pCreate->meterId); + return TSDB_CODE_INVALID_TABLE; + } + memcpy(pMetric->schema, pCreate->schema, numOfCols * sizeof(SSchema)); + + pMetric->nextColId = 0; + for (int col = 0; col < pCreate->numOfColumns; col++) { + SSchema *tschema = (SSchema *)pMetric->schema; + tschema[col].colId = pMetric->nextColId++; + } + + if (sdbInsertRow(tsSuperTableSdb, pMetric, 0) < 0) { + mError("table:%s, update sdb error", pCreate->meterId); + return TSDB_CODE_SDB_ERROR; + } + + return 0; +} + +int32_t mgmtDropSuperTable(SDbObj *pDb, SSuperTableObj *pSuperTable) { + SChildTableObj *pMetric; + while ((pMetric = pSuperTable->pHead) != NULL) { + mgmtDropChildTable(pDb, pMetric); + } + sdbDeleteRow(tsSuperTableSdb, pMetric); +} + +SSuperTableObj* mgmtGetSuperTable(char *tableId) { + return (SSuperTableObj *)sdbGetRow(tsSuperTableSdb, tableId); +} + +int32_t mgmtFindTagCol(SSuperTableObj *pMetric, const char *tagName) { + for (int i = 0; i < pMetric->numOfTags; i++) { + SSchema *schema = (SSchema *)(pMetric->schema + (pMetric->numOfColumns + i) * sizeof(SSchema)); + if (strcasecmp(tagName, schema->name) == 0) { + return i; + } + } + + return -1; +} + +int32_t mgmtAddSuperTableTag(SSuperTableObj *pMetric, SSchema schema[], int32_t ntags) { + if (pMetric->numOfTags + ntags > TSDB_MAX_TAGS) { + return TSDB_CODE_APP_ERROR; + } + + // check if schemas have the same name + for (int i = 1; i < ntags; i++) { + for (int j = 0; j < i; j++) { + if (strcasecmp(schema[i].name, schema[j].name) == 0) { + return TSDB_CODE_APP_ERROR; + } + } + } + + for (int i = 0; i < ntags; i++) { + if (mgmtFindTagCol(pMetric, schema[i].name) >= 0) { + return TSDB_CODE_APP_ERROR; + } + } + + uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SSchema) * ntags; + SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *) malloc(size); + memset(msg, 0, size); + + memcpy(msg->meterId, pMetric->tableId, TSDB_TABLE_ID_LEN); + msg->type = SDB_TYPE_INSERT; + msg->cols = ntags; + memcpy(msg->data, schema, sizeof(SSchema) * ntags); + + int32_t ret = sdbBatchUpdateRow(tsSuperTableSdb, msg, size); + tfree(msg); + + if (ret < 0) { + mError("Failed to add tag column %s to table %s", schema[0].name, pMetric->tableId); + return TSDB_CODE_APP_ERROR; + } + + mTrace("Succeed to add tag column %s to table %s", schema[0].name, pMetric->tableId); + return TSDB_CODE_SUCCESS; +} + +int32_t mgmtDropSuperTableTag(SSuperTableObj *pMetric, char *tagName) { + int col = mgmtFindTagCol(pMetric, tagName); + if (col <= 0 || col >= pMetric->numOfTags) { + return TSDB_CODE_APP_ERROR; + } + + // Pack message to do batch update + uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SchemaUnit); + SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *) malloc(size); + memset(msg, 0, size); + + memcpy(msg->meterId, pMetric->tableId, TSDB_TABLE_ID_LEN); + msg->type = SDB_TYPE_DELETE; + msg->cols = 1; + + ((SchemaUnit *) (msg->data))->col = col; + ((SchemaUnit *) (msg->data))->pos = mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN; + ((SchemaUnit *) (msg->data))->schema = *(SSchema *) (pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + col)); + + int32_t ret = sdbBatchUpdateRow(tsSuperTableSdb, msg, size); + tfree(msg); + + if (ret < 0) { + mError("Failed to drop tag column: %d from table: %s", col, pMetric->tableId); + return TSDB_CODE_APP_ERROR; + } + + mTrace("Succeed to drop tag column: %d from table: %s", col, pMetric->tableId); + return TSDB_CODE_SUCCESS; +} + +int32_t mgmtModifySuperTableTagNameByName(SSuperTableObj *pMetric, char *oldTagName, char *newTagName) { + int col = mgmtFindTagCol(pMetric, oldTagName); + if (col < 0) { + // Tag name does not exist + mError("Failed to modify table %s tag column, oname: %s, nname: %s", pMetric->tableId, oldTagName, newTagName); + return TSDB_CODE_INVALID_MSG_TYPE; + } + + int rowSize = 0; + uint32_t len = strlen(newTagName); + + if (col >= pMetric->numOfTags || len >= TSDB_COL_NAME_LEN || mgmtFindTagCol(pMetric, newTagName) >= 0) { + return TSDB_CODE_APP_ERROR; + } + + // update + SSchema *schema = (SSchema *) (pMetric->schema + (pMetric->numOfColumns + col) * sizeof(SSchema)); + strncpy(schema->name, newTagName, TSDB_COL_NAME_LEN); + + // Encode string + int size = 1 + sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW; + char *msg = (char *) malloc(size); + if (msg == NULL) return TSDB_CODE_APP_ERROR; + memset(msg, 0, size); + + mgmtMeterActionEncode(pMetric, msg, size, &rowSize); + + int32_t ret = sdbUpdateRow(tsSuperTableSdb, msg, rowSize, 1); + tfree(msg); + + if (ret < 0) { + mError("Failed to modify table %s tag column", pMetric->tableId); + return TSDB_CODE_APP_ERROR; + } + + mTrace("Succeed to modify table %s tag column", pMetric->tableId); + return TSDB_CODE_SUCCESS; +} + + +static int32_t mgmtFindSuperTableColumnIndex(SNormalTableObj *pMetric, char *colName) { + SSchema *schema = (SSchema *) pMetric->schema; + for (int32_t i = 0; i < pMetric->numOfColumns; i++) { + if (strcasecmp(schema[i].name, colName) == 0) { + return i; + } + } + + return -1; +} + +int32_t mgmtAddSuperTableColumn(SSuperTableObj *pMetric, SSchema schema[], int ncols) { + if (ncols <= 0) { + return TSDB_CODE_APP_ERROR; + } + + for (int i = 0; i < ncols; i++) { + if (mgmtFindSuperTableColumnIndex(pMetric, schema[i].name) > 0) { + return TSDB_CODE_APP_ERROR; + } + } + + SDbObj *pDb = mgmtGetDbByMeterId(pMetric->tableId); + if (pDb == NULL) { + mError("meter: %s not belongs to any database", pMetric->tableId); + return TSDB_CODE_APP_ERROR; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("DB: %s not belongs to andy account", pDb->name); + return TSDB_CODE_APP_ERROR; + } + + pMetric->schema = realloc(pMetric->schema, pMetric->schemaSize + sizeof(SSchema) * ncols); + + memmove(pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + ncols), + pMetric->schema + sizeof(SSchema) * pMetric->numOfColumns, sizeof(SSchema) * pMetric->numOfTags); + memcpy(pMetric->schema + sizeof(SSchema) * pMetric->numOfColumns, schema, sizeof(SSchema) * ncols); + + SSchema *tschema = (SSchema *) (pMetric->schema + sizeof(SSchema) * pMetric->numOfColumns); + for (int i = 0; i < ncols; i++) { + tschema[i].colId = pMetric->nextColId++; + } + + pMetric->schemaSize += sizeof(SSchema) * ncols; + pMetric->numOfColumns += ncols; + pMetric->sversion++; + + pAcct->acctInfo.numOfTimeSeries += (ncols * pMetric->numOfMeters); + sdbUpdateRow(tsSuperTableSdb, pMetric, 0, 1); + + return TSDB_CODE_SUCCESS; +} + +int32_t mgmtDropSuperTableColumnByName(SSuperTableObj *pMetric, char *colName) { + int32_t col = mgmtFindSuperTableColumnIndex(pMetric, colName); + if (col < 0) { + return TSDB_CODE_APP_ERROR; + } + + SDbObj *pDb = mgmtGetDbByMeterId(pMetric->tableId); + if (pDb == NULL) { + mError("table: %s not belongs to any database", pMetric->tableId); + return TSDB_CODE_APP_ERROR; + } + + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + if (pAcct == NULL) { + mError("DB: %s not belongs to any account", pDb->name); + return TSDB_CODE_APP_ERROR; + } + + memmove(pMetric->schema + sizeof(SSchema) * col, pMetric->schema + sizeof(SSchema) * (col + 1), + sizeof(SSchema) * (pMetric->numOfColumns + pMetric->numOfTags - col - 1)); + + pMetric->schemaSize -= sizeof(SSchema); + pMetric->numOfColumns--; + pMetric->schema = realloc(pMetric->schema, pMetric->schemaSize); + pMetric->sversion++; + + pAcct->acctInfo.numOfTimeSeries -= (pMetric->numOfMeters); + sdbUpdateRow(tsSuperTableSdb, pMetric, 0, 1); + + return TSDB_CODE_SUCCESS; +} + + +int mgmtGetSuperTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + int cols = 0; + + SDbObj *pDb = NULL; + if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); + + if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; + + SSchema *pSchema = tsGetSchema(pMeta); + + pShow->bytes[cols] = TSDB_METER_NAME_LEN; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "name"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "created_time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "columns"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "tags"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "tables"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + + pShow->numOfRows = pDb->numOfMetrics; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + return 0; +} + +int mgmtRetrieveSuperTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { + int numOfRows = 0; + char * pWrite; + int cols = 0; + SSuperTableObj *pTable = NULL; + char prefix[20] = {0}; + int32_t prefixLen; + + SDbObj *pDb = NULL; + if (pConn->pDb != NULL) { + pDb = mgmtGetDb(pConn->pDb->name); + } + + if (pDb == NULL) { + return 0; + } + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) { + return 0; + } + } + + strcpy(prefix, pDb->name); + strcat(prefix, TS_PATH_DELIMITER); + prefixLen = strlen(prefix); + + SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; + char metricName[TSDB_METER_NAME_LEN] = {0}; + + while (numOfRows < rows) { + pTable = (SSuperTableObj *)pShow->pNode; + if (pTable == NULL) break; + pShow->pNode = (void *)pTable->next; + + if (strncmp(pTable->tableId, prefix, prefixLen)) { + continue; + } + + memset(metricName, 0, tListLen(metricName)); + extractTableName(pTable->tableId, metricName); + + if (pShow->payloadLen > 0 && + patternMatch(pShow->payload, metricName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) + continue; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + extractTableName(pTable->tableId, pWrite); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pTable->createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pTable->numOfColumns; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pTable->numOfTags; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pTable->numOfMeters; + cols++; + + numOfRows++; + } + + pShow->numOfReads += numOfRows; + return numOfRows; +} \ No newline at end of file diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 30b59aa1a1..29d4fb03ac 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -40,43 +40,10 @@ #include "mgmtNormalTable.h" #include "mgmtStreamTable.h" +#include "taoserror.h" extern int64_t sdbVersion; -#define mgmtDestroyMeter(pTable) \ - do { \ - tfree(pTable->schema); \ - pTable->pSkipList = tSkipListDestroy((pTable)->pSkipList); \ - tfree(pTable); \ - } while (0) - -enum _Meter_Update_Action { - METER_UPDATE_TAG_NAME, - METER_UPDATE_TAG_VALUE, - METER_UPDATE_TAG_VALUE_COL0, - METER_UPDATE_NULL, - MAX_METER_UPDATE_ACTION -}; - -typedef struct { - int32_t col; - int32_t pos; - SSchema schema; -} SchemaUnit; - -typedef struct { - char meterId[TSDB_TABLE_ID_LEN + 1]; - char type; - uint32_t cols; - char data[]; -} SMeterBatchUpdateMsg; - -typedef struct { - char meterId[TSDB_TABLE_ID_LEN + 1]; - char action; - int32_t dataSize; - char data[]; -} SMeterUpdateMsg; void *meterSdb = NULL; void *(*mgmtMeterActionFp[SDB_MAX_ACTION_TYPES])(void *row, char *str, int size, int *ssize); @@ -532,32 +499,39 @@ int mgmtInitMeters() { return 0; } -STabObj *mgmtGetTable(char *meterId) { return (STabObj *)sdbGetRow(meterSdb, meterId); } +STableObj mgmtGetTable(char *tableId) { + STableObj table = {.type = TSDB_TABLE_TYPE_MAX, .obj = NULL}; -int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) { - STabObj * pTable = NULL; - STabObj * pMetric = NULL; - SVgObj * pVgroup = NULL; - int size = 0; - SAcctObj *pAcct = NULL; - - int numOfTables = sdbGetNumOfRows(meterSdb); - if (numOfTables >= tsMaxTables) { - mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables); - return TSDB_CODE_TOO_MANY_TABLES; + table.obj = mgmtGetSuperTable(tableId); + if (table.obj != NULL) { + table.type = TSDB_TABLE_TYPE_SUPER_TABLE; + return table; } - pAcct = mgmtGetAcct(pDb->cfg.acct); - assert(pAcct != NULL); - int code = mgmtCheckTableLimit(pAcct, pCreate); - if (code != 0) { - mError("table:%s, exceed the limit", pCreate->meterId); - return code; + table.obj = mgmtGetNormalTable(tableId); + if (table.obj != NULL) { + table.type = TSDB_TABLE_TYPE_NORMAL_TABLE; + return table; } - // does table exist? - pTable = mgmtGetTable(pCreate->meterId); - if (pTable) { + table.obj = mgmtGetStreamTable(tableId); + if (table.obj != NULL) { + table.type = TSDB_TABLE_TYPE_STREAM_TABLE; + return table; + } + + table.obj = mgmtGetNormalTable(tableId); + if (table.obj != NULL) { + table.type = TSDB_TABLE_TYPE_CHILD_TABLE; + return table; + } + + return table; +} + +int32_t mgmtCreateTable(SDbObj *pDb, SCreateTableMsg *pCreate) { + STableObj table = mgmtGetTable(pCreate->meterId); + if (table.obj != NULL) { if (pCreate->igExists) { return TSDB_CODE_SUCCESS; } else { @@ -565,183 +539,51 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) { } } - // Create the table object - pTable = (STabObj *)malloc(sizeof(STabObj)); - if (pTable == NULL) return TSDB_CODE_SERV_OUT_OF_MEMORY; - memset(pTable, 0, sizeof(STabObj)); - - // - ETableType tableType = TSDB_TABLE_TYPE_MAX; - char *tagData = NULL; - int32_t tagDataSize = 0; - - if (pCreate->numOfColumns == 0 && pCreate->numOfTags == 0) { - tableType = TSDB_TABLE_TYPE_CHILD_TABLE; - tagData = (char *)pCreate->schema; // it is a tag key - - SSuperTableObj *pSuperTable = mgmtGetSuperTable(tagData); - if (pSuperTable == NULL) { - mError("table:%s, corresponding super table does not exist", pCreate->meterId); - return TSDB_CODE_INVALID_TABLE; - } - - tagDataSize = mgmtGetTagsLength(pMetric, INT_MAX) + (uint32_t)TSDB_TABLE_ID_LEN; - - - /* - * for meters created according to metrics, the schema of this meter isn't needed. - * so, we don't allocate memory for it in order to save a huge amount of - * memory when a large amount of meters are created according to this super table. - */ - size = mgmtGetTagsLength(pMetric, INT_MAX) + (uint32_t)TSDB_TABLE_ID_LEN; - pTable->schema = (char *)malloc(size); - if (pTable->schema == NULL) { - mgmtDestroyMeter(pTable); - mError("table:%s, corresponding super table schema is null", pCreate->meterId); - return TSDB_CODE_INVALID_TABLE; - } - memset(pTable->schema, 0, size); - - pTable->schemaSize = size; - - pTable->numOfColumns = pMetric->numOfColumns; - pTable->sversion = pMetric->sversion; - pTable->pTagData = pTable->schema; - pTable->nextColId = pMetric->nextColId; - memcpy(pTable->pTagData, pTagData, size); - } else { - int numOfCols = pCreate->numOfColumns + pCreate->numOfTags; - size = numOfCols * sizeof(SSchema) + pCreate->sqlLen; - pTable->schema = (char *)malloc(size); - if (pTable->schema == NULL) { - mgmtDestroyMeter(pTable); - mError("table:%s, no schema input", pCreate->meterId); - return TSDB_CODE_SERV_OUT_OF_MEMORY; - } - memset(pTable->schema, 0, size); - - pTable->numOfColumns = pCreate->numOfColumns; - pTable->sversion = 0; - pTable->numOfTags = pCreate->numOfTags; - pTable->schemaSize = size; - memcpy(pTable->schema, pCreate->schema, numOfCols * sizeof(SSchema)); - - for (int k = 0; k < pCreate->numOfColumns; k++) { - SSchema *tschema = (SSchema *)pTable->schema; - tschema[k].colId = pTable->nextColId++; - } - - if (pCreate->sqlLen > 0) { - pTable->tableType = TSDB_TABLE_TYPE_STREAM_TABLE; - pTable->pSql = pTable->schema + numOfCols * sizeof(SSchema); - memcpy(pTable->pSql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen); - pTable->pSql[pCreate->sqlLen - 1] = 0; - mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pTable->pSql); - } else { - if (pCreate->numOfTags > 0) { - pTable->tableType = TSDB_TABLE_TYPE_SUPER_TABLE; - } else { - pTable->tableType = TSDB_TABLE_TYPE_NORMAL_TABLE; - } - } - } - - pTable->createdTime = taosGetTimestampMs(); - strcpy(pTable->meterId, pCreate->meterId); - if (pthread_rwlock_init(&pTable->rwLock, NULL)) { - mError("table:%s, failed to init meter lock", pCreate->meterId); - mgmtDestroyMeter(pTable); - return TSDB_CODE_FAILED_TO_LOCK_RESOURCES; + SAcctObj *pAcct = mgmtGetAcct(pDb->cfg.acct); + assert(pAcct != NULL); + int code = mgmtCheckTableLimit(pAcct, pCreate); + if (code != 0) { + mError("table:%s, exceed the limit", pCreate->meterId); + return code; } if (mgmtCheckExpired()) { - mError("failed to create meter:%s, reason:grant expired", pTable->meterId); + mError("failed to create meter:%s, reason:grant expired", pCreate->meterId); return TSDB_CODE_GRANT_EXPIRED; } if (pCreate->numOfTags == 0) { - int grantCode = mgmtCheckTimeSeries(pTable->numOfColumns); + int grantCode = mgmtCheckTimeSeries(pCreate->numOfColumns); if (grantCode != 0) { mError("table:%s, grant expired", pCreate->meterId); return grantCode; } - } - - if (pCreate->numOfTags == 0) { // handle normal meter creation - pVgroup = pDb->pHead; - - if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) { - mgmtDestroyMeter(pTable); - //mTrace("table:%s, vgroup in creating progress", pCreate->meterId); - return TSDB_CODE_ACTION_IN_PROGRESS; - } - - if (pDb->vgStatus == TSDB_VG_STATUS_FULL) { - mgmtDestroyMeter(pTable); - mError("table:%s, vgroup is full", pCreate->meterId); - return TSDB_CODE_NO_ENOUGH_DNODES; - } - - if (pDb->vgStatus == TSDB_VG_STATUS_NO_DISK_PERMISSIONS || - pDb->vgStatus == TSDB_VG_STATUS_SERVER_NO_PACE || - pDb->vgStatus == TSDB_VG_STATUS_SERV_OUT_OF_MEMORY || - pDb->vgStatus == TSDB_VG_STATUS_INIT_FAILED ) { - mgmtDestroyMeter(pTable); - mError("table:%s, vgroup init failed, reason:%d %s", pCreate->meterId, pDb->vgStatus, taosGetVgroupStatusStr(pDb->vgStatus)); - return pDb->vgStatus; - } + SVgObj *pVgroup = mgmtGetAvailVgroup(pDb); if (pVgroup == NULL) { - pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; - mgmtCreateVgroup(pDb); - mgmtDestroyMeter(pTable); - mTrace("table:%s, vgroup malloced, wait for create progress finished", pCreate->meterId); - return TSDB_CODE_ACTION_IN_PROGRESS; + return terrno; } - int sid = taosAllocateId(pVgroup->idPool); + int32_t sid = mgmtAllocateSid(pDb, pVgroup); if (sid < 0) { - mWarn("table:%s, vgroup:%d run out of ID, num:%d", pCreate->meterId, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool)); - pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; - mgmtCreateVgroup(pDb); - mgmtDestroyMeter(pTable); - return TSDB_CODE_ACTION_IN_PROGRESS; + return terrno; } - pTable->gid.sid = sid; - pTable->gid.vgId = pVgroup->vgId; - pTable->uid = (((uint64_t)pTable->gid.vgId) << 40) + ((((uint64_t)pTable->gid.sid) & ((1ul << 24) - 1ul)) << 16) + - ((uint64_t)sdbVersion & ((1ul << 16) - 1ul)); - - mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%" PRIu64 " db:%s", - pTable->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pTable->uid, pDb->name); + if (pCreate->numOfColumns == 0) { + return mgmtCreateChildTable(pDb, pCreate, pVgroup->vgId, sid); + } else if (pCreate->sqlLen > 0) { + return mgmtCreateStreamTable(pDb, pCreate, pVgroup->vgId, sid); + } else { + return mgmtCreateNormalTable(pDb, pCreate, pVgroup->vgId, sid); + } } else { - pTable->uid = (((uint64_t)pTable->createdTime) << 16) + ((uint64_t)sdbVersion & ((1ul << 16) - 1ul)); + return mgmtCreateSuperTable(pDb, pCreate); } - - if (sdbInsertRow(meterSdb, pTable, 0) < 0) { - mError("table:%s, update sdb error", pCreate->meterId); - return TSDB_CODE_SDB_ERROR; - } - - // send create message to the selected vnode servers - if (pCreate->numOfTags == 0) { - mTrace("table:%s, send create table msg to dnode, vgId:%d, sid:%d, vnode:%d", - pTable->meterId, pTable->gid.vgId, pTable->gid.sid, pVgroup->vnodeGid[0].vnode); - - mgmtAddTimeSeries(pTable->numOfColumns - 1); - mgmtSendCreateMsgToVgroup(pTable, pVgroup); - } - - return 0; } -int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore) { - STabObj * pTable; - SAcctObj *pAcct; - - pTable = mgmtGetTable(meterId); - if (pTable == NULL) { +int mgmtDropTable(SDbObj *pDb, char *tableId, int ignore) { + STableObj table = mgmtGetTable(tableId); + if (table.obj == NULL) { if (ignore) { return TSDB_CODE_SUCCESS; } else { @@ -749,159 +591,76 @@ int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore) { } } - pAcct = mgmtGetAcct(pDb->cfg.acct); - // 0.log if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - return TSDB_CODE_MONITOR_DB_FORBEIDDEN; + return TSDB_CODE_MONITOR_DB_FORBIDDEN; } - if (mgmtIsNormalTable(pTable)) { - return dropMeterImp(pDb, pTable, pAcct); - } else { - // remove a metric - /* - if (pTable->numOfMeters > 0) { - assert(pTable->pSkipList != NULL && pTable->pSkipList->nSize > 0); - return TSDB_CODE_RELATED_TABLES_EXIST; - } - */ - // first delet all meters of metric - dropAllMetersOfMetric(pDb, pTable, pAcct); - - // finally delete metric - sdbDeleteRow(meterSdb, pTable); + switch (table.type) { + case TSDB_TABLE_TYPE_SUPER_TABLE: + return mgmtDropSuperTable(pDb, table.obj); + case TSDB_TABLE_TYPE_CHILD_TABLE: + return mgmtDropChildTable(pDb, table.obj); + case TSDB_TABLE_TYPE_STREAM_TABLE: + return mgmtDropStreamTable(pDb, table.obj); + case TSDB_TABLE_TYPE_NORMAL_TABLE: + return mgmtDropNormalTable(pDb, table.obj); + default: + return TSDB_CODE_INVALID_TABLE; } - - return 0; } -int mgmtAlterMeter(SDbObj *pDb, SAlterTableMsg *pAlter) { - STabObj *pTable; - - pTable = mgmtGetTable(pAlter->meterId); - if (pTable == NULL) { +int mgmtAlterTable(SDbObj *pDb, SAlterTableMsg *pAlter) { + STableObj table = mgmtGetTable(tableId); + if (table.obj == NULL) { return TSDB_CODE_INVALID_TABLE; } // 0.log - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) return TSDB_CODE_MONITOR_DB_FORBEIDDEN; - - if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { - if (!mgmtIsNormalTable(pTable) || !mgmtTableCreateFromSuperTable(pTable)) { - return TSDB_CODE_OPS_NOT_SUPPORT; - } + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + return TSDB_CODE_MONITOR_DB_FORBIDDEN; } +// if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { +// return mgmtUpdate +// if (!mgmtIsNormalTable(pTable) || !mgmtTableCreateFromSuperTable(pTable)) { +// return TSDB_CODE_OPS_NOT_SUPPORT; +// } +// } + // todo add /* mgmtMeterAddTags */ if (pAlter->type == TSDB_ALTER_TABLE_ADD_TAG_COLUMN) { - mTrace("alter table %s to add tag column:%s, type:%d", pTable->meterId, pAlter->schema[0].name, - pAlter->schema[0].type); - return mgmtMeterAddTags(pTable, pAlter->schema, 1); - } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { - mTrace("alter table %s to drop tag column:%s", pTable->meterId, pAlter->schema[0].name); - return mgmtMeterDropTagByName(pTable, pAlter->schema[0].name); - } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { - mTrace("alter table %s to change tag column name, old: %s, new: %s", pTable->meterId, pAlter->schema[0].name, - pAlter->schema[1].name); - return mgmtMeterModifyTagNameByName(pTable, pAlter->schema[0].name, pAlter->schema[1].name); - } else if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { - mTrace("alter table %s to modify tag value, tag name:%s", pTable->meterId, pAlter->schema[0].name); - return mgmtMeterModifyTagValueByName(pTable, pAlter->schema[0].name, pAlter->tagVal); - } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { - mTrace("alter table %s to add column:%s, type:%d", pTable->meterId, pAlter->schema[0].name, pAlter->schema[0].type); - return mgmtMeterAddColumn(pTable, pAlter->schema, 1); - } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { - mTrace("alter table %s to drop column:%s", pTable->meterId, pAlter->schema[0].name); - return mgmtMeterDropColumnByName(pTable, pAlter->schema[0].name); - } else { - return TSDB_CODE_INVALID_MSG_TYPE; - } - - return TSDB_CODE_SUCCESS; -} - -static int dropMeterImp(SDbObj *pDb, STabObj * pTable, SAcctObj *pAcct) { - SVgObj * pVgroup; - - if (pAcct != NULL) pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfColumns - 1); - - pVgroup = mgmtGetVgroup(pTable->gid.vgId); - if (pVgroup == NULL) return TSDB_CODE_OTHERS; - - mgmtRestoreTimeSeries(pTable->numOfColumns - 1); - mgmtSendRemoveMeterMsgToDnode(pTable, pVgroup); - sdbDeleteRow(meterSdb, pTable); - - if (pVgroup->numOfMeters <= 0) mgmtDropVgroup(pDb, pVgroup); - - return 0; -} - -static void dropAllMetersOfMetric(SDbObj *pDb, STabObj * pMetric, SAcctObj *pAcct) { - STabObj * pTable = NULL; - - while ((pTable = pMetric->pHead) != NULL) { - (void)dropMeterImp(pDb, pTable, pAcct); - } -} - -/* - * create key of each meter for skip list, which is generated from first tag - * column - */ -static void createKeyFromTagValue(STabObj *pMetric, STabObj *pTable, tSkipListKey *pKey) { - SSchema * pTagSchema = (SSchema *)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema)); - const int16_t KEY_COLUMN_OF_TAGS = 0; - - char *tagVal = pTable->pTagData + TSDB_TABLE_ID_LEN; // tag start position - *pKey = tSkipListCreateKey(pTagSchema[KEY_COLUMN_OF_TAGS].type, tagVal, pTagSchema[KEY_COLUMN_OF_TAGS].bytes); -} - -/* - * add a meter into a metric's skip list - */ -static void addMeterIntoMetricIndex(STabObj *pMetric, STabObj *pTable) { - const int16_t KEY_COLUMN_OF_TAGS = 0; - SSchema * pTagSchema = (SSchema *)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema)); - - if (pMetric->pSkipList == NULL) { - pMetric->pSkipList = tSkipListCreate(MAX_SKIP_LIST_LEVEL, pTagSchema[KEY_COLUMN_OF_TAGS].type, - pTagSchema[KEY_COLUMN_OF_TAGS].bytes); - } - - if (pMetric->pSkipList) { - tSkipListKey key = {0}; - createKeyFromTagValue(pMetric, pTable, &key); - tSkipListPut(pMetric->pSkipList, pTable, &key, 1); - - tSkipListDestroyKey(&key); - } -} - -static void removeMeterFromMetricIndex(STabObj *pMetric, STabObj *pTable) { - if (pMetric->pSkipList == NULL) { - return; - } - - tSkipListKey key = {0}; - createKeyFromTagValue(pMetric, pTable, &key); - tSkipListNode **pRes = NULL; - - int32_t num = tSkipListGets(pMetric->pSkipList, &key, &pRes); - for (int32_t i = 0; i < num; ++i) { - STabObj *pOneMeter = (STabObj *)pRes[i]->pData; - if (pOneMeter->gid.sid == pTable->gid.sid && pOneMeter->gid.vgId == pTable->gid.vgId) { - assert(pTable == pOneMeter); - tSkipListRemoveNode(pMetric->pSkipList, pRes[i]); + if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { + return mgmtAddSuperTableTag(table.obj, pAlter->schema, 1); } - } + } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_TAG_COLUMN) { + if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { + return mgmtDropSuperTableTag(table.obj, pAlter->schema[0].name); + } + } else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN) { + if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { + return mgmtModifySuperTableTagNameByName(table.obj, pAlter->schema[0].name, pAlter->schema[1].name); + } + } else if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) { + if (table.type == TSDB_TABLE_TYPE_CHILD_TABLE) { + return mgmtModifyChildTableTagValueByName(table.obj, pAlter->schema[0].name, pAlter->tagVal); + } + } else if (pAlter->type == TSDB_ALTER_TABLE_ADD_COLUMN) { + if (table.type == TSDB_TABLE_TYPE_NORMAL_TABLE) { + return mgmtAddNormalTableColumn(table.obj, pAlter->schema, 1); + } else if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { + return mgmtAddSuperTableColumn(table.obj, pAlter->schema, 1); + } else {} + } else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) { + if (table.type == TSDB_TABLE_TYPE_NORMAL_TABLE) { + return mgmtDropNormalTableColumnByName(table.obj, pAlter->schema[0].name); + } else if (table.type == TSDB_TABLE_TYPE_SUPER_TABLE) { + return mgmtDropSuperTableColumnByName(table.obj, pAlter->schema[0].name); + } else {} + } else {} - tSkipListDestroyKey(&key); - if (num != 0) { - free(pRes); - } + return TSDB_CODE_OPS_NOT_SUPPORT; } int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable) { @@ -942,15 +701,24 @@ int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable) { return 0; } -void mgmtCleanUpMeters() { sdbCloseTable(meterSdb); } +void mgmtCleanUpMeters() { + mgmtCleanUpNormalTables(); + mgmtCleanUpStreamTables(); + mgmtCleanUpChildTables(); + mgmtCleanUpSuperTables(); +} -int mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; +int32_t mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { + int32_t cols = 0; SDbObj *pDb = NULL; - if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); + if (pConn->pDb != NULL) { + pDb = mgmtGetDb(pConn->pDb->name); + } - if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; + if (pDb == NULL) { + return TSDB_CODE_DB_NOT_SELECTED; + } SSchema *pSchema = tsGetSchema(pMeta); @@ -982,15 +750,124 @@ int mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { pShow->numOfColumns = cols; pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } - // pShow->numOfRows = sdbGetNumOfRows (meterSdb); pShow->numOfRows = pDb->numOfTables; pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; return 0; } +int32_t mgmtRetrieveTables(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { + int32_t numOfRows = 0; + STabObj *pTable = NULL; + char * pWrite; + int32_t cols = 0; + int32_t prefixLen; + int32_t numOfRead = 0; + char prefix[20] = {0}; + int16_t numOfColumns; + char * tableId; + char * superTableId; + int64_t createdTime; + void * pNormalTableNode; + void * pChildTableNode; + + SDbObj *pDb = NULL; + if (pConn->pDb != NULL) { + pDb = mgmtGetDb(pConn->pDb->name); + } + + if (pDb == NULL) { + return 0; + } + + if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { + if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && + strcmp(pConn->pUser->user, "monitor") != 0) { + return 0; + } + } + + strcpy(prefix, pDb->name); + strcat(prefix, TS_PATH_DELIMITER); + prefixLen = strlen(prefix); + + SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; + char meterName[TSDB_METER_NAME_LEN] = {0}; + + while (numOfRows < rows) { + pNormalTableNode = sdbFetchRow(tsNormalTableSdb, pShow->pNode, (void **) &pTable); + if (pTable != NULL) { + pShow->pNode = pNormalTableNode; + SNormalTableObj *pNormalTable = (SNormalTableObj *) pTable; + tableId = pNormalTable->tableId; + superTableId = NULL; + createdTime = pNormalTable->createdTime; + numOfColumns = pNormalTable->numOfColumns; + } else { + pChildTableNode = sdbFetchRow(tsChildTableSdb, pShow->pNode, (void **) &pTable); + if (pTable != NULL) { + pShow->pNode = pChildTableNode; + SChildTableObj *pChildTable = (SChildTableObj *) pTable; + tableId = pChildTable->tableId; + superTableId = NULL; + createdTime = pChildTable->createdTime; + numOfColumns = pChildTable->superTable->numOfColumns; + } else { + break; + } + } + + // not belong to current db + if (strncmp(tableId, prefix, prefixLen)) { + continue; + } + + numOfRead++; + memset(meterName, 0, tListLen(meterName)); + + // pattern compare for meter name + extractTableName(tableId, meterName); + + if (pShow->payloadLen > 0 && + patternMatch(pShow->payload, meterName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) { + continue; + } + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strncpy(pWrite, meterName, TSDB_METER_NAME_LEN); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *) pWrite = createdTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *) pWrite = numOfColumns; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + if (pTable->pTagData) { + extractTableName(superTableId, pWrite); + } + cols++; + + numOfRows++; + } + + pShow->numOfReads += numOfRead; + const int32_t NUM_OF_COLUMNS = 4; + + mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); + + return numOfRows; +} + SSchema *mgmtGetTableSchema(STabObj *pTable) { if (pTable == NULL) { return NULL; @@ -1006,803 +883,3 @@ SSchema *mgmtGetTableSchema(STabObj *pTable) { return (SSchema *)pMetric->schema; } -static int32_t mgmtSerializeTagValue(char* pMsg, STabObj* pTable, int16_t* tagsId, int32_t numOfTags) { - int32_t offset = 0; - - for (int32_t j = 0; j < numOfTags; ++j) { - if (tagsId[j] == TSDB_TBNAME_COLUMN_INDEX) { // handle the table name tags - char name[TSDB_METER_NAME_LEN] = {0}; - extractTableName(pTable->meterId, name); - - memcpy(pMsg + offset, name, TSDB_METER_NAME_LEN); - offset += TSDB_METER_NAME_LEN; - } else { - SSchema s = {0}; - char * tag = mgmtTableGetTag(pTable, tagsId[j], &s); - - memcpy(pMsg + offset, tag, (size_t)s.bytes); - offset += s.bytes; - } - } - - return offset; -} - -/* - * serialize SVnodeSidList to byte array - */ -static char *mgmtBuildMetricMetaMsg(SConnObj *pConn, STabObj *pTable, int32_t *ovgId, SVnodeSidList **pList, SMetricMeta *pMeta, - int32_t tagLen, int16_t numOfTags, int16_t *tagsId, int32_t maxNumOfMeters, - char *pMsg) { - if (pTable->gid.vgId != *ovgId || ((*pList) != NULL && (*pList)->numOfSids >= maxNumOfMeters)) { - /* - * here we construct a new vnode group for 2 reasons - * 1. the query msg may be larger than 64k, - * 2. the following meters belong to different vnodes - */ - (*pList) = (SVnodeSidList *)pMsg; - (*pList)->numOfSids = 0; - (*pList)->index = 0; - pMeta->numOfVnodes++; - - SVgObj *pVgroup = mgmtGetVgroup(pTable->gid.vgId); - for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) { - if (pConn->usePublicIp) { - (*pList)->vpeerDesc[i].ip = pVgroup->vnodeGid[i].publicIp; - (*pList)->vpeerDesc[i].vnode = pVgroup->vnodeGid[i].vnode; - } else { - (*pList)->vpeerDesc[i].ip = pVgroup->vnodeGid[i].ip; - (*pList)->vpeerDesc[i].vnode = pVgroup->vnodeGid[i].vnode; - } - } - - pMsg += sizeof(SVnodeSidList); - (*ovgId) = pTable->gid.vgId; - } - pMeta->numOfMeters++; - (*pList)->numOfSids++; - - SMeterSidExtInfo *pSMeterTagInfo = (SMeterSidExtInfo *)pMsg; - pSMeterTagInfo->sid = htonl(pTable->gid.sid); - pSMeterTagInfo->uid = htobe64(pTable->uid); - - pMsg += sizeof(SMeterSidExtInfo); - - int32_t offset = mgmtSerializeTagValue(pMsg, pTable, tagsId, numOfTags); - assert(offset == tagLen); - - pMsg += offset; - return pMsg; -} - -// get total number of vnodes in final result set -static int32_t mgmtGetNumOfVnodesInResult(tQueryResultset *pResult) { - int32_t numOfVnodes = 0; - int32_t prevGid = -1; - - for (int32_t i = 0; i < pResult->num; ++i) { - STabObj *pTable = pResult->pRes[i]; - if (prevGid == -1) { - prevGid = pTable->gid.vgId; - numOfVnodes++; - } else if (prevGid != pTable->gid.vgId) { - prevGid = pTable->gid.vgId; - numOfVnodes++; - } - } - - return numOfVnodes; -} - -static int32_t mgmtGetMetricMetaMsgSize(tQueryResultset *pResult, int32_t tagLength, int32_t maxMetersPerQuery) { - int32_t numOfVnodes = mgmtGetNumOfVnodesInResult(pResult); - - int32_t size = (sizeof(SMeterSidExtInfo) + tagLength) * pResult->num + - ((pResult->num / maxMetersPerQuery) + 1 + numOfVnodes) * sizeof(SVnodeSidList) + sizeof(SMetricMeta) + - 1024; - - return size; -} - -static SMetricMetaElemMsg *doConvertMetricMetaMsg(SSuperTableMetaMsg *pSuperTableMetaMsg, int32_t tableIndex) { - SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)((char *)pSuperTableMetaMsg + pSuperTableMetaMsg->metaElem[tableIndex]); - - pElem->orderIndex = htons(pElem->orderIndex); - pElem->orderType = htons(pElem->orderType); - pElem->numOfTags = htons(pElem->numOfTags); - - pElem->numOfGroupCols = htons(pElem->numOfGroupCols); - pElem->condLen = htonl(pElem->condLen); - pElem->cond = htonl(pElem->cond); - - pElem->elemLen = htons(pElem->elemLen); - - pElem->tableCond = htonl(pElem->tableCond); - pElem->tableCondLen = htonl(pElem->tableCondLen); - - pElem->rel = htons(pElem->rel); - - for (int32_t i = 0; i < pElem->numOfTags; ++i) { - pElem->tagCols[i] = htons(pElem->tagCols[i]); - } - - pElem->groupbyTagColumnList = htonl(pElem->groupbyTagColumnList); - - SColIndexEx *groupColIds = (SColIndexEx*) (((char *)pSuperTableMetaMsg) + pElem->groupbyTagColumnList); - for (int32_t i = 0; i < pElem->numOfGroupCols; ++i) { - groupColIds[i].colId = htons(groupColIds[i].colId); - groupColIds[i].colIdx = htons(groupColIds[i].colIdx); - groupColIds[i].flag = htons(groupColIds[i].flag); - groupColIds[i].colIdxInBuf = 0; - } - - return pElem; -} - -static int32_t mgmtBuildMetricMetaRspMsg(SConnObj *pConn, SSuperTableMetaMsg *pSuperTableMetaMsg, tQueryResultset *pResult, - char **pStart, int32_t *tagLen, int32_t rspMsgSize, int32_t maxTablePerVnode, - int32_t code) { - *pStart = taosBuildRspMsgWithSize(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, rspMsgSize); - if (*pStart == NULL) { - return 0; - } - - char * pMsg = (*pStart); - STaosRsp *pRsp = (STaosRsp *)pMsg; - - pRsp->code = code; - pMsg += sizeof(STaosRsp); - *pMsg = TSDB_IE_TYPE_META; - pMsg++; - - if (code != TSDB_CODE_SUCCESS) { - return pMsg - (*pStart); // one bit in payload - } - - int32_t msgLen = 0; - - *(int16_t *)pMsg = htons(pSuperTableMetaMsg->numOfMeters); - pMsg += sizeof(int16_t); - - for (int32_t j = 0; j < pSuperTableMetaMsg->numOfMeters; ++j) { - SVnodeSidList *pList = NULL; - int ovgId = -1; - - SMetricMeta *pMeta = (SMetricMeta *)pMsg; - - pMeta->numOfMeters = 0; - pMeta->numOfVnodes = 0; - pMeta->tagLen = htons((uint16_t)tagLen[j]); - - pMsg = (char *)pMeta + sizeof(SMetricMeta); - - SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)((char *)pSuperTableMetaMsg + pSuperTableMetaMsg->metaElem[j]); - - for (int32_t i = 0; i < pResult[j].num; ++i) { - STabObj *pTable = pResult[j].pRes[i]; - pMsg = mgmtBuildMetricMetaMsg(pConn, pTable, &ovgId, &pList, pMeta, tagLen[j], pElem->numOfTags, pElem->tagCols, - maxTablePerVnode, pMsg); - } - - mTrace("metric:%s metric-meta tables:%d, vnode:%d", pElem->meterId, pMeta->numOfMeters, pMeta->numOfVnodes); - - pMeta->numOfMeters = htonl(pMeta->numOfMeters); - pMeta->numOfVnodes = htonl(pMeta->numOfVnodes); - } - - msgLen = pMsg - (*pStart); - mTrace("metric-meta msg size %d", msgLen); - - return msgLen; -} - -int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pSuperTableMetaMsg) { - /* - * naive method: Do not limit the maximum number of meters in each - * vnode(subquery), split the result according to vnodes - * - * todo: split the number of vnodes to make sure each vnode has the same - * number of tables to query, while not break the upper limit of number of vnode queries - */ - int32_t maxMetersPerVNodeForQuery = INT32_MAX; - int msgLen = 0; - int ret = TSDB_CODE_SUCCESS; - tQueryResultset *result = calloc(1, pSuperTableMetaMsg->numOfMeters * sizeof(tQueryResultset)); - int32_t * tagLen = calloc(1, sizeof(int32_t) * pSuperTableMetaMsg->numOfMeters); - - if (result == NULL || tagLen == NULL) { - tfree(result); - tfree(tagLen); - return -1; - } - - for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) { - SMetricMetaElemMsg *pElem = doConvertMetricMetaMsg(pSuperTableMetaMsg, i); - STabObj * pMetric = mgmtGetTable(pElem->meterId); - - if (!mgmtIsSuperTable(pMetric)) { - ret = TSDB_CODE_NOT_SUPER_TABLE; - break; - } - - tagLen[i] = mgmtGetReqTagsLength(pMetric, (int16_t *)pElem->tagCols, pElem->numOfTags); - } - -#if 0 - //todo: opt for join process - int64_t num = 0; - int32_t index = 0; - - for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) { - SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg*) ((char *) pSuperTableMetaMsg + pSuperTableMetaMsg->metaElem[i]); - STabObj *pMetric = mgmtGetTable(pElem->meterId); - - if (pMetric->pSkipList->nSize > num) { - index = i; - num = pMetric->pSkipList->nSize; - } - } -#endif - - if (ret == TSDB_CODE_SUCCESS) { - // todo opt performance - for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) { - ret = mgmtRetrieveMetersFromSuperTable(pSuperTableMetaMsg, i, &result[i]); - } - } - - if (ret == TSDB_CODE_SUCCESS) { - ret = mgmtDoJoin(pSuperTableMetaMsg, result); - } - - if (ret == TSDB_CODE_SUCCESS) { - for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) { - mgmtReorganizeMetersInMetricMeta(pSuperTableMetaMsg, i, &result[i]); - } - } - - if (ret == TSDB_CODE_SUCCESS) { - for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) { - msgLen += mgmtGetMetricMetaMsgSize(&result[i], tagLen[i], maxMetersPerVNodeForQuery); - } - } else { - msgLen = 512; - } - - msgLen = mgmtBuildMetricMetaRspMsg(pConn, pSuperTableMetaMsg, result, pStart, tagLen, msgLen, maxMetersPerVNodeForQuery, ret); - - for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) { - tQueryResultClean(&result[i]); - } - - free(tagLen); - free(result); - - return msgLen; -} - -int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; - STabObj *pTable = NULL; - char * pWrite; - int cols = 0; - int prefixLen; - int numOfRead = 0; - char prefix[20] = {0}; - - SDbObj *pDb = NULL; - if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); - - if (pDb == NULL) return 0; - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) { - return 0; - } - } - - strcpy(prefix, pDb->name); - strcat(prefix, TS_PATH_DELIMITER); - prefixLen = strlen(prefix); - - SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; - char meterName[TSDB_METER_NAME_LEN] = {0}; - - while (numOfRows < rows) { - pShow->pNode = sdbFetchRow(meterSdb, pShow->pNode, (void **)&pTable); - if (pTable == NULL) break; - - if (mgmtIsSuperTable(pTable)) continue; - - // not belong to current db - if (strncmp(pTable->meterId, prefix, prefixLen)) continue; - - numOfRead++; - memset(meterName, 0, tListLen(meterName)); - - // pattern compare for meter name - extractTableName(pTable->meterId, meterName); - - if (pShow->payloadLen > 0 && - patternMatch(pShow->payload, meterName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) - continue; - - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strncpy(pWrite, meterName, TSDB_METER_NAME_LEN); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pTable->createdTime; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pTable->numOfColumns; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - if (pTable->pTagData) { - extractTableName(pTable->pTagData, pWrite); - } - cols++; - - numOfRows++; - } - - pShow->numOfReads += numOfRead; - const int32_t NUM_OF_COLUMNS = 4; - - mgmtVacuumResult(data, NUM_OF_COLUMNS, numOfRows, rows, pShow); - - return numOfRows; -} - -int mgmtGetMetricMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { - int cols = 0; - - SDbObj *pDb = NULL; - if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); - - if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED; - - SSchema *pSchema = tsGetSchema(pMeta); - - pShow->bytes[cols] = TSDB_METER_NAME_LEN; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "name"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "created_time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 2; - pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "columns"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 2; - pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; - strcpy(pSchema[cols].name, "tags"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 4; - pSchema[cols].type = TSDB_DATA_TYPE_INT; - strcpy(pSchema[cols].name, "tables"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htons(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - - pShow->numOfRows = pDb->numOfMetrics; - pShow->pNode = pDb->pMetric; - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - - return 0; -} - -int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn) { - int numOfRows = 0; - STabObj *pMetric = NULL; - char * pWrite; - int cols = 0; - - SDbObj *pDb = NULL; - if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name); - - if (pDb == NULL) return 0; - if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) { - if (strcmp(pConn->pUser->user, "root") != 0 && strcmp(pConn->pUser->user, "_root") != 0 && strcmp(pConn->pUser->user, "monitor") != 0 ) { - return 0; - } - } - - SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; - - char metricName[TSDB_METER_NAME_LEN] = {0}; - - while (numOfRows < rows) { - pMetric = (STabObj *)pShow->pNode; - if (pMetric == NULL) break; - pShow->pNode = (void *)pMetric->next; - - memset(metricName, 0, tListLen(metricName)); - extractTableName(pMetric->meterId, metricName); - - if (pShow->payloadLen > 0 && - patternMatch(pShow->payload, metricName, TSDB_METER_NAME_LEN, &info) != TSDB_PATTERN_MATCH) - continue; - - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - extractTableName(pMetric->meterId, pWrite); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pMetric->createdTime; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pMetric->numOfColumns; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pMetric->numOfTags; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pMetric->numOfMeters; - cols++; - - numOfRows++; - } - - pShow->numOfReads += numOfRows; - return numOfRows; -} - -int32_t mgmtFindTagCol(STabObj *pMetric, const char *tagName) { - if (!mgmtIsSuperTable(pMetric)) return -1; - - SSchema *schema = NULL; - - for (int i = 0; i < pMetric->numOfTags; i++) { - schema = (SSchema *)(pMetric->schema + (pMetric->numOfColumns + i) * sizeof(SSchema)); - - if (strcasecmp(tagName, schema->name) == 0) return i; - } - - return -1; -} - -int32_t mgmtMeterModifyTagNameByCol(STabObj *pMetric, uint32_t col, const char *nname) { - int rowSize = 0; - assert(col >= 0); - - uint32_t len = strlen(nname); - - if (pMetric == NULL || (!mgmtIsSuperTable(pMetric)) || col >= pMetric->numOfTags || len >= TSDB_COL_NAME_LEN || - mgmtFindTagCol(pMetric, nname) >= 0) - return TSDB_CODE_APP_ERROR; - - // update - SSchema *schema = (SSchema *)(pMetric->schema + (pMetric->numOfColumns + col) * sizeof(SSchema)); - strncpy(schema->name, nname, TSDB_COL_NAME_LEN); - - // Encode string - int size = 1 + sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW; - char *msg = (char *)malloc(size); - if (msg == NULL) return TSDB_CODE_APP_ERROR; - memset(msg, 0, size); - - mgmtMeterActionEncode(pMetric, msg, size, &rowSize); - - int32_t ret = sdbUpdateRow(meterSdb, msg, rowSize, 1); - tfree(msg); - - if (ret < 0) { - mError("Failed to modify table %s tag column", pMetric->meterId); - return TSDB_CODE_APP_ERROR; - } - - mTrace("Succeed to modify table %s tag column", pMetric->meterId); - return TSDB_CODE_SUCCESS; -} - -int32_t mgmtMeterModifyTagNameByName(STabObj *pMetric, const char *oname, const char *nname) { - if (pMetric == NULL || (!mgmtIsSuperTable(pMetric))) return TSDB_CODE_APP_ERROR; - - int index = mgmtFindTagCol(pMetric, oname); - if (index < 0) { - // Tag name does not exist - mError("Failed to modify table %s tag column, oname: %s, nname: %s", pMetric->meterId, oname, nname); - return TSDB_CODE_INVALID_MSG_TYPE; - } - - return mgmtMeterModifyTagNameByCol(pMetric, index, nname); -} - -int32_t mgmtMeterModifyTagValueByCol(STabObj *pTable, int col, const char *nContent) { - int rowSize = 0; - if (pTable == NULL || nContent == NULL || (!mgmtTableCreateFromSuperTable(pTable))) return TSDB_CODE_APP_ERROR; - - STabObj *pMetric = mgmtGetTable(pTable->pTagData); - assert(pMetric != NULL); - - if (col < 0 || col > pMetric->numOfTags) return TSDB_CODE_APP_ERROR; - - SSchema *schema = (SSchema *)(pMetric->schema + (pMetric->numOfColumns + col) * sizeof(SSchema)); - - if (col == 0) { - pTable->isDirty = 1; - removeMeterFromMetricIndex(pMetric, pTable); - } - memcpy(pTable->pTagData + mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN, nContent, schema->bytes); - if (col == 0) { - addMeterIntoMetricIndex(pMetric, pTable); - } - - // Encode the string - int size = sizeof(STabObj) + TSDB_MAX_BYTES_PER_ROW + 1; - char *msg = (char *)malloc(size); - if (msg == NULL) { - mError("failed to allocate message memory while modify tag value"); - return TSDB_CODE_APP_ERROR; - } - memset(msg, 0, size); - - mgmtMeterActionEncode(pTable, msg, size, &rowSize); - - int32_t ret = sdbUpdateRow(meterSdb, msg, rowSize, 1); // Need callback function - tfree(msg); - - if (pTable->isDirty) pTable->isDirty = 0; - - if (ret < 0) { - mError("Failed to modify tag column %d of table %s", col, pTable->meterId); - return TSDB_CODE_APP_ERROR; - } - - mTrace("Succeed to modify tag column %d of table %s", col, pTable->meterId); - return TSDB_CODE_SUCCESS; -} - -int32_t mgmtMeterModifyTagValueByName(STabObj *pTable, char *tagName, char *nContent) { - if (pTable == NULL || tagName == NULL || nContent == NULL || (!mgmtTableCreateFromSuperTable(pTable))) - return TSDB_CODE_INVALID_MSG_TYPE; - - STabObj *pMetric = mgmtGetTable(pTable->pTagData); - if (pMetric == NULL) return TSDB_CODE_APP_ERROR; - - int col = mgmtFindTagCol(pMetric, tagName); - if (col < 0) return TSDB_CODE_APP_ERROR; - - return mgmtMeterModifyTagValueByCol(pTable, col, nContent); -} - -int32_t mgmtMeterAddTags(STabObj *pMetric, SSchema schema[], int ntags) { - if (pMetric == NULL || (!mgmtIsSuperTable(pMetric))) return TSDB_CODE_INVALID_TABLE; - - if (pMetric->numOfTags + ntags > TSDB_MAX_TAGS) return TSDB_CODE_APP_ERROR; - - // check if schemas have the same name - for (int i = 1; i < ntags; i++) { - for (int j = 0; j < i; j++) { - if (strcasecmp(schema[i].name, schema[j].name) == 0) { - return TSDB_CODE_APP_ERROR; - } - } - } - - for (int i = 0; i < ntags; i++) { - if (mgmtFindTagCol(pMetric, schema[i].name) >= 0) { - return TSDB_CODE_APP_ERROR; - } - } - - uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SSchema) * ntags; - SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *)malloc(size); - memset(msg, 0, size); - - memcpy(msg->meterId, pMetric->meterId, TSDB_TABLE_ID_LEN); - msg->type = SDB_TYPE_INSERT; - msg->cols = ntags; - memcpy(msg->data, schema, sizeof(SSchema) * ntags); - - int32_t ret = sdbBatchUpdateRow(meterSdb, msg, size); - tfree(msg); - - if (ret < 0) { - mError("Failed to add tag column %s to table %s", schema[0].name, pMetric->meterId); - return TSDB_CODE_APP_ERROR; - } - - mTrace("Succeed to add tag column %s to table %s", schema[0].name, pMetric->meterId); - return TSDB_CODE_SUCCESS; -} - -int32_t mgmtMeterDropTagByCol(STabObj *pMetric, int col) { - if (pMetric == NULL || (!mgmtIsSuperTable(pMetric)) || col <= 0 || col >= pMetric->numOfTags) return TSDB_CODE_APP_ERROR; - - // Pack message to do batch update - uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SchemaUnit); - SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *)malloc(size); - memset(msg, 0, size); - - memcpy(msg->meterId, pMetric->meterId, TSDB_TABLE_ID_LEN); - msg->type = SDB_TYPE_DELETE; // TODO: what should here be ? - msg->cols = 1; - - ((SchemaUnit *)(msg->data))->col = col; - ((SchemaUnit *)(msg->data))->pos = mgmtGetTagsLength(pMetric, col) + TSDB_TABLE_ID_LEN; - ((SchemaUnit *)(msg->data))->schema = *(SSchema *)(pMetric->schema + sizeof(SSchema) * (pMetric->numOfColumns + col)); - - int32_t ret = sdbBatchUpdateRow(meterSdb, msg, size); - tfree(msg); - - if (ret < 0) { - mError("Failed to drop tag column: %d from table: %s", col, pMetric->meterId); - return TSDB_CODE_APP_ERROR; - } - - mTrace("Succeed to drop tag column: %d from table: %s", col, pMetric->meterId); - return TSDB_CODE_SUCCESS; -} - -int32_t mgmtMeterDropTagByName(STabObj *pMetric, char *name) { - if (pMetric == NULL || (!mgmtIsSuperTable(pMetric))) { - mTrace("Failed to drop tag name: %s from table: %s", name, pMetric->meterId); - return TSDB_CODE_INVALID_TABLE; - } - - int col = mgmtFindTagCol(pMetric, name); - - return mgmtMeterDropTagByCol(pMetric, col); -} - -int32_t mgmtFindColumnIndex(STabObj *pTable, const char *colName) { - STabObj *pMetric = NULL; - SSchema *schema = NULL; - - if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE || pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) { - schema = (SSchema *)pTable->schema; - for (int32_t i = 0; i < pTable->numOfColumns; i++) { - if (strcasecmp(schema[i].name, colName) == 0) { - return i; - } - } - - } else if (pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE) { - pMetric = mgmtGetTable(pTable->pTagData); - if (pMetric == NULL) { - mError("MTable not belongs to any metric, meter: %s", pTable->meterId); - return -1; - } - schema = (SSchema *)pMetric->schema; - for (int32_t i = 0; i < pMetric->numOfColumns; i++) { - if (strcasecmp(schema[i].name, colName) == 0) { - return i; - } - } - } - - return -1; -} - -int32_t mgmtMeterAddColumn(STabObj *pTable, SSchema schema[], int ncols) { - SAcctObj *pAcct = NULL; - SDbObj * pDb = NULL; - - if (pTable == NULL || pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE || pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE || ncols <= 0) - return TSDB_CODE_APP_ERROR; - - // ASSUMPTION: no two tags are the same - for (int i = 0; i < ncols; i++) - if (mgmtFindColumnIndex(pTable, schema[i].name) > 0) return TSDB_CODE_APP_ERROR; - - pDb = mgmtGetDbByMeterId(pTable->meterId); - if (pDb == NULL) { - mError("meter: %s not belongs to any database", pTable->meterId); - return TSDB_CODE_APP_ERROR; - } - - pAcct = mgmtGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to andy account", pDb->name); - return TSDB_CODE_APP_ERROR; - } - - pTable->schema = realloc(pTable->schema, pTable->schemaSize + sizeof(SSchema) * ncols); - - if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE) { - memcpy(pTable->schema + pTable->schemaSize, schema, sizeof(SSchema) * ncols); - } else if (pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) { - memmove(pTable->schema + sizeof(SSchema) * (pTable->numOfColumns + ncols), - pTable->schema + sizeof(SSchema) * pTable->numOfColumns, sizeof(SSchema) * pTable->numOfTags); - memcpy(pTable->schema + sizeof(SSchema) * pTable->numOfColumns, schema, sizeof(SSchema) * ncols); - } - - SSchema *tschema = (SSchema *)(pTable->schema + sizeof(SSchema) * pTable->numOfColumns); - for (int i = 0; i < ncols; i++) tschema[i].colId = pTable->nextColId++; - - pTable->schemaSize += sizeof(SSchema) * ncols; - pTable->numOfColumns += ncols; - pTable->sversion++; - if (mgmtIsNormalTable(pTable)) - pAcct->acctInfo.numOfTimeSeries += ncols; - else - pAcct->acctInfo.numOfTimeSeries += (ncols * pTable->numOfMeters); - sdbUpdateRow(meterSdb, pTable, 0, 1); - - if (pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) { - for (STabObj *pObj = pTable->pHead; pObj != NULL; pObj = pObj->next) { - pObj->numOfColumns++; - pObj->nextColId = pTable->nextColId; - pObj->sversion = pTable->sversion; - sdbUpdateRow(meterSdb, pObj, 0, 1); - } - } - - return TSDB_CODE_SUCCESS; -} - -int32_t mgmtMeterDropColumnByName(STabObj *pTable, const char *name) { - SAcctObj *pAcct = NULL; - SDbObj * pDb = NULL; - - if (pTable == NULL || pTable->tableType == TSDB_TABLE_TYPE_CHILD_TABLE || pTable->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) - return TSDB_CODE_APP_ERROR; - - int32_t index = mgmtFindColumnIndex(pTable, name); - if (index < 0) return TSDB_CODE_APP_ERROR; - - pDb = mgmtGetDbByMeterId(pTable->meterId); - if (pDb == NULL) { - mError("meter: %s not belongs to any database", pTable->meterId); - return TSDB_CODE_APP_ERROR; - } - - pAcct = mgmtGetAcct(pDb->cfg.acct); - if (pAcct == NULL) { - mError("DB: %s not belongs to any account", pDb->name); - return TSDB_CODE_APP_ERROR; - } - - if (pTable->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE) { - memmove(pTable->schema + sizeof(SSchema) * index, pTable->schema + sizeof(SSchema) * (index + 1), - sizeof(SSchema) * (pTable->numOfColumns - index - 1)); - } else if (pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) { - memmove(pTable->schema + sizeof(SSchema) * index, pTable->schema + sizeof(SSchema) * (index + 1), - sizeof(SSchema) * (pTable->numOfColumns + pTable->numOfTags - index - 1)); - } - pTable->schemaSize -= sizeof(SSchema); - pTable->numOfColumns--; - if (mgmtIsNormalTable(pTable)) - pAcct->acctInfo.numOfTimeSeries--; - else - pAcct->acctInfo.numOfTimeSeries -= (pTable->numOfMeters); - - pTable->schema = realloc(pTable->schema, pTable->schemaSize); - pTable->sversion++; - sdbUpdateRow(meterSdb, pTable, 0, 1); - - if (pTable->tableType == TSDB_TABLE_TYPE_SUPER_TABLE) { - for (STabObj *pObj = pTable->pHead; pObj != NULL; pObj = pObj->next) { - pObj->numOfColumns--; - pObj->sversion = pTable->sversion; - sdbUpdateRow(meterSdb, pObj, 0, 1); - } - } - - return TSDB_CODE_SUCCESS; -} diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 2f4c78f2a8..ce3175abcc 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -26,6 +26,7 @@ #include "tschemautil.h" #include "tlog.h" #include "tstatus.h" +#include "taoserror.h" void * vgSdb = NULL; int tsVgUpdateSize; @@ -128,6 +129,54 @@ int mgmtInitVgroups() { SVgObj *mgmtGetVgroup(int vgId) { return (SVgObj *)sdbGetRow(vgSdb, &vgId); } +SVgObj *mgmtGetAvailVgroup(SDbObj *pDb) { + SVgObj *pVgroup = pDb->pHead; + + if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) { + terrno = TSDB_CODE_ACTION_IN_PROGRESS; + return NULL; + } + + if (pDb->vgStatus == TSDB_VG_STATUS_FULL) { + mError("db:%s, vgroup is full", pDb->name); + terrno = TSDB_CODE_NO_ENOUGH_DNODES; + return NULL; + } + + if (pDb->vgStatus == TSDB_VG_STATUS_NO_DISK_PERMISSIONS || + pDb->vgStatus == TSDB_VG_STATUS_SERVER_NO_PACE || + pDb->vgStatus == TSDB_VG_STATUS_SERV_OUT_OF_MEMORY || + pDb->vgStatus == TSDB_VG_STATUS_INIT_FAILED ) { + mError("db:%s, vgroup init failed, reason:%d %s", pDb->name, pDb->vgStatus, taosGetVgroupStatusStr(pDb->vgStatus)); + terrno = pDb->vgStatus; + return NULL; + } + + if (pVgroup == NULL) { + pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; + mgmtCreateVgroup(pDb); + mTrace("db:%s, vgroup malloced, wait for create progress finished", pDb->name); + terrno = TSDB_CODE_ACTION_IN_PROGRESS; + return NULL; + } + + terrno = 0; + return pVgroup; +} + +int32_t mgmtAllocateSid(SDbObj *pDb, SVgObj *pVgroup) { + int32_t sid = taosAllocateId(pVgroup->idPool); + if (sid < 0) { + mWarn("table:%s, vgroup:%d run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool)); + pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS; + mgmtCreateVgroup(pDb); + terrno = TSDB_CODE_ACTION_IN_PROGRESS; + } + + terrno = 0; + return sid; +} + void mgmtProcessVgTimer(void *handle, void *tmrId) { SDbObj *pDb = (SDbObj *)handle; if (pDb == NULL) return; @@ -178,7 +227,7 @@ int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) { for (int i = 0; i < pDb->cfg.maxSessions; ++i) { if (pVgroup->meterList != NULL) { pTable = pVgroup->meterList[i]; - if (pTable) mgmtDropMeter(pDb, pTable->meterId, 0); + if (pTable) mgmtDropTable(pDb, pTable->meterId, 0); } } } diff --git a/src/sdb/inc/sdbint.h b/src/sdb/inc/sdbint.h index c5b4f4e4ae..d0977f2e2f 100644 --- a/src/sdb/inc/sdbint.h +++ b/src/sdb/inc/sdbint.h @@ -144,4 +144,5 @@ int sdbRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); void sdbResetTable(SSdbTable *pTable); extern const int16_t sdbFileVersion; + #endif