From 99d6fda66a516b7db4864d10153524384d7ad386 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 8 Apr 2022 18:34:11 +0800 Subject: [PATCH 1/7] feature/qnode --- include/common/tmsg.h | 18 +++++++ include/common/tmsgdef.h | 1 + include/util/taoserror.h | 1 + include/util/tdef.h | 6 +++ source/common/src/tmsg.c | 58 +++++++++++++++++++++ source/dnode/mgmt/mm/mmHandle.c | 1 + source/dnode/mnode/impl/src/mndDb.c | 46 ++++++++++++++++ source/dnode/mnode/impl/src/mndInfoSchema.c | 11 ++-- source/dnode/mnode/impl/src/mndSma.c | 34 ++++++++++++ source/util/src/terror.c | 1 + 10 files changed, 172 insertions(+), 5 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 4efb68d490..3fd5f31c24 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2293,6 +2293,24 @@ static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) { return buf; } +typedef struct { + char indexFName[TSDB_INDEX_FNAME_LEN]; +} SUserIndexReq; + +int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq); +int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq); + +typedef struct { + char dbFName[TSDB_DB_FNAME_LEN]; + char tblFName[TSDB_TABLE_FNAME_LEN]; + char colName[TSDB_COL_NAME_LEN]; + char indexType[TSDB_INDEX_TYPE_LEN]; + char indexExts[TSDB_INDEX_EXTS_LEN]; +} SUserIndexRsp; + +int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp); +int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp); + typedef struct { int8_t mqMsgType; int32_t code; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 63d1fc5014..4314612c1f 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -156,6 +156,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "mnode-drop-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_DB_CFG, "mnode-get-db-cfg", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "mnode-get-index", NULL, NULL) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ed90628bda..ec9812fa04 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -217,6 +217,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0385) #define TSDB_CODE_MND_INVALID_DB_ACCT TAOS_DEF_ERROR_CODE(0, 0x0386) #define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x0387) +#define TSDB_CODE_MND_DB_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0388) // mnode-vgroup #define TSDB_CODE_MND_VGROUP_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0390) diff --git a/include/util/tdef.h b/include/util/tdef.h index 2a634e2327..98fbe94d66 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -109,6 +109,9 @@ extern const int32_t TYPE_BYTES[15]; #define TSDB_INS_TABLE_USER_USERS "user_users" #define TSDB_INS_TABLE_VGROUPS "vgroups" +#define TSDB_INDEX_TYPE_SMA "SMA" +#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT" + #define TSDB_INS_USER_STABLES_DBNAME_COLID 2 #define TSDB_TICK_PER_SECOND(precision) \ @@ -213,6 +216,9 @@ typedef enum ELogicConditionType { #define TSDB_FUNC_MAX_RETRIEVE 1024 #define TSDB_INDEX_NAME_LEN 65 // 64 + 1 '\0' +#define TSDB_INDEX_TYPE_LEN 10 +#define TSDB_INDEX_EXTS_LEN 256 +#define TSDB_INDEX_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_INDEX_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 57a8953253..a22b5c0dd4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2033,6 +2033,64 @@ int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) { return 0; } +int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->indexFName) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->indexFName) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->tblFName) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->colName) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->indexType) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->indexExts) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->tblFName) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->colName) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->indexType) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->indexExts) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) { SCoder encoder = {0}; diff --git a/source/dnode/mgmt/mm/mmHandle.c b/source/dnode/mgmt/mm/mmHandle.c index e5495c7e1d..ffeeeecd17 100644 --- a/source/dnode/mgmt/mm/mmHandle.c +++ b/source/dnode/mgmt/mm/mmHandle.c @@ -147,6 +147,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_GET_DB_CFG, mmProcessReadMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MND_GET_INDEX, mmProcessReadMsg, DEFAULT_HANDLE); // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index ff2ea162fb..ef992b4549 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -58,6 +58,7 @@ int32_t mndInitDb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_SYNC_DB, mndProcessSyncDbReq); mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_DB, mndProcessCompactDbReq); mndSetMsgHandle(pMnode, TDMT_MND_GET_DB_CFG, mndProcessGetDbCfgReq); + mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetIndexReq); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_DB, mndGetDbMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_DB, mndRetrieveDbs); @@ -1661,3 +1662,48 @@ static void mndCancelGetNextDb(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndProcessGetIndexReq(SNodeMsg *pReq) { + SUserIndexReq indexReq = {0}; + SMnode *pMnode = pReq->pNode; + int32_t code = -1; + SUserIndexRsp rsp = {0}; + bool exist = false; + + if (tDeserializeSUserIndexReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &indexReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + code = mndProcessGetSmaReq(pMnode, &indexReq, &rsp, &exist); + if (code) { + goto _OVER; + } + + if (!exist) { + //TODO GET INDEX FROM FULLTEXT + } else { + int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp); + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + goto _OVER; + } + + tSerializeSUserIndexRsp(pRsp, contLen, &rsp); + + pReq->pRsp = pRsp; + pReq->rspLen = contLen; + + code = 0; + } + +_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + mError("failed to get index %s since %s", indexReq.indexFName, terrstr()); + } + + return code; +} + diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index aadd914439..71ee0baaae 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -18,6 +18,7 @@ #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) +#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) //!!!! Note: only APPEND columns in below tables, NO insert !!!! static const SInfosTableSchema dnodesSchema[] = { @@ -79,11 +80,11 @@ static const SInfosTableSchema userFuncSchema[] = { {.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, }; static const SInfosTableSchema userIdxSchema[] = { - {.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "index_database", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "index_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "column_name", .bytes = 64, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "index_database", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "index_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "column_name", .bytes = SYSTABLE_SCH_COL_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "index_type", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, {.name = "index_extensions", .bytes = 256, .type = TSDB_DATA_TYPE_BINARY}, }; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 94114a96bf..7badd3031d 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -689,6 +689,40 @@ _OVER: return code; } +static int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) { + int32_t code = -1; + SSmaObj *pSma = NULL; + + pSma = mndAcquireSma(pMnode, indexReq.indexFName); + if (pSma == NULL) { + //TODO TRY TO GET INDEX FROM FULLTEXT + *exist = false; + return 0; + } + + memcpy(rsp->dbFName, pSma->db, sizeof(pSma->db)); + memcpy(rsp->tblFName, pSma->stb, sizeof(pSma->stb)); + strcpy(rsp->indexType, TSDB_INDEX_TYPE_SMA); + + SNodeList *pList = NULL; + int32_t extOffset = 0; + code = nodesStringToList(pSma->expr, &pList); + if (0 == code) { + SNode *node = NULL; + FOREACH(node, pList) { + SFunctionNode *pFunc = (SFunctionNode *)node; + extOffset += snprintf(rsp->indexExts + extOffset, sizeof(rsp->indexExts) - extOffset - 1, "%s%s", (extOffset ? ",":""), pFunc->functionName); + } + + *exist = true; + } + + mndReleaseSma(pMnode, pSma); + + return code; +} + + static int32_t mndProcessVDropSmaRsp(SNodeMsg *pRsp) { mndTransProcessRsp(pRsp); return 0; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6d4de2f575..e02310aab3 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -224,6 +224,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_OPTION, "Invalid database options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_ACCT, "Invalid database account") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_OPTION_UNCHANGED, "Database options not changed") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_INDEX_NOT_EXIST, "Index not exist") // mnode-vgroup TAOS_DEFINE_ERROR(TSDB_CODE_MND_VGROUP_ALREADY_IN_DNODE, "Vgroup already in dnode") From abf827ed63ca7283be67b52e47be6ec5295ea093 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 8 Apr 2022 18:37:42 +0800 Subject: [PATCH 2/7] feature/qnode --- source/dnode/mnode/impl/inc/mndSma.h | 1 + source/dnode/mnode/impl/src/mndDb.c | 2 ++ source/dnode/mnode/impl/src/mndSma.c | 5 ++--- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndSma.h b/source/dnode/mnode/impl/inc/mndSma.h index 4a80f619d3..91c6e24e28 100644 --- a/source/dnode/mnode/impl/inc/mndSma.h +++ b/source/dnode/mnode/impl/inc/mndSma.h @@ -26,6 +26,7 @@ int32_t mndInitSma(SMnode *pMnode); void mndCleanupSma(SMnode *pMnode); SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName); void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma); +int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index ef992b4549..bd12897827 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -22,6 +22,7 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "mndSma.h" #define TSDB_DB_VER_NUMBER 1 #define TSDB_DB_RESERVE_SIZE 64 @@ -41,6 +42,7 @@ static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq); +static int32_t mndProcessGetIndexReq(SNodeMsg *pReq); int32_t mndInitDb(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_DB, diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 7badd3031d..e5e591b4d2 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -689,13 +689,12 @@ _OVER: return code; } -static int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) { +int32_t mndProcessGetSmaReq(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp *rsp, bool *exist) { int32_t code = -1; SSmaObj *pSma = NULL; - pSma = mndAcquireSma(pMnode, indexReq.indexFName); + pSma = mndAcquireSma(pMnode, indexReq->indexFName); if (pSma == NULL) { - //TODO TRY TO GET INDEX FROM FULLTEXT *exist = false; return 0; } From e9f7bf866aed2f464ab139af2b8d238264a49028 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 9 Apr 2022 09:39:01 +0800 Subject: [PATCH 3/7] feature/qnode --- include/libs/catalog/catalog.h | 3 ++ source/dnode/mnode/impl/src/mndDb.c | 8 +++-- source/libs/catalog/src/catalog.c | 50 ++++++++++++++++++++++++++++- source/libs/qcom/src/querymsg.c | 37 +++++++++++++++++++++ 4 files changed, 94 insertions(+), 4 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index a0b342fca2..5abda69aa8 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -78,6 +78,7 @@ typedef struct SDbVgVersion { } SDbVgVersion; typedef SDbCfgRsp SDbCfgInfo; +typedef SUserIndexRsp SIndexInfo; int32_t catalogInit(SCatalogCfg *cfg); @@ -221,6 +222,8 @@ int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *n int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg); +int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); + /** * Destroy catalog and relase all resources diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index bd12897827..fd739dc657 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -41,8 +41,8 @@ static int32_t mndProcessCompactDbReq(SNodeMsg *pReq); static int32_t mndGetDbMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveDbs(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextDb(SMnode *pMnode, void *pIter); -static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq); -static int32_t mndProcessGetIndexReq(SNodeMsg *pReq); +static int32_t mndProcessGetDbCfgReq(SNodeMsg *pReq); +static int32_t mndProcessGetIndexReq(SNodeMsg *pReq); int32_t mndInitDb(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_DB, @@ -1684,6 +1684,8 @@ static int32_t mndProcessGetIndexReq(SNodeMsg *pReq) { if (!exist) { //TODO GET INDEX FROM FULLTEXT + code = -1; + terrno = TSDB_CODE_MND_DB_INDEX_NOT_EXIST; } else { int32_t contLen = tSerializeSUserIndexRsp(NULL, 0, &rsp); void *pRsp = rpcMallocCont(contLen); @@ -1702,7 +1704,7 @@ static int32_t mndProcessGetIndexReq(SNodeMsg *pReq) { } _OVER: - if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0) { mError("failed to get index %s since %s", indexReq.indexFName, terrstr()); } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 6ed09ce7eb..195b7cda4b 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -606,6 +606,43 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, return TSDB_CODE_SUCCESS; } +int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char *indexName, SIndexInfo *out) { + char *msg = NULL; + int32_t msgLen = 0; + + ctgDebug("try to get index from mnode, indexName:%s", indexName); + + int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)]((void *)indexName, &msg, 0, &msgLen); + if (code) { + ctgError("Build get index msg failed, code:%x, db:%s", code, indexName); + CTG_ERR_RET(code); + } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_GET_INDEX, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + + rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp); + if (TSDB_CODE_SUCCESS != rpcRsp.code) { + ctgError("error rsp for get index, error:%s, indexName:%s", tstrerror(rpcRsp.code), indexName); + CTG_ERR_RET(rpcRsp.code); + } + + code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)](out, rpcRsp.pCont, rpcRsp.contLen); + if (code) { + ctgError("Process get index rsp failed, code:%x, indexName:%s", code, indexName); + CTG_ERR_RET(code); + } + + ctgDebug("Got index from mnode, indexName:%s", indexName); + + return TSDB_CODE_SUCCESS; +} + int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) { if (NULL == pCtg->dbCache) { @@ -1776,7 +1813,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - + bool inCache = false; int32_t code = 0; uint64_t dbId = 0; @@ -2764,6 +2801,17 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons CTG_API_LEAVE(ctgGetDBCfgFromMnode(pCtg, pRpc, pMgmtEps, dbFName, pDbCfg)); } +int32_t catalogGetIndexInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == indexName || NULL == pInfo) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + CTG_API_LEAVE(ctgGetIndexInfoFromMnode(pCtg, pRpc, pMgmtEps, indexName, pInfo)); +} + + void catalogDestroy(void) { qInfo("start to destroy catalog"); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index d211e780b0..932107dfc1 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -139,6 +139,25 @@ int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) { + if (NULL == msg || NULL == msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + SUserIndexReq indexReq = {0}; + strcpy(indexReq.indexFName, input); + + int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq); + void *pBuf = rpcMallocCont(bufLen); + tSerializeSUserIndexReq(pBuf, bufLen, &indexReq); + + *msg = pBuf; + *msgLen = bufLen; + + return TSDB_CODE_SUCCESS; +} + + int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; SUseDbRsp usedbRsp = {0}; @@ -343,6 +362,22 @@ int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) { return TSDB_CODE_SUCCESS; } +int32_t queryProcessGetIndexRsp(void *output, char *msg, int32_t msgSize) { + SUserIndexRsp out = {0}; + + if (NULL == output || NULL == msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + if (tDeserializeSUserIndexRsp(msg, msgSize, &out) != 0) { + qError("tDeserializeSUserIndexRsp failed, msgSize:%d", msgSize); + return TSDB_CODE_INVALID_MSG; + } + + memcpy(output, &out, sizeof(out)); + + return TSDB_CODE_SUCCESS; +} void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; @@ -350,12 +385,14 @@ void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg; + queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp; } #pragma GCC diagnostic pop From a80610560dfb3f9b4f097d0522fb5efd8ec26139 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 9 Apr 2022 10:09:17 +0800 Subject: [PATCH 4/7] constant condition optimize --- include/libs/nodes/querynodes.h | 1 + include/libs/parser/parser.h | 10 ++++-- source/client/src/clientImpl.c | 37 ++++++++++++------- source/libs/parser/src/parCalcConst.c | 49 +++++++++++++++++++++----- source/libs/parser/src/parInsert.c | 2 +- source/libs/parser/src/parTranslater.c | 8 +++-- 6 files changed, 80 insertions(+), 27 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 6b6d11c2fe..f052c9daa7 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -238,6 +238,7 @@ typedef struct SSelectStmt { SNode* pSlimit; char stmtName[TSDB_TABLE_NAME_LEN]; uint8_t precision; + bool isEmptyResult; } SSelectStmt; typedef enum ESetOperatorType { diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index f10185c44f..fef624288a 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -44,8 +44,15 @@ typedef struct SCmdMsgInfo { void* pExtension; // todo remove it soon } SCmdMsgInfo; +typedef enum EQueryExecMode { + QUERY_EXEC_MODE_LOCAL = 1, + QUERY_EXEC_MODE_RPC, + QUERY_EXEC_MODE_SCHEDULE, + QUERY_EXEC_MODE_EMPTY_RESULT +} EQueryExecMode; + typedef struct SQuery { - bool directRpc; + EQueryExecMode execMode; bool haveResultSet; SNode* pRoot; int32_t numOfResCols; @@ -55,7 +62,6 @@ typedef struct SQuery { SArray* pDbList; SArray* pTableList; bool showRewrite; - bool localCmd; } SQuery; int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c3a17b6073..9938a2e1b9 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -281,22 +281,35 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* pRequest = NULL; SQuery* pQuery = NULL; - int32_t code = 0; SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); - CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); - CHECK_CODE_GOTO(parseSql(pRequest, false, &pQuery), _return); - - if (pQuery->localCmd) { - CHECK_CODE_GOTO(execLocalCmd(pRequest, pQuery), _return); - } else if (pQuery->directRpc) { - CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); - } else { - CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return); - CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return); + int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); + if (TSDB_CODE_SUCCESS == code) { + code = parseSql(pRequest, false, &pQuery); + } + + if (TSDB_CODE_SUCCESS == code) { + switch (pQuery->execMode) { + case QUERY_EXEC_MODE_LOCAL: + code = execLocalCmd(pRequest, pQuery); + break; + case QUERY_EXEC_MODE_RPC: + code = execDdlQuery(pRequest, pQuery); + break; + case QUERY_EXEC_MODE_SCHEDULE: + code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList); + if (TSDB_CODE_SUCCESS == code) { + code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList); + } + break; + case QUERY_EXEC_MODE_EMPTY_RESULT: + pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT; + break; + default: + break; + } } -_return: taosArrayDestroy(pNodeList); qDestroyQuery(pQuery); if (NULL != pRequest && TSDB_CODE_SUCCESS != code) { diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index 0b4f79d3ff..048830b80b 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -149,22 +149,37 @@ static int32_t rewriteConditionForFromTable(SCalcConstContext* pCxt, SNode* pTab return pCxt->code; } +static void rewriteConstCondition(SSelectStmt* pSelect, SNode** pCond) { + if (QUERY_NODE_VALUE != nodeType(*pCond)) { + return; + } + if (((SValueNode*)*pCond)->datum.b) { + nodesDestroyNode(*pCond); + *pCond = NULL; + } else { + pSelect->isEmptyResult = true; + } +} + static int32_t calcConstFromTable(SCalcConstContext* pCxt, SSelectStmt* pSelect) { - nodesRewriteExprPostOrder(&pSelect->pFromTable, calcConst, pCxt); + pCxt->code = rewriteConditionForFromTable(pCxt, pSelect->pFromTable); if (TSDB_CODE_SUCCESS == pCxt->code) { - pCxt->code = rewriteConditionForFromTable(pCxt, pSelect->pFromTable); + nodesRewriteExprPostOrder(&pSelect->pFromTable, calcConst, pCxt); } return pCxt->code; } -static int32_t calcConstCondition(SCalcConstContext* pCxt, SNode** pCond) { +static int32_t calcConstCondition(SCalcConstContext* pCxt, SSelectStmt* pSelect, SNode** pCond) { if (NULL == *pCond) { return TSDB_CODE_SUCCESS; } - nodesRewriteExprPostOrder(pCond, calcConst, pCxt); + pCxt->code = rewriteCondition(pCxt, pCond); if (TSDB_CODE_SUCCESS == pCxt->code) { - pCxt->code = rewriteCondition(pCxt, pCond); + nodesRewriteExprPostOrder(pCond, calcConst, pCxt); + } + if (TSDB_CODE_SUCCESS == pCxt->code) { + rewriteConstCondition(pSelect, pCond); } return pCxt->code; } @@ -176,7 +191,7 @@ static int32_t calcConstSelect(SSelectStmt* pSelect) { cxt.code = calcConstFromTable(&cxt, pSelect); } if (TSDB_CODE_SUCCESS == cxt.code) { - cxt.code = calcConstCondition(&cxt, &pSelect->pWhere); + cxt.code = calcConstCondition(&cxt, pSelect, &pSelect->pWhere); } if (TSDB_CODE_SUCCESS == cxt.code) { nodesRewriteExprsPostOrder(pSelect->pPartitionByList, calcConst, &cxt); @@ -188,7 +203,7 @@ static int32_t calcConstSelect(SSelectStmt* pSelect) { nodesRewriteExprsPostOrder(pSelect->pGroupByList, calcConst, &cxt); } if (TSDB_CODE_SUCCESS == cxt.code) { - cxt.code = calcConstCondition(&cxt, &pSelect->pHaving); + cxt.code = calcConstCondition(&cxt, pSelect, &pSelect->pHaving); } if (TSDB_CODE_SUCCESS == cxt.code) { nodesRewriteExprsPostOrder(pSelect->pOrderByList, calcConst, &cxt); @@ -208,6 +223,22 @@ static int32_t calcConstQuery(SNode* pStmt) { return TSDB_CODE_SUCCESS; } -int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery) { - return calcConstQuery(pQuery->pRoot); +static bool isEmptyResultQuery(SNode* pStmt) { + switch (nodeType(pStmt)) { + case QUERY_NODE_SELECT_STMT: + return ((SSelectStmt*)pStmt)->isEmptyResult; + case QUERY_NODE_EXPLAIN_STMT: + return isEmptyResultQuery(((SExplainStmt*)pStmt)->pQuery); + default: + break; + } + return false; +} + +int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery) { + int32_t code = calcConstQuery(pQuery->pRoot); + if (TSDB_CODE_SUCCESS == code) { + pQuery->execMode = isEmptyResultQuery(pQuery->pRoot) ? QUERY_EXEC_MODE_EMPTY_RESULT : pQuery->execMode; + } + return code; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index d9cf0b39b5..9d945039b8 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1084,7 +1084,7 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { if (NULL == *pQuery) { return TSDB_CODE_OUT_OF_MEMORY; } - (*pQuery)->directRpc = false; + (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; (*pQuery)->haveResultSet = false; (*pQuery)->msgType = TDMT_VND_SUBMIT; (*pQuery)->pRoot = (SNode*)context.pOutput; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ebe5b639c4..a286918ce5 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2923,21 +2923,23 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { switch (nodeType(pQuery->pRoot)) { case QUERY_NODE_SELECT_STMT: case QUERY_NODE_EXPLAIN_STMT: + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->haveResultSet = true; pQuery->msgType = TDMT_VND_QUERY; break; case QUERY_NODE_VNODE_MODIF_STMT: + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->msgType = TDMT_VND_CREATE_TABLE; break; case QUERY_NODE_DESCRIBE_STMT: - pQuery->localCmd = true; + pQuery->execMode = QUERY_EXEC_MODE_LOCAL; pQuery->haveResultSet = true; break; case QUERY_NODE_RESET_QUERY_CACHE_STMT: - pQuery->localCmd = true; + pQuery->execMode = QUERY_EXEC_MODE_LOCAL; break; default: - pQuery->directRpc = true; + pQuery->execMode = QUERY_EXEC_MODE_RPC; if (NULL != pCxt->pCmdMsg) { TSWAP(pQuery->pCmdMsg, pCxt->pCmdMsg, SCmdMsgInfo*); pQuery->msgType = pQuery->pCmdMsg->msgType; From 9df566b21096dda6dbd29c860a0a7a6db3a46cf4 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 9 Apr 2022 10:42:28 +0800 Subject: [PATCH 5/7] topic ci bugfix --- tests/test/c/tmqSim.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 7372745cb8..236d1e2eed 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -226,7 +226,7 @@ void loop_consume(tmq_t* tmq) { int32_t totalRows = 0; int32_t skipLogNum = 0; while (running) { - tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 4000); + tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 8000); if (tmqMsg) { totalMsgs++; From 119bdf8c7cea96f3b0f2c61635a9adb51f41203b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 9 Apr 2022 13:16:32 +0800 Subject: [PATCH 6/7] compile failed --- source/libs/monitor/src/monMsg.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/monitor/src/monMsg.c b/source/libs/monitor/src/monMsg.c index 94645f9da1..3aafcf071d 100644 --- a/source/libs/monitor/src/monMsg.c +++ b/source/libs/monitor/src/monMsg.c @@ -267,6 +267,7 @@ int32_t tEncodeSMonDiskDesc(SCoder *encoder, const SMonDiskDesc *pDesc) { if (tEncodeI64(encoder, pDesc->size.total) < 0) return -1; if (tEncodeI64(encoder, pDesc->size.used) < 0) return -1; if (tEncodeI64(encoder, pDesc->size.avail) < 0) return -1; + return 0; } static int32_t tDecodeSMonDiskDesc(SCoder *decoder, SMonDiskDesc *pDesc) { @@ -528,4 +529,4 @@ int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInf void tFreeSMonVloadInfo(SMonVloadInfo *pInfo) { taosArrayDestroy(pInfo->pVloads); pInfo->pVloads = NULL; -} \ No newline at end of file +} From 257b7297573c258e14f97e281b02d8d8026f1099 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 9 Apr 2022 13:36:36 +0800 Subject: [PATCH 7/7] feature/qnode --- source/libs/executor/src/executorimpl.c | 29 +++++++++++++++---------- source/libs/executor/src/scanoperator.c | 22 +++++++++++++------ source/libs/scalar/src/filter.c | 11 ++++++++++ tests/script/tsim/query/stddev.sim | 2 +- 4 files changed, 45 insertions(+), 19 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e85cfb76fd..5ba77d4a09 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3297,6 +3297,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { int8_t* rowRes = NULL; bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); + filterFreeInfo(filter); SSDataBlock* px = createOneDataBlock(pBlock); blockDataEnsureCapacity(px, pBlock->info.rows); @@ -3306,19 +3307,25 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i); + if (keep) { + colDataAssign(pDst, pSrc, pBlock->info.rows); + numOfRow = pBlock->info.rows; + } else if (NULL != rowRes) { + numOfRow = 0; + for (int32_t j = 0; j < pBlock->info.rows; ++j) { + if (rowRes[j] == 0) { + continue; + } - numOfRow = 0; - for (int32_t j = 0; j < pBlock->info.rows; ++j) { - if (rowRes[j] == 0) { - continue; + if (colDataIsNull_s(pSrc, j)) { + colDataAppendNULL(pDst, numOfRow); + } else { + colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false); + } + numOfRow += 1; } - - if (colDataIsNull_s(pSrc, j)) { - colDataAppendNULL(pDst, numOfRow); - } else { - colDataAppend(pDst, numOfRow, colDataGetData(pSrc, j), false); - } - numOfRow += 1; + } else { + numOfRow = 0; } *pSrc = *pDst; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c67f833699..7362f1da31 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -596,6 +596,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { int8_t* rowRes = NULL; bool keep = filterExecute(filter, pInfo->pRes, &rowRes, NULL, param1.numOfCols); + filterFreeInfo(filter); SSDataBlock* px = createOneDataBlock(pInfo->pRes); blockDataEnsureCapacity(px, pInfo->pRes->info.rows); @@ -606,14 +607,21 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { SColumnInfoData* pDest = taosArrayGet(px->pDataBlock, i); SColumnInfoData* pSrc = taosArrayGet(pInfo->pRes->pDataBlock, i); - numOfRow = 0; - for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) { - if (rowRes[j] == 0) { - continue; + if (keep) { + colDataAssign(pDest, pSrc, pInfo->pRes->info.rows); + numOfRow = pInfo->pRes->info.rows; + } else if (NULL != rowRes) { + numOfRow = 0; + for (int32_t j = 0; j < pInfo->pRes->info.rows; ++j) { + if (rowRes[j] == 0) { + continue; + } + + colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); + numOfRow += 1; } - - colDataAppend(pDest, numOfRow, colDataGetData(pSrc, j), false); - numOfRow += 1; + } else { + numOfRow = 0; } } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 16236fc05b..c780ed5725 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1797,6 +1797,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) { } else { SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))}; out.columnData->info.type = type; + out.columnData->info.bytes = tDataTypes[type].bytes; // todo refactor the convert int32_t code = doConvertDataType(var, &out); @@ -1804,6 +1805,8 @@ int32_t fltInitValFieldData(SFilterInfo *info) { qError("convert value to type[%d] failed", type); return TSDB_CODE_TSC_INVALID_OPERATION; } + + memcpy(fi->data, out.columnData->pData, out.columnData->info.bytes); } } @@ -3638,6 +3641,10 @@ int32_t fltGetDataFromSlotId(void *param, int32_t id, void **data) { int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param) { + if (NULL == info) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + return fltSetColFieldDataImpl(info, param, fltGetDataFromSlotId, false); } @@ -3691,6 +3698,10 @@ _return: } bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols) { + if (NULL == info) { + return false; + } + if (info->scalarMode) { SScalarParam output = {0}; diff --git a/tests/script/tsim/query/stddev.sim b/tests/script/tsim/query/stddev.sim index 01cde31966..adab1c354d 100644 --- a/tests/script/tsim/query/stddev.sim +++ b/tests/script/tsim/query/stddev.sim @@ -121,4 +121,4 @@ if $rows != 0 then return -1 endi -system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file +system sh/exec.sh -n dnode1 -s stop -x SIGINT