other: merge 3.0 branch.
This commit is contained in:
commit
6a245710c9
|
@ -2305,6 +2305,24 @@ static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char indexFName[TSDB_INDEX_FNAME_LEN];
|
||||||
|
} SUserIndexReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);
|
||||||
|
int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
|
char tblFName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
char colName[TSDB_COL_NAME_LEN];
|
||||||
|
char indexType[TSDB_INDEX_TYPE_LEN];
|
||||||
|
char indexExts[TSDB_INDEX_EXTS_LEN];
|
||||||
|
} SUserIndexRsp;
|
||||||
|
|
||||||
|
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp);
|
||||||
|
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t mqMsgType;
|
int8_t mqMsgType;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
|
@ -156,6 +156,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL)
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_MSG)
|
||||||
|
|
|
@ -78,6 +78,7 @@ typedef struct SDbVgVersion {
|
||||||
} SDbVgVersion;
|
} SDbVgVersion;
|
||||||
|
|
||||||
typedef SDbCfgRsp SDbCfgInfo;
|
typedef SDbCfgRsp SDbCfgInfo;
|
||||||
|
typedef SUserIndexRsp SIndexInfo;
|
||||||
|
|
||||||
int32_t catalogInit(SCatalogCfg *cfg);
|
int32_t catalogInit(SCatalogCfg *cfg);
|
||||||
|
|
||||||
|
@ -221,6 +222,8 @@ int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *n
|
||||||
|
|
||||||
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
|
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
|
||||||
|
|
||||||
|
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destroy catalog and relase all resources
|
* Destroy catalog and relase all resources
|
||||||
|
|
|
@ -238,6 +238,7 @@ typedef struct SSelectStmt {
|
||||||
SNode* pSlimit;
|
SNode* pSlimit;
|
||||||
char stmtName[TSDB_TABLE_NAME_LEN];
|
char stmtName[TSDB_TABLE_NAME_LEN];
|
||||||
uint8_t precision;
|
uint8_t precision;
|
||||||
|
bool isEmptyResult;
|
||||||
} SSelectStmt;
|
} SSelectStmt;
|
||||||
|
|
||||||
typedef enum ESetOperatorType {
|
typedef enum ESetOperatorType {
|
||||||
|
|
|
@ -44,8 +44,15 @@ typedef struct SCmdMsgInfo {
|
||||||
void* pExtension; // todo remove it soon
|
void* pExtension; // todo remove it soon
|
||||||
} SCmdMsgInfo;
|
} SCmdMsgInfo;
|
||||||
|
|
||||||
|
typedef enum EQueryExecMode {
|
||||||
|
QUERY_EXEC_MODE_LOCAL = 1,
|
||||||
|
QUERY_EXEC_MODE_RPC,
|
||||||
|
QUERY_EXEC_MODE_SCHEDULE,
|
||||||
|
QUERY_EXEC_MODE_EMPTY_RESULT
|
||||||
|
} EQueryExecMode;
|
||||||
|
|
||||||
typedef struct SQuery {
|
typedef struct SQuery {
|
||||||
bool directRpc;
|
EQueryExecMode execMode;
|
||||||
bool haveResultSet;
|
bool haveResultSet;
|
||||||
SNode* pRoot;
|
SNode* pRoot;
|
||||||
int32_t numOfResCols;
|
int32_t numOfResCols;
|
||||||
|
@ -55,7 +62,6 @@ typedef struct SQuery {
|
||||||
SArray* pDbList;
|
SArray* pDbList;
|
||||||
SArray* pTableList;
|
SArray* pTableList;
|
||||||
bool showRewrite;
|
bool showRewrite;
|
||||||
bool localCmd;
|
|
||||||
} SQuery;
|
} SQuery;
|
||||||
|
|
||||||
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
|
int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery);
|
||||||
|
|
|
@ -217,6 +217,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0385)
|
#define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0385)
|
||||||
#define TSDB_CODE_MND_INVALID_DB_ACCT TAOS_DEF_ERROR_CODE(0, 0x0386)
|
#define TSDB_CODE_MND_INVALID_DB_ACCT TAOS_DEF_ERROR_CODE(0, 0x0386)
|
||||||
#define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x0387)
|
#define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x0387)
|
||||||
|
#define TSDB_CODE_MND_DB_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0388)
|
||||||
|
|
||||||
// mnode-vgroup
|
// mnode-vgroup
|
||||||
#define TSDB_CODE_MND_VGROUP_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0390)
|
#define TSDB_CODE_MND_VGROUP_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0390)
|
||||||
|
|
|
@ -111,6 +111,9 @@ extern const int32_t TYPE_BYTES[15];
|
||||||
#define TSDB_INS_TABLE_USER_USERS "user_users"
|
#define TSDB_INS_TABLE_USER_USERS "user_users"
|
||||||
#define TSDB_INS_TABLE_VGROUPS "vgroups"
|
#define TSDB_INS_TABLE_VGROUPS "vgroups"
|
||||||
|
|
||||||
|
#define TSDB_INDEX_TYPE_SMA "SMA"
|
||||||
|
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
|
||||||
|
|
||||||
#define TSDB_INS_USER_STABLES_DBNAME_COLID 2
|
#define TSDB_INS_USER_STABLES_DBNAME_COLID 2
|
||||||
|
|
||||||
#define TSDB_TICK_PER_SECOND(precision) \
|
#define TSDB_TICK_PER_SECOND(precision) \
|
||||||
|
@ -215,6 +218,9 @@ typedef enum ELogicConditionType {
|
||||||
#define TSDB_FUNC_MAX_RETRIEVE 1024
|
#define TSDB_FUNC_MAX_RETRIEVE 1024
|
||||||
|
|
||||||
#define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0'
|
#define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0'
|
||||||
|
#define TSDB_INDEX_TYPE_LEN 10
|
||||||
|
#define TSDB_INDEX_EXTS_LEN 256
|
||||||
|
#define TSDB_INDEX_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_INDEX_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||||
#define TSDB_TYPE_STR_MAX_LEN 32
|
#define TSDB_TYPE_STR_MAX_LEN 32
|
||||||
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
#define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
|
||||||
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN
|
||||||
|
|
|
@ -281,22 +281,35 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
|
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
SQuery* pQuery = NULL;
|
SQuery* pQuery = NULL;
|
||||||
int32_t code = 0;
|
|
||||||
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
|
||||||
|
|
||||||
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
|
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
|
||||||
CHECK_CODE_GOTO(parseSql(pRequest, false, &pQuery), _return);
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = parseSql(pRequest, false, &pQuery);
|
||||||
if (pQuery->localCmd) {
|
}
|
||||||
CHECK_CODE_GOTO(execLocalCmd(pRequest, pQuery), _return);
|
|
||||||
} else if (pQuery->directRpc) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
|
switch (pQuery->execMode) {
|
||||||
} else {
|
case QUERY_EXEC_MODE_LOCAL:
|
||||||
CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
|
code = execLocalCmd(pRequest, pQuery);
|
||||||
CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
|
break;
|
||||||
|
case QUERY_EXEC_MODE_RPC:
|
||||||
|
code = execDdlQuery(pRequest, pQuery);
|
||||||
|
break;
|
||||||
|
case QUERY_EXEC_MODE_SCHEDULE:
|
||||||
|
code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case QUERY_EXEC_MODE_EMPTY_RESULT:
|
||||||
|
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
|
||||||
taosArrayDestroy(pNodeList);
|
taosArrayDestroy(pNodeList);
|
||||||
qDestroyQuery(pQuery);
|
qDestroyQuery(pQuery);
|
||||||
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
|
if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
|
||||||
|
|
|
@ -2033,6 +2033,64 @@ int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pReq->indexFName) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pReq->indexFName) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp) {
|
||||||
|
SCoder encoder = {0};
|
||||||
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pRsp->tblFName) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pRsp->colName) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pRsp->indexType) < 0) return -1;
|
||||||
|
if (tEncodeCStr(&encoder, pRsp->indexExts) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tCoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp) {
|
||||||
|
SCoder decoder = {0};
|
||||||
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pRsp->tblFName) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pRsp->colName) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pRsp->indexType) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(&decoder, pRsp->indexExts) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tCoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
|
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
|
||||||
SCoder encoder = {0};
|
SCoder encoder = {0};
|
||||||
|
|
|
@ -179,6 +179,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
|
dndSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
|
@ -26,6 +26,7 @@ int32_t mndInitSma(SMnode *pMnode);
|
||||||
void mndCleanupSma(SMnode *pMnode);
|
void mndCleanupSma(SMnode *pMnode);
|
||||||
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName);
|
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName);
|
||||||
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma);
|
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma);
|
||||||
|
int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
#include "mndUser.h"
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
|
#include "mndSma.h"
|
||||||
|
|
||||||
#define TSDB_DB_VER_NUMBER 1
|
#define TSDB_DB_VER_NUMBER 1
|
||||||
#define TSDB_DB_RESERVE_SIZE 64
|
#define TSDB_DB_RESERVE_SIZE 64
|
||||||
|
@ -39,7 +40,8 @@ static int32_t mndProcessSyncDbReq(SNodeMsg *pReq);
|
||||||
static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
|
static int32_t mndProcessCompactDbReq(SNodeMsg *pReq);
|
||||||
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextDb(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
|
static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq);
|
||||||
|
static int32_t mndProcessGetIndexReq(SNodeMsg *pReq);
|
||||||
|
|
||||||
int32_t mndInitDb(SMnode *pMnode) {
|
int32_t mndInitDb(SMnode *pMnode) {
|
||||||
SSdbTable table = {.sdbType = SDB_DB,
|
SSdbTable table = {.sdbType = SDB_DB,
|
||||||
|
@ -57,6 +59,7 @@ int32_t mndInitDb(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetIndexReq);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DB, mndCancelGetNextDb);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_DB, mndCancelGetNextDb);
|
||||||
|
@ -1529,3 +1532,50 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbCancelFetch(pSdb, pIter);
|
sdbCancelFetch(pSdb, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndProcessGetIndexReq(SNodeMsg *pReq) {
|
||||||
|
SUserIndexReq indexReq = {0};
|
||||||
|
SMnode *pMnode = pReq->pNode;
|
||||||
|
int32_t code = -1;
|
||||||
|
SUserIndexRsp rsp = {0};
|
||||||
|
bool exist = false;
|
||||||
|
|
||||||
|
if (tDeserializeSUserIndexReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &indexReq) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = mndProcessGetSmaReq(pMnode, &indexReq, &rsp, &exist);
|
||||||
|
if (code) {
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!exist) {
|
||||||
|
//TODO GET INDEX FROM FULLTEXT
|
||||||
|
code = -1;
|
||||||
|
terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST;
|
||||||
|
} else {
|
||||||
|
int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp);
|
||||||
|
void *pRsp = rpcMallocCont(contLen);
|
||||||
|
if (pRsp == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = -1;
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
tSerializeSUserIndexRsp(pRsp, contLen, &rsp);
|
||||||
|
|
||||||
|
pReq->pRsp = pRsp;
|
||||||
|
pReq->rspLen = contLen;
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
_OVER:
|
||||||
|
if (code != 0) {
|
||||||
|
mError("failed to get index %s since %s", indexReq.indexFName, terrstr());
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
|
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
|
|
||||||
//!!!! Note: only APPEND columns in below tables, NO insert !!!!
|
//!!!! Note: only APPEND columns in below tables, NO insert !!!!
|
||||||
static const SInfosTableSchema dnodesSchema[] = {
|
static const SInfosTableSchema dnodesSchema[] = {
|
||||||
|
@ -89,11 +90,11 @@ static const SInfosTableSchema userFuncSchema[] = {
|
||||||
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
};
|
};
|
||||||
static const SInfosTableSchema userIdxSchema[] = {
|
static const SInfosTableSchema userIdxSchema[] = {
|
||||||
{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "index_database", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "index_database", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "index_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "index_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "column_name", .bytes = 64, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "column_name", .bytes = SYSTABLE_SCH_COL_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
};
|
};
|
||||||
|
|
|
@ -687,6 +687,39 @@ _OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) {
|
||||||
|
int32_t code = -1;
|
||||||
|
SSmaObj *pSma = NULL;
|
||||||
|
|
||||||
|
pSma = mndAcquireSma(pMnode, indexReq->indexFName);
|
||||||
|
if (pSma == NULL) {
|
||||||
|
*exist = false;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db));
|
||||||
|
memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb));
|
||||||
|
strcpy(rsp->indexType, TSDB_INDEX_TYPE_SMA);
|
||||||
|
|
||||||
|
SNodeList *pList = NULL;
|
||||||
|
int32_t extOffset = 0;
|
||||||
|
code = nodesStringToList(pSma->expr, &pList);
|
||||||
|
if (0 == code) {
|
||||||
|
SNode *node = NULL;
|
||||||
|
FOREACH(node, pList) {
|
||||||
|
SFunctionNode *pFunc = (SFunctionNode *)node;
|
||||||
|
extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s", (extOffset ? ",":""), pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
*exist = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseSma(pMnode, pSma);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) {
|
static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) {
|
||||||
mndTransProcessRsp(pRsp);
|
mndTransProcessRsp(pRsp);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -606,6 +606,43 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *indexName, SIndexInfo *out) {
|
||||||
|
char *msg = NULL;
|
||||||
|
int32_t msgLen = 0;
|
||||||
|
|
||||||
|
ctgDebug("try to get index from mnode, indexName:%s", indexName);
|
||||||
|
|
||||||
|
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)]((void *)indexName, &msg, 0, &msgLen);
|
||||||
|
if (code) {
|
||||||
|
ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = TDMT_MND_GET_INDEX,
|
||||||
|
.pCont = msg,
|
||||||
|
.contLen = msgLen,
|
||||||
|
};
|
||||||
|
|
||||||
|
SRpcMsg rpcRsp = {0};
|
||||||
|
|
||||||
|
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||||
|
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||||
|
ctgError("error rsp for get index, error:%s, indexName:%s", tstrerror(rpcRsp.code), indexName);
|
||||||
|
CTG_ERR_RET(rpcRsp.code);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)](out, rpcRsp.pCont, rpcRsp.contLen);
|
||||||
|
if (code) {
|
||||||
|
ctgError("Process get index rsp failed, code:%x, indexName:%s", code, indexName);
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
ctgDebug("Got index from mnode, indexName:%s", indexName);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
|
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
|
||||||
if (NULL == pCtg->dbCache) {
|
if (NULL == pCtg->dbCache) {
|
||||||
|
@ -1776,7 +1813,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
|
||||||
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool inCache = false;
|
bool inCache = false;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
uint64_t dbId = 0;
|
uint64_t dbId = 0;
|
||||||
|
@ -2764,6 +2801,17 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
|
||||||
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg));
|
CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo) {
|
||||||
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == indexName || NULL == pInfo) {
|
||||||
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(pCtg, pRpc, pMgmtEps, indexName, pInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void catalogDestroy(void) {
|
void catalogDestroy(void) {
|
||||||
qInfo("start to destroy catalog");
|
qInfo("start to destroy catalog");
|
||||||
|
|
||||||
|
|
|
@ -3131,6 +3131,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
||||||
|
|
||||||
int8_t* rowRes = NULL;
|
int8_t* rowRes = NULL;
|
||||||
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
|
bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols);
|
||||||
|
filterFreeInfo(filter);
|
||||||
|
|
||||||
SSDataBlock* px = createOneDataBlock(pBlock);
|
SSDataBlock* px = createOneDataBlock(pBlock);
|
||||||
blockDataEnsureCapacity(px, pBlock->info.rows);
|
blockDataEnsureCapacity(px, pBlock->info.rows);
|
||||||
|
@ -3140,19 +3141,25 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) {
|
||||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
if (keep) {
|
||||||
|
colDataAssign(pDst, pSrc, pBlock->info.rows);
|
||||||
|
numOfRow = pBlock->info.rows;
|
||||||
|
} else if (NULL != rowRes) {
|
||||||
|
numOfRow = 0;
|
||||||
|
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
||||||
|
if (rowRes[j] == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
numOfRow = 0;
|
if (colDataIsNull_s(pSrc, j)) {
|
||||||
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
|
colDataAppendNULL(pDst, numOfRow);
|
||||||
if (rowRes[j] == 0) {
|
} else {
|
||||||
continue;
|
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
|
||||||
|
}
|
||||||
|
numOfRow += 1;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
if (colDataIsNull_s(pSrc, j)) {
|
numOfRow = 0;
|
||||||
colDataAppendNULL(pDst, numOfRow);
|
|
||||||
} else {
|
|
||||||
colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false);
|
|
||||||
}
|
|
||||||
numOfRow += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*pSrc = *pDst;
|
*pSrc = *pDst;
|
||||||
|
|
|
@ -597,6 +597,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
||||||
|
|
||||||
int8_t* rowRes = NULL;
|
int8_t* rowRes = NULL;
|
||||||
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
|
bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols);
|
||||||
|
filterFreeInfo(filter);
|
||||||
|
|
||||||
SSDataBlock* px = createOneDataBlock(pInfo->pRes);
|
SSDataBlock* px = createOneDataBlock(pInfo->pRes);
|
||||||
blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
|
blockDataEnsureCapacity(px, pInfo->pRes->info.rows);
|
||||||
|
@ -607,14 +608,21 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
||||||
SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
|
SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i);
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);
|
SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i);
|
||||||
|
|
||||||
numOfRow = 0;
|
if (keep) {
|
||||||
for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
|
colDataAssign(pDest, pSrc, pInfo->pRes->info.rows);
|
||||||
if (rowRes[j] == 0) {
|
numOfRow = pInfo->pRes->info.rows;
|
||||||
continue;
|
} else if (NULL != rowRes) {
|
||||||
|
numOfRow = 0;
|
||||||
|
for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) {
|
||||||
|
if (rowRes[j] == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
|
||||||
|
numOfRow += 1;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false);
|
numOfRow = 0;
|
||||||
numOfRow += 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -529,4 +529,4 @@ int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInf
|
||||||
void tFreeSMonVloadInfo(SMonVloadInfo *pInfo) {
|
void tFreeSMonVloadInfo(SMonVloadInfo *pInfo) {
|
||||||
taosArrayDestroy(pInfo->pVloads);
|
taosArrayDestroy(pInfo->pVloads);
|
||||||
pInfo->pVloads = NULL;
|
pInfo->pVloads = NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -149,22 +149,37 @@ static int32_t rewriteConditionForFromTable(SCalcConstContext* pCxt, SNode* pTab
|
||||||
return pCxt->code;
|
return pCxt->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void rewriteConstCondition(SSelectStmt* pSelect, SNode** pCond) {
|
||||||
|
if (QUERY_NODE_VALUE != nodeType(*pCond)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (((SValueNode*)*pCond)->datum.b) {
|
||||||
|
nodesDestroyNode(*pCond);
|
||||||
|
*pCond = NULL;
|
||||||
|
} else {
|
||||||
|
pSelect->isEmptyResult = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t calcConstFromTable(SCalcConstContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t calcConstFromTable(SCalcConstContext* pCxt, SSelectStmt* pSelect) {
|
||||||
nodesRewriteExprPostOrder(&pSelect->pFromTable, calcConst, pCxt);
|
pCxt->code = rewriteConditionForFromTable(pCxt, pSelect->pFromTable);
|
||||||
if (TSDB_CODE_SUCCESS == pCxt->code) {
|
if (TSDB_CODE_SUCCESS == pCxt->code) {
|
||||||
pCxt->code = rewriteConditionForFromTable(pCxt, pSelect->pFromTable);
|
nodesRewriteExprPostOrder(&pSelect->pFromTable, calcConst, pCxt);
|
||||||
}
|
}
|
||||||
return pCxt->code;
|
return pCxt->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t calcConstCondition(SCalcConstContext* pCxt, SNode** pCond) {
|
static int32_t calcConstCondition(SCalcConstContext* pCxt, SSelectStmt* pSelect, SNode** pCond) {
|
||||||
if (NULL == *pCond) {
|
if (NULL == *pCond) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
nodesRewriteExprPostOrder(pCond, calcConst, pCxt);
|
pCxt->code = rewriteCondition(pCxt, pCond);
|
||||||
if (TSDB_CODE_SUCCESS == pCxt->code) {
|
if (TSDB_CODE_SUCCESS == pCxt->code) {
|
||||||
pCxt->code = rewriteCondition(pCxt, pCond);
|
nodesRewriteExprPostOrder(pCond, calcConst, pCxt);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == pCxt->code) {
|
||||||
|
rewriteConstCondition(pSelect, pCond);
|
||||||
}
|
}
|
||||||
return pCxt->code;
|
return pCxt->code;
|
||||||
}
|
}
|
||||||
|
@ -176,7 +191,7 @@ static int32_t calcConstSelect(SSelectStmt* pSelect) {
|
||||||
cxt.code = calcConstFromTable(&cxt, pSelect);
|
cxt.code = calcConstFromTable(&cxt, pSelect);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == cxt.code) {
|
if (TSDB_CODE_SUCCESS == cxt.code) {
|
||||||
cxt.code = calcConstCondition(&cxt, &pSelect->pWhere);
|
cxt.code = calcConstCondition(&cxt, pSelect, &pSelect->pWhere);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == cxt.code) {
|
if (TSDB_CODE_SUCCESS == cxt.code) {
|
||||||
nodesRewriteExprsPostOrder(pSelect->pPartitionByList, calcConst, &cxt);
|
nodesRewriteExprsPostOrder(pSelect->pPartitionByList, calcConst, &cxt);
|
||||||
|
@ -188,7 +203,7 @@ static int32_t calcConstSelect(SSelectStmt* pSelect) {
|
||||||
nodesRewriteExprsPostOrder(pSelect->pGroupByList, calcConst, &cxt);
|
nodesRewriteExprsPostOrder(pSelect->pGroupByList, calcConst, &cxt);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == cxt.code) {
|
if (TSDB_CODE_SUCCESS == cxt.code) {
|
||||||
cxt.code = calcConstCondition(&cxt, &pSelect->pHaving);
|
cxt.code = calcConstCondition(&cxt, pSelect, &pSelect->pHaving);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == cxt.code) {
|
if (TSDB_CODE_SUCCESS == cxt.code) {
|
||||||
nodesRewriteExprsPostOrder(pSelect->pOrderByList, calcConst, &cxt);
|
nodesRewriteExprsPostOrder(pSelect->pOrderByList, calcConst, &cxt);
|
||||||
|
@ -208,6 +223,22 @@ static int32_t calcConstQuery(SNode* pStmt) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery) {
|
static bool isEmptyResultQuery(SNode* pStmt) {
|
||||||
return calcConstQuery(pQuery->pRoot);
|
switch (nodeType(pStmt)) {
|
||||||
|
case QUERY_NODE_SELECT_STMT:
|
||||||
|
return ((SSelectStmt*)pStmt)->isEmptyResult;
|
||||||
|
case QUERY_NODE_EXPLAIN_STMT:
|
||||||
|
return isEmptyResultQuery(((SExplainStmt*)pStmt)->pQuery);
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery) {
|
||||||
|
int32_t code = calcConstQuery(pQuery->pRoot);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pQuery->execMode = isEmptyResultQuery(pQuery->pRoot) ? QUERY_EXEC_MODE_EMPTY_RESULT : pQuery->execMode;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1084,7 +1084,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
|
||||||
if (NULL == *pQuery) {
|
if (NULL == *pQuery) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
(*pQuery)->directRpc = false;
|
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
(*pQuery)->haveResultSet = false;
|
(*pQuery)->haveResultSet = false;
|
||||||
(*pQuery)->msgType = TDMT_VND_SUBMIT;
|
(*pQuery)->msgType = TDMT_VND_SUBMIT;
|
||||||
(*pQuery)->pRoot = (SNode*)context.pOutput;
|
(*pQuery)->pRoot = (SNode*)context.pOutput;
|
||||||
|
|
|
@ -2923,21 +2923,23 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
|
||||||
switch (nodeType(pQuery->pRoot)) {
|
switch (nodeType(pQuery->pRoot)) {
|
||||||
case QUERY_NODE_SELECT_STMT:
|
case QUERY_NODE_SELECT_STMT:
|
||||||
case QUERY_NODE_EXPLAIN_STMT:
|
case QUERY_NODE_EXPLAIN_STMT:
|
||||||
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
pQuery->haveResultSet = true;
|
pQuery->haveResultSet = true;
|
||||||
pQuery->msgType = TDMT_VND_QUERY;
|
pQuery->msgType = TDMT_VND_QUERY;
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_VNODE_MODIF_STMT:
|
case QUERY_NODE_VNODE_MODIF_STMT:
|
||||||
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
pQuery->msgType = TDMT_VND_CREATE_TABLE;
|
pQuery->msgType = TDMT_VND_CREATE_TABLE;
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_DESCRIBE_STMT:
|
case QUERY_NODE_DESCRIBE_STMT:
|
||||||
pQuery->localCmd = true;
|
pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
|
||||||
pQuery->haveResultSet = true;
|
pQuery->haveResultSet = true;
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
|
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
|
||||||
pQuery->localCmd = true;
|
pQuery->execMode = QUERY_EXEC_MODE_LOCAL;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
pQuery->directRpc = true;
|
pQuery->execMode = QUERY_EXEC_MODE_RPC;
|
||||||
if (NULL != pCxt->pCmdMsg) {
|
if (NULL != pCxt->pCmdMsg) {
|
||||||
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg, SCmdMsgInfo*);
|
TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg, SCmdMsgInfo*);
|
||||||
pQuery->msgType = pQuery->pCmdMsg->msgType;
|
pQuery->msgType = pQuery->pCmdMsg->msgType;
|
||||||
|
|
|
@ -139,6 +139,25 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||||
|
if (NULL == msg || NULL == msgLen) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
SUserIndexReq indexReq = {0};
|
||||||
|
strcpy(indexReq.indexFName, input);
|
||||||
|
|
||||||
|
int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq);
|
||||||
|
void *pBuf = rpcMallocCont(bufLen);
|
||||||
|
tSerializeSUserIndexReq(pBuf, bufLen, &indexReq);
|
||||||
|
|
||||||
|
*msg = pBuf;
|
||||||
|
*msgLen = bufLen;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
SUseDbOutput *pOut = output;
|
SUseDbOutput *pOut = output;
|
||||||
SUseDbRsp usedbRsp = {0};
|
SUseDbRsp usedbRsp = {0};
|
||||||
|
@ -343,6 +362,22 @@ int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t queryProcessGetIndexRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
|
SUserIndexRsp out = {0};
|
||||||
|
|
||||||
|
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||||
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tDeserializeSUserIndexRsp(msg, msgSize, &out) != 0) {
|
||||||
|
qError("tDeserializeSUserIndexRsp failed, msgSize:%d", msgSize);
|
||||||
|
return TSDB_CODE_INVALID_MSG;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(output, &out, sizeof(out));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
void initQueryModuleMsgHandle() {
|
void initQueryModuleMsgHandle() {
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||||
|
@ -350,12 +385,14 @@ void initQueryModuleMsgHandle() {
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
|
||||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
|
||||||
|
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
|
||||||
|
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
|
||||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
|
||||||
|
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
|
@ -1806,6 +1806,8 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
|
||||||
qError("convert value to type[%d] failed", type);
|
qError("convert value to type[%d] failed", type);
|
||||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
memcpy(fi->data, out.columnData->pData, out.columnData->info.bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3640,6 +3642,10 @@ int32_t fltGetDataFromSlotId(void *param, int32_t id, void **data) {
|
||||||
|
|
||||||
|
|
||||||
int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param) {
|
int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param) {
|
||||||
|
if (NULL == info) {
|
||||||
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
return fltSetColFieldDataImpl(info, param, fltGetDataFromSlotId, false);
|
return fltSetColFieldDataImpl(info, param, fltGetDataFromSlotId, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3693,6 +3699,10 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) {
|
||||||
|
if (NULL == info) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (info->scalarMode) {
|
if (info->scalarMode) {
|
||||||
SScalarParam output = {0};
|
SScalarParam output = {0};
|
||||||
|
|
||||||
|
|
|
@ -224,6 +224,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_ACCT, "Invalid database account")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_ACCT, "Invalid database account")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_OPTION_UNCHANGED, "Database options not changed")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_OPTION_UNCHANGED, "Database options not changed")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_INDEX_NOT_EXIST, "Index not exist")
|
||||||
|
|
||||||
// mnode-vgroup
|
// mnode-vgroup
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, "Vgroup already in dnode")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, "Vgroup already in dnode")
|
||||||
|
|
|
@ -121,4 +121,4 @@ if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
Loading…
Reference in New Issue