rename from meter to table
This commit is contained in:
parent
fd424455ff
commit
d97941aacf
|
@ -30,10 +30,10 @@ extern "C" {
|
|||
#include "tsdb.h"
|
||||
|
||||
#define UTIL_METER_IS_SUPERTABLE(metaInfo) \
|
||||
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
|
||||
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_TABLE_TYPE_SUPER_TABLE))
|
||||
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_SUPERTABLE(metaInfo)))
|
||||
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
|
||||
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE))
|
||||
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE))
|
||||
|
||||
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
|
||||
|
||||
|
|
|
@ -79,8 +79,8 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
|
|||
static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) {
|
||||
SMeterMeta *pMeta = tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->pMeterMeta;
|
||||
|
||||
if (pMeta->meterType == TSDB_METER_METRIC || pMeta->meterType == TSDB_METER_OTABLE ||
|
||||
pMeta->meterType == TSDB_METER_STABLE) {
|
||||
if (pMeta->meterType == TSDB_TABLE_TYPE_SUPER_TABLE || pMeta->meterType == TSDB_TABLE_TYPE_NORMAL_TABLE ||
|
||||
pMeta->meterType == TSDB_TABLE_TYPE_STREAM_TABLE) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -2991,7 +2991,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
|
|||
int32_t tagLen = 0;
|
||||
SSchema *pTagsSchema = tsGetTagSchema(pMeta);
|
||||
|
||||
if (pMeta->meterType == TSDB_METER_MTABLE) {
|
||||
if (pMeta->meterType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE) {
|
||||
for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
|
||||
tagLen += pTagsSchema[i].bytes;
|
||||
}
|
||||
|
@ -3106,7 +3106,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
|
|||
int32_t tagLen = 0;
|
||||
SSchema *pTagsSchema = tsGetTagSchema(pMeta);
|
||||
|
||||
if (pMeta->meterType == TSDB_METER_MTABLE) {
|
||||
if (pMeta->meterType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE) {
|
||||
for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
|
||||
tagLen += pTagsSchema[j].bytes;
|
||||
}
|
||||
|
|
|
@ -324,20 +324,17 @@ void mgmtCleanUpVgroups();
|
|||
|
||||
// meter API
|
||||
int mgmtInitMeters();
|
||||
STabObj *mgmtGetMeter(char *meterId);
|
||||
STabObj *mgmtGetMeterInfo(char *src, char *tags[]);
|
||||
STabObj *mgmtGetTable(char *meterId);
|
||||
STabObj *mgmtGetTableInfo(char *src, char *tags[]);
|
||||
int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SMetricMetaMsg *pInfo);
|
||||
int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate);
|
||||
int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore);
|
||||
int mgmtAlterMeter(SDbObj *pDb, SAlterTableMsg *pAlter);
|
||||
int mgmtGetMeterMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
|
||||
int mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
|
||||
int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
|
||||
void mgmtCleanUpMeters();
|
||||
SSchema *mgmtGetMeterSchema(STabObj *pMeter); // get schema for a meter
|
||||
SSchema *mgmtGetTableSchema(STabObj *pMeter); // get schema for a meter
|
||||
|
||||
bool mgmtMeterCreateFromMetric(STabObj *pMeterObj);
|
||||
bool mgmtIsMetric(STabObj *pMeterObj);
|
||||
bool mgmtIsNormalMeter(STabObj *pMeterObj);
|
||||
|
||||
// dnode API
|
||||
int mgmtInitDnodes();
|
||||
|
|
|
@ -181,11 +181,14 @@ enum _mgmt_table {
|
|||
|
||||
#define TSDB_KILL_MSG_LEN 30
|
||||
|
||||
#define TSDB_METER_METRIC 0 // metric
|
||||
#define TSDB_METER_MTABLE 1 // table created from metric
|
||||
#define TSDB_METER_OTABLE 2 // ordinary table
|
||||
#define TSDB_METER_STABLE 3 // table created from stream computing
|
||||
#define TSDB_MAX_METER_TYPES 4
|
||||
enum {
|
||||
TSDB_TABLE_TYPE_SUPER_TABLE = 0, // super table
|
||||
TSDB_TABLE_TYPE_CREATE_FROM_STABLE = 1, // table created from super table
|
||||
TSDB_TABLE_TYPE_NORMAL_TABLE = 2, // ordinary table
|
||||
TSDB_TABLE_TYPE_STREAM_TABLE = 3, // table created from stream computing
|
||||
TSDB_TABLE_TYPE_MAX = 4
|
||||
} ETableType;
|
||||
|
||||
|
||||
#define TSDB_VN_READ_ACCCESS ((char)0x1)
|
||||
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
|
||||
|
|
|
@ -2,22 +2,20 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
|||
PROJECT(TDengine)
|
||||
|
||||
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
|
||||
|
||||
INCLUDE_DIRECTORIES(inc)
|
||||
AUX_SOURCE_DIRECTORY(src SRC)
|
||||
ADD_LIBRARY(mnode ${SRC})
|
||||
TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread)
|
||||
|
||||
IF (TD_CLUSTER)
|
||||
TARGET_LINK_LIBRARIES(mnode mcluster)
|
||||
ELSEIF (TD_LITE)
|
||||
TARGET_LINK_LIBRARIES(mnode mlite)
|
||||
ENDIF ()
|
||||
ADD_LIBRARY(mnode ${SRC})
|
||||
#TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread)
|
||||
|
||||
#IF (TD_CLUSTER)
|
||||
# TARGET_LINK_LIBRARIES(mnode mcluster)
|
||||
#ENDIF ()
|
||||
ENDIF ()
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include "tast.h"
|
||||
|
||||
#ifndef TBASE_MNODE_UTIL_H
|
||||
#define TBASE_MNODE_UTIL_H
|
||||
|
||||
bool mgmtTableCreateFromSuperTable(STabObj *pTableObj);
|
||||
bool mgmtIsSuperTable(STabObj *pTableObj);
|
||||
bool mgmtIsNormalTable(STabObj *pTableObj);
|
||||
|
||||
typedef struct SSyntaxTreeFilterSupporter {
|
||||
SSchema* pTagSchema;
|
||||
int32_t numOfTags;
|
||||
int32_t optr;
|
||||
} SSyntaxTreeFilterSupporter;
|
||||
|
||||
char* mgmtTableGetTag(STabObj* pMeter, int32_t col, SSchema* pTagColSchema);
|
||||
int32_t mgmtGetTagsLength(STabObj* pMetric, int32_t col);
|
||||
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
|
||||
int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate);
|
||||
|
||||
#endif
|
|
@ -17,8 +17,12 @@
|
|||
|
||||
#include "tast.h"
|
||||
|
||||
#ifndef TBASE_MGMTUTIL_H
|
||||
#define TBASE_MGMTUTIL_H
|
||||
#ifndef TBASE_MNODE_UTIL_H
|
||||
#define TBASE_MNODE_UTIL_H
|
||||
|
||||
bool mgmtTableCreateFromSuperTable(STabObj *pTableObj);
|
||||
bool mgmtIsSuperTable(STabObj *pTableObj);
|
||||
bool mgmtIsNormalTable(STabObj *pTableObj);
|
||||
|
||||
typedef struct SSyntaxTreeFilterSupporter {
|
||||
SSchema* pTagSchema;
|
||||
|
@ -26,17 +30,18 @@ typedef struct SSyntaxTreeFilterSupporter {
|
|||
int32_t optr;
|
||||
} SSyntaxTreeFilterSupporter;
|
||||
|
||||
char* mgmtMeterGetTag(STabObj* pMeter, int32_t col, SSchema* pTagColSchema);
|
||||
int32_t mgmtFindTagCol(STabObj * pMetric, const char * tagName);
|
||||
|
||||
char* mgmtTableGetTag(STabObj* pMeter, int32_t col, SSchema* pTagColSchema);
|
||||
int32_t mgmtGetTagsLength(STabObj* pMetric, int32_t col);
|
||||
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
|
||||
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
|
||||
int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate);
|
||||
|
||||
int32_t mgmtFindTagCol(STabObj * pMetric, const char * tagName);
|
||||
|
||||
int32_t mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pInfo, int32_t tableIndex, tQueryResultset* pRes);
|
||||
int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes);
|
||||
void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pInfo, int32_t index, tQueryResultset* pRes);
|
||||
|
||||
bool tSkipListNodeFilterCallback(const void *pNode, void *param);
|
||||
|
||||
#endif //TBASE_MGMTUTIL_H
|
||||
|
||||
#endif
|
||||
|
|
|
@ -206,7 +206,7 @@ char *mgmtBuildCreateMeterIe(STabObj *pMeter, char *pMsg, int vnode) {
|
|||
*/
|
||||
pCreateMeter->sversion = htonl(pMeter->sversion);
|
||||
pCreateMeter->numOfColumns = htons(pMeter->numOfColumns);
|
||||
SSchema *pSchema = mgmtGetMeterSchema(pMeter);
|
||||
SSchema *pSchema = mgmtGetTableSchema(pMeter);
|
||||
|
||||
for (int i = 0; i < pMeter->numOfColumns; ++i) {
|
||||
pCreateMeter->schema[i].type = pSchema[i].type;
|
||||
|
|
|
@ -109,7 +109,7 @@ void mgmtMeterActionInit() {
|
|||
}
|
||||
|
||||
static int32_t mgmtGetReqTagsLength(STabObj *pMetric, int16_t *cols, int32_t numOfCols) {
|
||||
assert(mgmtIsMetric(pMetric) && numOfCols >= 0 && numOfCols <= TSDB_MAX_TAGS + 1);
|
||||
assert(mgmtIsSuperTable(pMetric) && numOfCols >= 0 && numOfCols <= TSDB_MAX_TAGS + 1);
|
||||
|
||||
int32_t len = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
|
@ -142,7 +142,7 @@ void *mgmtMeterActionReset(void *row, char *str, int size, int *ssize) {
|
|||
pMeter->schema = (char *)realloc(pMeter->schema, pMeter->schemaSize);
|
||||
memcpy(pMeter->schema, str + tsize, pMeter->schemaSize);
|
||||
|
||||
if (mgmtMeterCreateFromMetric(pMeter)) {
|
||||
if (mgmtTableCreateFromSuperTable(pMeter)) {
|
||||
pMeter->pTagData = pMeter->schema;
|
||||
}
|
||||
|
||||
|
@ -164,7 +164,7 @@ void *mgmtMeterActionInsert(void *row, char *str, int size, int *ssize) {
|
|||
|
||||
pMeter = (STabObj *)row;
|
||||
|
||||
if (mgmtIsNormalMeter(pMeter)) {
|
||||
if (mgmtIsNormalTable(pMeter)) {
|
||||
pVgroup = mgmtGetVgroup(pMeter->gid.vgId);
|
||||
if (pVgroup == NULL) {
|
||||
mError("id:%s not in vgroup:%d", pMeter->meterId, pMeter->gid.vgId);
|
||||
|
@ -185,17 +185,17 @@ void *mgmtMeterActionInsert(void *row, char *str, int size, int *ssize) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mgmtMeterCreateFromMetric(pMeter)) {
|
||||
if (mgmtTableCreateFromSuperTable(pMeter)) {
|
||||
pMeter->pTagData = (char *)pMeter->schema;
|
||||
pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
pMetric = mgmtGetTable(pMeter->pTagData);
|
||||
assert(pMetric != NULL);
|
||||
}
|
||||
|
||||
if (pMeter->meterType == TSDB_METER_STABLE) {
|
||||
if (pMeter->meterType == TSDB_TABLE_TYPE_STREAM_TABLE) {
|
||||
pMeter->pSql = (char *)pMeter->schema + sizeof(SSchema) * pMeter->numOfColumns;
|
||||
}
|
||||
|
||||
if (mgmtIsNormalMeter(pMeter)) {
|
||||
if (mgmtIsNormalTable(pMeter)) {
|
||||
if (pMetric) mgmtAddMeterIntoMetric(pMetric, pMeter);
|
||||
|
||||
if (!sdbMaster) {
|
||||
|
@ -233,7 +233,7 @@ void *mgmtMeterActionDelete(void *row, char *str, int size, int *ssize) {
|
|||
|
||||
pMeter = (STabObj *)row;
|
||||
|
||||
if (mgmtIsNormalMeter(pMeter)) {
|
||||
if (mgmtIsNormalTable(pMeter)) {
|
||||
if (pMeter->gid.vgId == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -251,13 +251,13 @@ void *mgmtMeterActionDelete(void *row, char *str, int size, int *ssize) {
|
|||
}
|
||||
}
|
||||
|
||||
if (mgmtMeterCreateFromMetric(pMeter)) {
|
||||
if (mgmtTableCreateFromSuperTable(pMeter)) {
|
||||
pMeter->pTagData = (char *)pMeter->schema;
|
||||
pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
pMetric = mgmtGetTable(pMeter->pTagData);
|
||||
assert(pMetric != NULL);
|
||||
}
|
||||
|
||||
if (mgmtIsNormalMeter(pMeter)) {
|
||||
if (mgmtIsNormalTable(pMeter)) {
|
||||
if (pMetric) mgmtRemoveMeterFromMetric(pMetric, pMeter);
|
||||
|
||||
pVgroup->meterList[pMeter->gid.sid] = NULL;
|
||||
|
@ -285,7 +285,7 @@ void *mgmtMeterActionUpdate(void *row, char *str, int size, int *ssize) {
|
|||
STabObj *pNew = (STabObj *)str;
|
||||
|
||||
if (pNew->isDirty) {
|
||||
pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
pMetric = mgmtGetTable(pMeter->pTagData);
|
||||
removeMeterFromMetricIndex(pMetric, pMeter);
|
||||
}
|
||||
mgmtMeterActionReset(pMeter, str, size, NULL);
|
||||
|
@ -353,7 +353,7 @@ void *mgmtMeterActionBatchUpdate(void *row, char *str, int size, int *ssize) {
|
|||
STabObj * pMeter = (STabObj *)row;
|
||||
SMeterBatchUpdateMsg *msg = (SMeterBatchUpdateMsg *)str;
|
||||
|
||||
if (mgmtIsMetric(pMeter)) {
|
||||
if (mgmtIsSuperTable(pMeter)) {
|
||||
if (msg->type == SDB_TYPE_INSERT) { // Insert schema
|
||||
uint32_t total_cols = pMeter->numOfColumns + pMeter->numOfTags;
|
||||
pMeter->schema = realloc(pMeter->schema, (total_cols + msg->cols) * sizeof(SSchema));
|
||||
|
@ -378,7 +378,7 @@ void *mgmtMeterActionBatchUpdate(void *row, char *str, int size, int *ssize) {
|
|||
|
||||
return pMeter->pHead;
|
||||
|
||||
} else if (mgmtMeterCreateFromMetric(pMeter)) {
|
||||
} else if (mgmtTableCreateFromSuperTable(pMeter)) {
|
||||
if (msg->type == SDB_TYPE_INSERT) {
|
||||
SSchema *schemas = (SSchema *)msg->data;
|
||||
int total_size = 0;
|
||||
|
@ -452,7 +452,7 @@ int mgmtInitMeters() {
|
|||
while (1) {
|
||||
pNode = sdbFetchRow(meterSdb, pNode, (void **)&pMeter);
|
||||
if (pMeter == NULL) break;
|
||||
if (mgmtIsMetric(pMeter)) pMeter->numOfMeters = 0;
|
||||
if (mgmtIsSuperTable(pMeter)) pMeter->numOfMeters = 0;
|
||||
}
|
||||
|
||||
pNode = NULL;
|
||||
|
@ -470,7 +470,7 @@ int mgmtInitMeters() {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (mgmtIsNormalMeter(pMeter)) {
|
||||
if (mgmtIsNormalTable(pMeter)) {
|
||||
pVgroup = mgmtGetVgroup(pMeter->gid.vgId);
|
||||
|
||||
if (pVgroup == NULL) {
|
||||
|
@ -501,13 +501,13 @@ int mgmtInitMeters() {
|
|||
pVgroup->meterList[pMeter->gid.sid] = pMeter;
|
||||
taosIdPoolMarkStatus(pVgroup->idPool, pMeter->gid.sid, 1);
|
||||
|
||||
if (pMeter->meterType == TSDB_METER_STABLE) {
|
||||
if (pMeter->meterType == TSDB_TABLE_TYPE_STREAM_TABLE) {
|
||||
pMeter->pSql = (char *)pMeter->schema + sizeof(SSchema) * pMeter->numOfColumns;
|
||||
}
|
||||
|
||||
if (mgmtMeterCreateFromMetric(pMeter)) {
|
||||
if (mgmtTableCreateFromSuperTable(pMeter)) {
|
||||
pMeter->pTagData = (char *)pMeter->schema; // + sizeof(SSchema)*pMeter->numOfColumns;
|
||||
pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
pMetric = mgmtGetTable(pMeter->pTagData);
|
||||
if (pMetric) mgmtAddMeterIntoMetric(pMetric, pMeter);
|
||||
}
|
||||
|
||||
|
@ -524,7 +524,7 @@ int mgmtInitMeters() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
STabObj *mgmtGetMeter(char *meterId) { return (STabObj *)sdbGetRow(meterSdb, meterId); }
|
||||
STabObj *mgmtGetTable(char *meterId) { return (STabObj *)sdbGetRow(meterSdb, meterId); }
|
||||
|
||||
int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
||||
STabObj * pMeter = NULL;
|
||||
|
@ -548,7 +548,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
}
|
||||
|
||||
// does table exist?
|
||||
pMeter = mgmtGetMeter(pCreate->meterId);
|
||||
pMeter = mgmtGetTable(pCreate->meterId);
|
||||
if (pMeter) {
|
||||
if (pCreate->igExists) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -563,9 +563,9 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
memset(pMeter, 0, sizeof(STabObj));
|
||||
|
||||
if (pCreate->numOfColumns == 0 && pCreate->numOfTags == 0) { // MTABLE
|
||||
pMeter->meterType = TSDB_METER_MTABLE;
|
||||
pMeter->meterType = TSDB_TABLE_TYPE_CREATE_FROM_STABLE;
|
||||
char *pTagData = (char *)pCreate->schema; // it is a tag key
|
||||
pMetric = mgmtGetMeter(pTagData);
|
||||
pMetric = mgmtGetTable(pTagData);
|
||||
if (pMetric == NULL) {
|
||||
mError("table:%s, corresponding super table does not exist", pCreate->meterId);
|
||||
return TSDB_CODE_INVALID_TABLE;
|
||||
|
@ -615,16 +615,16 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
}
|
||||
|
||||
if (pCreate->sqlLen > 0) {
|
||||
pMeter->meterType = TSDB_METER_STABLE;
|
||||
pMeter->meterType = TSDB_TABLE_TYPE_STREAM_TABLE;
|
||||
pMeter->pSql = pMeter->schema + numOfCols * sizeof(SSchema);
|
||||
memcpy(pMeter->pSql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen);
|
||||
pMeter->pSql[pCreate->sqlLen - 1] = 0;
|
||||
mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pMeter->pSql);
|
||||
} else {
|
||||
if (pCreate->numOfTags > 0) {
|
||||
pMeter->meterType = TSDB_METER_METRIC;
|
||||
pMeter->meterType = TSDB_TABLE_TYPE_SUPER_TABLE;
|
||||
} else {
|
||||
pMeter->meterType = TSDB_METER_OTABLE;
|
||||
pMeter->meterType = TSDB_TABLE_TYPE_NORMAL_TABLE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -723,7 +723,7 @@ int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore) {
|
|||
STabObj * pMeter;
|
||||
SAcctObj *pAcct;
|
||||
|
||||
pMeter = mgmtGetMeter(meterId);
|
||||
pMeter = mgmtGetTable(meterId);
|
||||
if (pMeter == NULL) {
|
||||
if (ignore) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -739,7 +739,7 @@ int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore) {
|
|||
return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
|
||||
}
|
||||
|
||||
if (mgmtIsNormalMeter(pMeter)) {
|
||||
if (mgmtIsNormalTable(pMeter)) {
|
||||
return dropMeterImp(pDb, pMeter, pAcct);
|
||||
} else {
|
||||
// remove a metric
|
||||
|
@ -762,7 +762,7 @@ int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore) {
|
|||
int mgmtAlterMeter(SDbObj *pDb, SAlterTableMsg *pAlter) {
|
||||
STabObj *pMeter;
|
||||
|
||||
pMeter = mgmtGetMeter(pAlter->meterId);
|
||||
pMeter = mgmtGetTable(pAlter->meterId);
|
||||
if (pMeter == NULL) {
|
||||
return TSDB_CODE_INVALID_TABLE;
|
||||
}
|
||||
|
@ -771,7 +771,7 @@ int mgmtAlterMeter(SDbObj *pDb, SAlterTableMsg *pAlter) {
|
|||
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) return TSDB_CODE_MONITOR_DB_FORBEIDDEN;
|
||||
|
||||
if (pAlter->type == TSDB_ALTER_TABLE_UPDATE_TAG_VAL) {
|
||||
if (!mgmtIsNormalMeter(pMeter) || !mgmtMeterCreateFromMetric(pMeter)) {
|
||||
if (!mgmtIsNormalTable(pMeter) || !mgmtTableCreateFromSuperTable(pMeter)) {
|
||||
return TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
}
|
||||
}
|
||||
|
@ -927,7 +927,7 @@ int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pMeter) {
|
|||
|
||||
void mgmtCleanUpMeters() { sdbCloseTable(meterSdb); }
|
||||
|
||||
int mgmtGetMeterMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
||||
int mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
||||
int cols = 0;
|
||||
|
||||
SDbObj *pDb = NULL;
|
||||
|
@ -974,16 +974,16 @@ int mgmtGetMeterMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
SSchema *mgmtGetMeterSchema(STabObj *pMeter) {
|
||||
SSchema *mgmtGetTableSchema(STabObj *pMeter) {
|
||||
if (pMeter == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!mgmtMeterCreateFromMetric(pMeter)) {
|
||||
if (!mgmtTableCreateFromSuperTable(pMeter)) {
|
||||
return (SSchema *)pMeter->schema;
|
||||
}
|
||||
|
||||
STabObj *pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
STabObj *pMetric = mgmtGetTable(pMeter->pTagData);
|
||||
assert(pMetric != NULL);
|
||||
|
||||
return (SSchema *)pMetric->schema;
|
||||
|
@ -1001,7 +1001,7 @@ static int32_t mgmtSerializeTagValue(char* pMsg, STabObj* pMeter, int16_t* tagsI
|
|||
offset += TSDB_METER_NAME_LEN;
|
||||
} else {
|
||||
SSchema s = {0};
|
||||
char * tag = mgmtMeterGetTag(pMeter, tagsId[j], &s);
|
||||
char * tag = mgmtTableGetTag(pMeter, tagsId[j], &s);
|
||||
|
||||
memcpy(pMsg + offset, tag, (size_t)s.bytes);
|
||||
offset += s.bytes;
|
||||
|
@ -1201,9 +1201,9 @@ int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SMetricMetaMsg *pMetr
|
|||
|
||||
for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) {
|
||||
SMetricMetaElemMsg *pElem = doConvertMetricMetaMsg(pMetricMetaMsg, i);
|
||||
STabObj * pMetric = mgmtGetMeter(pElem->meterId);
|
||||
STabObj * pMetric = mgmtGetTable(pElem->meterId);
|
||||
|
||||
if (!mgmtIsMetric(pMetric)) {
|
||||
if (!mgmtIsSuperTable(pMetric)) {
|
||||
ret = TSDB_CODE_NOT_SUPER_TABLE;
|
||||
break;
|
||||
}
|
||||
|
@ -1218,7 +1218,7 @@ int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SMetricMetaMsg *pMetr
|
|||
|
||||
for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) {
|
||||
SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg*) ((char *) pMetricMetaMsg + pMetricMetaMsg->metaElem[i]);
|
||||
STabObj *pMetric = mgmtGetMeter(pElem->meterId);
|
||||
STabObj *pMetric = mgmtGetTable(pElem->meterId);
|
||||
|
||||
if (pMetric->pSkipList->nSize > num) {
|
||||
index = i;
|
||||
|
@ -1294,7 +1294,7 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
|
|||
pShow->pNode = sdbFetchRow(meterSdb, pShow->pNode, (void **)&pMeter);
|
||||
if (pMeter == NULL) break;
|
||||
|
||||
if (mgmtIsMetric(pMeter)) continue;
|
||||
if (mgmtIsSuperTable(pMeter)) continue;
|
||||
|
||||
// not belong to current db
|
||||
if (strncmp(pMeter->meterId, prefix, prefixLen)) continue;
|
||||
|
@ -1455,7 +1455,7 @@ int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
|
|||
}
|
||||
|
||||
int32_t mgmtFindTagCol(STabObj *pMetric, const char *tagName) {
|
||||
if (!mgmtIsMetric(pMetric)) return -1;
|
||||
if (!mgmtIsSuperTable(pMetric)) return -1;
|
||||
|
||||
SSchema *schema = NULL;
|
||||
|
||||
|
@ -1474,7 +1474,7 @@ int32_t mgmtMeterModifyTagNameByCol(STabObj *pMetric, uint32_t col, const char *
|
|||
|
||||
uint32_t len = strlen(nname);
|
||||
|
||||
if (pMetric == NULL || (!mgmtIsMetric(pMetric)) || col >= pMetric->numOfTags || len >= TSDB_COL_NAME_LEN ||
|
||||
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric)) || col >= pMetric->numOfTags || len >= TSDB_COL_NAME_LEN ||
|
||||
mgmtFindTagCol(pMetric, nname) >= 0)
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
||||
|
@ -1503,7 +1503,7 @@ int32_t mgmtMeterModifyTagNameByCol(STabObj *pMetric, uint32_t col, const char *
|
|||
}
|
||||
|
||||
int32_t mgmtMeterModifyTagNameByName(STabObj *pMetric, const char *oname, const char *nname) {
|
||||
if (pMetric == NULL || (!mgmtIsMetric(pMetric))) return TSDB_CODE_APP_ERROR;
|
||||
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric))) return TSDB_CODE_APP_ERROR;
|
||||
|
||||
int index = mgmtFindTagCol(pMetric, oname);
|
||||
if (index < 0) {
|
||||
|
@ -1517,9 +1517,9 @@ int32_t mgmtMeterModifyTagNameByName(STabObj *pMetric, const char *oname, const
|
|||
|
||||
int32_t mgmtMeterModifyTagValueByCol(STabObj *pMeter, int col, const char *nContent) {
|
||||
int rowSize = 0;
|
||||
if (pMeter == NULL || nContent == NULL || (!mgmtMeterCreateFromMetric(pMeter))) return TSDB_CODE_APP_ERROR;
|
||||
if (pMeter == NULL || nContent == NULL || (!mgmtTableCreateFromSuperTable(pMeter))) return TSDB_CODE_APP_ERROR;
|
||||
|
||||
STabObj *pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
STabObj *pMetric = mgmtGetTable(pMeter->pTagData);
|
||||
assert(pMetric != NULL);
|
||||
|
||||
if (col < 0 || col > pMetric->numOfTags) return TSDB_CODE_APP_ERROR;
|
||||
|
@ -1561,10 +1561,10 @@ int32_t mgmtMeterModifyTagValueByCol(STabObj *pMeter, int col, const char *nCont
|
|||
}
|
||||
|
||||
int32_t mgmtMeterModifyTagValueByName(STabObj *pMeter, char *tagName, char *nContent) {
|
||||
if (pMeter == NULL || tagName == NULL || nContent == NULL || (!mgmtMeterCreateFromMetric(pMeter)))
|
||||
if (pMeter == NULL || tagName == NULL || nContent == NULL || (!mgmtTableCreateFromSuperTable(pMeter)))
|
||||
return TSDB_CODE_INVALID_MSG_TYPE;
|
||||
|
||||
STabObj *pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
STabObj *pMetric = mgmtGetTable(pMeter->pTagData);
|
||||
if (pMetric == NULL) return TSDB_CODE_APP_ERROR;
|
||||
|
||||
int col = mgmtFindTagCol(pMetric, tagName);
|
||||
|
@ -1574,7 +1574,7 @@ int32_t mgmtMeterModifyTagValueByName(STabObj *pMeter, char *tagName, char *nCon
|
|||
}
|
||||
|
||||
int32_t mgmtMeterAddTags(STabObj *pMetric, SSchema schema[], int ntags) {
|
||||
if (pMetric == NULL || (!mgmtIsMetric(pMetric))) return TSDB_CODE_INVALID_TABLE;
|
||||
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric))) return TSDB_CODE_INVALID_TABLE;
|
||||
|
||||
if (pMetric->numOfTags + ntags > TSDB_MAX_TAGS) return TSDB_CODE_APP_ERROR;
|
||||
|
||||
|
@ -1615,7 +1615,7 @@ int32_t mgmtMeterAddTags(STabObj *pMetric, SSchema schema[], int ntags) {
|
|||
}
|
||||
|
||||
int32_t mgmtMeterDropTagByCol(STabObj *pMetric, int col) {
|
||||
if (pMetric == NULL || (!mgmtIsMetric(pMetric)) || col <= 0 || col >= pMetric->numOfTags) return TSDB_CODE_APP_ERROR;
|
||||
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric)) || col <= 0 || col >= pMetric->numOfTags) return TSDB_CODE_APP_ERROR;
|
||||
|
||||
// Pack message to do batch update
|
||||
uint32_t size = sizeof(SMeterBatchUpdateMsg) + sizeof(SchemaUnit);
|
||||
|
@ -1643,7 +1643,7 @@ int32_t mgmtMeterDropTagByCol(STabObj *pMetric, int col) {
|
|||
}
|
||||
|
||||
int32_t mgmtMeterDropTagByName(STabObj *pMetric, char *name) {
|
||||
if (pMetric == NULL || (!mgmtIsMetric(pMetric))) {
|
||||
if (pMetric == NULL || (!mgmtIsSuperTable(pMetric))) {
|
||||
mTrace("Failed to drop tag name: %s from table: %s", name, pMetric->meterId);
|
||||
return TSDB_CODE_INVALID_TABLE;
|
||||
}
|
||||
|
@ -1657,7 +1657,7 @@ int32_t mgmtFindColumnIndex(STabObj *pMeter, const char *colName) {
|
|||
STabObj *pMetric = NULL;
|
||||
SSchema *schema = NULL;
|
||||
|
||||
if (pMeter->meterType == TSDB_METER_OTABLE || pMeter->meterType == TSDB_METER_METRIC) {
|
||||
if (pMeter->meterType == TSDB_TABLE_TYPE_NORMAL_TABLE || pMeter->meterType == TSDB_TABLE_TYPE_SUPER_TABLE) {
|
||||
schema = (SSchema *)pMeter->schema;
|
||||
for (int32_t i = 0; i < pMeter->numOfColumns; i++) {
|
||||
if (strcasecmp(schema[i].name, colName) == 0) {
|
||||
|
@ -1665,8 +1665,8 @@ int32_t mgmtFindColumnIndex(STabObj *pMeter, const char *colName) {
|
|||
}
|
||||
}
|
||||
|
||||
} else if (pMeter->meterType == TSDB_METER_MTABLE) {
|
||||
pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
} else if (pMeter->meterType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE) {
|
||||
pMetric = mgmtGetTable(pMeter->pTagData);
|
||||
if (pMetric == NULL) {
|
||||
mError("MTable not belongs to any metric, meter: %s", pMeter->meterId);
|
||||
return -1;
|
||||
|
@ -1686,7 +1686,7 @@ int32_t mgmtMeterAddColumn(STabObj *pMeter, SSchema schema[], int ncols) {
|
|||
SAcctObj *pAcct = NULL;
|
||||
SDbObj * pDb = NULL;
|
||||
|
||||
if (pMeter == NULL || pMeter->meterType == TSDB_METER_MTABLE || pMeter->meterType == TSDB_METER_STABLE || ncols <= 0)
|
||||
if (pMeter == NULL || pMeter->meterType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE || pMeter->meterType == TSDB_TABLE_TYPE_STREAM_TABLE || ncols <= 0)
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
||||
// ASSUMPTION: no two tags are the same
|
||||
|
@ -1707,9 +1707,9 @@ int32_t mgmtMeterAddColumn(STabObj *pMeter, SSchema schema[], int ncols) {
|
|||
|
||||
pMeter->schema = realloc(pMeter->schema, pMeter->schemaSize + sizeof(SSchema) * ncols);
|
||||
|
||||
if (pMeter->meterType == TSDB_METER_OTABLE) {
|
||||
if (pMeter->meterType == TSDB_TABLE_TYPE_NORMAL_TABLE) {
|
||||
memcpy(pMeter->schema + pMeter->schemaSize, schema, sizeof(SSchema) * ncols);
|
||||
} else if (pMeter->meterType == TSDB_METER_METRIC) {
|
||||
} else if (pMeter->meterType == TSDB_TABLE_TYPE_SUPER_TABLE) {
|
||||
memmove(pMeter->schema + sizeof(SSchema) * (pMeter->numOfColumns + ncols),
|
||||
pMeter->schema + sizeof(SSchema) * pMeter->numOfColumns, sizeof(SSchema) * pMeter->numOfTags);
|
||||
memcpy(pMeter->schema + sizeof(SSchema) * pMeter->numOfColumns, schema, sizeof(SSchema) * ncols);
|
||||
|
@ -1721,13 +1721,13 @@ int32_t mgmtMeterAddColumn(STabObj *pMeter, SSchema schema[], int ncols) {
|
|||
pMeter->schemaSize += sizeof(SSchema) * ncols;
|
||||
pMeter->numOfColumns += ncols;
|
||||
pMeter->sversion++;
|
||||
if (mgmtIsNormalMeter(pMeter))
|
||||
if (mgmtIsNormalTable(pMeter))
|
||||
pAcct->acctInfo.numOfTimeSeries += ncols;
|
||||
else
|
||||
pAcct->acctInfo.numOfTimeSeries += (ncols * pMeter->numOfMeters);
|
||||
sdbUpdateRow(meterSdb, pMeter, 0, 1);
|
||||
|
||||
if (pMeter->meterType == TSDB_METER_METRIC) {
|
||||
if (pMeter->meterType == TSDB_TABLE_TYPE_SUPER_TABLE) {
|
||||
for (STabObj *pObj = pMeter->pHead; pObj != NULL; pObj = pObj->next) {
|
||||
pObj->numOfColumns++;
|
||||
pObj->nextColId = pMeter->nextColId;
|
||||
|
@ -1743,7 +1743,7 @@ int32_t mgmtMeterDropColumnByName(STabObj *pMeter, const char *name) {
|
|||
SAcctObj *pAcct = NULL;
|
||||
SDbObj * pDb = NULL;
|
||||
|
||||
if (pMeter == NULL || pMeter->meterType == TSDB_METER_MTABLE || pMeter->meterType == TSDB_METER_STABLE)
|
||||
if (pMeter == NULL || pMeter->meterType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE || pMeter->meterType == TSDB_TABLE_TYPE_STREAM_TABLE)
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
||||
int32_t index = mgmtFindColumnIndex(pMeter, name);
|
||||
|
@ -1761,16 +1761,16 @@ int32_t mgmtMeterDropColumnByName(STabObj *pMeter, const char *name) {
|
|||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
if (pMeter->meterType == TSDB_METER_OTABLE) {
|
||||
if (pMeter->meterType == TSDB_TABLE_TYPE_NORMAL_TABLE) {
|
||||
memmove(pMeter->schema + sizeof(SSchema) * index, pMeter->schema + sizeof(SSchema) * (index + 1),
|
||||
sizeof(SSchema) * (pMeter->numOfColumns - index - 1));
|
||||
} else if (pMeter->meterType == TSDB_METER_METRIC) {
|
||||
} else if (pMeter->meterType == TSDB_TABLE_TYPE_SUPER_TABLE) {
|
||||
memmove(pMeter->schema + sizeof(SSchema) * index, pMeter->schema + sizeof(SSchema) * (index + 1),
|
||||
sizeof(SSchema) * (pMeter->numOfColumns + pMeter->numOfTags - index - 1));
|
||||
}
|
||||
pMeter->schemaSize -= sizeof(SSchema);
|
||||
pMeter->numOfColumns--;
|
||||
if (mgmtIsNormalMeter(pMeter))
|
||||
if (mgmtIsNormalTable(pMeter))
|
||||
pAcct->acctInfo.numOfTimeSeries--;
|
||||
else
|
||||
pAcct->acctInfo.numOfTimeSeries -= (pMeter->numOfMeters);
|
||||
|
@ -1779,7 +1779,7 @@ int32_t mgmtMeterDropColumnByName(STabObj *pMeter, const char *name) {
|
|||
pMeter->sversion++;
|
||||
sdbUpdateRow(meterSdb, pMeter, 0, 1);
|
||||
|
||||
if (pMeter->meterType == TSDB_METER_METRIC) {
|
||||
if (pMeter->meterType == TSDB_TABLE_TYPE_SUPER_TABLE) {
|
||||
for (STabObj *pObj = pMeter->pHead; pObj != NULL; pObj = pObj->next) {
|
||||
pObj->numOfColumns--;
|
||||
pObj->sversion = pMeter->sversion;
|
||||
|
|
|
@ -157,7 +157,7 @@ bool mgmtCheckMeterMetaMsgType(char *pMsg) {
|
|||
SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
|
||||
|
||||
int16_t autoCreate = htons(pInfo->createFlag);
|
||||
STabObj *pMeterObj = mgmtGetMeter(pInfo->meterId);
|
||||
STabObj *pMeterObj = mgmtGetTable(pInfo->meterId);
|
||||
|
||||
// If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue
|
||||
bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1);
|
||||
|
@ -199,7 +199,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
goto _exit_code;
|
||||
}
|
||||
|
||||
pMeterObj = mgmtGetMeter(pInfo->meterId);
|
||||
pMeterObj = mgmtGetTable(pInfo->meterId);
|
||||
|
||||
// on demand create table from super table if meter does not exists
|
||||
if (pMeterObj == NULL && pInfo->createFlag == 1) {
|
||||
|
@ -243,7 +243,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
goto _exit_code;
|
||||
}
|
||||
|
||||
pMeterObj = mgmtGetMeter(pInfo->meterId);
|
||||
pMeterObj = mgmtGetTable(pInfo->meterId);
|
||||
}
|
||||
|
||||
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
|
||||
|
@ -279,10 +279,10 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
pMsg += sizeof(SMeterMeta);
|
||||
pSchema = (SSchema *)pMsg; // schema locates at the end of SMeterMeta struct
|
||||
|
||||
if (mgmtMeterCreateFromMetric(pMeterObj)) {
|
||||
if (mgmtTableCreateFromSuperTable(pMeterObj)) {
|
||||
assert(pMeterObj->numOfTags == 0);
|
||||
|
||||
STabObj *pMetric = mgmtGetMeter(pMeterObj->pTagData);
|
||||
STabObj *pMetric = mgmtGetTable(pMeterObj->pTagData);
|
||||
uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns;
|
||||
|
||||
pMeta->numOfTags = pMetric->numOfTags; // update the numOfTags info
|
||||
|
@ -302,7 +302,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
pMsg += numOfTotalCols * sizeof(SSchema);
|
||||
}
|
||||
|
||||
if (mgmtIsNormalMeter(pMeterObj)) {
|
||||
if (mgmtIsNormalTable(pMeterObj)) {
|
||||
pVgroup = mgmtGetVgroup(pMeterObj->gid.vgId);
|
||||
if (pVgroup == NULL) {
|
||||
pRsp->code = TSDB_CODE_INVALID_TABLE;
|
||||
|
@ -397,7 +397,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
}
|
||||
|
||||
// get meter schema, and fill into resp payload
|
||||
pMeterObj = mgmtGetMeter(tblName);
|
||||
pMeterObj = mgmtGetTable(tblName);
|
||||
pDbObj = mgmtGetDbByMeterId(tblName);
|
||||
|
||||
if (pMeterObj == NULL || (pDbObj == NULL)) {
|
||||
|
@ -419,10 +419,10 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
pCurMeter += sizeof(SMultiMeterMeta);
|
||||
pSchema = (SSchema *)pCurMeter; // schema locates at the end of SMeterMeta struct
|
||||
|
||||
if (mgmtMeterCreateFromMetric(pMeterObj)) {
|
||||
if (mgmtTableCreateFromSuperTable(pMeterObj)) {
|
||||
assert(pMeterObj->numOfTags == 0);
|
||||
|
||||
STabObj *pMetric = mgmtGetMeter(pMeterObj->pTagData);
|
||||
STabObj *pMetric = mgmtGetTable(pMeterObj->pTagData);
|
||||
uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns;
|
||||
|
||||
pMeta->meta.numOfTags = pMetric->numOfTags; // update the numOfTags info
|
||||
|
@ -442,7 +442,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
pCurMeter += numOfTotalCols * sizeof(SSchema);
|
||||
}
|
||||
|
||||
if (mgmtIsNormalMeter(pMeterObj)) {
|
||||
if (mgmtIsNormalTable(pMeterObj)) {
|
||||
pVgroup = mgmtGetVgroup(pMeterObj->gid.vgId);
|
||||
if (pVgroup == NULL) {
|
||||
pRsp->code = TSDB_CODE_INVALID_TABLE;
|
||||
|
@ -509,7 +509,7 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
}
|
||||
|
||||
SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)(((char *)pMetricMetaMsg) + pMetricMetaMsg->metaElem[0]);
|
||||
pMetric = mgmtGetMeter(pElem->meterId);
|
||||
pMetric = mgmtGetTable(pElem->meterId);
|
||||
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
@ -880,7 +880,7 @@ int mgmtProcessUseDbMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
}
|
||||
|
||||
int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = {
|
||||
mgmtGetAcctMeta, mgmtGetUserMeta, mgmtGetDbMeta, mgmtGetMeterMeta, mgmtGetDnodeMeta,
|
||||
mgmtGetAcctMeta, mgmtGetUserMeta, mgmtGetDbMeta, mgmtGetTableMeta, mgmtGetDnodeMeta,
|
||||
mgmtGetMnodeMeta, mgmtGetVgroupMeta, mgmtGetMetricMeta, mgmtGetModuleMeta, mgmtGetQueryMeta,
|
||||
mgmtGetStreamMeta, mgmtGetConfigMeta, mgmtGetConnsMeta, mgmtGetScoresMeta, grantGetGrantsMeta,
|
||||
mgmtGetVnodeMeta,
|
||||
|
@ -1076,7 +1076,7 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
} else if (code != TSDB_CODE_SUCCESS) {
|
||||
if (code == TSDB_CODE_TABLE_ALREADY_EXIST) { // table already created when the second attempt to create table
|
||||
|
||||
STabObj* pMeter = mgmtGetMeter(pCreate->meterId);
|
||||
STabObj* pMeter = mgmtGetTable(pCreate->meterId);
|
||||
assert(pMeter != NULL);
|
||||
|
||||
mWarn("table:%s, table already created, failed to create table, ts:%" PRId64 ", code:%d", pCreate->meterId,
|
||||
|
|
|
@ -84,8 +84,8 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para
|
|||
schema.type = TSDB_DATA_TYPE_BINARY;
|
||||
schema.bytes = TSDB_METER_ID_LEN;
|
||||
} else {
|
||||
f1 = mgmtMeterGetTag(pNode1, colIdx, NULL);
|
||||
f2 = mgmtMeterGetTag(pNode2, colIdx, &schema);
|
||||
f1 = mgmtTableGetTag(pNode1, colIdx, NULL);
|
||||
f2 = mgmtTableGetTag(pNode2, colIdx, &schema);
|
||||
assert(schema.type == pOrderDesc->pTagSchema->pSchema[colIdx].type);
|
||||
}
|
||||
|
||||
|
@ -134,7 +134,7 @@ void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pMetricMetaMsg, int32_t ta
|
|||
|
||||
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMetricMetaMsg + pMetricMetaMsg->metaElem[tableIndex]);
|
||||
|
||||
STabObj* pMetric = mgmtGetMeter(pElem->meterId);
|
||||
STabObj* pMetric = mgmtGetTable(pElem->meterId);
|
||||
SSchema* pTagSchema = (SSchema*)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema));
|
||||
|
||||
/*
|
||||
|
@ -181,7 +181,7 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
|
|||
pRes->num = 0;
|
||||
|
||||
for (pToken = strsep(&str, sep); pToken != NULL; pToken = strsep(&str, sep)) {
|
||||
STabObj* pMeterObj = mgmtGetMeter(pToken);
|
||||
STabObj* pMeterObj = mgmtGetTable(pToken);
|
||||
if (pMeterObj == NULL) {
|
||||
mWarn("metric:%s error in metric query expression, invalid meter id:%s", pMetric->meterId, pToken);
|
||||
continue;
|
||||
|
@ -193,7 +193,7 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
|
|||
}
|
||||
|
||||
/* not a table created from metric, ignore */
|
||||
if (pMeterObj->meterType != TSDB_METER_MTABLE) {
|
||||
if (pMeterObj->meterType != TSDB_TABLE_TYPE_CREATE_FROM_STABLE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -201,7 +201,7 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
|
|||
* queried meter not belongs to this metric, ignore, metric does not have
|
||||
* uid, so compare according to meterid
|
||||
*/
|
||||
STabObj* parentMetric = mgmtGetMeter(pMeterObj->pTagData);
|
||||
STabObj* parentMetric = mgmtGetTable(pMeterObj->pTagData);
|
||||
if (strncasecmp(parentMetric->meterId, pMetric->meterId, TSDB_METER_ID_LEN) != 0 ||
|
||||
(parentMetric->uid != pMetric->uid)) {
|
||||
continue;
|
||||
|
@ -256,7 +256,7 @@ UNUSED_FUNC static bool mgmtJoinFilterCallback(tSkipListNode* pNode, void* param
|
|||
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
|
||||
|
||||
SSchema s = {0};
|
||||
char* v = mgmtMeterGetTag((STabObj*)pNode->pData, pSupporter->colIndex, &s);
|
||||
char* v = mgmtTableGetTag((STabObj*)pNode->pData, pSupporter->colIndex, &s);
|
||||
|
||||
for (int32_t i = 0; i < pSupporter->size; ++i) {
|
||||
int32_t ret = doCompare(v, pSupporter->val[i], pSupporter->type, s.bytes);
|
||||
|
@ -288,7 +288,7 @@ static void orderResult(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes, i
|
|||
tOrderDescriptor* descriptor =
|
||||
(tOrderDescriptor*)calloc(1, sizeof(tOrderDescriptor) + sizeof(int32_t) * 1); // only one column for join
|
||||
|
||||
STabObj* pMetric = mgmtGetMeter(pElem->meterId);
|
||||
STabObj* pMetric = mgmtGetTable(pElem->meterId);
|
||||
SSchema* pTagSchema = (SSchema*)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema));
|
||||
|
||||
descriptor->pTagSchema = tCreateTagSchema(pTagSchema, pMetric->numOfTags);
|
||||
|
@ -311,8 +311,8 @@ static int32_t mgmtCheckForDuplicateTagValue(tQueryResultset* pRes, int32_t inde
|
|||
STabObj* pObj1 = pRes[index].pRes[k - 1];
|
||||
STabObj* pObj2 = pRes[index].pRes[k];
|
||||
|
||||
char* val1 = mgmtMeterGetTag(pObj1, tagCol, &s);
|
||||
char* val2 = mgmtMeterGetTag(pObj2, tagCol, NULL);
|
||||
char* val1 = mgmtTableGetTag(pObj1, tagCol, &s);
|
||||
char* val2 = mgmtTableGetTag(pObj2, tagCol, NULL);
|
||||
|
||||
if (doCompare(val1, val2, s.type, s.bytes) == 0) {
|
||||
return TSDB_CODE_DUPLICATE_TAGS;
|
||||
|
@ -354,8 +354,8 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
|
|||
strcpy(right, cond + TSDB_METER_ID_LEN + sizeof(int16_t));
|
||||
int16_t rightTagColIndex = *(int16_t*)(cond + TSDB_METER_ID_LEN * 2 + sizeof(int16_t));
|
||||
|
||||
STabObj* pLeftMetric = mgmtGetMeter(left);
|
||||
STabObj* pRightMetric = mgmtGetMeter(right);
|
||||
STabObj* pLeftMetric = mgmtGetTable(left);
|
||||
STabObj* pRightMetric = mgmtGetTable(right);
|
||||
|
||||
// decide the pRes belongs to
|
||||
int32_t leftIndex = 0;
|
||||
|
@ -363,7 +363,7 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
|
|||
|
||||
for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) {
|
||||
STabObj* pObj = (STabObj*)pRes[i].pRes[0];
|
||||
STabObj* pMetric1 = mgmtGetMeter(pObj->pTagData);
|
||||
STabObj* pMetric1 = mgmtGetTable(pObj->pTagData);
|
||||
if (pMetric1 == pLeftMetric) {
|
||||
leftIndex = i;
|
||||
} else if (pMetric1 == pRightMetric) {
|
||||
|
@ -391,8 +391,8 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
|
|||
STabObj* pLeftObj = pRes[leftIndex].pRes[i];
|
||||
STabObj* pRightObj = pRes[rightIndex].pRes[j];
|
||||
|
||||
char* v1 = mgmtMeterGetTag(pLeftObj, leftTagColIndex, &s);
|
||||
char* v2 = mgmtMeterGetTag(pRightObj, rightTagColIndex, NULL);
|
||||
char* v1 = mgmtTableGetTag(pLeftObj, leftTagColIndex, &s);
|
||||
char* v2 = mgmtTableGetTag(pRightObj, rightTagColIndex, NULL);
|
||||
|
||||
int32_t ret = doCompare(v1, v2, s.type, s.bytes);
|
||||
if (ret == 0) { // qualified
|
||||
|
@ -729,7 +729,7 @@ static int32_t mgmtFilterMeterByIndex(STabObj* pMetric, tQueryResultset* pRes, c
|
|||
|
||||
int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQueryResultset* pRes) {
|
||||
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMsg + pMsg->metaElem[tableIndex]);
|
||||
STabObj* pMetric = mgmtGetMeter(pElem->meterId);
|
||||
STabObj* pMetric = mgmtGetTable(pElem->meterId);
|
||||
char* pCond = NULL;
|
||||
char* tmpTableNameCond = NULL;
|
||||
|
||||
|
|
|
@ -21,32 +21,18 @@
|
|||
#include "textbuffer.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "vnodeTagMgmt.h"
|
||||
|
||||
extern int cksumsize;
|
||||
|
||||
uint64_t mgmtGetCheckSum(FILE* fp, int offset) {
|
||||
uint64_t checksum = 0;
|
||||
uint64_t data;
|
||||
int bytes;
|
||||
|
||||
while (1) {
|
||||
data = 0;
|
||||
bytes = fread(&data, sizeof(data), 1, fp);
|
||||
|
||||
if (bytes != sizeof(data)) break;
|
||||
|
||||
checksum += data;
|
||||
}
|
||||
|
||||
return checksum;
|
||||
bool mgmtTableCreateFromSuperTable(STabObj* pTableObj) {
|
||||
return pTableObj->meterType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE;
|
||||
}
|
||||
|
||||
bool mgmtMeterCreateFromMetric(STabObj* pMeterObj) { return pMeterObj->meterType == TSDB_METER_MTABLE; }
|
||||
bool mgmtIsSuperTable(STabObj* pTableObj) {
|
||||
return pTableObj->meterType == TSDB_TABLE_TYPE_SUPER_TABLE;
|
||||
}
|
||||
|
||||
bool mgmtIsMetric(STabObj* pMeterObj) { return pMeterObj->meterType == TSDB_METER_METRIC; }
|
||||
|
||||
bool mgmtIsNormalMeter(STabObj* pMeterObj) { return !mgmtIsMetric(pMeterObj); }
|
||||
bool mgmtIsNormalTable(STabObj* pTableObj) {
|
||||
return !mgmtIsSuperTable(pTableObj);
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO: the tag offset value should be kept in memory to avoid dynamically calculating the value
|
||||
|
@ -56,30 +42,30 @@ bool mgmtIsNormalMeter(STabObj* pMeterObj) { return !mgmtIsMetric(pMeterObj); }
|
|||
* @param pTagColSchema
|
||||
* @return
|
||||
*/
|
||||
char* mgmtMeterGetTag(STabObj* pMeter, int32_t col, SSchema* pTagColSchema) {
|
||||
if (!mgmtMeterCreateFromMetric(pMeter)) {
|
||||
char* mgmtTableGetTag(STabObj* pTable, int32_t col, SSchema* pTagColSchema) {
|
||||
if (!mgmtTableCreateFromSuperTable(pTable)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STabObj* pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
int32_t offset = mgmtGetTagsLength(pMetric, col) + TSDB_METER_ID_LEN;
|
||||
STabObj* pSuperTable = mgmtGetTable(pTable->pTagData);
|
||||
int32_t offset = mgmtGetTagsLength(pSuperTable, col) + TSDB_METER_ID_LEN;
|
||||
assert(offset > 0);
|
||||
|
||||
if (pTagColSchema != NULL) {
|
||||
*pTagColSchema = ((SSchema*)pMetric->schema)[pMetric->numOfColumns + col];
|
||||
*pTagColSchema = ((SSchema*)pSuperTable->schema)[pSuperTable->numOfColumns + col];
|
||||
}
|
||||
|
||||
return (pMeter->pTagData + offset);
|
||||
return (pTable->pTagData + offset);
|
||||
}
|
||||
|
||||
int32_t mgmtGetTagsLength(STabObj* pMetric, int32_t col) { // length before column col
|
||||
assert(mgmtIsMetric(pMetric) && col >= 0);
|
||||
int32_t mgmtGetTagsLength(STabObj* pSuperTable, int32_t col) { // length before column col
|
||||
assert(mgmtIsSuperTable(pSuperTable) && col >= 0);
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t tagColumnIndexOffset = pMetric->numOfColumns;
|
||||
int32_t tagColumnIndexOffset = pSuperTable->numOfColumns;
|
||||
|
||||
for (int32_t i = 0; i < pMetric->numOfTags && i < col; ++i) {
|
||||
len += ((SSchema*)pMetric->schema)[tagColumnIndexOffset + i].bytes;
|
||||
for (int32_t i = 0; i < pSuperTable->numOfTags && i < col; ++i) {
|
||||
len += ((SSchema*)pSuperTable->schema)[tagColumnIndexOffset + i].bytes;
|
||||
}
|
||||
|
||||
return len;
|
||||
|
|
|
@ -241,7 +241,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
|||
SVgObj *pVgroup = NULL;
|
||||
STabObj *pMeter = NULL;
|
||||
if (pShow->payloadLen > 0 ) {
|
||||
pMeter = mgmtGetMeter(pShow->payload);
|
||||
pMeter = mgmtGetTable(pShow->payload);
|
||||
if (NULL == pMeter) {
|
||||
return TSDB_CODE_INVALID_METER_ID;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue