Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/hzcheng_3.0
This commit is contained in:
commit
8bca9c9733
|
@ -301,6 +301,8 @@ typedef struct SSchema {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t nCols;
|
int32_t nCols;
|
||||||
int32_t sver;
|
int32_t sver;
|
||||||
|
int32_t tagVer;
|
||||||
|
int32_t colVer;
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
} SSchemaWrapper;
|
} SSchemaWrapper;
|
||||||
|
|
||||||
|
@ -309,6 +311,8 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
|
||||||
if (pSW == NULL) return pSW;
|
if (pSW == NULL) return pSW;
|
||||||
pSW->nCols = pSchemaWrapper->nCols;
|
pSW->nCols = pSchemaWrapper->nCols;
|
||||||
pSW->sver = pSchemaWrapper->sver;
|
pSW->sver = pSchemaWrapper->sver;
|
||||||
|
pSW->tagVer = pSchemaWrapper->tagVer;
|
||||||
|
pSW->colVer = pSchemaWrapper->colVer;
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) {
|
if (pSW->pSchema == NULL) {
|
||||||
taosMemoryFree(pSW);
|
taosMemoryFree(pSW);
|
||||||
|
@ -364,6 +368,8 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeVariantI32(buf, pSW->nCols);
|
tlen += taosEncodeVariantI32(buf, pSW->nCols);
|
||||||
tlen += taosEncodeVariantI32(buf, pSW->sver);
|
tlen += taosEncodeVariantI32(buf, pSW->sver);
|
||||||
|
tlen += taosEncodeVariantI32(buf, pSW->tagVer);
|
||||||
|
tlen += taosEncodeVariantI32(buf, pSW->colVer);
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
|
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
|
||||||
}
|
}
|
||||||
|
@ -373,6 +379,8 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
|
||||||
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
||||||
buf = taosDecodeVariantI32(buf, &pSW->nCols);
|
buf = taosDecodeVariantI32(buf, &pSW->nCols);
|
||||||
buf = taosDecodeVariantI32(buf, &pSW->sver);
|
buf = taosDecodeVariantI32(buf, &pSW->sver);
|
||||||
|
buf = taosDecodeVariantI32(buf, &pSW->tagVer);
|
||||||
|
buf = taosDecodeVariantI32(buf, &pSW->colVer);
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) {
|
if (pSW->pSchema == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -387,6 +395,8 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
|
||||||
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
|
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
|
||||||
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
|
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
|
||||||
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
|
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
|
||||||
|
if (tEncodeI32v(pEncoder, pSW->tagVer) < 0) return -1;
|
||||||
|
if (tEncodeI32v(pEncoder, pSW->colVer) < 0) return -1;
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
|
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
@ -397,6 +407,8 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch
|
||||||
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
||||||
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
||||||
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
||||||
|
if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1;
|
||||||
|
if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1;
|
||||||
|
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) return -1;
|
if (pSW->pSchema == NULL) return -1;
|
||||||
|
@ -410,6 +422,8 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra
|
||||||
static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
||||||
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
||||||
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
||||||
|
if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1;
|
||||||
|
if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1;
|
||||||
|
|
||||||
pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema));
|
pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema));
|
||||||
if (pSW->pSchema == NULL) return -1;
|
if (pSW->pSchema == NULL) return -1;
|
||||||
|
@ -455,6 +469,7 @@ int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
int8_t alterType;
|
int8_t alterType;
|
||||||
|
int32_t verInBlock;
|
||||||
int32_t numOfFields;
|
int32_t numOfFields;
|
||||||
SArray* pFields;
|
SArray* pFields;
|
||||||
int32_t ttl;
|
int32_t ttl;
|
||||||
|
|
|
@ -142,6 +142,8 @@ void fmFuncMgtDestroy();
|
||||||
|
|
||||||
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc);
|
int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc);
|
||||||
|
|
||||||
|
bool fmIsBuiltinFunc(const char* pFunc);
|
||||||
|
|
||||||
bool fmIsAggFunc(int32_t funcId);
|
bool fmIsAggFunc(int32_t funcId);
|
||||||
bool fmIsScalarFunc(int32_t funcId);
|
bool fmIsScalarFunc(int32_t funcId);
|
||||||
bool fmIsNonstandardSQLFunc(int32_t funcId);
|
bool fmIsNonstandardSQLFunc(int32_t funcId);
|
||||||
|
|
|
@ -78,7 +78,7 @@ typedef struct SAlterDatabaseStmt {
|
||||||
|
|
||||||
typedef struct STableOptions {
|
typedef struct STableOptions {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
char comment[TSDB_STB_COMMENT_LEN];
|
char comment[TSDB_TB_COMMENT_LEN];
|
||||||
int32_t delay;
|
int32_t delay;
|
||||||
float filesFactor;
|
float filesFactor;
|
||||||
SNodeList* pRollupFuncs;
|
SNodeList* pRollupFuncs;
|
||||||
|
@ -90,7 +90,7 @@ typedef struct SColumnDefNode {
|
||||||
ENodeType type;
|
ENodeType type;
|
||||||
char colName[TSDB_COL_NAME_LEN];
|
char colName[TSDB_COL_NAME_LEN];
|
||||||
SDataType dataType;
|
SDataType dataType;
|
||||||
char comments[TSDB_STB_COMMENT_LEN];
|
char comments[TSDB_TB_COMMENT_LEN];
|
||||||
bool sma;
|
bool sma;
|
||||||
} SColumnDefNode;
|
} SColumnDefNode;
|
||||||
|
|
||||||
|
|
|
@ -647,6 +647,8 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY TAOS_DEF_ERROR_CODE(0, 0x264A)
|
#define TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY TAOS_DEF_ERROR_CODE(0, 0x264A)
|
||||||
#define TSDB_CODE_PAR_INVALID_MODIFY_COL TAOS_DEF_ERROR_CODE(0, 0x264B)
|
#define TSDB_CODE_PAR_INVALID_MODIFY_COL TAOS_DEF_ERROR_CODE(0, 0x264B)
|
||||||
#define TSDB_CODE_PAR_INVALID_TBNAME TAOS_DEF_ERROR_CODE(0, 0x264C)
|
#define TSDB_CODE_PAR_INVALID_TBNAME TAOS_DEF_ERROR_CODE(0, 0x264C)
|
||||||
|
#define TSDB_CODE_PAR_INVALID_FUNCTION_NAME TAOS_DEF_ERROR_CODE(0, 0x264D)
|
||||||
|
#define TSDB_CODE_PAR_COMMENT_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x264E)
|
||||||
|
|
||||||
//planner
|
//planner
|
||||||
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
|
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
|
||||||
|
|
|
@ -218,8 +218,8 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_MAX_SQL_SHOW_LEN 1024
|
#define TSDB_MAX_SQL_SHOW_LEN 1024
|
||||||
#define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb
|
#define TSDB_MAX_ALLOWED_SQL_LEN (1 * 1024 * 1024u) // sql length should be less than 1mb
|
||||||
|
|
||||||
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
|
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
|
||||||
#define TSDB_STB_COMMENT_LEN 1024
|
#define TSDB_TB_COMMENT_LEN 1025
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In some scenarios uint16_t (0~65535) is used to store the row len.
|
* In some scenarios uint16_t (0~65535) is used to store the row len.
|
||||||
|
|
|
@ -600,6 +600,7 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq)
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||||
if (tEncodeI8(&encoder, pReq->alterType) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->alterType) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->verInBlock) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->numOfFields) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->numOfFields) < 0) return -1;
|
||||||
for (int32_t i = 0; i < pReq->numOfFields; ++i) {
|
for (int32_t i = 0; i < pReq->numOfFields; ++i) {
|
||||||
SField *pField = taosArrayGet(pReq->pFields, i);
|
SField *pField = taosArrayGet(pReq->pFields, i);
|
||||||
|
@ -626,6 +627,7 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &pReq->alterType) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->alterType) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->verInBlock) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->numOfFields) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->numOfFields) < 0) return -1;
|
||||||
pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField));
|
pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField));
|
||||||
if (pReq->pFields == NULL) {
|
if (pReq->pFields == NULL) {
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "bmInt.h"
|
#include "bmInt.h"
|
||||||
|
|
||||||
static void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
|
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
|
||||||
|
|
||||||
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
SMonBmInfo bmInfo = {0};
|
SMonBmInfo bmInfo = {0};
|
||||||
|
|
|
@ -32,7 +32,9 @@ typedef struct SDnodeMgmt {
|
||||||
SSingleWorker mgmtWorker;
|
SSingleWorker mgmtWorker;
|
||||||
ProcessCreateNodeFp processCreateNodeFp;
|
ProcessCreateNodeFp processCreateNodeFp;
|
||||||
ProcessDropNodeFp processDropNodeFp;
|
ProcessDropNodeFp processDropNodeFp;
|
||||||
IsNodeRequiredFp isNodeRequiredFp;
|
SendMonitorReportFp sendMonitorReportFp;
|
||||||
|
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||||
|
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||||
} SDnodeMgmt;
|
} SDnodeMgmt;
|
||||||
|
|
||||||
// dmHandle.c
|
// dmHandle.c
|
||||||
|
@ -43,11 +45,6 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// dmMonitor.c
|
|
||||||
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo);
|
|
||||||
void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo);
|
|
||||||
void dmSendMonitorReport(SDnodeMgmt *pMgmt);
|
|
||||||
|
|
||||||
// dmWorker.c
|
// dmWorker.c
|
||||||
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
|
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
|
||||||
|
|
|
@ -72,11 +72,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
taosRUnLockLatch(&pMgmt->pData->latch);
|
taosRUnLockLatch(&pMgmt->pData->latch);
|
||||||
|
|
||||||
SMonVloadInfo vinfo = {0};
|
SMonVloadInfo vinfo = {0};
|
||||||
dmGetVnodeLoads(pMgmt, &vinfo);
|
(*pMgmt->getVnodeLoadsFp)(&vinfo);
|
||||||
req.pVloads = vinfo.pVloads;
|
req.pVloads = vinfo.pVloads;
|
||||||
|
|
||||||
SMonMloadInfo minfo = {0};
|
SMonMloadInfo minfo = {0};
|
||||||
dmGetMnodeLoads(pMgmt, &minfo);
|
(*pMgmt->getMnodeLoadsFp)(&minfo);
|
||||||
|
|
||||||
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
|
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
|
||||||
void *pHead = rpcMallocCont(contLen);
|
void *pHead = rpcMallocCont(contLen);
|
||||||
|
@ -115,7 +115,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
|
||||||
|
|
||||||
SServerStatusRsp statusRsp = {0};
|
SServerStatusRsp statusRsp = {0};
|
||||||
SMonMloadInfo minfo = {0};
|
SMonMloadInfo minfo = {0};
|
||||||
dmGetMnodeLoads(pMgmt, &minfo);
|
(*pMgmt->getMnodeLoadsFp)(&minfo);
|
||||||
if (minfo.isMnode && minfo.load.syncState == TAOS_SYNC_STATE_ERROR) {
|
if (minfo.isMnode && minfo.load.syncState == TAOS_SYNC_STATE_ERROR) {
|
||||||
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
|
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
|
||||||
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
|
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
|
||||||
|
@ -123,7 +123,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SMonVloadInfo vinfo = {0};
|
SMonVloadInfo vinfo = {0};
|
||||||
dmGetVnodeLoads(pMgmt, &vinfo);
|
(*pMgmt->getVnodeLoadsFp)(&vinfo);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
|
||||||
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
|
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
|
||||||
if (pLoad->syncState == TAOS_SYNC_STATE_ERROR) {
|
if (pLoad->syncState == TAOS_SYNC_STATE_ERROR) {
|
||||||
|
|
|
@ -45,7 +45,9 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
pMgmt->name = pInput->name;
|
pMgmt->name = pInput->name;
|
||||||
pMgmt->processCreateNodeFp = pInput->processCreateNodeFp;
|
pMgmt->processCreateNodeFp = pInput->processCreateNodeFp;
|
||||||
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
|
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
|
||||||
pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp;
|
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
|
||||||
|
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
|
||||||
|
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
|
||||||
|
|
||||||
if (dmStartWorker(pMgmt) != 0) {
|
if (dmStartWorker(pMgmt) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -1,104 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
|
||||||
#include "dmInt.h"
|
|
||||||
|
|
||||||
#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \
|
|
||||||
if (!tsMultiProcess) { \
|
|
||||||
SRpcMsg rsp = {0}; \
|
|
||||||
SRpcMsg req = {.msgType = mtype}; \
|
|
||||||
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
|
|
||||||
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \
|
|
||||||
epset.eps[0].port = tsServerPort; \
|
|
||||||
rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \
|
|
||||||
if (rsp.code == 0 && rsp.contLen > 0) { \
|
|
||||||
func(rsp.pCont, rsp.contLen, pInfo); \
|
|
||||||
} \
|
|
||||||
rpcFreeCont(rsp.pCont); \
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) {
|
|
||||||
pInfo->protocol = 1;
|
|
||||||
pInfo->dnode_id = pMgmt->pData->dnodeId;
|
|
||||||
pInfo->cluster_id = pMgmt->pData->clusterId;
|
|
||||||
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) {
|
|
||||||
pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f);
|
|
||||||
pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(MNODE);
|
|
||||||
pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(QNODE);
|
|
||||||
pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(SNODE);
|
|
||||||
pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(BNODE);
|
|
||||||
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
|
|
||||||
pInfo->logdir.size = tsLogSpace.size;
|
|
||||||
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
|
|
||||||
pInfo->tempdir.size = tsTempSpace.size;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void dmGetMonitorInfo(SDnodeMgmt *pMgmt, SMonDmInfo *pInfo) {
|
|
||||||
dmGetMonitorBasicInfo(pMgmt, &pInfo->basic);
|
|
||||||
dmGetMonitorDnodeInfo(pMgmt, &pInfo->dnode);
|
|
||||||
dmGetMonitorSystemInfo(&pInfo->sys);
|
|
||||||
}
|
|
||||||
|
|
||||||
void dmSendMonitorReport(SDnodeMgmt *pMgmt) {
|
|
||||||
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
|
|
||||||
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
|
|
||||||
|
|
||||||
SMonDmInfo dmInfo = {0};
|
|
||||||
SMonMmInfo mmInfo = {0};
|
|
||||||
SMonVmInfo vmInfo = {0};
|
|
||||||
SMonQmInfo qmInfo = {0};
|
|
||||||
SMonSmInfo smInfo = {0};
|
|
||||||
SMonBmInfo bmInfo = {0};
|
|
||||||
|
|
||||||
dmGetMonitorInfo(pMgmt, &dmInfo);
|
|
||||||
dmSendLocalRecv(pMgmt, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo);
|
|
||||||
if (dmInfo.dnode.has_mnode) {
|
|
||||||
dmSendLocalRecv(pMgmt, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo);
|
|
||||||
}
|
|
||||||
if (dmInfo.dnode.has_qnode) {
|
|
||||||
dmSendLocalRecv(pMgmt, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo);
|
|
||||||
}
|
|
||||||
if (dmInfo.dnode.has_snode) {
|
|
||||||
dmSendLocalRecv(pMgmt, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo);
|
|
||||||
}
|
|
||||||
if (dmInfo.dnode.has_bnode) {
|
|
||||||
dmSendLocalRecv(pMgmt, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
monSetDmInfo(&dmInfo);
|
|
||||||
monSetMmInfo(&mmInfo);
|
|
||||||
monSetVmInfo(&vmInfo);
|
|
||||||
monSetQmInfo(&qmInfo);
|
|
||||||
monSetSmInfo(&smInfo);
|
|
||||||
monSetBmInfo(&bmInfo);
|
|
||||||
tFreeSMonMmInfo(&mmInfo);
|
|
||||||
tFreeSMonVmInfo(&vmInfo);
|
|
||||||
tFreeSMonQmInfo(&qmInfo);
|
|
||||||
tFreeSMonSmInfo(&smInfo);
|
|
||||||
tFreeSMonBmInfo(&bmInfo);
|
|
||||||
monSendReport();
|
|
||||||
}
|
|
||||||
|
|
||||||
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
|
||||||
dmSendLocalRecv(pMgmt, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
|
||||||
dmSendLocalRecv(pMgmt, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
|
|
||||||
}
|
|
|
@ -50,7 +50,7 @@ static void *dmMonitorThreadFp(void *param) {
|
||||||
int64_t curTime = taosGetTimestampMs();
|
int64_t curTime = taosGetTimestampMs();
|
||||||
float interval = (curTime - lastTime) / 1000.0f;
|
float interval = (curTime - lastTime) / 1000.0f;
|
||||||
if (interval >= tsMonitorInterval) {
|
if (interval >= tsMonitorInterval) {
|
||||||
dmSendMonitorReport(pMgmt);
|
(*pMgmt->sendMonitorReportFp)();
|
||||||
lastTime = curTime;
|
lastTime = curTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,6 +154,6 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dInfo("successed to write %s, deployed:%d", realfile, deployed);
|
dDebug("successed to write %s, deployed:%d", realfile, deployed);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,8 +16,13 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mmInt.h"
|
#include "mmInt.h"
|
||||||
|
|
||||||
static void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *mmInfo) {
|
void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
|
||||||
mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant);
|
mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
||||||
|
pInfo->isMnode = 1;
|
||||||
|
mndGetLoad(pMgmt->pMnode, &pInfo->load);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
|
@ -45,11 +50,6 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
|
|
||||||
pInfo->isMnode = 1;
|
|
||||||
mndGetLoad(pMgmt->pMnode, &pInfo->load);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
SMonMloadInfo mloads = {0};
|
SMonMloadInfo mloads = {0};
|
||||||
mmGetMnodeLoads(pMgmt, &mloads);
|
mmGetMnodeLoads(pMgmt, &mloads);
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "qmInt.h"
|
#include "qmInt.h"
|
||||||
|
|
||||||
static void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
|
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
|
||||||
|
|
||||||
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
SMonQmInfo qmInfo = {0};
|
SMonQmInfo qmInfo = {0};
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "smInt.h"
|
#include "smInt.h"
|
||||||
|
|
||||||
static void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
|
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
|
||||||
|
|
||||||
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
|
||||||
SMonSmInfo smInfo = {0};
|
SMonSmInfo smInfo = {0};
|
||||||
|
|
|
@ -128,7 +128,7 @@ int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t
|
||||||
|
|
||||||
*numOfVnodes = vnodesNum;
|
*numOfVnodes = vnodesNum;
|
||||||
code = 0;
|
code = 0;
|
||||||
dInfo("succcessed to read file %s", file);
|
dDebug("succcessed to read file %s", file);
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
if (content != NULL) taosMemoryFree(content);
|
if (content != NULL) taosMemoryFree(content);
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "vmInt.h"
|
#include "vmInt.h"
|
||||||
|
|
||||||
static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||||
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
|
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
|
||||||
if (pInfo->pVloads == NULL) return;
|
if (pInfo->pVloads == NULL) return;
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||||
taosRUnLockLatch(&pMgmt->latch);
|
taosRUnLockLatch(&pMgmt->latch);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||||
SMonVloadInfo vloads = {0};
|
SMonVloadInfo vloads = {0};
|
||||||
vmGetVnodeLoads(pMgmt, &vloads);
|
vmGetVnodeLoads(pMgmt, &vloads);
|
||||||
|
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef _TD_DND_IMP_H_
|
#ifndef _TD_DND_MGMT_H_
|
||||||
#define _TD_DND_IMP_H_
|
#define _TD_DND_MGMT_H_
|
||||||
|
|
||||||
// tobe deleted
|
// tobe deleted
|
||||||
#include "uv.h"
|
#include "uv.h"
|
||||||
|
@ -165,16 +165,13 @@ SMsgCb dmGetMsgcb(SDnode *pDnode);
|
||||||
int32_t dmInitMsgHandle(SDnode *pDnode);
|
int32_t dmInitMsgHandle(SDnode *pDnode);
|
||||||
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
|
||||||
|
|
||||||
// mgmt nodes
|
// dmMonitor.c
|
||||||
SMgmtFunc dmGetMgmtFunc();
|
void dmSendMonitorReport();
|
||||||
SMgmtFunc bmGetMgmtFunc();
|
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
|
||||||
SMgmtFunc qmGetMgmtFunc();
|
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
|
||||||
SMgmtFunc smGetMgmtFunc();
|
|
||||||
SMgmtFunc vmGetMgmtFunc();
|
|
||||||
SMgmtFunc mmGetMgmtFunc();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_DND_IMP_H_*/
|
#endif /*_TD_DND_MGMT_H_*/
|
|
@ -0,0 +1,45 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_DND_NODES_H_
|
||||||
|
#define _TD_DND_NODES_H_
|
||||||
|
|
||||||
|
#include "dmInt.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
SMgmtFunc dmGetMgmtFunc();
|
||||||
|
SMgmtFunc bmGetMgmtFunc();
|
||||||
|
SMgmtFunc qmGetMgmtFunc();
|
||||||
|
SMgmtFunc smGetMgmtFunc();
|
||||||
|
SMgmtFunc vmGetMgmtFunc();
|
||||||
|
SMgmtFunc mmGetMgmtFunc();
|
||||||
|
|
||||||
|
void mmGetMonitorInfo(void *pMgmt, SMonMmInfo *pInfo);
|
||||||
|
void vmGetMonitorInfo(void *pMgmt, SMonVmInfo *pInfo);
|
||||||
|
void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo);
|
||||||
|
void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo);
|
||||||
|
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
|
||||||
|
|
||||||
|
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo);
|
||||||
|
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_DND_NODES_H_*/
|
|
@ -168,11 +168,6 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool dmIsNodeRequired(EDndNodeType ntype) {
|
|
||||||
SDnode *pDnode = dmInstance();
|
|
||||||
return pDnode->wrappers[ntype].required;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
||||||
SMgmtInputOpt opt = {
|
SMgmtInputOpt opt = {
|
||||||
.path = pWrapper->path,
|
.path = pWrapper->path,
|
||||||
|
@ -180,7 +175,9 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
|
||||||
.pData = &pWrapper->pDnode->data,
|
.pData = &pWrapper->pDnode->data,
|
||||||
.processCreateNodeFp = dmProcessCreateNodeReq,
|
.processCreateNodeFp = dmProcessCreateNodeReq,
|
||||||
.processDropNodeFp = dmProcessDropNodeReq,
|
.processDropNodeFp = dmProcessDropNodeReq,
|
||||||
.isNodeRequiredFp = dmIsNodeRequired,
|
.sendMonitorReportFp = dmSendMonitorReport,
|
||||||
|
.getVnodeLoadsFp = dmGetVnodeLoads,
|
||||||
|
.getMnodeLoadsFp = dmGetMnodeLoads,
|
||||||
};
|
};
|
||||||
|
|
||||||
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
|
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "dmMgmt.h"
|
#include "dmMgmt.h"
|
||||||
|
#include "dmNodes.h"
|
||||||
|
|
||||||
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
|
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
|
||||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||||
|
@ -189,7 +190,7 @@ int32_t dmInitDnode(SDnode *pDnode, EDndNodeType rtype) {
|
||||||
}
|
}
|
||||||
|
|
||||||
dmReportStartup("dnode-transport", "initialized");
|
dmReportStartup("dnode-transport", "initialized");
|
||||||
dInfo("dnode is created, ptr:%p", pDnode);
|
dDebug("dnode is created, ptr:%p", pDnode);
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
|
@ -208,7 +209,7 @@ void dmCleanupDnode(SDnode *pDnode) {
|
||||||
dmCleanupClient(pDnode);
|
dmCleanupClient(pDnode);
|
||||||
dmCleanupServer(pDnode);
|
dmCleanupServer(pDnode);
|
||||||
dmClearVars(pDnode);
|
dmClearVars(pDnode);
|
||||||
dInfo("dnode is closed, ptr:%p", pDnode);
|
dDebug("dnode is closed, ptr:%p", pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
|
void dmSetStatus(SDnode *pDnode, EDndRunStatus status) {
|
||||||
|
|
|
@ -0,0 +1,170 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "dmMgmt.h"
|
||||||
|
#include "dmNodes.h"
|
||||||
|
|
||||||
|
#define dmSendLocalRecv(pDnode, mtype, func, pInfo) \
|
||||||
|
SRpcMsg rsp = {0}; \
|
||||||
|
SRpcMsg req = {.msgType = mtype}; \
|
||||||
|
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
|
||||||
|
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \
|
||||||
|
epset.eps[0].port = tsServerPort; \
|
||||||
|
rpcSendRecv(pDnode->trans.clientRpc, &epset, &req, &rsp); \
|
||||||
|
if (rsp.code == 0 && rsp.contLen > 0) { \
|
||||||
|
func(rsp.pCont, rsp.contLen, pInfo); \
|
||||||
|
} \
|
||||||
|
rpcFreeCont(rsp.pCont);
|
||||||
|
|
||||||
|
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
|
||||||
|
pInfo->protocol = 1;
|
||||||
|
pInfo->dnode_id = pDnode->data.dnodeId;
|
||||||
|
pInfo->cluster_id = pDnode->data.clusterId;
|
||||||
|
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
|
||||||
|
pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f);
|
||||||
|
pInfo->has_mnode = pDnode->wrappers[MNODE].required;
|
||||||
|
pInfo->has_qnode = pDnode->wrappers[QNODE].required;
|
||||||
|
pInfo->has_snode = pDnode->wrappers[SNODE].required;
|
||||||
|
pInfo->has_bnode = pDnode->wrappers[BNODE].required;
|
||||||
|
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
|
||||||
|
pInfo->logdir.size = tsLogSpace.size;
|
||||||
|
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
|
||||||
|
pInfo->tempdir.size = tsTempSpace.size;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dmGetDmMonitorInfo(SDnode *pDnode) {
|
||||||
|
SMonDmInfo dmInfo = {0};
|
||||||
|
dmGetMonitorBasicInfo(pDnode, &dmInfo.basic);
|
||||||
|
dmGetMonitorDnodeInfo(pDnode, &dmInfo.dnode);
|
||||||
|
dmGetMonitorSystemInfo(&dmInfo.sys);
|
||||||
|
monSetDmInfo(&dmInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dmGetMmMonitorInfo(SDnode *pDnode) {
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
|
||||||
|
if (dmMarkWrapper(pWrapper) == 0) {
|
||||||
|
SMonMmInfo mmInfo = {0};
|
||||||
|
if (tsMultiProcess) {
|
||||||
|
dmSendLocalRecv(pDnode, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo);
|
||||||
|
} else if (pWrapper->pMgmt != NULL) {
|
||||||
|
mmGetMonitorInfo(pWrapper->pMgmt, &mmInfo);
|
||||||
|
}
|
||||||
|
dmReleaseWrapper(pWrapper);
|
||||||
|
monSetMmInfo(&mmInfo);
|
||||||
|
tFreeSMonMmInfo(&mmInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dmGetVmMonitorInfo(SDnode *pDnode) {
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
|
||||||
|
if (dmMarkWrapper(pWrapper) == 0) {
|
||||||
|
SMonVmInfo vmInfo = {0};
|
||||||
|
if (tsMultiProcess) {
|
||||||
|
dmSendLocalRecv(pDnode, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo);
|
||||||
|
} else if (pWrapper->pMgmt != NULL) {
|
||||||
|
vmGetMonitorInfo(pWrapper->pMgmt, &vmInfo);
|
||||||
|
}
|
||||||
|
dmReleaseWrapper(pWrapper);
|
||||||
|
monSetVmInfo(&vmInfo);
|
||||||
|
tFreeSMonVmInfo(&vmInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dmGetQmMonitorInfo(SDnode *pDnode) {
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[QNODE];
|
||||||
|
if (dmMarkWrapper(pWrapper) == 0) {
|
||||||
|
SMonQmInfo qmInfo = {0};
|
||||||
|
if (tsMultiProcess) {
|
||||||
|
dmSendLocalRecv(pDnode, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo);
|
||||||
|
} else if (pWrapper->pMgmt != NULL) {
|
||||||
|
qmGetMonitorInfo(pWrapper->pMgmt, &qmInfo);
|
||||||
|
}
|
||||||
|
dmReleaseWrapper(pWrapper);
|
||||||
|
monSetQmInfo(&qmInfo);
|
||||||
|
tFreeSMonQmInfo(&qmInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dmGetSmMonitorInfo(SDnode *pDnode) {
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[SNODE];
|
||||||
|
if (dmMarkWrapper(pWrapper) == 0) {
|
||||||
|
SMonSmInfo smInfo = {0};
|
||||||
|
if (tsMultiProcess) {
|
||||||
|
dmSendLocalRecv(pDnode, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo);
|
||||||
|
} else if (pWrapper->pMgmt != NULL) {
|
||||||
|
smGetMonitorInfo(pWrapper->pMgmt, &smInfo);
|
||||||
|
}
|
||||||
|
dmReleaseWrapper(pWrapper);
|
||||||
|
monSetSmInfo(&smInfo);
|
||||||
|
tFreeSMonSmInfo(&smInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void dmGetBmMonitorInfo(SDnode *pDnode) {
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[BNODE];
|
||||||
|
if (dmMarkWrapper(pWrapper) == 0) {
|
||||||
|
SMonBmInfo bmInfo = {0};
|
||||||
|
if (tsMultiProcess) {
|
||||||
|
dmSendLocalRecv(pDnode, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo);
|
||||||
|
} else if (pWrapper->pMgmt != NULL) {
|
||||||
|
bmGetMonitorInfo(pWrapper->pMgmt, &bmInfo);
|
||||||
|
}
|
||||||
|
dmReleaseWrapper(pWrapper);
|
||||||
|
monSetBmInfo(&bmInfo);
|
||||||
|
tFreeSMonBmInfo(&bmInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void dmSendMonitorReport() {
|
||||||
|
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
|
||||||
|
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
|
||||||
|
|
||||||
|
SDnode *pDnode = dmInstance();
|
||||||
|
dmGetDmMonitorInfo(pDnode);
|
||||||
|
dmGetMmMonitorInfo(pDnode);
|
||||||
|
dmGetVmMonitorInfo(pDnode);
|
||||||
|
dmGetQmMonitorInfo(pDnode);
|
||||||
|
dmGetSmMonitorInfo(pDnode);
|
||||||
|
dmGetBmMonitorInfo(pDnode);
|
||||||
|
monSendReport();
|
||||||
|
}
|
||||||
|
|
||||||
|
void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
|
||||||
|
SDnode *pDnode = dmInstance();
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];
|
||||||
|
if (dmMarkWrapper(pWrapper) == 0) {
|
||||||
|
if (tsMultiProcess) {
|
||||||
|
dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
|
||||||
|
} else if (pWrapper->pMgmt != NULL) {
|
||||||
|
vmGetVnodeLoads(pWrapper->pMgmt, pInfo);
|
||||||
|
}
|
||||||
|
dmReleaseWrapper(pWrapper);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
|
||||||
|
SDnode *pDnode = dmInstance();
|
||||||
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[MNODE];
|
||||||
|
if (tsMultiProcess) {
|
||||||
|
dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
|
||||||
|
} else if (pWrapper->pMgmt != NULL) {
|
||||||
|
mmGetMnodeLoads(pWrapper->pMgmt, pInfo);
|
||||||
|
}
|
||||||
|
dmReleaseWrapper(pWrapper);
|
||||||
|
}
|
|
@ -89,21 +89,23 @@ typedef enum {
|
||||||
|
|
||||||
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
||||||
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
|
||||||
typedef bool (*IsNodeRequiredFp)(EDndNodeType ntype);
|
typedef void (*SendMonitorReportFp)();
|
||||||
|
typedef void (*GetVnodeLoadsFp)();
|
||||||
|
typedef void (*GetMnodeLoadsFp)();
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
int64_t clusterId;
|
int64_t clusterId;
|
||||||
int64_t dnodeVer;
|
int64_t dnodeVer;
|
||||||
int64_t updateTime;
|
int64_t updateTime;
|
||||||
int64_t rebootTime;
|
int64_t rebootTime;
|
||||||
bool dropped;
|
bool dropped;
|
||||||
bool stopped;
|
bool stopped;
|
||||||
SEpSet mnodeEps;
|
SEpSet mnodeEps;
|
||||||
SArray *dnodeEps;
|
SArray *dnodeEps;
|
||||||
SHashObj *dnodeHash;
|
SHashObj *dnodeHash;
|
||||||
SRWLatch latch;
|
SRWLatch latch;
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
} SDnodeData;
|
} SDnodeData;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -113,7 +115,9 @@ typedef struct {
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
ProcessCreateNodeFp processCreateNodeFp;
|
ProcessCreateNodeFp processCreateNodeFp;
|
||||||
ProcessDropNodeFp processDropNodeFp;
|
ProcessDropNodeFp processDropNodeFp;
|
||||||
IsNodeRequiredFp isNodeRequiredFp;
|
SendMonitorReportFp sendMonitorReportFp;
|
||||||
|
GetVnodeLoadsFp getVnodeLoadsFp;
|
||||||
|
GetMnodeLoadsFp getMnodeLoadsFp;
|
||||||
} SMgmtInputOpt;
|
} SMgmtInputOpt;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -365,6 +365,8 @@ typedef struct {
|
||||||
int64_t uid;
|
int64_t uid;
|
||||||
int64_t dbUid;
|
int64_t dbUid;
|
||||||
int32_t version;
|
int32_t version;
|
||||||
|
int32_t tagVer;
|
||||||
|
int32_t colVer;
|
||||||
int32_t nextColId;
|
int32_t nextColId;
|
||||||
float xFilesFactor;
|
float xFilesFactor;
|
||||||
int32_t delay;
|
int32_t delay;
|
||||||
|
|
|
@ -88,6 +88,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
|
||||||
SDB_SET_INT64(pRaw, dataPos, pStb->uid, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pStb->uid, _OVER)
|
||||||
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER)
|
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->version, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pStb->version, _OVER)
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER)
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), _OVER)
|
SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), _OVER)
|
||||||
SDB_SET_INT32(pRaw, dataPos, pStb->delay, _OVER)
|
SDB_SET_INT32(pRaw, dataPos, pStb->delay, _OVER)
|
||||||
|
@ -166,6 +168,8 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pStb->uid, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pStb->uid, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pStb->version, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pStb->version, _OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER)
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER)
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER)
|
||||||
int32_t xFilesFactor = 0;
|
int32_t xFilesFactor = 0;
|
||||||
SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, _OVER)
|
||||||
|
@ -317,6 +321,8 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
|
||||||
|
|
||||||
pOld->updateTime = pNew->updateTime;
|
pOld->updateTime = pNew->updateTime;
|
||||||
pOld->version = pNew->version;
|
pOld->version = pNew->version;
|
||||||
|
pOld->tagVer = pNew->tagVer;
|
||||||
|
pOld->colVer = pNew->colVer;
|
||||||
pOld->nextColId = pNew->nextColId;
|
pOld->nextColId = pNew->nextColId;
|
||||||
pOld->ttl = pNew->ttl;
|
pOld->ttl = pNew->ttl;
|
||||||
pOld->numOfColumns = pNew->numOfColumns;
|
pOld->numOfColumns = pNew->numOfColumns;
|
||||||
|
@ -384,6 +390,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
|
||||||
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
|
req.rollup = pStb->ast1Len > 0 ? 1 : 0;
|
||||||
req.schema.nCols = pStb->numOfColumns;
|
req.schema.nCols = pStb->numOfColumns;
|
||||||
req.schema.sver = pStb->version;
|
req.schema.sver = pStb->version;
|
||||||
|
req.schema.tagVer = pStb->tagVer;
|
||||||
|
req.schema.colVer = pStb->colVer;
|
||||||
req.schema.pSchema = pStb->pColumns;
|
req.schema.pSchema = pStb->pColumns;
|
||||||
req.schemaTag.nCols = pStb->numOfTags;
|
req.schemaTag.nCols = pStb->numOfTags;
|
||||||
req.schemaTag.sver = 1;
|
req.schemaTag.sver = 1;
|
||||||
|
@ -657,6 +665,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
||||||
pDst->uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
pDst->uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||||
pDst->dbUid = pDb->uid;
|
pDst->dbUid = pDb->uid;
|
||||||
pDst->version = 1;
|
pDst->version = 1;
|
||||||
|
pDst->tagVer = 1;
|
||||||
|
pDst->colVer = 1;
|
||||||
pDst->nextColId = 1;
|
pDst->nextColId = 1;
|
||||||
pDst->xFilesFactor = pCreate->xFilesFactor;
|
pDst->xFilesFactor = pCreate->xFilesFactor;
|
||||||
pDst->delay = pCreate->delay;
|
pDst->delay = pCreate->delay;
|
||||||
|
@ -949,6 +959,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
||||||
}
|
}
|
||||||
|
|
||||||
pNew->version++;
|
pNew->version++;
|
||||||
|
pNew->tagVer++;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -967,6 +978,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
|
||||||
pNew->numOfTags--;
|
pNew->numOfTags--;
|
||||||
|
|
||||||
pNew->version++;
|
pNew->version++;
|
||||||
|
pNew->tagVer++;
|
||||||
mDebug("stb:%s, start to drop tag %s", pNew->name, tagName);
|
mDebug("stb:%s, start to drop tag %s", pNew->name, tagName);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1007,6 +1019,7 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF
|
||||||
memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
|
memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN);
|
||||||
|
|
||||||
pNew->version++;
|
pNew->version++;
|
||||||
|
pNew->tagVer++;
|
||||||
mDebug("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
|
mDebug("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1036,6 +1049,7 @@ static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SFi
|
||||||
|
|
||||||
pTag->bytes = pField->bytes;
|
pTag->bytes = pField->bytes;
|
||||||
pNew->version++;
|
pNew->version++;
|
||||||
|
pNew->tagVer++;
|
||||||
|
|
||||||
mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes);
|
mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1075,6 +1089,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
|
||||||
}
|
}
|
||||||
|
|
||||||
pNew->version++;
|
pNew->version++;
|
||||||
|
pNew->colVer++;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1103,6 +1118,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
|
||||||
pNew->numOfColumns--;
|
pNew->numOfColumns--;
|
||||||
|
|
||||||
pNew->version++;
|
pNew->version++;
|
||||||
|
pNew->colVer++;
|
||||||
mDebug("stb:%s, start to drop col %s", pNew->name, colName);
|
mDebug("stb:%s, start to drop col %s", pNew->name, colName);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1141,6 +1157,7 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
|
||||||
|
|
||||||
pCol->bytes = pField->bytes;
|
pCol->bytes = pField->bytes;
|
||||||
pNew->version++;
|
pNew->version++;
|
||||||
|
pNew->colVer++;
|
||||||
|
|
||||||
mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes);
|
mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1300,6 +1317,13 @@ static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (alterReq.verInBlock > 0 && alterReq.verInBlock <= pStb->version) {
|
||||||
|
mDebug("stb:%s, already exist, verInBlock:%d smaller than verInStb:%d, alter success", alterReq.name,
|
||||||
|
alterReq.verInBlock, pStb->version);
|
||||||
|
code = 0;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
pUser = mndAcquireUser(pMnode, pReq->conn.user);
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
|
@ -32,7 +32,7 @@ class MndTestStb : public ::testing::Test {
|
||||||
void* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, int32_t* pContLen);
|
void* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, int32_t* pContLen);
|
||||||
void* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
void* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
||||||
void* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
void* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen);
|
||||||
void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen);
|
void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen, int32_t verInBlock);
|
||||||
};
|
};
|
||||||
|
|
||||||
Testbase MndTestStb::test;
|
Testbase MndTestStb::test;
|
||||||
|
@ -271,12 +271,13 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co
|
||||||
}
|
}
|
||||||
|
|
||||||
void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
|
void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes,
|
||||||
int32_t* pContLen) {
|
int32_t* pContLen, int32_t verInBlock) {
|
||||||
SMAlterStbReq req = {0};
|
SMAlterStbReq req = {0};
|
||||||
strcpy(req.name, stbname);
|
strcpy(req.name, stbname);
|
||||||
req.numOfFields = 1;
|
req.numOfFields = 1;
|
||||||
req.pFields = taosArrayInit(1, sizeof(SField));
|
req.pFields = taosArrayInit(1, sizeof(SField));
|
||||||
req.alterType = TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES;
|
req.alterType = TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES;
|
||||||
|
req.verInBlock = verInBlock;
|
||||||
|
|
||||||
SField field = {0};
|
SField field = {0};
|
||||||
field.bytes = bytes;
|
field.bytes = bytes;
|
||||||
|
@ -781,31 +782,40 @@ TEST_F(MndTestStb, 08_Alter_Stb_AlterTagBytes) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col5", 12, &contLen);
|
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col5", 12, &contLen, 0);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_COLUMN_NOT_EXIST);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_COLUMN_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "ts", 8, &contLen);
|
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "ts", 8, &contLen, 0);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_STB_OPTION);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_STB_OPTION);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 8, &contLen);
|
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 8, &contLen, 0);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", TSDB_MAX_BYTES_PER_ROW, &contLen);
|
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", TSDB_MAX_BYTES_PER_ROW, &contLen, 0);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES);
|
ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 20, &contLen);
|
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 20, &contLen, 0);
|
||||||
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
||||||
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
|
||||||
|
test.SendShowReq(TSDB_MGMT_TABLE_STB, "user_stables", dbname);
|
||||||
|
EXPECT_EQ(test.GetShowRows(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col_not_exist", 20, &contLen, 1);
|
||||||
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen);
|
||||||
ASSERT_EQ(pRsp->code, 0);
|
ASSERT_EQ(pRsp->code, 0);
|
||||||
|
|
||||||
|
|
|
@ -100,6 +100,10 @@ int32_t fmGetFuncInfo(SFmGetFuncInfoParam* pParam, SFunctionNode* pFunc) {
|
||||||
return getUdfInfo(pParam, pFunc);
|
return getUdfInfo(pParam, pFunc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool fmIsBuiltinFunc(const char* pFunc) {
|
||||||
|
return NULL != taosHashGet(gFunMgtService.pFuncNameHashTable, pFunc, strlen(pFunc));
|
||||||
|
}
|
||||||
|
|
||||||
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
EFuncDataRequired fmFuncDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||||
if (fmIsUserDefinedFunc(pFunc->funcId) || pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
if (fmIsUserDefinedFunc(pFunc->funcId) || pFunc->funcId < 0 || pFunc->funcId >= funcMgtBuiltinsNum) {
|
||||||
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
return FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
|
|
@ -946,6 +946,23 @@ static int32_t jsonToLogicSubplan(const SJson* pJson, void* pObj) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const char* jkLogicPlanSubplans = "Subplans";
|
||||||
|
|
||||||
|
static int32_t logicPlanToJson(const void* pObj, SJson* pJson) {
|
||||||
|
const SQueryLogicPlan* pNode = (const SQueryLogicPlan*)pObj;
|
||||||
|
return tjsonAddObject(pJson, jkLogicPlanSubplans, nodeToJson, nodesListGetNode(pNode->pTopSubplans, 0));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t jsonToLogicPlan(const SJson* pJson, void* pObj) {
|
||||||
|
SQueryLogicPlan* pNode = (SQueryLogicPlan*)pObj;
|
||||||
|
SNode* pChild = NULL;
|
||||||
|
int32_t code = jsonToNodeObject(pJson, jkLogicPlanSubplans, &pChild);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesListMakeStrictAppend(&pNode->pTopSubplans, pChild);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static const char* jkJoinLogicPlanJoinType = "JoinType";
|
static const char* jkJoinLogicPlanJoinType = "JoinType";
|
||||||
static const char* jkJoinLogicPlanOnConditions = "OnConditions";
|
static const char* jkJoinLogicPlanOnConditions = "OnConditions";
|
||||||
|
|
||||||
|
@ -3029,7 +3046,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
case QUERY_NODE_LOGIC_SUBPLAN:
|
case QUERY_NODE_LOGIC_SUBPLAN:
|
||||||
return logicSubplanToJson(pObj, pJson);
|
return logicSubplanToJson(pObj, pJson);
|
||||||
case QUERY_NODE_LOGIC_PLAN:
|
case QUERY_NODE_LOGIC_PLAN:
|
||||||
break;
|
return logicPlanToJson(pObj, pJson);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||||
return physiTagScanNodeToJson(pObj, pJson);
|
return physiTagScanNodeToJson(pObj, pJson);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||||
|
@ -3126,6 +3143,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
||||||
return jsonToLogicPartitionNode(pJson, pObj);
|
return jsonToLogicPartitionNode(pJson, pObj);
|
||||||
case QUERY_NODE_LOGIC_SUBPLAN:
|
case QUERY_NODE_LOGIC_SUBPLAN:
|
||||||
return jsonToLogicSubplan(pJson, pObj);
|
return jsonToLogicSubplan(pJson, pObj);
|
||||||
|
case QUERY_NODE_LOGIC_PLAN:
|
||||||
|
return jsonToLogicPlan(pJson, pObj);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||||
return jsonToPhysiTagScanNode(pJson, pObj);
|
return jsonToPhysiTagScanNode(pJson, pObj);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
|
||||||
|
|
|
@ -196,6 +196,15 @@ static bool checkIndexName(SAstCreateContext* pCxt, SToken* pIndexName) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool checkComment(SAstCreateContext* pCxt, const SToken* pCommentToken, bool demand) {
|
||||||
|
if (NULL == pCommentToken) {
|
||||||
|
pCxt->errCode = demand ? TSDB_CODE_PAR_SYNTAX_ERROR : TSDB_CODE_SUCCESS;
|
||||||
|
} else if (pCommentToken->n >= (TSDB_TB_COMMENT_LEN + 2)) {
|
||||||
|
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_COMMENT_TOO_LONG);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS == pCxt->errCode;
|
||||||
|
}
|
||||||
|
|
||||||
SNode* createRawExprNode(SAstCreateContext* pCxt, const SToken* pToken, SNode* pNode) {
|
SNode* createRawExprNode(SAstCreateContext* pCxt, const SToken* pToken, SNode* pNode) {
|
||||||
SRawExprNode* target = (SRawExprNode*)nodesMakeNode(QUERY_NODE_RAW_EXPR);
|
SRawExprNode* target = (SRawExprNode*)nodesMakeNode(QUERY_NODE_RAW_EXPR);
|
||||||
CHECK_OUT_OF_MEM(target);
|
CHECK_OUT_OF_MEM(target);
|
||||||
|
@ -823,8 +832,10 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) {
|
||||||
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal) {
|
SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType type, void* pVal) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case TABLE_OPTION_COMMENT:
|
case TABLE_OPTION_COMMENT:
|
||||||
copyStringFormStringToken((SToken*)pVal, ((STableOptions*)pOptions)->comment,
|
if (checkComment(pCxt, (SToken*)pVal, true)) {
|
||||||
sizeof(((STableOptions*)pOptions)->comment));
|
copyStringFormStringToken((SToken*)pVal, ((STableOptions*)pOptions)->comment,
|
||||||
|
sizeof(((STableOptions*)pOptions)->comment));
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
case TABLE_OPTION_DELAY:
|
case TABLE_OPTION_DELAY:
|
||||||
((STableOptions*)pOptions)->delay = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
|
((STableOptions*)pOptions)->delay = taosStr2Int32(((SToken*)pVal)->z, NULL, 10);
|
||||||
|
@ -848,7 +859,7 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType
|
||||||
}
|
}
|
||||||
|
|
||||||
SNode* createColumnDefNode(SAstCreateContext* pCxt, SToken* pColName, SDataType dataType, const SToken* pComment) {
|
SNode* createColumnDefNode(SAstCreateContext* pCxt, SToken* pColName, SDataType dataType, const SToken* pComment) {
|
||||||
if (!checkColumnName(pCxt, pColName)) {
|
if (!checkColumnName(pCxt, pColName) || !checkComment(pCxt, pComment, false)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
SColumnDefNode* pCol = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF);
|
SColumnDefNode* pCol = (SColumnDefNode*)nodesMakeNode(QUERY_NODE_COLUMN_DEF);
|
||||||
|
|
|
@ -272,6 +272,10 @@ static bool isTimelineFunc(const SNode* pNode) {
|
||||||
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsTimelineFunc(((SFunctionNode*)pNode)->funcId));
|
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsTimelineFunc(((SFunctionNode*)pNode)->funcId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isScanPseudoColumnFunc(const SNode* pNode) {
|
||||||
|
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsScanPseudoColumnFunc(((SFunctionNode*)pNode)->funcId));
|
||||||
|
}
|
||||||
|
|
||||||
static bool isDistinctOrderBy(STranslateContext* pCxt) {
|
static bool isDistinctOrderBy(STranslateContext* pCxt) {
|
||||||
return (SQL_CLAUSE_ORDER_BY == pCxt->currClause && pCxt->pCurrStmt->isDistinct);
|
return (SQL_CLAUSE_ORDER_BY == pCxt->currClause && pCxt->pCurrStmt->isDistinct);
|
||||||
}
|
}
|
||||||
|
@ -892,7 +896,7 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||||
if (pCxt->selectFuncNum > 1) {
|
if (pCxt->selectFuncNum > 1) {
|
||||||
return generateDealNodeErrMsg(pCxt->pTranslateCxt, getGroupByErrorCode(pCxt->pTranslateCxt));
|
return generateDealNodeErrMsg(pCxt->pTranslateCxt, getGroupByErrorCode(pCxt->pTranslateCxt));
|
||||||
} else {
|
} else {
|
||||||
|
@ -930,7 +934,7 @@ static EDealRes rewriteColsToSelectValFuncImpl(SNode** pNode, void* pContext) {
|
||||||
if (isAggFunc(*pNode)) {
|
if (isAggFunc(*pNode)) {
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
|
||||||
return rewriteColToSelectValFunc((STranslateContext*)pContext, NULL, pNode);
|
return rewriteColToSelectValFunc((STranslateContext*)pContext, NULL, pNode);
|
||||||
}
|
}
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
|
@ -958,7 +962,7 @@ static EDealRes doCheckAggColCoexist(SNode* pNode, void* pContext) {
|
||||||
pCxt->existAggFunc = true;
|
pCxt->existAggFunc = true;
|
||||||
return DEAL_RES_IGNORE_CHILD;
|
return DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (isScanPseudoColumnFunc(pNode) || QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
pCxt->existCol = true;
|
pCxt->existCol = true;
|
||||||
}
|
}
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
|
@ -3261,6 +3265,9 @@ static int32_t readFromFile(char* pName, int32_t* len, char** buf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionStmt* pStmt) {
|
static int32_t translateCreateFunction(STranslateContext* pCxt, SCreateFunctionStmt* pStmt) {
|
||||||
|
if (fmIsBuiltinFunc(pStmt->funcName)) {
|
||||||
|
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FUNCTION_NAME);
|
||||||
|
}
|
||||||
SCreateFuncReq req = {0};
|
SCreateFuncReq req = {0};
|
||||||
strcpy(req.name, pStmt->funcName);
|
strcpy(req.name, pStmt->funcName);
|
||||||
req.igExists = pStmt->ignoreExists;
|
req.igExists = pStmt->ignoreExists;
|
||||||
|
|
|
@ -160,6 +160,10 @@ static char* getSyntaxErrFormat(int32_t errCode) {
|
||||||
return "Only binary/nchar column length could be modified";
|
return "Only binary/nchar column length could be modified";
|
||||||
case TSDB_CODE_PAR_INVALID_TBNAME:
|
case TSDB_CODE_PAR_INVALID_TBNAME:
|
||||||
return "Invalid tbname pseudo column";
|
return "Invalid tbname pseudo column";
|
||||||
|
case TSDB_CODE_PAR_INVALID_FUNCTION_NAME:
|
||||||
|
return "Invalid function name";
|
||||||
|
case TSDB_CODE_PAR_COMMENT_TOO_LONG:
|
||||||
|
return "Comment too long";
|
||||||
case TSDB_CODE_OUT_OF_MEMORY:
|
case TSDB_CODE_OUT_OF_MEMORY:
|
||||||
return "Out of memory";
|
return "Out of memory";
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
#define SPLIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||||
|
|
||||||
typedef struct SSplitContext {
|
typedef struct SSplitContext {
|
||||||
|
int32_t queryId;
|
||||||
int32_t groupId;
|
int32_t groupId;
|
||||||
bool split;
|
bool split;
|
||||||
} SSplitContext;
|
} SSplitContext;
|
||||||
|
@ -62,6 +63,7 @@ static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode*
|
||||||
if (NULL == pSubplan) {
|
if (NULL == pSubplan) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
pSubplan->id.queryId = pCxt->queryId;
|
||||||
pSubplan->id.groupId = pCxt->groupId;
|
pSubplan->id.groupId = pCxt->groupId;
|
||||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||||
pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan);
|
pSubplan->pNode = (SLogicNode*)nodesCloneNode(pScan);
|
||||||
|
@ -242,6 +244,7 @@ static SLogicSubplan* unionCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode)
|
||||||
if (NULL == pSubplan) {
|
if (NULL == pSubplan) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
pSubplan->id.queryId = pCxt->queryId;
|
||||||
pSubplan->id.groupId = pCxt->groupId;
|
pSubplan->id.groupId = pCxt->groupId;
|
||||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||||
pSubplan->pNode = pNode;
|
pSubplan->pNode = pNode;
|
||||||
|
@ -406,7 +409,7 @@ static const SSplitRule splitRuleSet[] = {{.pName = "SuperTableScan", .splitFunc
|
||||||
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
|
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
|
||||||
|
|
||||||
static int32_t applySplitRule(SLogicSubplan* pSubplan) {
|
static int32_t applySplitRule(SLogicSubplan* pSubplan) {
|
||||||
SSplitContext cxt = {.groupId = pSubplan->id.groupId + 1, .split = false};
|
SSplitContext cxt = {.queryId = pSubplan->id.queryId, .groupId = pSubplan->id.groupId + 1, .split = false};
|
||||||
do {
|
do {
|
||||||
cxt.split = false;
|
cxt.split = false;
|
||||||
for (int32_t i = 0; i < splitRuleNum; ++i) {
|
for (int32_t i = 0; i < splitRuleNum; ++i) {
|
||||||
|
|
|
@ -32,6 +32,15 @@ TEST_F(PlanSetOpTest, unionAllSubquery) {
|
||||||
run("SELECT * FROM (SELECT c1, c2 FROM t1 UNION ALL SELECT c1, c2 FROM t1)");
|
run("SELECT * FROM (SELECT c1, c2 FROM t1 UNION ALL SELECT c1, c2 FROM t1)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(PlanSetOpTest, unionAllWithSubquery) {
|
||||||
|
useDb("root", "test");
|
||||||
|
|
||||||
|
// child table
|
||||||
|
run("SELECT ts FROM (SELECT ts FROM st1s1) UNION ALL SELECT ts FROM (SELECT ts FROM st1s2)");
|
||||||
|
// super table
|
||||||
|
run("SELECT ts FROM (SELECT ts FROM st1) UNION ALL SELECT ts FROM (SELECT ts FROM st1)");
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(PlanSetOpTest, union) {
|
TEST_F(PlanSetOpTest, union) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
|
|
|
@ -322,6 +322,7 @@ class PlannerTestBaseImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
|
void setPlanContext(SQuery* pQuery, SPlanContext* pCxt) {
|
||||||
|
pCxt->queryId = 1;
|
||||||
if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) {
|
if (QUERY_NODE_CREATE_TOPIC_STMT == nodeType(pQuery->pRoot)) {
|
||||||
pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery;
|
pCxt->pAstRoot = ((SCreateTopicStmt*)pQuery->pRoot)->pQuery;
|
||||||
pCxt->topicQuery = true;
|
pCxt->topicQuery = true;
|
||||||
|
|
Loading…
Reference in New Issue