Merge branch '2.0' of https://github.com/taosdata/TDengine into 2.0
This commit is contained in:
commit
ded484b5cd
|
@ -45,13 +45,15 @@ mkdir build && cd build
|
|||
cmake .. && cmake --build .
|
||||
```
|
||||
|
||||
if compiling on an ARM processor(aarch64 or aarch32), you need add one parameter:
|
||||
To compile on an ARM processor (aarch64 or aarch32), please add option CPUTYPE as below:
|
||||
|
||||
```cmd
|
||||
aarch64:
|
||||
```cmd
|
||||
cmake .. -DCPUTYPE=aarch64 && cmake --build .
|
||||
```
|
||||
|
||||
aarch32:
|
||||
```cmd
|
||||
cmake .. -DCPUTYPE=aarch32 && cmake --build .
|
||||
```
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@ sed -i "2c$debver" ${pkg_dir}/DEBIAN/control
|
|||
if [ "$verMode" == "cluster" ]; then
|
||||
debname="TDengine-server-"${tdengine_ver}-${osType}-${cpuType}
|
||||
elif [ "$verMode" == "lite" ]; then
|
||||
debname="TDengine-server-edge"-${tdengine_ver}-${osType}-${cpuType}
|
||||
debname="TDengine-server"-${tdengine_ver}-${osType}-${cpuType}
|
||||
else
|
||||
echo "unknow verMode, nor cluster or lite"
|
||||
exit 1
|
||||
|
|
|
@ -66,7 +66,7 @@ cp_rpm_package ${pkg_dir}/RPMS
|
|||
if [ "$verMode" == "cluster" ]; then
|
||||
rpmname="TDengine-server-"${tdengine_ver}-${osType}-${cpuType}
|
||||
elif [ "$verMode" == "lite" ]; then
|
||||
rpmname="TDengine-server-edge"-${tdengine_ver}-${osType}-${cpuType}
|
||||
rpmname="TDengine-server"-${tdengine_ver}-${osType}-${cpuType}
|
||||
else
|
||||
echo "unknow verMode, nor cluster or lite"
|
||||
exit 1
|
||||
|
|
|
@ -111,7 +111,7 @@ cd ${release_dir}
|
|||
if [ "$verMode" == "cluster" ]; then
|
||||
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
|
||||
elif [ "$verMode" == "lite" ]; then
|
||||
pkg_name=${install_dir}-edge-${version}-${osType}-${cpuType}
|
||||
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
|
||||
else
|
||||
echo "unknow verMode, nor cluster or lite"
|
||||
exit 1
|
||||
|
|
|
@ -111,7 +111,7 @@ cd ${release_dir}
|
|||
if [ "$verMode" == "cluster" ]; then
|
||||
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
|
||||
elif [ "$verMode" == "lite" ]; then
|
||||
pkg_name=${install_dir}-edge-${version}-${osType}-${cpuType}
|
||||
pkg_name=${install_dir}-${version}-${osType}-${cpuType}
|
||||
else
|
||||
echo "unknow verMode, nor cluster or lite"
|
||||
exit 1
|
||||
|
@ -128,4 +128,4 @@ fi
|
|||
|
||||
tar -zcv -f "$(basename ${pkg_name}).tar.gz" $(basename ${install_dir}) --remove-files || :
|
||||
|
||||
cd ${curr_dir}
|
||||
cd ${curr_dir}
|
||||
|
|
|
@ -30,10 +30,10 @@ extern "C" {
|
|||
#include "tsdb.h"
|
||||
|
||||
#define UTIL_METER_IS_SUPERTABLE(metaInfo) \
|
||||
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_METRIC))
|
||||
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE))
|
||||
#define UTIL_METER_IS_NOMRAL_METER(metaInfo) (!(UTIL_METER_IS_SUPERTABLE(metaInfo)))
|
||||
#define UTIL_METER_IS_CREATE_FROM_METRIC(metaInfo) \
|
||||
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->meterType == TSDB_METER_MTABLE))
|
||||
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE))
|
||||
|
||||
#define TSDB_COL_IS_TAG(f) (((f)&TSDB_COL_TAG) != 0)
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ typedef struct SMeterMetaInfo {
|
|||
* 2. keep the vnode index for multi-vnode insertion
|
||||
*/
|
||||
int32_t vnodeIndex;
|
||||
char name[TSDB_METER_ID_LEN + 1]; // table(super table) name
|
||||
char name[TSDB_TABLE_ID_LEN + 1]; // table(super table) name
|
||||
int16_t numOfTags; // total required tags in query, including groupby tags
|
||||
int16_t tagColumnIndex[TSDB_MAX_TAGS]; // clause + tag projection
|
||||
} SMeterMetaInfo;
|
||||
|
@ -119,7 +119,7 @@ typedef struct SCond {
|
|||
} SCond;
|
||||
|
||||
typedef struct SJoinNode {
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
uint64_t uid;
|
||||
int16_t tagCol;
|
||||
} SJoinNode;
|
||||
|
@ -154,7 +154,7 @@ typedef struct SParamInfo {
|
|||
} SParamInfo;
|
||||
|
||||
typedef struct STableDataBlocks {
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
int8_t tsSource; // where does the UNIX timestamp come from, server or client
|
||||
bool ordered; // if current rows are ordered or not
|
||||
int64_t vgid; // virtual group id
|
||||
|
@ -302,7 +302,7 @@ typedef struct _tsc_obj {
|
|||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_KEY_LEN];
|
||||
char acctId[TSDB_DB_NAME_LEN];
|
||||
char db[TSDB_METER_ID_LEN];
|
||||
char db[TSDB_TABLE_ID_LEN];
|
||||
char sversion[TSDB_VERSION_LEN];
|
||||
char writeAuth : 1;
|
||||
char superAuth : 1;
|
||||
|
|
|
@ -24,8 +24,6 @@ taos_fetch_row_a
|
|||
taos_subscribe
|
||||
taos_consume
|
||||
taos_unsubscribe
|
||||
taos_subfields_count
|
||||
taos_fetch_subfields
|
||||
taos_open_stream
|
||||
taos_close_stream
|
||||
taos_fetch_block
|
||||
|
|
|
@ -3194,7 +3194,7 @@ static void diff_function(SQLFunctionCtx *pCtx) {
|
|||
GET_RES_INFO(pCtx)->numOfRes += forwardStep;
|
||||
|
||||
pCtx->aOutputBuf += forwardStep * pCtx->outputBytes;
|
||||
pCtx->ptsOutputBuf += forwardStep * TSDB_KEYSIZE;
|
||||
pCtx->ptsOutputBuf = (char*)pCtx->ptsOutputBuf + forwardStep * TSDB_KEYSIZE;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -79,8 +79,8 @@ static int32_t getToStringLength(const char *pData, int32_t length, int32_t type
|
|||
static int32_t tscMaxLengthOfTagsFields(SSqlObj *pSql) {
|
||||
SMeterMeta *pMeta = tscGetMeterMetaInfo(&pSql->cmd, 0, 0)->pMeterMeta;
|
||||
|
||||
if (pMeta->meterType == TSDB_METER_METRIC || pMeta->meterType == TSDB_METER_OTABLE ||
|
||||
pMeta->meterType == TSDB_METER_STABLE) {
|
||||
if (pMeta->tableType == TSDB_TABLE_TYPE_SUPER_TABLE || pMeta->tableType == TSDB_TABLE_TYPE_NORMAL_TABLE ||
|
||||
pMeta->tableType == TSDB_TABLE_TYPE_STREAM_TABLE) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -776,7 +776,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
|||
SMeterMetaInfo *pSTableMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, STABLE_INDEX);
|
||||
setMeterID(pSTableMeterMetaInfo, &sToken, pSql);
|
||||
|
||||
strncpy(pTag->name, pSTableMeterMetaInfo->name, TSDB_METER_ID_LEN);
|
||||
strncpy(pTag->name, pSTableMeterMetaInfo->name, TSDB_TABLE_ID_LEN);
|
||||
code = tscGetMeterMeta(pSql, pSTableMeterMetaInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -950,7 +950,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
int validateTableName(char *tblName, int len) {
|
||||
char buf[TSDB_METER_ID_LEN] = {0};
|
||||
char buf[TSDB_TABLE_ID_LEN] = {0};
|
||||
strncpy(buf, tblName, len);
|
||||
|
||||
SSQLToken token = {.n = len, .type = TK_ID, .z = buf};
|
||||
|
@ -1544,7 +1544,7 @@ void tscProcessMultiVnodesInsertFromFile(SSqlObj *pSql) {
|
|||
continue;
|
||||
}
|
||||
|
||||
strncpy(pMeterMetaInfo->name, pDataBlock->meterId, TSDB_METER_ID_LEN);
|
||||
strncpy(pMeterMetaInfo->name, pDataBlock->meterId, TSDB_TABLE_ID_LEN);
|
||||
memset(pDataBlock->pData, 0, pDataBlock->nAllocSize);
|
||||
|
||||
int32_t ret = tscGetMeterMeta(pSql, pMeterMetaInfo);
|
||||
|
|
|
@ -1072,11 +1072,11 @@ int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pDB, SSQL
|
|||
*xlen = totalLen;
|
||||
}
|
||||
|
||||
if (totalLen < TSDB_METER_ID_LEN) {
|
||||
if (totalLen < TSDB_TABLE_ID_LEN) {
|
||||
fullName[totalLen] = 0;
|
||||
}
|
||||
|
||||
return (totalLen <= TSDB_METER_ID_LEN) ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_SQL;
|
||||
return (totalLen <= TSDB_TABLE_ID_LEN) ? TSDB_CODE_SUCCESS : TSDB_CODE_INVALID_SQL;
|
||||
}
|
||||
|
||||
static void extractColumnNameFromString(tSQLExprItem* pItem) {
|
||||
|
@ -1901,7 +1901,7 @@ int32_t getMeterIndex(SSQLToken* pTableToken, SQueryInfo* pQueryInfo, SColumnInd
|
|||
}
|
||||
|
||||
pIndex->tableIndex = COLUMN_INDEX_INITIAL_VAL;
|
||||
char tableName[TSDB_METER_ID_LEN + 1] = {0};
|
||||
char tableName[TSDB_TABLE_ID_LEN + 1] = {0};
|
||||
|
||||
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
|
||||
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
|
||||
|
@ -3365,7 +3365,7 @@ static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* ac
|
|||
SStringBuilder sb1 = {0};
|
||||
taosStringBuilderAppendStringLen(&sb1, QUERY_COND_REL_PREFIX_IN, QUERY_COND_REL_PREFIX_IN_LEN);
|
||||
|
||||
char db[TSDB_METER_ID_LEN] = {0};
|
||||
char db[TSDB_TABLE_ID_LEN] = {0};
|
||||
|
||||
// remove the duplicated input table names
|
||||
int32_t num = 0;
|
||||
|
@ -3389,7 +3389,7 @@ static int32_t setTableCondForMetricQuery(SQueryInfo* pQueryInfo, const char* ac
|
|||
taosStringBuilderAppendStringLen(&sb1, TBNAME_LIST_SEP, 1);
|
||||
}
|
||||
|
||||
char idBuf[TSDB_METER_ID_LEN + 1] = {0};
|
||||
char idBuf[TSDB_TABLE_ID_LEN + 1] = {0};
|
||||
int32_t xlen = strlen(segments[i]);
|
||||
SSQLToken t = {.z = segments[i], .n = xlen, .type = TK_STRING};
|
||||
|
||||
|
@ -5300,7 +5300,7 @@ int32_t doCheckForCreateFromStable(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
}
|
||||
|
||||
// get meter meta from mnode
|
||||
strncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, TSDB_METER_ID_LEN);
|
||||
strncpy(pCreateTable->usingInfo.tagdata.name, pStableMeterMetaInfo->name, TSDB_TABLE_ID_LEN);
|
||||
tVariantList* pList = pInfo->pCreateTableInfo->usingInfo.pTagVals;
|
||||
|
||||
int32_t code = tscGetMeterMeta(pSql, pStableMeterMetaInfo);
|
||||
|
|
|
@ -2685,7 +2685,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
// fill head info
|
||||
SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
|
||||
memset(pMgmt->db, 0, TSDB_METER_ID_LEN); // server don't need the db
|
||||
memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN); // server don't need the db
|
||||
|
||||
SMultiMeterInfoMsg *pInfoMsg = (SMultiMeterInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
|
||||
pInfoMsg->numOfMeters = htonl((int32_t)pCmd->count);
|
||||
|
@ -2709,7 +2709,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
|
||||
const int32_t defaultSize =
|
||||
minMsgSize() + sizeof(SMetricMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
|
||||
minMsgSize() + sizeof(SSuperTableMetaMsg) + sizeof(SMgmtHead) + sizeof(int16_t) * TSDB_MAX_TAGS;
|
||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
|
||||
int32_t n = 0;
|
||||
|
@ -2722,7 +2722,7 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
|
|||
tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
|
||||
}
|
||||
|
||||
int32_t joinCondLen = (TSDB_METER_ID_LEN + sizeof(int16_t)) * 2;
|
||||
int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
|
||||
int32_t elemSize = sizeof(SMetricMetaElemMsg) * pQueryInfo->numOfTables;
|
||||
|
||||
int32_t len = tagLen + joinCondLen + elemSize + defaultSize;
|
||||
|
@ -2731,7 +2731,7 @@ static int32_t tscEstimateMetricMetaMsgSize(SSqlCmd *pCmd) {
|
|||
}
|
||||
|
||||
int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||
SMetricMetaMsg *pMetaMsg;
|
||||
SSuperTableMetaMsg *pMetaMsg;
|
||||
char * pMsg, *pStart;
|
||||
int msgLen = 0;
|
||||
int tableIndex = 0;
|
||||
|
@ -2757,25 +2757,25 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
pMsg += sizeof(SMgmtHead);
|
||||
|
||||
pMetaMsg = (SMetricMetaMsg *)pMsg;
|
||||
pMetaMsg = (SSuperTableMetaMsg *)pMsg;
|
||||
pMetaMsg->numOfMeters = htonl(pQueryInfo->numOfTables);
|
||||
|
||||
pMsg += sizeof(SMetricMetaMsg);
|
||||
pMsg += sizeof(SSuperTableMetaMsg);
|
||||
|
||||
int32_t offset = pMsg - (char *)pMetaMsg;
|
||||
pMetaMsg->join = htonl(offset);
|
||||
|
||||
// todo refactor
|
||||
pMetaMsg->joinCondLen = htonl((TSDB_METER_ID_LEN + sizeof(int16_t)) * 2);
|
||||
pMetaMsg->joinCondLen = htonl((TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2);
|
||||
|
||||
memcpy(pMsg, pTagCond->joinInfo.left.meterId, TSDB_METER_ID_LEN);
|
||||
pMsg += TSDB_METER_ID_LEN;
|
||||
memcpy(pMsg, pTagCond->joinInfo.left.meterId, TSDB_TABLE_ID_LEN);
|
||||
pMsg += TSDB_TABLE_ID_LEN;
|
||||
|
||||
*(int16_t *)pMsg = pTagCond->joinInfo.left.tagCol;
|
||||
pMsg += sizeof(int16_t);
|
||||
|
||||
memcpy(pMsg, pTagCond->joinInfo.right.meterId, TSDB_METER_ID_LEN);
|
||||
pMsg += TSDB_METER_ID_LEN;
|
||||
memcpy(pMsg, pTagCond->joinInfo.right.meterId, TSDB_TABLE_ID_LEN);
|
||||
pMsg += TSDB_TABLE_ID_LEN;
|
||||
|
||||
*(int16_t *)pMsg = pTagCond->joinInfo.right.tagCol;
|
||||
pMsg += sizeof(int16_t);
|
||||
|
@ -2991,7 +2991,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
|
|||
int32_t tagLen = 0;
|
||||
SSchema *pTagsSchema = tsGetTagSchema(pMeta);
|
||||
|
||||
if (pMeta->meterType == TSDB_METER_MTABLE) {
|
||||
if (pMeta->tableType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE) {
|
||||
for (int32_t i = 0; i < pMeta->numOfTags; ++i) {
|
||||
tagLen += pTagsSchema[i].bytes;
|
||||
}
|
||||
|
@ -3106,7 +3106,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
|
|||
int32_t tagLen = 0;
|
||||
SSchema *pTagsSchema = tsGetTagSchema(pMeta);
|
||||
|
||||
if (pMeta->meterType == TSDB_METER_MTABLE) {
|
||||
if (pMeta->tableType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE) {
|
||||
for (int32_t j = 0; j < pMeta->numOfTags; ++j) {
|
||||
tagLen += pTagsSchema[j].bytes;
|
||||
}
|
||||
|
@ -3304,7 +3304,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
int tscProcessConnectRsp(SSqlObj *pSql) {
|
||||
char temp[TSDB_METER_ID_LEN * 2];
|
||||
char temp[TSDB_TABLE_ID_LEN * 2];
|
||||
SConnectRsp *pConnect;
|
||||
|
||||
STscObj *pObj = pSql->pTscObj;
|
||||
|
|
|
@ -1047,7 +1047,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
|
|||
}
|
||||
|
||||
char *nextStr;
|
||||
char tblName[TSDB_METER_ID_LEN];
|
||||
char tblName[TSDB_TABLE_ID_LEN];
|
||||
int payloadLen = 0;
|
||||
char *pMsg = pCmd->payload;
|
||||
while (1) {
|
||||
|
|
|
@ -657,7 +657,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
|
|||
dataBuf->size = startOffset;
|
||||
dataBuf->tsSource = -1;
|
||||
|
||||
strncpy(dataBuf->meterId, name, TSDB_METER_ID_LEN);
|
||||
strncpy(dataBuf->meterId, name, TSDB_TABLE_ID_LEN);
|
||||
|
||||
/*
|
||||
* The metermeta may be released since the metermeta cache are completed clean by other thread
|
||||
|
@ -1810,7 +1810,7 @@ SMeterMetaInfo* tscAddMeterMetaInfo(SQueryInfo* pQueryInfo, const char* name, SM
|
|||
assert(pMeterMetaInfo != NULL);
|
||||
|
||||
if (name != NULL) {
|
||||
assert(strlen(name) <= TSDB_METER_ID_LEN);
|
||||
assert(strlen(name) <= TSDB_TABLE_ID_LEN);
|
||||
strcpy(pMeterMetaInfo->name, name);
|
||||
}
|
||||
|
||||
|
|
|
@ -312,7 +312,7 @@ int vnodeProcessCreateMeterMsg(char *pMsg, int msgLen) {
|
|||
pObj->vnode = pCreate->vnode;
|
||||
pObj->sid = pCreate->sid;
|
||||
pObj->uid = pCreate->uid;
|
||||
memcpy(pObj->meterId, pCreate->meterId, TSDB_METER_ID_LEN);
|
||||
memcpy(pObj->meterId, pCreate->meterId, TSDB_TABLE_ID_LEN);
|
||||
pObj->numOfColumns = pCreate->numOfColumns;
|
||||
pObj->timeStamp = pCreate->timeStamp;
|
||||
pObj->sversion = htonl(pCreate->sversion);
|
||||
|
@ -377,7 +377,7 @@ int vnodeProcessRemoveMeterRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
|
|||
pObj = vnodeList[pRemove->vnode].meterList[pRemove->sid];
|
||||
if (pObj == NULL) goto _remove_over;
|
||||
|
||||
if (memcmp(pObj->meterId, pRemove->meterId, TSDB_METER_ID_LEN) != 0) {
|
||||
if (memcmp(pObj->meterId, pRemove->meterId, TSDB_TABLE_ID_LEN) != 0) {
|
||||
dWarn("vid:%d sid:%d id:%s, remove ID:%s, meter ID not matched", pObj->vnode, pObj->sid, pObj->meterId,
|
||||
pRemove->meterId);
|
||||
goto _remove_over;
|
||||
|
|
|
@ -95,7 +95,7 @@ typedef struct {
|
|||
} SMeterGid;
|
||||
|
||||
typedef struct _tab_obj {
|
||||
char meterId[TSDB_METER_ID_LEN + 1];
|
||||
char meterId[TSDB_TABLE_ID_LEN + 1];
|
||||
uint64_t uid;
|
||||
SMeterGid gid;
|
||||
|
||||
|
@ -106,7 +106,7 @@ typedef struct _tab_obj {
|
|||
int32_t numOfColumns;
|
||||
int32_t schemaSize;
|
||||
short nextColId;
|
||||
char meterType : 4;
|
||||
char tableType : 4;
|
||||
char status : 3;
|
||||
char isDirty : 1; // if the table change tag column 1 value
|
||||
char reserved[15];
|
||||
|
@ -116,7 +116,7 @@ typedef struct _tab_obj {
|
|||
tSkipList * pSkipList;
|
||||
struct _tab_obj *pHead; // for metric, a link list for all meters created
|
||||
// according to this metric
|
||||
char *pTagData; // TSDB_METER_ID_LEN(metric_name)+
|
||||
char *pTagData; // TSDB_TABLE_ID_LEN(metric_name)+
|
||||
// tags_value1/tags_value2/tags_value3
|
||||
struct _tab_obj *prev, *next;
|
||||
char * pSql; // pointer to SQL, for SC, null-terminated string
|
||||
|
@ -262,8 +262,8 @@ extern SDnodeObj dnodeObj;
|
|||
// dnodeInt API
|
||||
int mgmtInitDnodeInt();
|
||||
void mgmtCleanUpDnodeInt();
|
||||
int mgmtSendCreateMsgToVgroup(STabObj *pMeter, SVgObj *pVgroup);
|
||||
int mgmtSendRemoveMeterMsgToDnode(STabObj *pMeter, SVgObj *pVgroup);
|
||||
int mgmtSendCreateMsgToVgroup(STabObj *pTable, SVgObj *pVgroup);
|
||||
int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup);
|
||||
int mgmtSendVPeersMsg(SVgObj *pVgroup);
|
||||
int mgmtSendFreeVnodeMsg(SVgObj *pVgroup);
|
||||
int mgmtSendOneFreeVnodeMsg(SVnodeGid *pVnodeGid);
|
||||
|
@ -284,8 +284,8 @@ int mgmtRetrieveUsers(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
|
|||
void mgmtCleanUpUsers();
|
||||
|
||||
// metric API
|
||||
int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pMeter);
|
||||
int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pMeter);
|
||||
int mgmtAddMeterIntoMetric(STabObj *pMetric, STabObj *pTable);
|
||||
int mgmtRemoveMeterFromMetric(STabObj *pMetric, STabObj *pTable);
|
||||
int mgmtGetMetricMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
|
||||
int mgmtRetrieveMetrics(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
|
||||
|
||||
|
@ -324,20 +324,17 @@ void mgmtCleanUpVgroups();
|
|||
|
||||
// meter API
|
||||
int mgmtInitMeters();
|
||||
STabObj *mgmtGetMeter(char *meterId);
|
||||
STabObj *mgmtGetMeterInfo(char *src, char *tags[]);
|
||||
int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SMetricMetaMsg *pInfo);
|
||||
STabObj *mgmtGetTable(char *meterId);
|
||||
STabObj *mgmtGetTableInfo(char *src, char *tags[]);
|
||||
int mgmtRetrieveMetricMeta(SConnObj *pConn, char **pStart, SSuperTableMetaMsg *pInfo);
|
||||
int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate);
|
||||
int mgmtDropMeter(SDbObj *pDb, char *meterId, int ignore);
|
||||
int mgmtAlterMeter(SDbObj *pDb, SAlterTableMsg *pAlter);
|
||||
int mgmtGetMeterMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
|
||||
int mgmtGetTableMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
|
||||
int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
|
||||
void mgmtCleanUpMeters();
|
||||
SSchema *mgmtGetMeterSchema(STabObj *pMeter); // get schema for a meter
|
||||
SSchema *mgmtGetTableSchema(STabObj *pTable); // get schema for a meter
|
||||
|
||||
bool mgmtMeterCreateFromMetric(STabObj *pMeterObj);
|
||||
bool mgmtIsMetric(STabObj *pMeterObj);
|
||||
bool mgmtIsNormalMeter(STabObj *pMeterObj);
|
||||
|
||||
// dnode API
|
||||
int mgmtInitDnodes();
|
||||
|
@ -376,16 +373,6 @@ int grantRetrieveGrants(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
|
|||
int mgmtGetVnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
|
||||
int mgmtRetrieveVnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn);
|
||||
|
||||
// dnode balance api
|
||||
int mgmtInitBalance();
|
||||
void mgmtCleanupBalance();
|
||||
int mgmtAllocVnodes(SVgObj *pVgroup);
|
||||
int mgmtSetDnodeShellRemoving(SDnodeObj *pDnode);
|
||||
void mgmtSetDnodeUnRemove(SDnodeObj *pDnode);
|
||||
void mgmtStartBalanceTimer(int64_t mseconds);
|
||||
void mgmtSetDnodeOfflineOnSdbChanged();
|
||||
void mgmtUpdateVgroupState(SVgObj *pVgroup, int lbStatus, int srcIp);
|
||||
bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode);
|
||||
|
||||
void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
|
||||
int mgmtUnSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
|
|
@ -181,11 +181,14 @@ enum _mgmt_table {
|
|||
|
||||
#define TSDB_KILL_MSG_LEN 30
|
||||
|
||||
#define TSDB_METER_METRIC 0 // metric
|
||||
#define TSDB_METER_MTABLE 1 // table created from metric
|
||||
#define TSDB_METER_OTABLE 2 // ordinary table
|
||||
#define TSDB_METER_STABLE 3 // table created from stream computing
|
||||
#define TSDB_MAX_METER_TYPES 4
|
||||
enum {
|
||||
TSDB_TABLE_TYPE_SUPER_TABLE = 0, // super table
|
||||
TSDB_TABLE_TYPE_CREATE_FROM_STABLE = 1, // table created from super table
|
||||
TSDB_TABLE_TYPE_NORMAL_TABLE = 2, // ordinary table
|
||||
TSDB_TABLE_TYPE_STREAM_TABLE = 3, // table created from stream computing
|
||||
TSDB_TABLE_TYPE_MAX = 4
|
||||
} ETableType;
|
||||
|
||||
|
||||
#define TSDB_VN_READ_ACCCESS ((char)0x1)
|
||||
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
|
||||
|
@ -298,7 +301,7 @@ typedef struct {
|
|||
uint64_t uid;
|
||||
char spi;
|
||||
char encrypt;
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
char secret[TSDB_KEY_LEN];
|
||||
char cipheringKey[TSDB_KEY_LEN];
|
||||
uint64_t timeStamp;
|
||||
|
@ -311,7 +314,7 @@ typedef struct {
|
|||
} SCreateMsg;
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_METER_ID_LEN];
|
||||
char db[TSDB_TABLE_ID_LEN];
|
||||
uint8_t ignoreNotExists;
|
||||
} SDropDbMsg, SUseDbMsg;
|
||||
|
||||
|
@ -324,7 +327,7 @@ typedef struct {
|
|||
} SShowTableMsg;
|
||||
|
||||
typedef struct {
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
char igExists;
|
||||
|
||||
short numOfTags;
|
||||
|
@ -338,12 +341,12 @@ typedef struct {
|
|||
} SCreateTableMsg;
|
||||
|
||||
typedef struct {
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
char igNotExists;
|
||||
} SDropTableMsg;
|
||||
|
||||
typedef struct {
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
short type; /* operation type */
|
||||
char tagVal[TSDB_MAX_BYTES_PER_ROW];
|
||||
short numOfCols; /* number of schema */
|
||||
|
@ -352,7 +355,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char clientVersion[TSDB_VERSION_LEN];
|
||||
char db[TSDB_METER_ID_LEN];
|
||||
char db[TSDB_TABLE_ID_LEN];
|
||||
} SConnectMsg;
|
||||
|
||||
typedef struct {
|
||||
|
@ -383,7 +386,7 @@ typedef struct {
|
|||
} SCreateUserMsg, SAlterUserMsg;
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_METER_ID_LEN];
|
||||
char db[TSDB_TABLE_ID_LEN];
|
||||
} SMgmtHead;
|
||||
|
||||
typedef struct {
|
||||
|
@ -397,7 +400,7 @@ typedef struct {
|
|||
short vnode;
|
||||
int32_t sid;
|
||||
uint64_t uid;
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
} SRemoveMeterMsg;
|
||||
|
||||
typedef struct {
|
||||
|
@ -601,7 +604,7 @@ typedef struct {
|
|||
* the message is too large, so it may will overwrite the cfg information in meterobj.v*
|
||||
* recover to origin codes
|
||||
*/
|
||||
//char db[TSDB_METER_ID_LEN+2]; // 8bytes align
|
||||
//char db[TSDB_TABLE_ID_LEN+2]; // 8bytes align
|
||||
char db[TSDB_DB_NAME_LEN];
|
||||
uint32_t vgId;
|
||||
int32_t maxSessions;
|
||||
|
@ -692,7 +695,7 @@ typedef struct {
|
|||
} SVPeersMsg;
|
||||
|
||||
typedef struct {
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
short createFlag;
|
||||
char tags[];
|
||||
} SMeterInfoMsg;
|
||||
|
@ -705,7 +708,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
int16_t elemLen;
|
||||
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
int16_t orderIndex;
|
||||
int16_t orderType; // used in group by xx order by xxx
|
||||
|
||||
|
@ -729,7 +732,7 @@ typedef struct {
|
|||
int32_t join;
|
||||
int32_t joinCondLen; // for join condition
|
||||
int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM];
|
||||
} SMetricMetaMsg;
|
||||
} SSuperTableMetaMsg;
|
||||
|
||||
typedef struct {
|
||||
SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT];
|
||||
|
@ -748,7 +751,7 @@ typedef struct {
|
|||
typedef struct SMeterMeta {
|
||||
uint8_t numOfTags : 6;
|
||||
uint8_t precision : 2;
|
||||
uint8_t meterType : 4;
|
||||
uint8_t tableType : 4;
|
||||
uint8_t index : 4; // used locally
|
||||
|
||||
int16_t numOfColumns;
|
||||
|
@ -764,12 +767,12 @@ typedef struct SMeterMeta {
|
|||
} SMeterMeta;
|
||||
|
||||
typedef struct SMultiMeterMeta {
|
||||
char meterId[TSDB_METER_ID_LEN]; // note: This field must be at the front
|
||||
char meterId[TSDB_TABLE_ID_LEN]; // note: This field must be at the front
|
||||
SMeterMeta meta;
|
||||
} SMultiMeterMeta;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_METER_ID_LEN];
|
||||
char name[TSDB_TABLE_ID_LEN];
|
||||
char data[TSDB_MAX_TAGS_LEN];
|
||||
} STagData;
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ extern "C" {
|
|||
#define TS_PATH_DELIMITER_LEN 1
|
||||
|
||||
#define TSDB_METER_ID_LEN_MARGIN 10
|
||||
#define TSDB_METER_ID_LEN (TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*TS_PATH_DELIMITER_LEN+TSDB_USERID_LEN+TSDB_METER_ID_LEN_MARGIN) //TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*strlen(TS_PATH_DELIMITER)+strlen(USERID)
|
||||
#define TSDB_TABLE_ID_LEN (TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*TS_PATH_DELIMITER_LEN+TSDB_USERID_LEN+TSDB_METER_ID_LEN_MARGIN) //TSDB_DB_NAME_LEN+TSDB_METER_NAME_LEN+2*strlen(TS_PATH_DELIMITER)+strlen(USERID)
|
||||
#define TSDB_UNI_LEN 24
|
||||
#define TSDB_USER_LEN TSDB_UNI_LEN
|
||||
#define TSDB_ACCT_LEN TSDB_UNI_LEN
|
||||
|
|
|
@ -2,22 +2,20 @@ CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
|||
PROJECT(TDengine)
|
||||
|
||||
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/mnode/detail/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/vnode/detail/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/client/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/dnode/inc)
|
||||
|
||||
INCLUDE_DIRECTORIES(inc)
|
||||
AUX_SOURCE_DIRECTORY(src SRC)
|
||||
ADD_LIBRARY(mnode ${SRC})
|
||||
TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread)
|
||||
|
||||
IF (TD_CLUSTER)
|
||||
TARGET_LINK_LIBRARIES(mnode mcluster)
|
||||
ELSEIF (TD_LITE)
|
||||
TARGET_LINK_LIBRARIES(mnode mlite)
|
||||
ENDIF ()
|
||||
ADD_LIBRARY(mnode ${SRC})
|
||||
#TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread)
|
||||
|
||||
#IF (TD_CLUSTER)
|
||||
# TARGET_LINK_LIBRARIES(mnode mcluster)
|
||||
#ENDIF ()
|
||||
ENDIF ()
|
||||
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
|
||||
int32_t mgmtCreateAcct(char *name, char *pass, SAcctCfg *pCfg);
|
||||
int32_t mgmtUpdateAcct(SAcctObj *pAcct);
|
||||
|
|
|
@ -13,60 +13,32 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_MGMTBALANCE_H
|
||||
#define TDENGINE_MGMTBALANCE_H
|
||||
#ifndef TDENGINE_MGMT_BALANCE_H
|
||||
#define TDENGINE_MGMT_BALANCE_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#include "dnodeSystem.h"
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "tglobalcfg.h"
|
||||
#include "vnodeStatus.h"
|
||||
#include "ttime.h"
|
||||
|
||||
void mgmtCreateDnodeOrderList();
|
||||
|
||||
void mgmtReleaseDnodeOrderList();
|
||||
|
||||
void mgmtMakeDnodeOrderList();
|
||||
|
||||
void mgmtCalcSystemScore();
|
||||
|
||||
float mgmtTryCalcDnodeScore(SDnodeObj *pDnode, int extraVnode);
|
||||
|
||||
bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode);
|
||||
|
||||
bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode);
|
||||
|
||||
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType);
|
||||
|
||||
void mgmtMonitorDnodeModule();
|
||||
|
||||
void mgmtSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
|
||||
|
||||
int mgmtUnSetModuleInDnode(SDnodeObj *pDnode, int moduleType);
|
||||
|
||||
void mgmtMonitorVgroups();
|
||||
|
||||
void mgmtMonitorDnodes();
|
||||
|
||||
void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode);
|
||||
|
||||
extern void * dnodeSdb;
|
||||
extern void * vgSdb;
|
||||
extern void * balanceTimer;
|
||||
extern int mgmtOrderedDnodesSize;
|
||||
extern int mgmtOrderedDnodesMallocSize;
|
||||
extern SDnodeObj **mgmtOrderedDnodes;
|
||||
extern uint32_t mgmtAccessSquence;
|
||||
extern SMgmtIpList mgmtIpList;
|
||||
extern void (*mgmtStartBalanceTimer)(int64_t mseconds);
|
||||
extern int32_t (*mgmtInitBalance)();
|
||||
extern void (*mgmtCleanupBalance)();
|
||||
extern int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup);
|
||||
extern bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType);
|
||||
extern char* (*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode);
|
||||
extern bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode);
|
||||
extern void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus);
|
||||
extern void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp);
|
||||
extern bool (*mgmtAddVnode)(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // TDENGINE_MGMTBALANCE_H
|
||||
#endif
|
||||
|
|
|
@ -20,7 +20,7 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
|
||||
int mgmtGetQueryMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn);
|
||||
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TBASE_MNODE_SUPER_TABLE_QUERY_H
|
||||
#define TBASE_MNODE_SUPER_TABLE_QUERY_H
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include "mnode.h"
|
||||
|
||||
int32_t mgmtRetrieveMetersFromSuperTable(SSuperTableMetaMsg* pInfo, int32_t tableIndex, tQueryResultset* pRes);
|
||||
int32_t mgmtDoJoin(SSuperTableMetaMsg* pSuperTableMetaMsg, tQueryResultset* pRes);
|
||||
void mgmtReorganizeMetersInMetricMeta(SSuperTableMetaMsg* pInfo, int32_t index, tQueryResultset* pRes);
|
||||
|
||||
|
||||
#endif
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TBASE_MNODE_TABLE_H
|
||||
#define TBASE_MNODE_TABLE_H
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include "mnode.h"
|
||||
|
||||
int32_t mgmtFindTagCol(STabObj * pTable, const char * tagName);
|
||||
|
||||
#endif
|
|
@ -11,32 +11,22 @@
|
|||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/#include <stdio.h>
|
||||
*/
|
||||
|
||||
#ifndef TBASE_MNODE_UTIL_H
|
||||
#define TBASE_MNODE_UTIL_H
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdint.h>
|
||||
#include "mnode.h"
|
||||
|
||||
#include "tast.h"
|
||||
|
||||
#ifndef TBASE_MGMTUTIL_H
|
||||
#define TBASE_MGMTUTIL_H
|
||||
|
||||
typedef struct SSyntaxTreeFilterSupporter {
|
||||
SSchema* pTagSchema;
|
||||
int32_t numOfTags;
|
||||
int32_t optr;
|
||||
} SSyntaxTreeFilterSupporter;
|
||||
|
||||
char* mgmtMeterGetTag(STabObj* pMeter, int32_t col, SSchema* pTagColSchema);
|
||||
int32_t mgmtFindTagCol(STabObj * pMetric, const char * tagName);
|
||||
|
||||
int32_t mgmtGetTagsLength(STabObj* pMetric, int32_t col);
|
||||
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
|
||||
bool mgmtTableCreateFromSuperTable(STabObj *pTableObj);
|
||||
bool mgmtIsSuperTable(STabObj *pTableObj);
|
||||
bool mgmtIsNormalTable(STabObj *pTableObj);
|
||||
char* mgmtTableGetTag(STabObj* pTable, int32_t col, SSchema* pTagColSchema);
|
||||
int32_t mgmtGetTagsLength(STabObj* pSuperTable, int32_t col);
|
||||
bool mgmtCheckIsMonitorDB(char *db, char *monitordb);
|
||||
int32_t mgmtCheckDBParams(SCreateDbMsg *pCreate);
|
||||
|
||||
int32_t mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pInfo, int32_t tableIndex, tQueryResultset* pRes);
|
||||
int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes);
|
||||
void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pInfo, int32_t index, tQueryResultset* pRes);
|
||||
|
||||
bool tSkipListNodeFilterCallback(const void *pNode, void *param);
|
||||
|
||||
#endif //TBASE_MGMTUTIL_H
|
||||
#endif
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtAcct.h"
|
||||
#include "tschemautil.h"
|
||||
|
||||
|
|
|
@ -18,13 +18,16 @@
|
|||
#include "vnodeStatus.h"
|
||||
#include "dnodeModule.h"
|
||||
|
||||
void mgmtStartBalanceTimer(int64_t mseconds) {}
|
||||
void mgmtStartBalanceTimerImp(int64_t mseconds) {}
|
||||
void (*mgmtStartBalanceTimer)(int64_t mseconds) = mgmtStartBalanceTimerImp;
|
||||
|
||||
int mgmtInitBalance() { return 0; }
|
||||
int32_t mgmtInitBalanceImp() { return 0; }
|
||||
int32_t (*mgmtInitBalance)() = mgmtInitBalanceImp;
|
||||
|
||||
void mgmtCleanupBalance() {}
|
||||
void mgmtCleanupBalanceImp() {}
|
||||
void (*mgmtCleanupBalance)() = mgmtCleanupBalanceImp;
|
||||
|
||||
int mgmtAllocVnodes(SVgObj *pVgroup) {
|
||||
int32_t mgmtAllocVnodesImp(SVgObj *pVgroup) {
|
||||
int selectedVnode = -1;
|
||||
SDnodeObj *pDnode = &dnodeObj;
|
||||
int lastAllocVode = pDnode->lastAllocVnode;
|
||||
|
@ -48,17 +51,26 @@ int mgmtAllocVnodes(SVgObj *pVgroup) {
|
|||
return 0;
|
||||
}
|
||||
}
|
||||
int32_t (*mgmtAllocVnodes)(SVgObj *pVgroup) = mgmtAllocVnodesImp;
|
||||
|
||||
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType) {
|
||||
bool mgmtCheckModuleInDnodeImp(SDnodeObj *pDnode, int moduleType) {
|
||||
return tsModule[moduleType].num != 0;
|
||||
}
|
||||
bool (*mgmtCheckModuleInDnode)(SDnodeObj *pDnode, int moduleType) = mgmtCheckModuleInDnodeImp;
|
||||
|
||||
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) { return "master"; }
|
||||
char *mgmtGetVnodeStatusImp(SVgObj *pVgroup, SVnodeGid *pVnode) { return "master"; }
|
||||
char *(*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtGetVnodeStatusImp;
|
||||
|
||||
bool mgmtCheckVnodeReady(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { return true; }
|
||||
bool mgmtCheckVnodeReadyImp(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) { return true; }
|
||||
bool (*mgmtCheckVnodeReady)(SDnodeObj *pDnode, SVgObj *pVgroup, SVnodeGid *pVnode) = mgmtCheckVnodeReadyImp;
|
||||
|
||||
void mgmtUpdateDnodeState(SDnodeObj *pDnode, int lbStatus) {}
|
||||
|
||||
void mgmtUpdateVgroupState(SVgObj *pVgroup, int lbStatus, int srcIp) {}
|
||||
void mgmtUpdateDnodeStateImp(SDnodeObj *pDnode, int lbStatus) {}
|
||||
void (*mgmtUpdateDnodeState)(SDnodeObj *pDnode, int lbStatus) = mgmtUpdateDnodeStateImp;
|
||||
|
||||
void mgmtUpdateVgroupStateImp(SVgObj *pVgroup, int lbStatus, int srcIp) {}
|
||||
void (*mgmtUpdateVgroupState)(SVgObj *pVgroup, int lbStatus, int srcIp) = mgmtUpdateVgroupStateImp;
|
||||
|
||||
bool mgmtAddVnodeImp(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { return false; }
|
||||
bool (*mgmtAddVnode)(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) = mgmtAddVnodeImp;
|
||||
|
||||
bool mgmtAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { return false; }
|
|
@ -14,15 +14,13 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "os.h"
|
||||
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tschemautil.h"
|
||||
|
||||
typedef struct {
|
||||
char user[TSDB_METER_ID_LEN];
|
||||
char user[TSDB_TABLE_ID_LEN];
|
||||
uint64_t stime;
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtBalance.h"
|
||||
#include "mgmtUtil.h"
|
||||
|
@ -116,7 +116,7 @@ int mgmtInitDbs() {
|
|||
SDbObj *mgmtGetDb(char *db) { return (SDbObj *)sdbGetRow(dbSdb, db); }
|
||||
|
||||
SDbObj *mgmtGetDbByMeterId(char *meterId) {
|
||||
char db[TSDB_METER_ID_LEN], *pos;
|
||||
char db[TSDB_TABLE_ID_LEN], *pos;
|
||||
|
||||
pos = strstr(meterId, TS_PATH_DELIMITER);
|
||||
pos = strstr(pos + 1, TS_PATH_DELIMITER);
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
#include "os.h"
|
||||
|
||||
#include "dnodeSystem.h"
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "tschemautil.h"
|
||||
#include "vnodeStatus.h"
|
||||
#include "dnodeModule.h"
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "vnodeStatus.h"
|
||||
#include "dnodeModule.h"
|
||||
|
||||
|
|
|
@ -17,14 +17,14 @@
|
|||
#include "os.h"
|
||||
|
||||
#include "dnodeSystem.h"
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtBalance.h"
|
||||
#include "tutil.h"
|
||||
|
||||
void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *pObj);
|
||||
int mgmtSendVPeersMsg(SVgObj *pVgroup);
|
||||
char *mgmtBuildVpeersIe(char *pMsg, SVgObj *pVgroup, int vnode);
|
||||
char *mgmtBuildCreateMeterIe(STabObj *pMeter, char *pMsg, int vnode);
|
||||
char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode);
|
||||
|
||||
/*
|
||||
* functions for communicate between dnode and mnode
|
||||
|
@ -39,7 +39,7 @@ int taosSendMsgToDnode(SDnodeObj *pObj, char *msg, int msgLen);
|
|||
int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
|
||||
char * pMsg, *pStart;
|
||||
int msgLen = 0;
|
||||
STabObj * pMeter = NULL;
|
||||
STabObj * pTable = NULL;
|
||||
SMeterCfgMsg *pCfg = (SMeterCfgMsg *)cont;
|
||||
SVgObj * pVgroup;
|
||||
|
||||
|
@ -63,13 +63,13 @@ int mgmtProcessMeterCfgMsg(char *cont, int contLen, SDnodeObj *pObj) {
|
|||
int vgId = pObj->vload[vnode].vgId;
|
||||
|
||||
pVgroup = mgmtGetVgroup(vgId);
|
||||
if (pVgroup) pMeter = pVgroup->meterList[sid];
|
||||
if (pVgroup) pTable = pVgroup->meterList[sid];
|
||||
}
|
||||
|
||||
if (pMeter) {
|
||||
if (pTable) {
|
||||
*pMsg = 0; // code
|
||||
pMsg++;
|
||||
pMsg = mgmtBuildCreateMeterIe(pMeter, pMsg, vnode);
|
||||
pMsg = mgmtBuildCreateMeterIe(pTable, pMsg, vnode);
|
||||
} else {
|
||||
mTrace("dnode:%s, vnode:%d sid:%d, meter not there", taosIpStr(pObj->privateIp), vnode, sid);
|
||||
*pMsg = TSDB_CODE_INVALID_METER_ID;
|
||||
|
@ -187,48 +187,48 @@ void mgmtProcessMsgFromDnode(char *content, int msgLen, int msgType, SDnodeObj *
|
|||
}
|
||||
}
|
||||
|
||||
char *mgmtBuildCreateMeterIe(STabObj *pMeter, char *pMsg, int vnode) {
|
||||
char *mgmtBuildCreateMeterIe(STabObj *pTable, char *pMsg, int vnode) {
|
||||
SCreateMsg *pCreateMeter;
|
||||
|
||||
pCreateMeter = (SCreateMsg *)pMsg;
|
||||
pCreateMeter->vnode = htons(vnode);
|
||||
pCreateMeter->sid = htonl(pMeter->gid.sid);
|
||||
pCreateMeter->uid = pMeter->uid;
|
||||
memcpy(pCreateMeter->meterId, pMeter->meterId, TSDB_METER_ID_LEN);
|
||||
pCreateMeter->sid = htonl(pTable->gid.sid);
|
||||
pCreateMeter->uid = pTable->uid;
|
||||
memcpy(pCreateMeter->meterId, pTable->meterId, TSDB_TABLE_ID_LEN);
|
||||
|
||||
// pCreateMeter->lastCreate = htobe64(pVgroup->lastCreate);
|
||||
pCreateMeter->timeStamp = htobe64(pMeter->createdTime);
|
||||
pCreateMeter->timeStamp = htobe64(pTable->createdTime);
|
||||
/*
|
||||
pCreateMeter->spi = pSec->spi;
|
||||
pCreateMeter->encrypt = pSec->encrypt;
|
||||
memcpy(pCreateMeter->cipheringKey, pSec->cipheringKey, TSDB_KEY_LEN);
|
||||
memcpy(pCreateMeter->secret, pSec->secret, TSDB_KEY_LEN);
|
||||
*/
|
||||
pCreateMeter->sversion = htonl(pMeter->sversion);
|
||||
pCreateMeter->numOfColumns = htons(pMeter->numOfColumns);
|
||||
SSchema *pSchema = mgmtGetMeterSchema(pMeter);
|
||||
pCreateMeter->sversion = htonl(pTable->sversion);
|
||||
pCreateMeter->numOfColumns = htons(pTable->numOfColumns);
|
||||
SSchema *pSchema = mgmtGetTableSchema(pTable);
|
||||
|
||||
for (int i = 0; i < pMeter->numOfColumns; ++i) {
|
||||
for (int i = 0; i < pTable->numOfColumns; ++i) {
|
||||
pCreateMeter->schema[i].type = pSchema[i].type;
|
||||
/* strcpy(pCreateMeter->schema[i].name, pSchema[i].name); */
|
||||
pCreateMeter->schema[i].bytes = htons(pSchema[i].bytes);
|
||||
pCreateMeter->schema[i].colId = htons(pSchema[i].colId);
|
||||
}
|
||||
|
||||
pMsg = ((char *)(pCreateMeter->schema)) + pMeter->numOfColumns * sizeof(SMColumn);
|
||||
pMsg = ((char *)(pCreateMeter->schema)) + pTable->numOfColumns * sizeof(SMColumn);
|
||||
pCreateMeter->sqlLen = 0;
|
||||
|
||||
if (pMeter->pSql) {
|
||||
int len = strlen(pMeter->pSql) + 1;
|
||||
if (pTable->pSql) {
|
||||
int len = strlen(pTable->pSql) + 1;
|
||||
pCreateMeter->sqlLen = htons(len);
|
||||
strcpy(pMsg, pMeter->pSql);
|
||||
strcpy(pMsg, pTable->pSql);
|
||||
pMsg += len;
|
||||
}
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
int mgmtSendCreateMsgToVgroup(STabObj *pMeter, SVgObj *pVgroup) {
|
||||
int mgmtSendCreateMsgToVgroup(STabObj *pTable, SVgObj *pVgroup) {
|
||||
char * pMsg, *pStart;
|
||||
int i, msgLen = 0;
|
||||
SDnodeObj *pObj;
|
||||
|
@ -244,7 +244,7 @@ int mgmtSendCreateMsgToVgroup(STabObj *pMeter, SVgObj *pVgroup) {
|
|||
|
||||
pStart = taosBuildReqMsgToDnodeWithSize(pObj, TSDB_MSG_TYPE_CREATE, 64000);
|
||||
if (pStart == NULL) continue;
|
||||
pMsg = mgmtBuildCreateMeterIe(pMeter, pStart, pVgroup->vnodeGid[i].vnode);
|
||||
pMsg = mgmtBuildCreateMeterIe(pTable, pStart, pVgroup->vnodeGid[i].vnode);
|
||||
msgLen = pMsg - pStart;
|
||||
|
||||
taosSendMsgToDnode(pObj, pStart, msgLen);
|
||||
|
@ -255,7 +255,7 @@ int mgmtSendCreateMsgToVgroup(STabObj *pMeter, SVgObj *pVgroup) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int mgmtSendRemoveMeterMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
|
||||
int mgmtSendRemoveMeterMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
|
||||
SRemoveMeterMsg *pRemove;
|
||||
char * pMsg, *pStart;
|
||||
int i, msgLen = 0;
|
||||
|
@ -277,8 +277,8 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
|
|||
|
||||
pRemove = (SRemoveMeterMsg *)pMsg;
|
||||
pRemove->vnode = htons(pVgroup->vnodeGid[i].vnode);
|
||||
pRemove->sid = htonl(pMeter->gid.sid);
|
||||
memcpy(pRemove->meterId, pMeter->meterId, TSDB_METER_ID_LEN);
|
||||
pRemove->sid = htonl(pTable->gid.sid);
|
||||
memcpy(pRemove->meterId, pTable->meterId, TSDB_TABLE_ID_LEN);
|
||||
|
||||
pMsg += sizeof(SRemoveMeterMsg);
|
||||
msgLen = pMsg - pStart;
|
||||
|
@ -287,7 +287,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
|
|||
|
||||
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].ip);
|
||||
mTrace("dnode:%s vid:%d, send remove meter msg, sid:%d status:%d", ipstr, pVgroup->vnodeGid[i].vnode,
|
||||
pMeter->gid.sid, pObj->status);
|
||||
pTable->gid.sid, pObj->status);
|
||||
}
|
||||
|
||||
pVgroup->lastRemove = timeStamp;
|
||||
|
@ -295,7 +295,7 @@ int mgmtSendRemoveMeterMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int mgmtSendAlterStreamMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
|
||||
int mgmtSendAlterStreamMsgToDnode(STabObj *pTable, SVgObj *pVgroup) {
|
||||
SAlterStreamMsg *pAlter;
|
||||
char * pMsg, *pStart;
|
||||
int i, msgLen = 0;
|
||||
|
@ -313,9 +313,9 @@ int mgmtSendAlterStreamMsgToDnode(STabObj *pMeter, SVgObj *pVgroup) {
|
|||
|
||||
pAlter = (SAlterStreamMsg *)pMsg;
|
||||
pAlter->vnode = htons(pVgroup->vnodeGid[i].vnode);
|
||||
pAlter->sid = htonl(pMeter->gid.sid);
|
||||
pAlter->uid = pMeter->uid;
|
||||
pAlter->status = pMeter->status;
|
||||
pAlter->sid = htonl(pTable->gid.sid);
|
||||
pAlter->uid = pTable->uid;
|
||||
pAlter->status = pTable->status;
|
||||
|
||||
pMsg += sizeof(SAlterStreamMsg);
|
||||
msgLen = pMsg - pStart;
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
#include <endian.h>
|
||||
|
||||
#include "dnodeSystem.h"
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "tsched.h"
|
||||
#include "tutil.h"
|
||||
#include "vnode.h"
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtAcct.h"
|
||||
#include "mgmtGrant.h"
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
|
||||
int mgmtGetMnodeMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) { return TSDB_CODE_OPS_NOT_SUPPORT; }
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtProfile.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tschemautil.h"
|
||||
|
@ -24,7 +24,7 @@
|
|||
typedef struct {
|
||||
uint32_t ip;
|
||||
uint16_t port;
|
||||
char user[TSDB_METER_ID_LEN];
|
||||
char user[TSDB_TABLE_ID_LEN];
|
||||
} SCDesc;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "os.h"
|
||||
|
||||
#include "dnodeSystem.h"
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtProfile.h"
|
||||
#include "taosmsg.h"
|
||||
|
@ -118,7 +118,7 @@ static void mgmtSetSchemaFromMeters(SSchema *pSchema, STabObj *pMeterObj, uint32
|
|||
static uint32_t mgmtSetMeterTagValue(char *pTags, STabObj *pMetric, STabObj *pMeterObj) {
|
||||
SSchema *pTagSchema = (SSchema *)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema));
|
||||
|
||||
char *tagVal = pMeterObj->pTagData + TSDB_METER_ID_LEN; // tag start position
|
||||
char *tagVal = pMeterObj->pTagData + TSDB_TABLE_ID_LEN; // tag start position
|
||||
|
||||
uint32_t tagsLen = 0;
|
||||
for (int32_t i = 0; i < pMetric->numOfTags; ++i) {
|
||||
|
@ -157,7 +157,7 @@ bool mgmtCheckMeterMetaMsgType(char *pMsg) {
|
|||
SMeterInfoMsg *pInfo = (SMeterInfoMsg *)pMsg;
|
||||
|
||||
int16_t autoCreate = htons(pInfo->createFlag);
|
||||
STabObj *pMeterObj = mgmtGetMeter(pInfo->meterId);
|
||||
STabObj *pMeterObj = mgmtGetTable(pInfo->meterId);
|
||||
|
||||
// If table does not exists and autoCreate flag is set, we add the handler into another task queue, namely tranQueue
|
||||
bool addIntoTranQueue = (pMeterObj == NULL && autoCreate == 1);
|
||||
|
@ -199,7 +199,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
goto _exit_code;
|
||||
}
|
||||
|
||||
pMeterObj = mgmtGetMeter(pInfo->meterId);
|
||||
pMeterObj = mgmtGetTable(pInfo->meterId);
|
||||
|
||||
// on demand create table from super table if meter does not exists
|
||||
if (pMeterObj == NULL && pInfo->createFlag == 1) {
|
||||
|
@ -224,8 +224,8 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
|
||||
int32_t code = mgmtCreateMeter(pDb, pCreateMsg);
|
||||
|
||||
char stableName[TSDB_METER_ID_LEN] = {0};
|
||||
strncpy(stableName, pInfo->tags, TSDB_METER_ID_LEN);
|
||||
char stableName[TSDB_TABLE_ID_LEN] = {0};
|
||||
strncpy(stableName, pInfo->tags, TSDB_TABLE_ID_LEN);
|
||||
mTrace("meter:%s is automatically created by %s from %s, code:%d", pCreateMsg->meterId, pConn->pUser->user,
|
||||
stableName, code);
|
||||
|
||||
|
@ -243,7 +243,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
goto _exit_code;
|
||||
}
|
||||
|
||||
pMeterObj = mgmtGetMeter(pInfo->meterId);
|
||||
pMeterObj = mgmtGetTable(pInfo->meterId);
|
||||
}
|
||||
|
||||
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
|
||||
|
@ -274,15 +274,15 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
|
||||
pMeta->numOfTags = pMeterObj->numOfTags;
|
||||
pMeta->numOfColumns = htons(pMeterObj->numOfColumns);
|
||||
pMeta->meterType = pMeterObj->meterType;
|
||||
pMeta->tableType = pMeterObj->tableType;
|
||||
|
||||
pMsg += sizeof(SMeterMeta);
|
||||
pSchema = (SSchema *)pMsg; // schema locates at the end of SMeterMeta struct
|
||||
|
||||
if (mgmtMeterCreateFromMetric(pMeterObj)) {
|
||||
if (mgmtTableCreateFromSuperTable(pMeterObj)) {
|
||||
assert(pMeterObj->numOfTags == 0);
|
||||
|
||||
STabObj *pMetric = mgmtGetMeter(pMeterObj->pTagData);
|
||||
STabObj *pMetric = mgmtGetTable(pMeterObj->pTagData);
|
||||
uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns;
|
||||
|
||||
pMeta->numOfTags = pMetric->numOfTags; // update the numOfTags info
|
||||
|
@ -302,7 +302,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
pMsg += numOfTotalCols * sizeof(SSchema);
|
||||
}
|
||||
|
||||
if (mgmtIsNormalMeter(pMeterObj)) {
|
||||
if (mgmtIsNormalTable(pMeterObj)) {
|
||||
pVgroup = mgmtGetVgroup(pMeterObj->gid.vgId);
|
||||
if (pVgroup == NULL) {
|
||||
pRsp->code = TSDB_CODE_INVALID_TABLE;
|
||||
|
@ -361,7 +361,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
}
|
||||
|
||||
int32_t totalNum = 0;
|
||||
char tblName[TSDB_METER_ID_LEN];
|
||||
char tblName[TSDB_TABLE_ID_LEN];
|
||||
char* nextStr;
|
||||
|
||||
char* pCurMeter = pStart + sizeof(STaosRsp) + sizeof(SMultiMeterInfoMsg) + 1; // 1: ie type byte
|
||||
|
@ -397,7 +397,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
}
|
||||
|
||||
// get meter schema, and fill into resp payload
|
||||
pMeterObj = mgmtGetMeter(tblName);
|
||||
pMeterObj = mgmtGetTable(tblName);
|
||||
pDbObj = mgmtGetDbByMeterId(tblName);
|
||||
|
||||
if (pMeterObj == NULL || (pDbObj == NULL)) {
|
||||
|
@ -414,15 +414,15 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
pMeta->meta.precision = pDbObj->cfg.precision;
|
||||
pMeta->meta.numOfTags = pMeterObj->numOfTags;
|
||||
pMeta->meta.numOfColumns = htons(pMeterObj->numOfColumns);
|
||||
pMeta->meta.meterType = pMeterObj->meterType;
|
||||
pMeta->meta.tableType = pMeterObj->tableType;
|
||||
|
||||
pCurMeter += sizeof(SMultiMeterMeta);
|
||||
pSchema = (SSchema *)pCurMeter; // schema locates at the end of SMeterMeta struct
|
||||
|
||||
if (mgmtMeterCreateFromMetric(pMeterObj)) {
|
||||
if (mgmtTableCreateFromSuperTable(pMeterObj)) {
|
||||
assert(pMeterObj->numOfTags == 0);
|
||||
|
||||
STabObj *pMetric = mgmtGetMeter(pMeterObj->pTagData);
|
||||
STabObj *pMetric = mgmtGetTable(pMeterObj->pTagData);
|
||||
uint32_t numOfTotalCols = (uint32_t)pMetric->numOfTags + pMetric->numOfColumns;
|
||||
|
||||
pMeta->meta.numOfTags = pMetric->numOfTags; // update the numOfTags info
|
||||
|
@ -442,7 +442,7 @@ int mgmtProcessMultiMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
pCurMeter += numOfTotalCols * sizeof(SSchema);
|
||||
}
|
||||
|
||||
if (mgmtIsNormalMeter(pMeterObj)) {
|
||||
if (mgmtIsNormalTable(pMeterObj)) {
|
||||
pVgroup = mgmtGetVgroup(pMeterObj->gid.vgId);
|
||||
if (pVgroup == NULL) {
|
||||
pRsp->code = TSDB_CODE_INVALID_TABLE;
|
||||
|
@ -494,22 +494,22 @@ _exit_code:
|
|||
}
|
||||
|
||||
int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
||||
SMetricMetaMsg *pMetricMetaMsg = (SMetricMetaMsg *)pMsg;
|
||||
SSuperTableMetaMsg *pSuperTableMetaMsg = (SSuperTableMetaMsg *)pMsg;
|
||||
STabObj * pMetric;
|
||||
STaosRsp * pRsp;
|
||||
char * pStart;
|
||||
|
||||
pMetricMetaMsg->numOfMeters = htonl(pMetricMetaMsg->numOfMeters);
|
||||
pSuperTableMetaMsg->numOfMeters = htonl(pSuperTableMetaMsg->numOfMeters);
|
||||
|
||||
pMetricMetaMsg->join = htonl(pMetricMetaMsg->join);
|
||||
pMetricMetaMsg->joinCondLen = htonl(pMetricMetaMsg->joinCondLen);
|
||||
pSuperTableMetaMsg->join = htonl(pSuperTableMetaMsg->join);
|
||||
pSuperTableMetaMsg->joinCondLen = htonl(pSuperTableMetaMsg->joinCondLen);
|
||||
|
||||
for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) {
|
||||
pMetricMetaMsg->metaElem[i] = htonl(pMetricMetaMsg->metaElem[i]);
|
||||
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
|
||||
pSuperTableMetaMsg->metaElem[i] = htonl(pSuperTableMetaMsg->metaElem[i]);
|
||||
}
|
||||
|
||||
SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)(((char *)pMetricMetaMsg) + pMetricMetaMsg->metaElem[0]);
|
||||
pMetric = mgmtGetMeter(pElem->meterId);
|
||||
SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)(((char *)pSuperTableMetaMsg) + pSuperTableMetaMsg->metaElem[0]);
|
||||
pMetric = mgmtGetTable(pElem->meterId);
|
||||
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
@ -531,7 +531,7 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
|
||||
msgLen = pMsg - pStart;
|
||||
} else {
|
||||
msgLen = mgmtRetrieveMetricMeta(pConn, &pStart, pMetricMetaMsg);
|
||||
msgLen = mgmtRetrieveMetricMeta(pConn, &pStart, pSuperTableMetaMsg);
|
||||
if (msgLen <= 0) {
|
||||
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
|
||||
return 0;
|
||||
|
@ -880,7 +880,7 @@ int mgmtProcessUseDbMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
}
|
||||
|
||||
int (*mgmtGetMetaFp[])(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) = {
|
||||
mgmtGetAcctMeta, mgmtGetUserMeta, mgmtGetDbMeta, mgmtGetMeterMeta, mgmtGetDnodeMeta,
|
||||
mgmtGetAcctMeta, mgmtGetUserMeta, mgmtGetDbMeta, mgmtGetTableMeta, mgmtGetDnodeMeta,
|
||||
mgmtGetMnodeMeta, mgmtGetVgroupMeta, mgmtGetMetricMeta, mgmtGetModuleMeta, mgmtGetQueryMeta,
|
||||
mgmtGetStreamMeta, mgmtGetConfigMeta, mgmtGetConnsMeta, mgmtGetScoresMeta, grantGetGrantsMeta,
|
||||
mgmtGetVnodeMeta,
|
||||
|
@ -1076,11 +1076,11 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
} else if (code != TSDB_CODE_SUCCESS) {
|
||||
if (code == TSDB_CODE_TABLE_ALREADY_EXIST) { // table already created when the second attempt to create table
|
||||
|
||||
STabObj* pMeter = mgmtGetMeter(pCreate->meterId);
|
||||
assert(pMeter != NULL);
|
||||
STabObj* pTable = mgmtGetTable(pCreate->meterId);
|
||||
assert(pTable != NULL);
|
||||
|
||||
mWarn("table:%s, table already created, failed to create table, ts:%" PRId64 ", code:%d", pCreate->meterId,
|
||||
pMeter->createdTime, code);
|
||||
pTable->createdTime, code);
|
||||
} else { // other errors
|
||||
mError("table:%s, failed to create table, code:%d", pCreate->meterId, code);
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include <arpa/inet.h>
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
|
||||
int mgmtCheckRedirectMsg(SConnObj *pConn, int msgType) { return 0; }
|
||||
|
||||
|
|
|
@ -16,13 +16,19 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtUtil.h"
|
||||
#include "textbuffer.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "vnodeTagMgmt.h"
|
||||
|
||||
typedef struct SSyntaxTreeFilterSupporter {
|
||||
SSchema* pTagSchema;
|
||||
int32_t numOfTags;
|
||||
int32_t optr;
|
||||
} SSyntaxTreeFilterSupporter;
|
||||
|
||||
typedef struct SJoinSupporter {
|
||||
void** val;
|
||||
void** pTabObjs;
|
||||
|
@ -41,6 +47,7 @@ typedef struct SMeterNameFilterSupporter {
|
|||
} SMeterNameFilterSupporter;
|
||||
|
||||
static void tansformQueryResult(tQueryResultset* pRes);
|
||||
static bool tSkipListNodeFilterCallback(const void *pNode, void *param);
|
||||
|
||||
static int32_t tabObjVGIDComparator(const void* pLeft, const void* pRight) {
|
||||
STabObj* p1 = *(STabObj**)pLeft;
|
||||
|
@ -82,10 +89,10 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para
|
|||
f1 = pNode1->meterId;
|
||||
f2 = pNode2->meterId;
|
||||
schema.type = TSDB_DATA_TYPE_BINARY;
|
||||
schema.bytes = TSDB_METER_ID_LEN;
|
||||
schema.bytes = TSDB_TABLE_ID_LEN;
|
||||
} else {
|
||||
f1 = mgmtMeterGetTag(pNode1, colIdx, NULL);
|
||||
f2 = mgmtMeterGetTag(pNode2, colIdx, &schema);
|
||||
f1 = mgmtTableGetTag(pNode1, colIdx, NULL);
|
||||
f2 = mgmtTableGetTag(pNode2, colIdx, &schema);
|
||||
assert(schema.type == pOrderDesc->pTagSchema->pSchema[colIdx].type);
|
||||
}
|
||||
|
||||
|
@ -104,15 +111,15 @@ static int32_t tabObjResultComparator(const void* p1, const void* p2, void* para
|
|||
* update the tag order index according to the tags column index. The tags column index needs to be checked one-by-one,
|
||||
* since the normal columns may be passed to server for handling the group by on status column.
|
||||
*
|
||||
* @param pMetricMetaMsg
|
||||
* @param pSuperTableMetaMsg
|
||||
* @param tableIndex
|
||||
* @param pOrderIndexInfo
|
||||
* @param numOfTags
|
||||
*/
|
||||
static void mgmtUpdateOrderTagColIndex(SMetricMetaMsg* pMetricMetaMsg, int32_t tableIndex, tOrderIdx* pOrderIndexInfo,
|
||||
static void mgmtUpdateOrderTagColIndex(SSuperTableMetaMsg* pSuperTableMetaMsg, int32_t tableIndex, tOrderIdx* pOrderIndexInfo,
|
||||
int32_t numOfTags) {
|
||||
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMetricMetaMsg + pMetricMetaMsg->metaElem[tableIndex]);
|
||||
SColIndexEx* groupColumnList = (SColIndexEx*)((char*)pMetricMetaMsg + pElem->groupbyTagColumnList);
|
||||
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pSuperTableMetaMsg + pSuperTableMetaMsg->metaElem[tableIndex]);
|
||||
SColIndexEx* groupColumnList = (SColIndexEx*)((char*)pSuperTableMetaMsg + pElem->groupbyTagColumnList);
|
||||
|
||||
int32_t numOfGroupbyTags = 0;
|
||||
for (int32_t i = 0; i < pElem->numOfGroupCols; ++i) {
|
||||
|
@ -127,14 +134,14 @@ static void mgmtUpdateOrderTagColIndex(SMetricMetaMsg* pMetricMetaMsg, int32_t t
|
|||
}
|
||||
|
||||
// todo merge sort function with losertree used
|
||||
void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pMetricMetaMsg, int32_t tableIndex, tQueryResultset* pRes) {
|
||||
void mgmtReorganizeMetersInMetricMeta(SSuperTableMetaMsg* pSuperTableMetaMsg, int32_t tableIndex, tQueryResultset* pRes) {
|
||||
if (pRes->num <= 0) { // no result, no need to pagination
|
||||
return;
|
||||
}
|
||||
|
||||
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMetricMetaMsg + pMetricMetaMsg->metaElem[tableIndex]);
|
||||
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pSuperTableMetaMsg + pSuperTableMetaMsg->metaElem[tableIndex]);
|
||||
|
||||
STabObj* pMetric = mgmtGetMeter(pElem->meterId);
|
||||
STabObj* pMetric = mgmtGetTable(pElem->meterId);
|
||||
SSchema* pTagSchema = (SSchema*)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema));
|
||||
|
||||
/*
|
||||
|
@ -149,7 +156,7 @@ void mgmtReorganizeMetersInMetricMeta(SMetricMetaMsg* pMetricMetaMsg, int32_t ta
|
|||
int32_t* startPos = NULL;
|
||||
int32_t numOfSubset = 1;
|
||||
|
||||
mgmtUpdateOrderTagColIndex(pMetricMetaMsg, tableIndex, &descriptor->orderIdx, pMetric->numOfTags);
|
||||
mgmtUpdateOrderTagColIndex(pSuperTableMetaMsg, tableIndex, &descriptor->orderIdx, pMetric->numOfTags);
|
||||
if (descriptor->orderIdx.numOfOrderedCols > 0) {
|
||||
tQSortEx(pRes->pRes, POINTER_BYTES, 0, pRes->num - 1, descriptor, tabObjResultComparator);
|
||||
startPos = calculateSubGroup(pRes->pRes, pRes->num, &numOfSubset, descriptor, tabObjResultComparator);
|
||||
|
@ -181,7 +188,7 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
|
|||
pRes->num = 0;
|
||||
|
||||
for (pToken = strsep(&str, sep); pToken != NULL; pToken = strsep(&str, sep)) {
|
||||
STabObj* pMeterObj = mgmtGetMeter(pToken);
|
||||
STabObj* pMeterObj = mgmtGetTable(pToken);
|
||||
if (pMeterObj == NULL) {
|
||||
mWarn("metric:%s error in metric query expression, invalid meter id:%s", pMetric->meterId, pToken);
|
||||
continue;
|
||||
|
@ -193,7 +200,7 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
|
|||
}
|
||||
|
||||
/* not a table created from metric, ignore */
|
||||
if (pMeterObj->meterType != TSDB_METER_MTABLE) {
|
||||
if (pMeterObj->tableType != TSDB_TABLE_TYPE_CREATE_FROM_STABLE) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -201,8 +208,8 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
|
|||
* queried meter not belongs to this metric, ignore, metric does not have
|
||||
* uid, so compare according to meterid
|
||||
*/
|
||||
STabObj* parentMetric = mgmtGetMeter(pMeterObj->pTagData);
|
||||
if (strncasecmp(parentMetric->meterId, pMetric->meterId, TSDB_METER_ID_LEN) != 0 ||
|
||||
STabObj* parentMetric = mgmtGetTable(pMeterObj->pTagData);
|
||||
if (strncasecmp(parentMetric->meterId, pMetric->meterId, TSDB_TABLE_ID_LEN) != 0 ||
|
||||
(parentMetric->uid != pMetric->uid)) {
|
||||
continue;
|
||||
}
|
||||
|
@ -214,13 +221,13 @@ static void mgmtRetrieveByMeterName(tQueryResultset* pRes, char* str, STabObj* p
|
|||
static bool mgmtTablenameFilterCallback(tSkipListNode* pNode, void* param) {
|
||||
SMeterNameFilterSupporter* pSupporter = (SMeterNameFilterSupporter*)param;
|
||||
|
||||
char name[TSDB_METER_ID_LEN] = {0};
|
||||
char name[TSDB_TABLE_ID_LEN] = {0};
|
||||
|
||||
// pattern compare for meter name
|
||||
STabObj* pMeterObj = (STabObj*)pNode->pData;
|
||||
extractTableName(pMeterObj->meterId, name);
|
||||
|
||||
return patternMatch(pSupporter->pattern, name, TSDB_METER_ID_LEN, &pSupporter->info) == TSDB_PATTERN_MATCH;
|
||||
return patternMatch(pSupporter->pattern, name, TSDB_TABLE_ID_LEN, &pSupporter->info) == TSDB_PATTERN_MATCH;
|
||||
}
|
||||
|
||||
static void mgmtRetrieveFromLikeOptr(tQueryResultset* pRes, const char* str, STabObj* pMetric) {
|
||||
|
@ -256,7 +263,7 @@ UNUSED_FUNC static bool mgmtJoinFilterCallback(tSkipListNode* pNode, void* param
|
|||
SJoinSupporter* pSupporter = (SJoinSupporter*)param;
|
||||
|
||||
SSchema s = {0};
|
||||
char* v = mgmtMeterGetTag((STabObj*)pNode->pData, pSupporter->colIndex, &s);
|
||||
char* v = mgmtTableGetTag((STabObj*)pNode->pData, pSupporter->colIndex, &s);
|
||||
|
||||
for (int32_t i = 0; i < pSupporter->size; ++i) {
|
||||
int32_t ret = doCompare(v, pSupporter->val[i], pSupporter->type, s.bytes);
|
||||
|
@ -282,13 +289,13 @@ UNUSED_FUNC static bool mgmtJoinFilterCallback(tSkipListNode* pNode, void* param
|
|||
return false;
|
||||
}
|
||||
|
||||
static void orderResult(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes, int16_t colIndex, int32_t tableIndex) {
|
||||
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMetricMetaMsg + pMetricMetaMsg->metaElem[tableIndex]);
|
||||
static void orderResult(SSuperTableMetaMsg* pSuperTableMetaMsg, tQueryResultset* pRes, int16_t colIndex, int32_t tableIndex) {
|
||||
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pSuperTableMetaMsg + pSuperTableMetaMsg->metaElem[tableIndex]);
|
||||
|
||||
tOrderDescriptor* descriptor =
|
||||
(tOrderDescriptor*)calloc(1, sizeof(tOrderDescriptor) + sizeof(int32_t) * 1); // only one column for join
|
||||
|
||||
STabObj* pMetric = mgmtGetMeter(pElem->meterId);
|
||||
STabObj* pMetric = mgmtGetTable(pElem->meterId);
|
||||
SSchema* pTagSchema = (SSchema*)(pMetric->schema + pMetric->numOfColumns * sizeof(SSchema));
|
||||
|
||||
descriptor->pTagSchema = tCreateTagSchema(pTagSchema, pMetric->numOfTags);
|
||||
|
@ -311,8 +318,8 @@ static int32_t mgmtCheckForDuplicateTagValue(tQueryResultset* pRes, int32_t inde
|
|||
STabObj* pObj1 = pRes[index].pRes[k - 1];
|
||||
STabObj* pObj2 = pRes[index].pRes[k];
|
||||
|
||||
char* val1 = mgmtMeterGetTag(pObj1, tagCol, &s);
|
||||
char* val2 = mgmtMeterGetTag(pObj2, tagCol, NULL);
|
||||
char* val1 = mgmtTableGetTag(pObj1, tagCol, &s);
|
||||
char* val2 = mgmtTableGetTag(pObj2, tagCol, NULL);
|
||||
|
||||
if (doCompare(val1, val2, s.type, s.bytes) == 0) {
|
||||
return TSDB_CODE_DUPLICATE_TAGS;
|
||||
|
@ -322,13 +329,13 @@ static int32_t mgmtCheckForDuplicateTagValue(tQueryResultset* pRes, int32_t inde
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
|
||||
if (pMetricMetaMsg->numOfMeters == 1) {
|
||||
int32_t mgmtDoJoin(SSuperTableMetaMsg* pSuperTableMetaMsg, tQueryResultset* pRes) {
|
||||
if (pSuperTableMetaMsg->numOfMeters == 1) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool allEmpty = false;
|
||||
for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) {
|
||||
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
|
||||
if (pRes[i].num == 0) { // all results are empty if one of them is empty
|
||||
allEmpty = true;
|
||||
break;
|
||||
|
@ -336,7 +343,7 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
|
|||
}
|
||||
|
||||
if (allEmpty) {
|
||||
for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) {
|
||||
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
|
||||
pRes[i].num = 0;
|
||||
tfree(pRes[i].pRes);
|
||||
}
|
||||
|
@ -344,26 +351,26 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char* cond = (char*)pMetricMetaMsg + pMetricMetaMsg->join;
|
||||
char* cond = (char*)pSuperTableMetaMsg + pSuperTableMetaMsg->join;
|
||||
|
||||
char left[TSDB_METER_ID_LEN + 1] = {0};
|
||||
char left[TSDB_TABLE_ID_LEN + 1] = {0};
|
||||
strcpy(left, cond);
|
||||
int16_t leftTagColIndex = *(int16_t*)(cond + TSDB_METER_ID_LEN);
|
||||
int16_t leftTagColIndex = *(int16_t*)(cond + TSDB_TABLE_ID_LEN);
|
||||
|
||||
char right[TSDB_METER_ID_LEN + 1] = {0};
|
||||
strcpy(right, cond + TSDB_METER_ID_LEN + sizeof(int16_t));
|
||||
int16_t rightTagColIndex = *(int16_t*)(cond + TSDB_METER_ID_LEN * 2 + sizeof(int16_t));
|
||||
char right[TSDB_TABLE_ID_LEN + 1] = {0};
|
||||
strcpy(right, cond + TSDB_TABLE_ID_LEN + sizeof(int16_t));
|
||||
int16_t rightTagColIndex = *(int16_t*)(cond + TSDB_TABLE_ID_LEN * 2 + sizeof(int16_t));
|
||||
|
||||
STabObj* pLeftMetric = mgmtGetMeter(left);
|
||||
STabObj* pRightMetric = mgmtGetMeter(right);
|
||||
STabObj* pLeftMetric = mgmtGetTable(left);
|
||||
STabObj* pRightMetric = mgmtGetTable(right);
|
||||
|
||||
// decide the pRes belongs to
|
||||
int32_t leftIndex = 0;
|
||||
int32_t rightIndex = 0;
|
||||
|
||||
for (int32_t i = 0; i < pMetricMetaMsg->numOfMeters; ++i) {
|
||||
for (int32_t i = 0; i < pSuperTableMetaMsg->numOfMeters; ++i) {
|
||||
STabObj* pObj = (STabObj*)pRes[i].pRes[0];
|
||||
STabObj* pMetric1 = mgmtGetMeter(pObj->pTagData);
|
||||
STabObj* pMetric1 = mgmtGetTable(pObj->pTagData);
|
||||
if (pMetric1 == pLeftMetric) {
|
||||
leftIndex = i;
|
||||
} else if (pMetric1 == pRightMetric) {
|
||||
|
@ -371,8 +378,8 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
|
|||
}
|
||||
}
|
||||
|
||||
orderResult(pMetricMetaMsg, &pRes[leftIndex], leftTagColIndex, leftIndex);
|
||||
orderResult(pMetricMetaMsg, &pRes[rightIndex], rightTagColIndex, rightIndex);
|
||||
orderResult(pSuperTableMetaMsg, &pRes[leftIndex], leftTagColIndex, leftIndex);
|
||||
orderResult(pSuperTableMetaMsg, &pRes[rightIndex], rightTagColIndex, rightIndex);
|
||||
|
||||
int32_t i = 0;
|
||||
int32_t j = 0;
|
||||
|
@ -391,8 +398,8 @@ int32_t mgmtDoJoin(SMetricMetaMsg* pMetricMetaMsg, tQueryResultset* pRes) {
|
|||
STabObj* pLeftObj = pRes[leftIndex].pRes[i];
|
||||
STabObj* pRightObj = pRes[rightIndex].pRes[j];
|
||||
|
||||
char* v1 = mgmtMeterGetTag(pLeftObj, leftTagColIndex, &s);
|
||||
char* v2 = mgmtMeterGetTag(pRightObj, rightTagColIndex, NULL);
|
||||
char* v1 = mgmtTableGetTag(pLeftObj, leftTagColIndex, &s);
|
||||
char* v2 = mgmtTableGetTag(pRightObj, rightTagColIndex, NULL);
|
||||
|
||||
int32_t ret = doCompare(v1, v2, s.type, s.bytes);
|
||||
if (ret == 0) { // qualified
|
||||
|
@ -727,9 +734,9 @@ static int32_t mgmtFilterMeterByIndex(STabObj* pMetric, tQueryResultset* pRes, c
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQueryResultset* pRes) {
|
||||
int32_t mgmtRetrieveMetersFromSuperTable(SSuperTableMetaMsg* pMsg, int32_t tableIndex, tQueryResultset* pRes) {
|
||||
SMetricMetaElemMsg* pElem = (SMetricMetaElemMsg*)((char*)pMsg + pMsg->metaElem[tableIndex]);
|
||||
STabObj* pMetric = mgmtGetMeter(pElem->meterId);
|
||||
STabObj* pMetric = mgmtGetTable(pElem->meterId);
|
||||
char* pCond = NULL;
|
||||
char* tmpTableNameCond = NULL;
|
||||
|
||||
|
@ -806,11 +813,11 @@ int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQuer
|
|||
}
|
||||
|
||||
// todo refactor!!!!!
|
||||
static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, int32_t len, char* param) {
|
||||
static char* getTagValueFromMeter(STabObj* pTable, int32_t offset, int32_t len, char* param) {
|
||||
if (offset == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
extractTableName(pMeter->meterId, param);
|
||||
extractTableName(pTable->meterId, param);
|
||||
} else {
|
||||
char* tags = pMeter->pTagData + offset + TSDB_METER_ID_LEN; // tag start position
|
||||
char* tags = pTable->pTagData + offset + TSDB_TABLE_ID_LEN; // tag start position
|
||||
memcpy(param, tags, len); // make sure the value is null-terminated string
|
||||
}
|
||||
|
||||
|
@ -820,11 +827,11 @@ static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, int32_t len,
|
|||
bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
|
||||
|
||||
tQueryInfo* pInfo = (tQueryInfo*)param;
|
||||
STabObj* pMeter = (STabObj*)(((tSkipListNode*)pNode)->pData);
|
||||
STabObj* pTable = (STabObj*)(((tSkipListNode*)pNode)->pData);
|
||||
|
||||
char buf[TSDB_MAX_TAGS_LEN] = {0};
|
||||
|
||||
char* val = getTagValueFromMeter(pMeter, pInfo->offset, pInfo->sch.bytes, buf);
|
||||
char* val = getTagValueFromMeter(pTable, pInfo->offset, pInfo->sch.bytes, buf);
|
||||
int8_t type = pInfo->sch.type;
|
||||
|
||||
int32_t ret = 0;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include "os.h"
|
||||
|
||||
#include "dnodeSystem.h"
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "tsdb.h"
|
||||
#include "mgmtSystem.h"
|
||||
#include "dnodeModule.h"
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
#include <arpa/inet.h>
|
||||
|
||||
#include "dnodeSystem.h"
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
|
||||
extern void *mgmtTmr;
|
||||
extern void *mgmtStatusTimer;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtGrant.h"
|
||||
#include "mgmtAcct.h"
|
||||
#include "tschemautil.h"
|
||||
|
|
|
@ -15,71 +15,54 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "mgmtUtil.h"
|
||||
#include "textbuffer.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "vnodeTagMgmt.h"
|
||||
|
||||
extern int cksumsize;
|
||||
|
||||
uint64_t mgmtGetCheckSum(FILE* fp, int offset) {
|
||||
uint64_t checksum = 0;
|
||||
uint64_t data;
|
||||
int bytes;
|
||||
|
||||
while (1) {
|
||||
data = 0;
|
||||
bytes = fread(&data, sizeof(data), 1, fp);
|
||||
|
||||
if (bytes != sizeof(data)) break;
|
||||
|
||||
checksum += data;
|
||||
}
|
||||
|
||||
return checksum;
|
||||
bool mgmtTableCreateFromSuperTable(STabObj* pTableObj) {
|
||||
return pTableObj->tableType == TSDB_TABLE_TYPE_CREATE_FROM_STABLE;
|
||||
}
|
||||
|
||||
bool mgmtMeterCreateFromMetric(STabObj* pMeterObj) { return pMeterObj->meterType == TSDB_METER_MTABLE; }
|
||||
bool mgmtIsSuperTable(STabObj* pTableObj) {
|
||||
return pTableObj->tableType == TSDB_TABLE_TYPE_SUPER_TABLE;
|
||||
}
|
||||
|
||||
bool mgmtIsMetric(STabObj* pMeterObj) { return pMeterObj->meterType == TSDB_METER_METRIC; }
|
||||
|
||||
bool mgmtIsNormalMeter(STabObj* pMeterObj) { return !mgmtIsMetric(pMeterObj); }
|
||||
bool mgmtIsNormalTable(STabObj* pTableObj) {
|
||||
return !mgmtIsSuperTable(pTableObj);
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO: the tag offset value should be kept in memory to avoid dynamically calculating the value
|
||||
*
|
||||
* @param pMeter
|
||||
* @param pTable
|
||||
* @param col
|
||||
* @param pTagColSchema
|
||||
* @return
|
||||
*/
|
||||
char* mgmtMeterGetTag(STabObj* pMeter, int32_t col, SSchema* pTagColSchema) {
|
||||
if (!mgmtMeterCreateFromMetric(pMeter)) {
|
||||
char* mgmtTableGetTag(STabObj* pTable, int32_t col, SSchema* pTagColSchema) {
|
||||
if (!mgmtTableCreateFromSuperTable(pTable)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STabObj* pMetric = mgmtGetMeter(pMeter->pTagData);
|
||||
int32_t offset = mgmtGetTagsLength(pMetric, col) + TSDB_METER_ID_LEN;
|
||||
STabObj* pSuperTable = mgmtGetTable(pTable->pTagData);
|
||||
int32_t offset = mgmtGetTagsLength(pSuperTable, col) + TSDB_TABLE_ID_LEN;
|
||||
assert(offset > 0);
|
||||
|
||||
if (pTagColSchema != NULL) {
|
||||
*pTagColSchema = ((SSchema*)pMetric->schema)[pMetric->numOfColumns + col];
|
||||
*pTagColSchema = ((SSchema*)pSuperTable->schema)[pSuperTable->numOfColumns + col];
|
||||
}
|
||||
|
||||
return (pMeter->pTagData + offset);
|
||||
return (pTable->pTagData + offset);
|
||||
}
|
||||
|
||||
int32_t mgmtGetTagsLength(STabObj* pMetric, int32_t col) { // length before column col
|
||||
assert(mgmtIsMetric(pMetric) && col >= 0);
|
||||
int32_t mgmtGetTagsLength(STabObj* pSuperTable, int32_t col) { // length before column col
|
||||
assert(mgmtIsSuperTable(pSuperTable) && col >= 0);
|
||||
|
||||
int32_t len = 0;
|
||||
int32_t tagColumnIndexOffset = pMetric->numOfColumns;
|
||||
int32_t tagColumnIndexOffset = pSuperTable->numOfColumns;
|
||||
|
||||
for (int32_t i = 0; i < pMetric->numOfTags && i < col; ++i) {
|
||||
len += ((SSchema*)pMetric->schema)[tagColumnIndexOffset + i].bytes;
|
||||
for (int32_t i = 0; i < pSuperTable->numOfTags && i < col; ++i) {
|
||||
len += ((SSchema*)pSuperTable->schema)[tagColumnIndexOffset + i].bytes;
|
||||
}
|
||||
|
||||
return len;
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
|
||||
#include "mgmt.h"
|
||||
#include "mnode.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tlog.h"
|
||||
#include "vnodeStatus.h"
|
||||
|
@ -168,13 +168,13 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
|
|||
}
|
||||
|
||||
int mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
|
||||
STabObj *pMeter;
|
||||
STabObj *pTable;
|
||||
|
||||
if (pVgroup->numOfMeters > 0) {
|
||||
for (int i = 0; i < pDb->cfg.maxSessions; ++i) {
|
||||
if (pVgroup->meterList != NULL) {
|
||||
pMeter = pVgroup->meterList[i];
|
||||
if (pMeter) mgmtDropMeter(pDb, pMeter->meterId, 0);
|
||||
pTable = pVgroup->meterList[i];
|
||||
if (pTable) mgmtDropMeter(pDb, pTable->meterId, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -239,14 +239,14 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
|||
|
||||
int maxReplica = 0;
|
||||
SVgObj *pVgroup = NULL;
|
||||
STabObj *pMeter = NULL;
|
||||
STabObj *pTable = NULL;
|
||||
if (pShow->payloadLen > 0 ) {
|
||||
pMeter = mgmtGetMeter(pShow->payload);
|
||||
if (NULL == pMeter) {
|
||||
pTable = mgmtGetTable(pShow->payload);
|
||||
if (NULL == pTable) {
|
||||
return TSDB_CODE_INVALID_METER_ID;
|
||||
}
|
||||
|
||||
pVgroup = mgmtGetVgroup(pMeter->gid.vgId);
|
||||
pVgroup = mgmtGetVgroup(pTable->gid.vgId);
|
||||
if (NULL == pVgroup) return TSDB_CODE_INVALID_METER_ID;
|
||||
|
||||
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
|
||||
|
@ -292,7 +292,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
|||
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
|
||||
if (NULL == pMeter) {
|
||||
if (NULL == pTable) {
|
||||
pShow->numOfRows = pDb->numOfVgroups;
|
||||
pShow->pNode = pDb->pHead;
|
||||
} else {
|
||||
|
|
|
@ -188,7 +188,7 @@ void dnodeBuildMonitorSql(char *sql, int cmd) {
|
|||
snprintf(sql, SQL_LENGTH,
|
||||
"create table if not exists %s.slowquery(ts timestamp, username "
|
||||
"binary(%d), created_time timestamp, time bigint, sql binary(%d))",
|
||||
tsMonitorDbName, TSDB_METER_ID_LEN, TSDB_SHOW_SQL_LEN);
|
||||
tsMonitorDbName, TSDB_TABLE_ID_LEN, TSDB_SHOW_SQL_LEN);
|
||||
} else if (cmd == MONITOR_CMD_CREATE_TB_LOG) {
|
||||
snprintf(sql, SQL_LENGTH,
|
||||
"create table if not exists %s.log(ts timestamp, level tinyint, "
|
||||
|
|
|
@ -722,7 +722,7 @@ int taosSetSecurityInfo(int chann, int sid, char *id, int spi, int encrypt, char
|
|||
pConn->encrypt = encrypt;
|
||||
memcpy(pConn->secret, pConn->secret, TSDB_KEY_LEN);
|
||||
memcpy(pConn->cipheringKey, ckey, TSDB_KEY_LEN);
|
||||
memcpy(pConn->meterId, id, TSDB_METER_ID_LEN);
|
||||
memcpy(pConn->meterId, id, TSDB_TABLE_ID_LEN);
|
||||
*/
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
#define MAX_STR_LEN 40
|
||||
|
||||
typedef struct _str_node_t {
|
||||
char string[TSDB_METER_ID_LEN];
|
||||
char string[TSDB_TABLE_ID_LEN];
|
||||
int hash;
|
||||
struct _str_node_t *prev;
|
||||
struct _str_node_t *next;
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8)
|
||||
PROJECT(TDengine)
|
||||
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_OS_DIR}/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||
|
||||
IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
|
||||
AUX_SOURCE_DIRECTORY(src SRC)
|
||||
|
@ -34,6 +35,7 @@ ELSEIF (TD_WINDOWS_64)
|
|||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/iconv)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/regex)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
|
||||
LIST(APPEND SRC ./src/hash.c)
|
||||
LIST(APPEND SRC ./src/ihash.c)
|
||||
LIST(APPEND SRC ./src/lz4.c)
|
||||
LIST(APPEND SRC ./src/shash.c)
|
||||
|
@ -68,6 +70,7 @@ ELSEIF (TD_WINDOWS_64)
|
|||
TARGET_LINK_LIBRARIES(tutil iconv regex pthread os winmm IPHLPAPI ws2_32)
|
||||
ELSEIF(TD_DARWIN_64)
|
||||
ADD_DEFINITIONS(-DUSE_LIBICONV)
|
||||
LIST(APPEND SRC ./src/hash.c)
|
||||
LIST(APPEND SRC ./src/ihash.c)
|
||||
LIST(APPEND SRC ./src/lz4.c)
|
||||
LIST(APPEND SRC ./src/shash.c)
|
||||
|
|
|
@ -16,11 +16,13 @@
|
|||
#ifndef TDENGINE_TSTATUS_H
|
||||
#define TDENGINE_TSTATUS_H
|
||||
|
||||
#include "taoserror.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include "taoserror.h"
|
||||
|
||||
enum _TSDB_VG_STATUS {
|
||||
TSDB_VG_STATUS_READY = TSDB_CODE_SUCCESS,
|
|
@ -80,7 +80,12 @@ short tsNumOfVnodesPerCore = 8;
|
|||
short tsNumOfTotalVnodes = 0;
|
||||
short tsCheckHeaderFile = 0;
|
||||
|
||||
#ifdef _TD_ARM_32_
|
||||
int tsSessionsPerVnode = 100;
|
||||
#else
|
||||
int tsSessionsPerVnode = 1000;
|
||||
#endif
|
||||
|
||||
int tsCacheBlockSize = 16384; // 256 columns
|
||||
int tsAverageCacheBlocks = TSDB_DEFAULT_AVG_BLOCKS;
|
||||
/**
|
||||
|
|
|
@ -14,8 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "taosmsg.h"
|
||||
#include "tsdb.h"
|
||||
#include "vnodeStatus.h"
|
||||
#include "tstatus.h"
|
||||
|
||||
const char* taosGetVgroupStatusStr(int32_t vgroupStatus) {
|
||||
switch (vgroupStatus) {
|
|
@ -24,7 +24,43 @@
|
|||
#include "ttime.h"
|
||||
#include "tutil.h"
|
||||
|
||||
/*
|
||||
* mktime64 - Converts date to seconds.
|
||||
* Converts Gregorian date to seconds since 1970-01-01 00:00:00.
|
||||
* Assumes input in normal date format, i.e. 1980-12-31 23:59:59
|
||||
* => year=1980, mon=12, day=31, hour=23, min=59, sec=59.
|
||||
*
|
||||
* [For the Julian calendar (which was used in Russia before 1917,
|
||||
* Britain & colonies before 1752, anywhere else before 1582,
|
||||
* and is still in use by some communities) leave out the
|
||||
* -year/100+year/400 terms, and add 10.]
|
||||
*
|
||||
* This algorithm was first published by Gauss (I think).
|
||||
*
|
||||
* A leap second can be indicated by calling this function with sec as
|
||||
* 60 (allowable under ISO 8601). The leap second is treated the same
|
||||
* as the following second since they don't exist in UNIX time.
|
||||
*
|
||||
* An encoding of midnight at the end of the day as 24:00:00 - ie. midnight
|
||||
* tomorrow - (allowable under ISO 8601) is supported.
|
||||
*/
|
||||
int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
|
||||
const unsigned int day, const unsigned int hour,
|
||||
const unsigned int min, const unsigned int sec)
|
||||
{
|
||||
unsigned int mon = mon0, year = year0;
|
||||
|
||||
/* 1..12 -> 11,12,1..10 */
|
||||
if (0 >= (int) (mon -= 2)) {
|
||||
mon += 12; /* Puts Feb last since it has leap day */
|
||||
year -= 1;
|
||||
}
|
||||
|
||||
int64_t res = (((((int64_t) (year/4 - year/100 + year/400 + 367*mon/12 + day) +
|
||||
year*365 - 719499)*24 + hour)*60 + min)*60 + sec);
|
||||
|
||||
return (res + timezone);
|
||||
}
|
||||
// ==== mktime() kernel code =================//
|
||||
static int64_t m_deltaUtc = 0;
|
||||
void deltaToUtcInitOnce() {
|
||||
|
@ -293,7 +329,8 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
|
|||
|
||||
/* mktime will be affected by TZ, set by using taos_options */
|
||||
//int64_t seconds = mktime(&tm);
|
||||
int64_t seconds = (int64_t)user_mktime(&tm);
|
||||
//int64_t seconds = (int64_t)user_mktime(&tm);
|
||||
int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
|
||||
|
||||
int64_t fraction = 0;
|
||||
|
||||
|
|
|
@ -976,11 +976,21 @@ void assignVal(char *val, const char *src, int32_t len, int32_t type) {
|
|||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
#ifdef _TD_ARM_32_
|
||||
float fv = GET_FLOAT_VAL(src);
|
||||
SET_FLOAT_VAL_ALIGN(val, &fv);
|
||||
#else
|
||||
*((float *)val) = GET_FLOAT_VAL(src);
|
||||
#endif
|
||||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
#ifdef _TD_ARM_32_
|
||||
double dv = GET_DOUBLE_VAL(src);
|
||||
SET_DOUBLE_VAL_ALIGN(val, &dv);
|
||||
#else
|
||||
*((double *)val) = GET_DOUBLE_VAL(src);
|
||||
#endif
|
||||
break;
|
||||
};
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
char version[64] = "1.6.5.3";
|
||||
char version[64] = "1.6.5.4";
|
||||
char compatible_version[64] = "1.6.1.0";
|
||||
char gitinfo[128] = "700305490a82228ec1b0244afb838bdbb9de9793";
|
||||
char gitinfoOfInternal[128] = "";
|
||||
char buildinfo[512] = "Built by at 2020-01-17 13:22";
|
||||
char gitinfo[128] = "3264067e97300c84caa61ac909d548c9ca56de6b";
|
||||
char gitinfoOfInternal[128] = "da88f4a2474737d1f9c76adcf0ff7fd0975e7342";
|
||||
char buildinfo[512] = "Built by root at 2020-02-05 14:38";
|
||||
|
||||
void libtaos_edge_1_6_5_1_Linux_x64() {};
|
||||
void libtaos_1_6_5_4_Linux_x64() {};
|
||||
|
|
|
@ -158,7 +158,7 @@ typedef struct SColumn {
|
|||
|
||||
typedef struct _meter_obj {
|
||||
uint64_t uid;
|
||||
char meterId[TSDB_METER_ID_LEN];
|
||||
char meterId[TSDB_TABLE_ID_LEN];
|
||||
int sid;
|
||||
short vnode;
|
||||
short numOfColumns;
|
||||
|
|
|
@ -253,7 +253,7 @@ typedef struct SMeterQuerySupportObj {
|
|||
typedef struct _qinfo {
|
||||
uint64_t signature;
|
||||
int32_t refCount; // QInfo reference count, when the value is 0, it can be released safely
|
||||
char user[TSDB_METER_ID_LEN + 1];
|
||||
char user[TSDB_TABLE_ID_LEN + 1];
|
||||
char sql[TSDB_SHOW_SQL_LEN];
|
||||
uint8_t stream;
|
||||
uint16_t port;
|
||||
|
|
|
@ -492,7 +492,7 @@ void *vnodeCommitMultiToFile(SVnodeObj *pVnode, int ssid, int esid) {
|
|||
SMeterObj * pObj = NULL;
|
||||
SCompInfo compInfo = {0};
|
||||
SCompHeader * pHeader;
|
||||
SMeterInfo * meterInfo = NULL, *pMeter = NULL;
|
||||
SMeterInfo * meterInfo = NULL, *pTable = NULL;
|
||||
SQuery query;
|
||||
SColumnInfoEx colList[TSDB_MAX_COLUMNS] = {0};
|
||||
SSqlFunctionExpr pExprs[TSDB_MAX_COLUMNS] = {0};
|
||||
|
@ -617,7 +617,7 @@ _again:
|
|||
continue;
|
||||
}
|
||||
|
||||
pMeter = meterInfo + sid;
|
||||
pTable = meterInfo + sid;
|
||||
pHeader = ((SCompHeader *)tmem) + sid;
|
||||
|
||||
if (pVnode->hfd > 0) {
|
||||
|
@ -633,18 +633,18 @@ _again:
|
|||
goto _over;
|
||||
} else {
|
||||
if (pObj->uid == compInfo.uid) {
|
||||
pMeter->oldNumOfBlocks = compInfo.numOfBlocks;
|
||||
pMeter->oldCompBlockOffset = pHeader->compInfoOffset + sizeof(SCompInfo);
|
||||
pMeter->last = compInfo.last;
|
||||
pTable->oldNumOfBlocks = compInfo.numOfBlocks;
|
||||
pTable->oldCompBlockOffset = pHeader->compInfoOffset + sizeof(SCompInfo);
|
||||
pTable->last = compInfo.last;
|
||||
if (compInfo.numOfBlocks > maxOldBlocks) maxOldBlocks = compInfo.numOfBlocks;
|
||||
if (pMeter->last) {
|
||||
if (pTable->last) {
|
||||
lseek(pVnode->hfd, sizeof(SCompBlock) * (compInfo.numOfBlocks - 1), SEEK_CUR);
|
||||
read(pVnode->hfd, &pMeter->lastBlock, sizeof(SCompBlock));
|
||||
read(pVnode->hfd, &pTable->lastBlock, sizeof(SCompBlock));
|
||||
}
|
||||
} else {
|
||||
dTrace("vid:%d sid:%d id:%s, uid:%" PRIu64 " is not matched with old:%" PRIu64 ", old data will be thrown away",
|
||||
vnode, sid, pObj->meterId, pObj->uid, compInfo.uid);
|
||||
pMeter->oldNumOfBlocks = 0;
|
||||
pTable->oldNumOfBlocks = 0;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -669,8 +669,8 @@ _again:
|
|||
pObj->pointsPerFileBlock * pObj->schema[col - 1].bytes + EXTRA_BYTES + sizeof(TSCKSUM));
|
||||
}
|
||||
|
||||
pMeter = meterInfo + sid;
|
||||
pMeter->tempHeadOffset = headLen;
|
||||
pTable = meterInfo + sid;
|
||||
pTable->tempHeadOffset = headLen;
|
||||
|
||||
memset(&query, 0, sizeof(query));
|
||||
query.colList = colList;
|
||||
|
@ -690,27 +690,27 @@ _again:
|
|||
pointsReadLast = 0;
|
||||
|
||||
// last block is at last file
|
||||
if (pMeter->last) {
|
||||
if ((pMeter->lastBlock.sversion != pObj->sversion) || (query.over)) {
|
||||
if (pTable->last) {
|
||||
if ((pTable->lastBlock.sversion != pObj->sversion) || (query.over)) {
|
||||
// TODO : Check the correctness of this code. write the last block to
|
||||
// .data file
|
||||
pCompBlock = (SCompBlock *)(hmem + headLen);
|
||||
assert(dmem - (char *)pCompBlock >= sizeof(SCompBlock));
|
||||
*pCompBlock = pMeter->lastBlock;
|
||||
if (pMeter->lastBlock.sversion != pObj->sversion) {
|
||||
*pCompBlock = pTable->lastBlock;
|
||||
if (pTable->lastBlock.sversion != pObj->sversion) {
|
||||
pCompBlock->last = 0;
|
||||
pCompBlock->offset = lseek(pVnode->dfd, 0, SEEK_END);
|
||||
pMeter->last = 0;
|
||||
lseek(pVnode->lfd, pMeter->lastBlock.offset, SEEK_SET);
|
||||
tsendfile(pVnode->dfd, pVnode->lfd, NULL, pMeter->lastBlock.len);
|
||||
pVnode->dfSize = pCompBlock->offset + pMeter->lastBlock.len;
|
||||
pTable->last = 0;
|
||||
lseek(pVnode->lfd, pTable->lastBlock.offset, SEEK_SET);
|
||||
tsendfile(pVnode->dfd, pVnode->lfd, NULL, pTable->lastBlock.len);
|
||||
pVnode->dfSize = pCompBlock->offset + pTable->lastBlock.len;
|
||||
} else {
|
||||
if (ssid == 0) {
|
||||
assert(pCompBlock->last && pVnode->tfd != -1);
|
||||
pCompBlock->offset = lseek(pVnode->tfd, 0, SEEK_END);
|
||||
lseek(pVnode->lfd, pMeter->lastBlock.offset, SEEK_SET);
|
||||
tsendfile(pVnode->tfd, pVnode->lfd, NULL, pMeter->lastBlock.len);
|
||||
pVnode->lfSize = pCompBlock->offset + pMeter->lastBlock.len;
|
||||
lseek(pVnode->lfd, pTable->lastBlock.offset, SEEK_SET);
|
||||
tsendfile(pVnode->tfd, pVnode->lfd, NULL, pTable->lastBlock.len);
|
||||
pVnode->lfSize = pCompBlock->offset + pTable->lastBlock.len;
|
||||
} else {
|
||||
assert(pVnode->tfd == -1);
|
||||
}
|
||||
|
@ -718,12 +718,12 @@ _again:
|
|||
}
|
||||
|
||||
headLen += sizeof(SCompBlock);
|
||||
pMeter->newNumOfBlocks++;
|
||||
pTable->newNumOfBlocks++;
|
||||
} else {
|
||||
// read last block into memory
|
||||
if (vnodeReadLastBlockToMem(pObj, &pMeter->lastBlock, data) < 0) goto _over;
|
||||
pMeter->last = 0;
|
||||
pointsReadLast = pMeter->lastBlock.numOfPoints;
|
||||
if (vnodeReadLastBlockToMem(pObj, &pTable->lastBlock, data) < 0) goto _over;
|
||||
pTable->last = 0;
|
||||
pointsReadLast = pTable->lastBlock.numOfPoints;
|
||||
query.over = 0;
|
||||
headInfo.totalStorage -= (pointsReadLast * pObj->bytesPerPoint);
|
||||
|
||||
|
@ -731,8 +731,8 @@ _again:
|
|||
pObj->vnode, pObj->sid, pObj->meterId, pointsReadLast);
|
||||
}
|
||||
|
||||
pMeter->changed = 1;
|
||||
pMeter->oldNumOfBlocks--;
|
||||
pTable->changed = 1;
|
||||
pTable->oldNumOfBlocks--;
|
||||
}
|
||||
|
||||
while (query.over == 0) {
|
||||
|
@ -753,17 +753,17 @@ _again:
|
|||
pCompBlock->last = 1;
|
||||
if (vnodeWriteBlockToFile(pObj, pCompBlock, data, cdata, pointsRead) < 0) goto _over;
|
||||
if (pCompBlock->keyLast > pObj->lastKeyOnFile) pObj->lastKeyOnFile = pCompBlock->keyLast;
|
||||
pMeter->last = pCompBlock->last;
|
||||
pTable->last = pCompBlock->last;
|
||||
|
||||
// write block info into header buffer
|
||||
headLen += sizeof(SCompBlock);
|
||||
pMeter->newNumOfBlocks++;
|
||||
pMeter->committedPoints += (pointsRead - pointsReadLast);
|
||||
pTable->newNumOfBlocks++;
|
||||
pTable->committedPoints += (pointsRead - pointsReadLast);
|
||||
|
||||
dTrace("vid:%d sid:%d id:%s, pointsRead:%d, pointsReadLast:%d lastKey:%" PRId64 ", "
|
||||
"slot:%d pos:%d newNumOfBlocks:%d headLen:%d",
|
||||
pObj->vnode, pObj->sid, pObj->meterId, pointsRead, pointsReadLast, pObj->lastKeyOnFile, query.slot, query.pos,
|
||||
pMeter->newNumOfBlocks, headLen);
|
||||
pTable->newNumOfBlocks, headLen);
|
||||
|
||||
if (pointsRead < pObj->pointsPerFileBlock || query.keyIsMet) break;
|
||||
|
||||
|
@ -772,12 +772,12 @@ _again:
|
|||
}
|
||||
|
||||
dTrace("vid:%d sid:%d id:%s, %d points are committed, lastKey:%" PRId64 " slot:%d pos:%d newNumOfBlocks:%d",
|
||||
pObj->vnode, pObj->sid, pObj->meterId, pMeter->committedPoints, pObj->lastKeyOnFile, query.slot, query.pos,
|
||||
pMeter->newNumOfBlocks);
|
||||
pObj->vnode, pObj->sid, pObj->meterId, pTable->committedPoints, pObj->lastKeyOnFile, query.slot, query.pos,
|
||||
pTable->newNumOfBlocks);
|
||||
|
||||
if (pMeter->committedPoints > 0) {
|
||||
pMeter->commitSlot = query.slot;
|
||||
pMeter->commitPos = query.pos;
|
||||
if (pTable->committedPoints > 0) {
|
||||
pTable->commitSlot = query.slot;
|
||||
pTable->commitPos = query.pos;
|
||||
}
|
||||
|
||||
TSKEY nextKey = 0;
|
||||
|
@ -805,19 +805,19 @@ _again:
|
|||
continue;
|
||||
}
|
||||
|
||||
pMeter = meterInfo + sid;
|
||||
pMeter->compInfoOffset = compInfoOffset;
|
||||
pMeter->finalNumOfBlocks = pMeter->oldNumOfBlocks + pMeter->newNumOfBlocks;
|
||||
pTable = meterInfo + sid;
|
||||
pTable->compInfoOffset = compInfoOffset;
|
||||
pTable->finalNumOfBlocks = pTable->oldNumOfBlocks + pTable->newNumOfBlocks;
|
||||
|
||||
if (pMeter->finalNumOfBlocks > 0) {
|
||||
pHeader->compInfoOffset = pMeter->compInfoOffset;
|
||||
compInfoOffset += sizeof(SCompInfo) + pMeter->finalNumOfBlocks * sizeof(SCompBlock) + sizeof(TSCKSUM);
|
||||
if (pTable->finalNumOfBlocks > 0) {
|
||||
pHeader->compInfoOffset = pTable->compInfoOffset;
|
||||
compInfoOffset += sizeof(SCompInfo) + pTable->finalNumOfBlocks * sizeof(SCompBlock) + sizeof(TSCKSUM);
|
||||
} else {
|
||||
pHeader->compInfoOffset = 0;
|
||||
}
|
||||
|
||||
dTrace("vid:%d sid:%d id:%s, oldBlocks:%d numOfBlocks:%d compInfoOffset:%d", pObj->vnode, pObj->sid, pObj->meterId,
|
||||
pMeter->oldNumOfBlocks, pMeter->finalNumOfBlocks, compInfoOffset);
|
||||
pTable->oldNumOfBlocks, pTable->finalNumOfBlocks, compInfoOffset);
|
||||
}
|
||||
|
||||
// write the comp header into new file
|
||||
|
@ -838,16 +838,16 @@ _again:
|
|||
pObj = (SMeterObj *)(pVnode->meterList[sid]);
|
||||
if (pObj == NULL) continue;
|
||||
|
||||
pMeter = meterInfo + sid;
|
||||
if (pMeter->finalNumOfBlocks <= 0) continue;
|
||||
pTable = meterInfo + sid;
|
||||
if (pTable->finalNumOfBlocks <= 0) continue;
|
||||
|
||||
compInfo.last = pMeter->last;
|
||||
compInfo.last = pTable->last;
|
||||
compInfo.uid = pObj->uid;
|
||||
compInfo.numOfBlocks = pMeter->finalNumOfBlocks;
|
||||
/* compInfo.compBlockLen = pMeter->finalCompBlockLen; */
|
||||
compInfo.numOfBlocks = pTable->finalNumOfBlocks;
|
||||
/* compInfo.compBlockLen = pTable->finalCompBlockLen; */
|
||||
compInfo.delimiter = TSDB_VNODE_DELIMITER;
|
||||
taosCalcChecksumAppend(0, (uint8_t *)(&compInfo), sizeof(SCompInfo));
|
||||
lseek(pVnode->nfd, pMeter->compInfoOffset, SEEK_SET);
|
||||
lseek(pVnode->nfd, pTable->compInfoOffset, SEEK_SET);
|
||||
if (twrite(pVnode->nfd, &compInfo, sizeof(compInfo)) <= 0) {
|
||||
dError("vid:%d sid:%d id:%s, failed to write:%s, reason:%s", vnode, sid, pObj->meterId, pVnode->nfn,
|
||||
strerror(errno));
|
||||
|
@ -857,23 +857,23 @@ _again:
|
|||
|
||||
// write the old comp blocks
|
||||
chksum = 0;
|
||||
if (pVnode->hfd && pMeter->oldNumOfBlocks) {
|
||||
lseek(pVnode->hfd, pMeter->oldCompBlockOffset, SEEK_SET);
|
||||
if (pMeter->changed) {
|
||||
int compBlockLen = pMeter->oldNumOfBlocks * sizeof(SCompBlock);
|
||||
if (pVnode->hfd && pTable->oldNumOfBlocks) {
|
||||
lseek(pVnode->hfd, pTable->oldCompBlockOffset, SEEK_SET);
|
||||
if (pTable->changed) {
|
||||
int compBlockLen = pTable->oldNumOfBlocks * sizeof(SCompBlock);
|
||||
read(pVnode->hfd, pOldCompBlocks, compBlockLen);
|
||||
twrite(pVnode->nfd, pOldCompBlocks, compBlockLen);
|
||||
chksum = taosCalcChecksum(0, pOldCompBlocks, compBlockLen);
|
||||
} else {
|
||||
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pMeter->oldNumOfBlocks * sizeof(SCompBlock));
|
||||
tsendfile(pVnode->nfd, pVnode->hfd, NULL, pTable->oldNumOfBlocks * sizeof(SCompBlock));
|
||||
read(pVnode->hfd, &chksum, sizeof(TSCKSUM));
|
||||
}
|
||||
}
|
||||
|
||||
if (pMeter->newNumOfBlocks) {
|
||||
chksum = taosCalcChecksum(chksum, (uint8_t *)(hmem + pMeter->tempHeadOffset),
|
||||
pMeter->newNumOfBlocks * sizeof(SCompBlock));
|
||||
if (twrite(pVnode->nfd, hmem + pMeter->tempHeadOffset, pMeter->newNumOfBlocks * sizeof(SCompBlock)) <= 0) {
|
||||
if (pTable->newNumOfBlocks) {
|
||||
chksum = taosCalcChecksum(chksum, (uint8_t *)(hmem + pTable->tempHeadOffset),
|
||||
pTable->newNumOfBlocks * sizeof(SCompBlock));
|
||||
if (twrite(pVnode->nfd, hmem + pTable->tempHeadOffset, pTable->newNumOfBlocks * sizeof(SCompBlock)) <= 0) {
|
||||
dError("vid:%d sid:%d id:%s, failed to write:%s, reason:%s", vnode, sid, pObj->meterId, pVnode->nfn,
|
||||
strerror(errno));
|
||||
vnodeRecoverFromPeer(pVnode, pVnode->commitFileId);
|
||||
|
@ -891,11 +891,11 @@ _again:
|
|||
pObj = (SMeterObj *)(pVnode->meterList[sid]);
|
||||
if (pObj == NULL) continue;
|
||||
|
||||
pMeter = meterInfo + sid;
|
||||
if (pMeter->finalNumOfBlocks <= 0) continue;
|
||||
pTable = meterInfo + sid;
|
||||
if (pTable->finalNumOfBlocks <= 0) continue;
|
||||
|
||||
if (pMeter->committedPoints > 0) {
|
||||
vnodeUpdateCommitInfo(pObj, pMeter->commitSlot, pMeter->commitPos, pMeter->commitCount);
|
||||
if (pTable->committedPoints > 0) {
|
||||
vnodeUpdateCommitInfo(pObj, pTable->commitSlot, pTable->commitPos, pTable->commitCount);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1236,12 +1236,12 @@ _error_merge:
|
|||
} \
|
||||
}
|
||||
|
||||
int isCacheEnd(SBlockIter iter, SMeterObj *pMeter) {
|
||||
SCacheInfo *pInfo = (SCacheInfo *)(pMeter->pCache);
|
||||
int isCacheEnd(SBlockIter iter, SMeterObj *pTable) {
|
||||
SCacheInfo *pInfo = (SCacheInfo *)(pTable->pCache);
|
||||
int slot = 0;
|
||||
int pos = 0;
|
||||
|
||||
if (pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints == pMeter->pointsPerBlock) {
|
||||
if (pInfo->cacheBlocks[pInfo->currentSlot]->numOfPoints == pTable->pointsPerBlock) {
|
||||
slot = (pInfo->currentSlot + 1) % (pInfo->maxBlocks);
|
||||
pos = 0;
|
||||
} else {
|
||||
|
|
|
@ -4506,7 +4506,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
|
|||
}
|
||||
|
||||
// get one queried meter
|
||||
SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[0]->sid);
|
||||
SMeterObj *pTable = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[0]->sid);
|
||||
|
||||
pRuntimeEnv->pTSBuf = param;
|
||||
pRuntimeEnv->cur.vnodeIndex = -1;
|
||||
|
@ -4517,18 +4517,18 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
|
|||
tsBufSetTraverseOrder(pRuntimeEnv->pTSBuf, order);
|
||||
}
|
||||
|
||||
int32_t ret = setupQueryRuntimeEnv(pMeter, pQuery, &pSupporter->runtimeEnv, pTagSchema, TSQL_SO_ASC, true);
|
||||
int32_t ret = setupQueryRuntimeEnv(pTable, pQuery, &pSupporter->runtimeEnv, pTagSchema, TSQL_SO_ASC, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
ret = allocateRuntimeEnvBuf(pRuntimeEnv, pMeter);
|
||||
ret = allocateRuntimeEnvBuf(pRuntimeEnv, pTable);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
tSidSetSort(pSupporter->pSidSet);
|
||||
vnodeRecordAllFiles(pQInfo, pMeter->vnode);
|
||||
vnodeRecordAllFiles(pQInfo, pTable->vnode);
|
||||
|
||||
if ((ret = allocateOutputBufForGroup(pSupporter, pQuery, true)) != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
|
@ -4595,12 +4595,12 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
|
|||
} else {
|
||||
int32_t num = 0;
|
||||
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
|
||||
SMeterObj *pMeter = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[i]->sid);
|
||||
atomic_fetch_sub_32(&(pMeter->numOfQueries), 1);
|
||||
SMeterObj *pTable = getMeterObj(pSupporter->pMetersHashTable, pSupporter->pSidSet->pSids[i]->sid);
|
||||
atomic_fetch_sub_32(&(pTable->numOfQueries), 1);
|
||||
|
||||
if (pMeter->numOfQueries > 0) {
|
||||
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pMeter->vnode, pMeter->sid,
|
||||
pMeter->meterId, pMeter->numOfQueries);
|
||||
if (pTable->numOfQueries > 0) {
|
||||
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d", pQInfo, pTable->vnode, pTable->sid,
|
||||
pTable->meterId, pTable->numOfQueries);
|
||||
num++;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -186,15 +186,15 @@ void vnodeUpdateStreamRole(SVnodeObj *pVnode) {
|
|||
|
||||
// Callback function called from client
|
||||
void vnodeCloseStreamCallback(void *param) {
|
||||
SMeterObj *pMeter = (SMeterObj *)param;
|
||||
SMeterObj *pTable = (SMeterObj *)param;
|
||||
SVnodeObj *pVnode = NULL;
|
||||
|
||||
if (pMeter == NULL || pMeter->sqlLen == 0) return;
|
||||
pVnode = vnodeList + pMeter->vnode;
|
||||
if (pTable == NULL || pTable->sqlLen == 0) return;
|
||||
pVnode = vnodeList + pTable->vnode;
|
||||
|
||||
pMeter->sqlLen = 0;
|
||||
pMeter->pSql = NULL;
|
||||
pMeter->pStream = NULL;
|
||||
pTable->sqlLen = 0;
|
||||
pTable->pSql = NULL;
|
||||
pTable->pStream = NULL;
|
||||
|
||||
pVnode->numOfStreams--;
|
||||
|
||||
|
@ -203,5 +203,5 @@ void vnodeCloseStreamCallback(void *param) {
|
|||
pVnode->dbConn = NULL;
|
||||
}
|
||||
|
||||
vnodeSaveMeterObjToFile(pMeter);
|
||||
vnodeSaveMeterObjToFile(pTable);
|
||||
}
|
|
@ -527,7 +527,7 @@ bool vnodeIsProjectionQuery(SSqlFunctionExpr* pExpr, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
/*
|
||||
* the pMeter->state may be changed by vnodeIsSafeToDeleteMeter and import/update processor, the check of
|
||||
* the pTable->state may be changed by vnodeIsSafeToDeleteMeter and import/update processor, the check of
|
||||
* the state will not always be correct.
|
||||
*
|
||||
* The import/update/deleting is actually blocked by current query processing if the check of meter state is
|
||||
|
@ -548,30 +548,30 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
for (int32_t i = 0; i < pQueryMsg->numOfSids; ++i) {
|
||||
SMeterObj* pMeter = pVnode->meterList[pSids[i]->sid];
|
||||
SMeterObj* pTable = pVnode->meterList[pSids[i]->sid];
|
||||
|
||||
/*
|
||||
* If table is missing or is in dropping status, config it from management node, and ignore it
|
||||
* during query processing. The error code of TSDB_CODE_NOT_ACTIVE_TABLE will never return to client.
|
||||
* The missing table needs to be removed from pSids list
|
||||
*/
|
||||
if (pMeter == NULL || vnodeIsMeterState(pMeter, TSDB_METER_STATE_DROPPING)) {
|
||||
if (pTable == NULL || vnodeIsMeterState(pTable, TSDB_METER_STATE_DROPPING)) {
|
||||
dWarn("qmsg:%p, vid:%d sid:%d, not there or will be dropped, ignore this table in query", pQueryMsg,
|
||||
pQueryMsg->vnode, pSids[i]->sid);
|
||||
|
||||
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
|
||||
continue;
|
||||
} else if (pMeter->uid != pSids[i]->uid || pMeter->sid != pSids[i]->sid) {
|
||||
} else if (pTable->uid != pSids[i]->uid || pTable->sid != pSids[i]->sid) {
|
||||
code = TSDB_CODE_TABLE_ID_MISMATCH;
|
||||
dError("qmsg:%p, vid:%d sid:%d id:%s uid:%" PRIu64 ", id mismatch. sid:%d uid:%" PRId64 " in msg", pQueryMsg,
|
||||
pQueryMsg->vnode, pMeter->sid, pMeter->meterId, pMeter->uid, pSids[i]->sid, pSids[i]->uid);
|
||||
pQueryMsg->vnode, pTable->sid, pTable->meterId, pTable->uid, pSids[i]->sid, pSids[i]->uid);
|
||||
|
||||
vnodeSendMeterCfgMsg(pQueryMsg->vnode, pSids[i]->sid);
|
||||
continue;
|
||||
} else if (pMeter->state > TSDB_METER_STATE_INSERTING) { //update or import
|
||||
} else if (pTable->state > TSDB_METER_STATE_INSERTING) { //update or import
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
dTrace("qmsg:%p, vid:%d sid:%d id:%s, it is in state:%s, wait!", pQueryMsg, pQueryMsg->vnode, pSids[i]->sid,
|
||||
pMeter->meterId, taosGetTableStatusStr(pMeter->state));
|
||||
pTable->meterId, taosGetTableStatusStr(pTable->state));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -579,15 +579,15 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
|
|||
* vnodeIsSafeToDeleteMeter will wait for this function complete, and then it can
|
||||
* check if the numOfQueries is 0 or not.
|
||||
*/
|
||||
pMeterObjList[(*numOfIncTables)++] = pMeter;
|
||||
atomic_fetch_add_32(&pMeter->numOfQueries, 1);
|
||||
pMeterObjList[(*numOfIncTables)++] = pTable;
|
||||
atomic_fetch_add_32(&pTable->numOfQueries, 1);
|
||||
|
||||
pSids[index++] = pSids[i];
|
||||
|
||||
// output for meter more than one query executed
|
||||
if (pMeter->numOfQueries > 1) {
|
||||
dTrace("qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid,
|
||||
pMeter->meterId, pMeter->numOfQueries);
|
||||
if (pTable->numOfQueries > 1) {
|
||||
dTrace("qmsg:%p, vid:%d sid:%d id:%s, inc query ref, numOfQueries:%d", pQueryMsg, pTable->vnode, pTable->sid,
|
||||
pTable->meterId, pTable->numOfQueries);
|
||||
num++;
|
||||
}
|
||||
}
|
||||
|
@ -605,14 +605,14 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList,
|
|||
int32_t num = 0;
|
||||
|
||||
for (int32_t i = 0; i < numOfIncTables; ++i) {
|
||||
SMeterObj* pMeter = pMeterObjList[i];
|
||||
SMeterObj* pTable = pMeterObjList[i];
|
||||
|
||||
if (pMeter != NULL) { // here, do not need to lock to perform operations
|
||||
atomic_fetch_sub_32(&pMeter->numOfQueries, 1);
|
||||
if (pTable != NULL) { // here, do not need to lock to perform operations
|
||||
atomic_fetch_sub_32(&pTable->numOfQueries, 1);
|
||||
|
||||
if (pMeter->numOfQueries > 0) {
|
||||
dTrace("qmsg:%p, vid:%d sid:%d id:%s dec query ref, numOfQueries:%d", pQueryMsg, pMeter->vnode, pMeter->sid,
|
||||
pMeter->meterId, pMeter->numOfQueries);
|
||||
if (pTable->numOfQueries > 0) {
|
||||
dTrace("qmsg:%p, vid:%d sid:%d id:%s dec query ref, numOfQueries:%d", pQueryMsg, pTable->vnode, pTable->sid,
|
||||
pTable->meterId, pTable->numOfQueries);
|
||||
num++;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue