diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 48a677a686..7d8cee379e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2507,7 +2507,7 @@ typedef struct { int64_t offset; int64_t sliding; int64_t dstTbUid; - int32_t dstVgId; // for stream + int32_t dstVgId; SEpSet epSet; char* expr; } STableIndexInfo; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index b0fac72172..df8a0bb7f0 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -21,13 +21,13 @@ extern "C" { #endif #include "os.h" -#include "taosdef.h" #include "query.h" -#include "tname.h" -#include "tcommon.h" +#include "taosdef.h" #include "tarray.h" +#include "tcommon.h" #include "thash.h" #include "tmsg.h" +#include "tname.h" #include "transport.h" typedef struct SCatalog SCatalog; @@ -47,8 +47,8 @@ typedef enum { } AUTH_TYPE; typedef struct SUserAuthInfo { - char user[TSDB_USER_LEN]; - char dbFName[TSDB_DB_FNAME_LEN]; + char user[TSDB_USER_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; AUTH_TYPE type; } SUserAuthInfo; @@ -59,17 +59,17 @@ typedef struct SDbInfo { } SDbInfo; typedef struct SCatalogReq { - SArray *pDbVgroup; // element is db full name - SArray *pDbCfg; // element is db full name - SArray *pDbInfo; // element is db full name - SArray *pTableMeta; // element is SNAME - SArray *pTableHash; // element is SNAME - SArray *pUdf; // element is udf name - SArray *pIndex; // element is index name - SArray *pUser; // element is SUserAuthInfo - SArray *pTableIndex; // element is SNAME + SArray* pDbVgroup; // element is db full name + SArray* pDbCfg; // element is db full name + SArray* pDbInfo; // element is db full name + SArray* pTableMeta; // element is SNAME + SArray* pTableHash; // element is SNAME + SArray* pUdf; // element is udf name + SArray* pIndex; // element is index name + SArray* pUser; // element is SUserAuthInfo + SArray* pTableIndex; // element is SNAME bool qNodeRequired; // valid qnode - bool forceUpdate; + bool forceUpdate; } SCatalogReq; typedef struct SMetaRes { @@ -78,16 +78,16 @@ typedef struct SMetaRes { } SMetaRes; typedef struct SMetaData { - SArray *pDbVgroup; // pRes = SArray* - SArray *pDbCfg; // pRes = SDbCfgInfo* - SArray *pDbInfo; // pRes = SDbInfo* - SArray *pTableMeta; // pRes = STableMeta* - SArray *pTableHash; // pRes = SVgroupInfo* - SArray *pTableIndex; // pRes = SArray* - SArray *pUdfList; // pRes = SFuncInfo* - SArray *pIndex; // pRes = SIndexInfo* - SArray *pUser; // pRes = bool* - SArray *pQnodeList; // pRes = SQueryNodeAddr* + SArray* pDbVgroup; // pRes = SArray* + SArray* pDbCfg; // pRes = SDbCfgInfo* + SArray* pDbInfo; // pRes = SDbInfo* + SArray* pTableMeta; // pRes = STableMeta* + SArray* pTableHash; // pRes = SVgroupInfo* + SArray* pTableIndex; // pRes = SArray* + SArray* pUdfList; // pRes = SFuncInfo* + SArray* pIndex; // pRes = SIndexInfo* + SArray* pUser; // pRes = bool* + SArray* pQnodeList; // pRes = SQueryNodeAddr* } SMetaData; typedef struct SCatalogCfg { @@ -104,18 +104,18 @@ typedef struct SSTableMetaVersion { uint64_t dbId; uint64_t suid; int16_t sversion; - int16_t tversion; + int16_t tversion; } SSTableMetaVersion; typedef struct SDbVgVersion { char dbFName[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; - int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SDbVgVersion; typedef struct STbSVersion { - char* tbFName; + char* tbFName; int32_t sver; int32_t tver; } STbSVersion; @@ -125,15 +125,15 @@ typedef struct SUserAuthVersion { int32_t version; } SUserAuthVersion; -typedef SDbCfgRsp SDbCfgInfo; +typedef SDbCfgRsp SDbCfgInfo; typedef SUserIndexRsp SIndexInfo; typedef void (*catalogCallback)(SMetaData* pResult, void* param, int32_t code); -int32_t catalogInit(SCatalogCfg *cfg); +int32_t catalogInit(SCatalogCfg* cfg); /** - * Get a cluster's catalog handle for all later operations. + * Get a cluster's catalog handle for all later operations. * @param clusterId * @param catalogHandle (output, NO need to free it) * @return error code @@ -141,14 +141,14 @@ int32_t catalogInit(SCatalogCfg *cfg); int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); /** - * Free a cluster's all catalog info, usually it's not necessary, until the application is closing. + * Free a cluster's all catalog info, usually it's not necessary, until the application is closing. * no current or future usage should be guaranteed by application * @param pCatalog (input, NO more usage) * @return error code */ void catalogFreeHandle(SCatalog* pCatalog); -int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum); +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum); /** * Get a DB's all vgroup info. @@ -159,7 +159,8 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, SArray** pVgroupList); +int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const char* pDBName, + SArray** pVgroupList); int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo); @@ -170,7 +171,7 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName); int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid); /** - * Get a table's meta data. + * Get a table's meta data. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) @@ -178,10 +179,11 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, * @param pTableMeta(output, table meta data, NEED to free it by calller) * @return error code */ -int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); +int32_t catalogGetTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, + STableMeta** pTableMeta); /** - * Get a super table's meta data. + * Get a super table's meta data. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) @@ -189,47 +191,47 @@ int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSe * @param pTableMeta(output, table meta data, NEED to free it by calller) * @return error code */ -int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); - -int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp *rspMsg); +int32_t catalogGetSTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, + STableMeta** pTableMeta); +int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg); /** - * Force refresh DB's local cached vgroup info. + * Force refresh DB's local cached vgroup info. * @param pCtg (input, got with catalogGetHandle) * @param pTrans (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param dbFName (input, db full name) * @return error code */ -int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName); +int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName); -int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables); +int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, SArray* pTables); /** - * Force refresh a table's local cached meta data. + * Force refresh a table's local cached meta data. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name) - * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) + * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ -int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable); +int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, + int32_t isSTable); /** - * Force refresh a table's local cached meta data and get the new one. + * Force refresh a table's local cached meta data and get the new one. * @param pCatalog (input, got with catalogGetHandle) * @param pTransporter (input, rpc object) * @param pMgmtEps (input, mnode EPs) * @param pTableName (input, table name) - * @param pTableMeta(output, table meta data, NEED to free it by calller) - * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) + * @param pTableMeta(output, table meta data, NEED to free it by calller) + * @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure) * @return error code */ -int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); - - +int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, + const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable); /** * Get a table's actual vgroup, for stable it's all possible vgroup list. @@ -240,7 +242,8 @@ int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList); +int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, + const SName* pTableName, SArray** pVgroupList); /** * Get a table's vgroup from its name's hash value. @@ -251,8 +254,8 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const * @param vgInfo (output, vgroup info) * @return error code */ -int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo); - +int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SName* pName, + SVgroupInfo* vgInfo); /** * Get all meta data required in pReq. @@ -261,36 +264,40 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const * @param pMgmtEps (input, mnode EPs) * @param pReq (input, reqest info) * @param pRsp (output, response data) - * @return error code + * @return error code */ -int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); +int32_t catalogGetAllMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, + SMetaData* pRsp); -int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId); +int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, uint64_t reqId, + const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId); -int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); +int32_t catalogGetQnodeList(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); -int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); +int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion** stables, uint32_t* num); -int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num); +int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion** dbs, uint32_t* num); -int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_t *num); +int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num); -int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg); +int32_t catalogGetDBCfg(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg); -int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo); +int32_t catalogGetIndexMeta(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* indexName, + SIndexInfo* pInfo); -int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes); +int32_t catalogGetTableIndex(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName, + SArray** pRes); -int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); +int32_t catalogGetUdfInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo); -int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass); +int32_t catalogChkAuth(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, + AUTH_TYPE type, bool* pass); int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); -int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet); - -int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate); +int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet* epSet); +int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate); /** * Destroy catalog and relase all resources diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 90cbb7a2e6..1a3bfdbb04 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -62,6 +62,7 @@ typedef struct SScanLogicNode { int64_t watermark; int16_t tsColId; double filesFactor; + SArray* pSmaIndexes; } SScanLogicNode; typedef struct SJoinLogicNode { @@ -303,7 +304,7 @@ typedef struct SDownstreamSourceNode { typedef struct SExchangePhysiNode { SPhysiNode node; - int32_t srcGroupId; // group id of datasource suplans + int32_t srcGroupId; // group id of datasource suplans bool singleChannel; SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhysiNode; @@ -437,7 +438,6 @@ typedef struct SQueryPlan { int32_t numOfSubplans; SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0. SExplainInfo explainInfo; - SArray* pPlaceholderValues; } SQueryPlan; void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext); diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index c2aa86e89f..33cc4a9c86 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -144,6 +144,7 @@ typedef struct SRealTableNode { SVgroupsInfo* pVgroupList; char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES double ratio; + SArray* pSmaIndexes; } SRealTableNode; typedef struct STempTableNode { diff --git a/include/util/tjson.h b/include/util/tjson.h index 84f7b81726..df433227ca 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -17,16 +17,17 @@ #define _TD_UTIL_JSON_H_ #include "os.h" +#include "tarray.h" #ifdef __cplusplus extern "C" { #endif -#define tjsonGetNumberValue(pJson, pName, val, code) \ - do { \ - uint64_t _tmp = 0; \ +#define tjsonGetNumberValue(pJson, pName, val, code) \ + do { \ + uint64_t _tmp = 0; \ code = tjsonGetBigIntValue(pJson, pName, &_tmp); \ - val = _tmp; \ + val = _tmp; \ } while (0) typedef void SJson; @@ -66,18 +67,20 @@ typedef int32_t (*FToJson)(const void* pObj, SJson* pJson); int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj); int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj); int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize, int32_t num); +int32_t tjsonAddTArray(SJson* pJson, const char* pName, FToJson func, const SArray* pArray); typedef int32_t (*FToObject)(const SJson* pJson, void* pObj); int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj); int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, void** pObj, int32_t objSize); int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize); +int32_t tjsonToTArray(const SJson* pJson, const char* pName, FToObject func, SArray** pArray, int32_t itemSize); char* tjsonToString(const SJson* pJson); char* tjsonToUnformattedString(const SJson* pJson); -SJson* tjsonParse(const char* pStr); -bool tjsonValidateJson(const char* pJson); +SJson* tjsonParse(const char* pStr); +bool tjsonValidateJson(const char* pJson); const char* tjsonGetError(); #ifdef __cplusplus diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 502949f191..de38e3cb77 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -13,23 +13,22 @@ * along with this program. If not, see . */ -#include "trpc.h" -#include "query.h" -#include "tname.h" #include "catalogInt.h" +#include "query.h" #include "systable.h" +#include "tname.h" #include "tref.h" +#include "trpc.h" SCatalogMgmt gCtgMgmt = {0}; - int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) { - int32_t code = 0; - STableMeta *tblMeta = NULL; + int32_t code = 0; + STableMeta* tblMeta = NULL; SCtgTbMetaCtx tbCtx = {0}; tbCtx.flag = CTG_FLAG_UNKNOWN_STB; tbCtx.pName = pTableName; - + CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta)); if (NULL == tblMeta) { @@ -39,13 +38,13 @@ int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pTableName, dbFName); - + if (TSDB_SUPER_TABLE == tblMeta->tableType) { CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq)); } else { CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq)); } - + _return: taosMemoryFreeClear(tblMeta); @@ -53,7 +52,8 @@ _return: CTG_RET(code); } -int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) { +int32_t ctgGetDBVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, + SDBVgInfo** pInfo) { int32_t code = 0; CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache)); @@ -62,7 +62,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, con return TSDB_CODE_SUCCESS; } - SUseDbOutput DbOut = {0}; + SUseDbOutput DbOut = {0}; SBuildUseDBInput input = {0}; tstrncpy(input.db, dbFName, tListLen(input.db)); @@ -80,17 +80,17 @@ _return: taosMemoryFreeClear(*pInfo); *pInfo = DbOut.dbVgroup; - + CTG_RET(code); } -int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) { - int32_t code = 0; +int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName) { + int32_t code = 0; SCtgDBCache* dbCache = NULL; CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); - SUseDbOutput DbOut = {0}; + SUseDbOutput DbOut = {0}; SBuildUseDBInput input = {0}; tstrncpy(input.db, dbFName, tListLen(input.db)); @@ -100,7 +100,7 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, ctgReleaseVgInfo(dbCache); ctgReleaseDBCache(pCtg, dbCache); } - + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.numOfTable = 0; @@ -119,18 +119,16 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, return TSDB_CODE_SUCCESS; } - - -int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOutput, bool syncReq) { +int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput** pOutput, bool syncReq) { SVgroupInfo vgroupInfo = {0}; - int32_t code = 0; + int32_t code = 0; if (!CTG_FLAG_IS_SYS_DB(ctx->flag)) { CTG_ERR_RET(catalogGetTableHashVgroup(CTG_PARAMS_LIST(), ctx->pName, &vgroupInfo)); } STableMetaOutput moutput = {0}; - STableMetaOutput *output = taosMemoryCalloc(1, sizeof(STableMetaOutput)); + STableMetaOutput* output = taosMemoryCalloc(1, sizeof(STableMetaOutput)); if (NULL == output) { ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); @@ -139,7 +137,8 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut if (CTG_FLAG_IS_SYS_DB(ctx->flag)) { ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(ctx->pName)); - CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), (char *)ctx->pName->dbname, (char *)ctx->pName->tname, output, NULL)); + CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), (char*)ctx->pName->dbname, (char*)ctx->pName->tname, + output, NULL)); } else if (CTG_FLAG_IS_STB(ctx->flag)) { ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(ctx->pName)); @@ -150,7 +149,8 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut CTG_ERR_JRET(ctgGetTbMetaFromVnode(CTG_PARAMS_LIST(), ctx->pName, &vgroupInfo, output, NULL)); } } else { - ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); + ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), + ctx->flag); // if get from vnode failed or no table meta, will not try mnode CTG_ERR_JRET(ctgGetTbMetaFromVnode(CTG_PARAMS_LIST(), ctx->pName, &vgroupInfo, output, NULL)); @@ -159,7 +159,7 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(ctx->pName)); taosMemoryFreeClear(output->tbMeta); - + CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), output->dbFName, output->tbName, output, NULL)); } else if (CTG_IS_META_BOTH(output->metaType)) { int32_t exist = 0; @@ -173,14 +173,14 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut if (CTG_IS_META_NULL(moutput.metaType)) { SET_META_TYPE_NULL(output->metaType); } - + taosMemoryFreeClear(output->tbMeta); output->tbMeta = moutput.tbMeta; moutput.tbMeta = NULL; } else { taosMemoryFreeClear(output->tbMeta); - - SET_META_TYPE_CTABLE(output->metaType); + + SET_META_TYPE_CTABLE(output->metaType); } } } @@ -192,9 +192,11 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut } if (CTG_IS_META_TABLE(output->metaType)) { - ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType); + ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, + output->tbMeta->tableType); } else { - ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType)); + ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, + output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType)); } if (pOutput) { @@ -209,7 +211,7 @@ _return: taosMemoryFreeClear(output->tbMeta); taosMemoryFreeClear(output); - + CTG_RET(code); } @@ -221,7 +223,8 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta)); if (*pTableMeta) { - if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) { + if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) && + ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) { return TSDB_CODE_SUCCESS; } @@ -231,14 +234,13 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) { CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType); } - + return TSDB_CODE_SUCCESS; } - int32_t ctgGetTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { - int32_t code = 0; - STableMetaOutput *output = NULL; + int32_t code = 0; + STableMetaOutput* output = NULL; CTG_ERR_RET(ctgGetTbMetaFromCache(CTG_PARAMS_LIST(), ctx, pTableMeta)); if (*pTableMeta) { @@ -255,7 +257,7 @@ int32_t ctgGetTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { if (CTG_IS_META_BOTH(output->metaType)) { memcpy(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta)); - + *pTableMeta = output->tbMeta; goto _return; } @@ -275,7 +277,7 @@ int32_t ctgGetTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { SCtgTbMetaCtx stbCtx = {0}; stbCtx.flag = ctx->flag; stbCtx.pName = &stbName; - + CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, pTableMeta)); if (NULL == *pTableMeta) { ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, ctx->pName->tname); @@ -314,43 +316,43 @@ _return: CTG_RET(code); } -int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp *rspMsg, bool syncOp) { - STableMetaOutput *output = taosMemoryCalloc(1, sizeof(STableMetaOutput)); +int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) { + STableMetaOutput* output = taosMemoryCalloc(1, sizeof(STableMetaOutput)); if (NULL == output) { ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } - + int32_t code = 0; strcpy(output->dbFName, rspMsg->dbFName); strcpy(output->tbName, rspMsg->tbName); output->dbId = rspMsg->dbId; - + SET_META_TYPE_TABLE(output->metaType); - + CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta)); CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncOp)); return TSDB_CODE_SUCCESS; - + _return: taosMemoryFreeClear(output->tbMeta); taosMemoryFreeClear(output); - + CTG_RET(code); } - -int32_t ctgChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) { - bool inCache = false; +int32_t ctgChkAuth(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName, + AUTH_TYPE type, bool* pass) { + bool inCache = false; int32_t code = 0; - + *pass = false; - + CTG_ERR_RET(ctgChkAuthFromCache(pCtg, user, dbFName, type, &inCache, pass)); if (inCache) { @@ -359,7 +361,7 @@ int32_t ctgChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const c SGetUserAuthRsp authRsp = {0}; CTG_ERR_RET(ctgGetUserDbAuthFromMnode(CTG_PARAMS_LIST(), user, &authRsp, NULL)); - + if (authRsp.superAuth) { *pass = true; goto _return; @@ -383,25 +385,25 @@ _return: return TSDB_CODE_SUCCESS; } -int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SName* pTableName, SArray** pVgList) { - STableMeta *tbMeta = NULL; - int32_t code = 0; - SVgroupInfo vgroupInfo = {0}; - SCtgDBCache* dbCache = NULL; - SArray *vgList = NULL; - SDBVgInfo *vgInfo = NULL; +int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, SName* pTableName, SArray** pVgList) { + STableMeta* tbMeta = NULL; + int32_t code = 0; + SVgroupInfo vgroupInfo = {0}; + SCtgDBCache* dbCache = NULL; + SArray* vgList = NULL; + SDBVgInfo* vgInfo = NULL; SCtgTbMetaCtx ctx = {0}; ctx.pName = pTableName; ctx.flag = CTG_FLAG_UNKNOWN_STB; *pVgList = NULL; - + CTG_ERR_JRET(ctgGetTbMeta(CTG_PARAMS_LIST(), &ctx, &tbMeta)); char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); - SHashObj *vgHash = NULL; + SHashObj* vgHash = NULL; CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, &dbCache, &vgInfo)); if (dbCache) { @@ -416,7 +418,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, // USE HASH METHOD INSTEAD OF VGID IN TBMETA ctgError("invalid method to get none stb vgInfo, tbType:%d", tbMeta->tableType); CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); - + #if 0 int32_t vgId = tbMeta->vgId; if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) { @@ -437,7 +439,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, *pVgList = vgList; vgList = NULL; -#endif +#endif } _return: @@ -462,7 +464,7 @@ _return: CTG_RET(code); } -int32_t catalogInit(SCatalogCfg *cfg) { +int32_t catalogInit(SCatalogCfg* cfg) { if (gCtgMgmt.pCluster) { qError("catalog already initialized"); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -495,7 +497,8 @@ int32_t catalogInit(SCatalogCfg *cfg) { gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND; } - gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), + false, HASH_ENTRY_LOCK); if (NULL == gCtgMgmt.pCluster) { qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -505,7 +508,7 @@ int32_t catalogInit(SCatalogCfg *cfg) { qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR); } - + gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode)); if (NULL == gCtgMgmt.queue.head) { qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); @@ -521,7 +524,8 @@ int32_t catalogInit(SCatalogCfg *cfg) { CTG_ERR_RET(ctgStartUpdateThread()); - qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec); + qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, + gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec); return TSDB_CODE_SUCCESS; } @@ -532,19 +536,19 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { } if (NULL == gCtgMgmt.pCluster) { - qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId); + qError("catalog cluster cache are not ready, clusterId:%" PRIx64, clusterId); CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY); } - int32_t code = 0; - SCatalog *clusterCtg = NULL; + int32_t code = 0; + SCatalog* clusterCtg = NULL; while (true) { - SCatalog **ctg = (SCatalog **)taosHashGet(gCtgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId)); + SCatalog** ctg = (SCatalog**)taosHashGet(gCtgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId)); if (ctg && (*ctg)) { *catalogHandle = *ctg; - qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg); + qDebug("got catalog handle from cache, clusterId:%" PRIx64 ", CTG:%p", clusterId, *ctg); return TSDB_CODE_SUCCESS; } @@ -559,7 +563,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB)); CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE)); - clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), + false, HASH_ENTRY_LOCK); if (NULL == clusterCtg->dbCache) { qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); @@ -571,12 +576,12 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { ctgFreeHandle(clusterCtg); continue; } - - qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId); + + qError("taosHashPut CTG to cache failed, clusterId:%" PRIx64, clusterId); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg); + qDebug("add CTG to cache, clusterId:%" PRIx64 ", CTG:%p", clusterId, clusterCtg); break; } @@ -584,13 +589,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { *catalogHandle = clusterCtg; CTG_CACHE_STAT_INC(clusterNum, 1); - + return TSDB_CODE_SUCCESS; _return: ctgFreeHandle(clusterCtg); - + CTG_RET(code); } @@ -600,28 +605,28 @@ void catalogFreeHandle(SCatalog* pCtg) { } if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) { - ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId); + ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%" PRIx64, pCtg->clusterId); return; } CTG_CACHE_STAT_DEC(clusterNum, 1); uint64_t clusterId = pCtg->clusterId; - + ctgFreeHandle(pCtg); - - ctgInfo("handle freed, culsterId:%"PRIx64, clusterId); + + ctgInfo("handle freed, culsterId:%" PRIx64, clusterId); } -int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum) { +int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum) { CTG_API_ENTER(); if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - SCtgDBCache *dbCache = NULL; - int32_t code = 0; + SCtgDBCache* dbCache = NULL; + int32_t code = 0; CTG_ERR_JRET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); if (NULL == dbCache) { @@ -645,7 +650,8 @@ _return: CTG_API_LEAVE(code); } -int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName, SArray** vgroupList) { +int32_t catalogGetDBVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName, + SArray** vgroupList) { CTG_API_ENTER(); if (NULL == pCtg || NULL == dbFName || NULL == pTrans || NULL == pMgmtEps || NULL == vgroupList) { @@ -653,10 +659,10 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, } SCtgDBCache* dbCache = NULL; - int32_t code = 0; - SArray *vgList = NULL; - SHashObj *vgHash = NULL; - SDBVgInfo *vgInfo = NULL; + int32_t code = 0; + SArray* vgList = NULL; + SHashObj* vgHash = NULL; + SDBVgInfo* vgInfo = NULL; CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName, &dbCache, &vgInfo)); if (dbCache) { vgHash = dbCache->vgInfo->vgHash; @@ -681,15 +687,14 @@ _return: taosMemoryFreeClear(vgInfo); } - CTG_API_LEAVE(code); + CTG_API_LEAVE(code); } - int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) { CTG_API_ENTER(); int32_t code = 0; - + if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) { ctgFreeVgInfo(dbInfo); CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); @@ -702,12 +707,11 @@ _return: CTG_API_LEAVE(code); } - int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) { CTG_API_ENTER(); int32_t code = 0; - + if (NULL == pCtg || NULL == dbFName) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -719,17 +723,17 @@ int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) { CTG_ERR_JRET(ctgDropDbCacheEnqueue(pCtg, dbFName, dbId)); CTG_API_LEAVE(TSDB_CODE_SUCCESS); - + _return: CTG_API_LEAVE(code); } -int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) { +int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet* epSet) { CTG_API_ENTER(); int32_t code = 0; - + if (NULL == pCtg || NULL == dbFName || NULL == epSet) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -737,7 +741,7 @@ int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, CTG_ERR_JRET(ctgUpdateVgEpsetEnqueue(pCtg, (char*)dbFName, vgId, epSet)); _return: - + CTG_API_LEAVE(code); } @@ -745,7 +749,7 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { CTG_API_ENTER(); int32_t code = 0; - + if (NULL == pCtg || NULL == pTableName) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -757,16 +761,15 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { CTG_ERR_JRET(ctgRemoveTbMetaFromCache(pCtg, pTableName, true)); _return: - + CTG_API_LEAVE(code); } - int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) { CTG_API_ENTER(); int32_t code = 0; - + if (NULL == pCtg || NULL == dbFName || NULL == stbName) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -778,23 +781,25 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, dbId, stbName, suid, true)); CTG_API_LEAVE(TSDB_CODE_SUCCESS); - + _return: CTG_API_LEAVE(code); } -int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { +int32_t catalogGetTableMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName, + STableMeta** pTableMeta) { CTG_API_ENTER(); SCtgTbMetaCtx ctx = {0}; ctx.pName = (SName*)pTableName; ctx.flag = CTG_FLAG_UNKNOWN_STB; - + CTG_API_LEAVE(ctgGetTbMeta(pCtg, pTrans, pMgmtEps, &ctx, pTableMeta)); } -int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { +int32_t catalogGetSTableMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName, + STableMeta** pTableMeta) { CTG_API_ENTER(); SCtgTbMetaCtx ctx = {0}; @@ -804,7 +809,7 @@ int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTrans, const SEpSet* pMgmtE CTG_API_LEAVE(ctgGetTbMeta(CTG_PARAMS_LIST(), &ctx, pTableMeta)); } -int32_t catalogUpdateTableMeta(SCatalog* pCtg, STableMetaRsp *pMsg) { +int32_t catalogUpdateTableMeta(SCatalog* pCtg, STableMetaRsp* pMsg) { CTG_API_ENTER(); if (NULL == pCtg || NULL == pMsg) { @@ -813,20 +818,20 @@ int32_t catalogUpdateTableMeta(SCatalog* pCtg, STableMetaRsp *pMsg) { int32_t code = 0; CTG_ERR_JRET(ctgUpdateTbMeta(pCtg, pMsg, true)); - + _return: - + CTG_API_LEAVE(code); } -int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) { +int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, SArray* pTables) { CTG_API_ENTER(); if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTables) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - SName name; + SName name; int32_t sver = 0; int32_t tver = 0; int32_t tbNum = taosArrayGetSize(pTables); @@ -835,7 +840,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm if (NULL == pTb->tbFName || 0 == pTb->tbFName[0]) { continue; } - + tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); if (CTG_IS_SYS_DBNAME(name.dbname)) { @@ -868,8 +873,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - -int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) { +int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName) { CTG_API_ENTER(); if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == dbFName) { @@ -879,7 +883,8 @@ int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmt CTG_API_LEAVE(ctgRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName)); } -int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { +int32_t catalogRefreshTableMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName, + int32_t isSTable) { CTG_API_ENTER(); if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) { @@ -893,7 +898,8 @@ int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm CTG_API_LEAVE(ctgRefreshTbMeta(CTG_PARAMS_LIST(), &ctx, NULL, true)); } -int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { +int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName, + STableMeta** pTableMeta, int32_t isSTable) { CTG_API_ENTER(); SCtgTbMetaCtx ctx = {0}; @@ -903,7 +909,8 @@ int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* p CTG_API_LEAVE(ctgGetTbMeta(CTG_PARAMS_LIST(), &ctx, pTableMeta)); } -int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) { +int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName, + SArray** pVgList) { CTG_API_ENTER(); if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) { @@ -918,8 +925,8 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pM CTG_API_LEAVE(ctgGetTbDistVgInfo(pCtg, pTrans, pMgmtEps, (SName*)pTableName, pVgList)); } - -int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) { +int32_t catalogGetTableHashVgroup(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName, + SVgroupInfo* pVgroup) { CTG_API_ENTER(); if (CTG_IS_SYS_DBNAME(pTableName->dbname)) { @@ -928,11 +935,11 @@ int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pM } SCtgDBCache* dbCache = NULL; - int32_t code = 0; - char db[TSDB_DB_FNAME_LEN] = {0}; + int32_t code = 0; + char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); - SDBVgInfo *vgInfo = NULL; + SDBVgInfo* vgInfo = NULL; CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, &dbCache, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup)); @@ -952,8 +959,8 @@ _return: CTG_API_LEAVE(code); } - -int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) { +int32_t catalogGetAllMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SCatalogReq* pReq, + SMetaData* pRsp) { CTG_API_ENTER(); if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) { @@ -975,14 +982,14 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, ctgError("taosArrayInit %d failed", tbNum); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - + for (int32_t i = 0; i < tbNum; ++i) { - SName *name = taosArrayGet(pReq->pTableMeta, i); - STableMeta *pTableMeta = NULL; + SName* name = taosArrayGet(pReq->pTableMeta, i); + STableMeta* pTableMeta = NULL; SCtgTbMetaCtx ctx = {0}; ctx.pName = name; ctx.flag = CTG_FLAG_UNKNOWN_STB; - + CTG_ERR_JRET(ctgGetTbMeta(CTG_PARAMS_LIST(), &ctx, &pTableMeta)); if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) { @@ -1000,31 +1007,32 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, CTG_API_LEAVE(TSDB_CODE_SUCCESS); -_return: +_return: if (pRsp->pTableMeta) { int32_t aSize = taosArrayGetSize(pRsp->pTableMeta); for (int32_t i = 0; i < aSize; ++i) { - STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i); + STableMeta* pMeta = taosArrayGetP(pRsp->pTableMeta, i); taosMemoryFreeClear(pMeta); } - + taosArrayDestroy(pRsp->pTableMeta); pRsp->pTableMeta = NULL; } - + CTG_API_LEAVE(code); } -int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) { +int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, uint64_t reqId, + const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) { CTG_API_ENTER(); if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == fp || NULL == param) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - int32_t code = 0, taskNum = 0; - SCtgJob *pJob = NULL; + int32_t code = 0, taskNum = 0; + SCtgJob* pJob = NULL; CTG_ERR_JRET(ctgInitJob(CTG_PARAMS_LIST(), &pJob, reqId, pReq, fp, param, &taskNum)); if (taskNum <= 0) { SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData)); @@ -1035,8 +1043,8 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmt CTG_ERR_JRET(ctgLaunchJob(pJob)); // NOTE: here the assignment of jobId is invalid, may over-write the true scheduler created query job. -// *jobId = pJob->refId; - + // *jobId = pJob->refId; + _return: if (pJob) { taosReleaseRef(gCtgMgmt.jobPool, pJob->refId); @@ -1045,15 +1053,15 @@ _return: taosRemoveRef(gCtgMgmt.jobPool, pJob->refId); } } - + CTG_API_LEAVE(code); } -int32_t catalogGetQnodeList(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pQnodeList) { +int32_t catalogGetQnodeList(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, SArray* pQnodeList) { CTG_API_ENTER(); - + int32_t code = 0; - if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pQnodeList) { + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pQnodeList) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1064,29 +1072,29 @@ _return: CTG_API_LEAVE(TSDB_CODE_SUCCESS); } -int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion **stables, uint32_t *num) { +int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion** stables, uint32_t* num) { CTG_API_ENTER(); if (NULL == pCtg || NULL == stables || NULL == num) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion))); + CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void**)stables, num, sizeof(SSTableMetaVersion))); } -int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) { +int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion** dbs, uint32_t* num) { CTG_API_ENTER(); - + if (NULL == pCtg || NULL == dbs || NULL == num) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); + CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void**)dbs, num, sizeof(SDbVgVersion))); } -int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_t *num) { +int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion** users, uint32_t* num) { CTG_API_ENTER(); - + if (NULL == pCtg || NULL == users || NULL == num) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1100,11 +1108,11 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_ } } - uint32_t i = 0; - SCtgUserAuth *pAuth = taosHashIterate(pCtg->userCache, NULL); + uint32_t i = 0; + SCtgUserAuth* pAuth = taosHashIterate(pCtg->userCache, NULL); while (pAuth != NULL) { size_t len = 0; - void *key = taosHashGetKey(pAuth, &len); + void* key = taosHashGetKey(pAuth, &len); strncpy((*users)[i].user, key, len); (*users)[i].user[len] = 0; (*users)[i].version = pAuth->version; @@ -1115,10 +1123,9 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_ CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - -int32_t catalogGetDBCfg(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) { +int32_t catalogGetDBCfg(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) { CTG_API_ENTER(); - + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == dbFName || NULL == pDbCfg) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1126,9 +1133,10 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, co CTG_API_LEAVE(ctgGetDBCfgFromMnode(CTG_PARAMS_LIST(), dbFName, pDbCfg, NULL)); } -int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo) { +int32_t catalogGetIndexMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* indexName, + SIndexInfo* pInfo) { CTG_API_ENTER(); - + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == indexName || NULL == pInfo) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1136,9 +1144,10 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL)); } -int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes) { +int32_t catalogGetTableIndex(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName, + SArray** pRes) { CTG_API_ENTER(); - + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == pRes) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1146,32 +1155,33 @@ int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEp CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), (SName*)pTableName, pRes, NULL)); } - -int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) { +int32_t catalogGetUdfInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* funcName, + SFuncInfo* pInfo) { CTG_API_ENTER(); - + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == funcName || NULL == pInfo) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } int32_t code = 0; CTG_ERR_JRET(ctgGetUdfInfoFromMnode(CTG_PARAMS_LIST(), funcName, pInfo, NULL)); - + _return: - + CTG_API_LEAVE(code); } -int32_t catalogChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) { +int32_t catalogChkAuth(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName, + AUTH_TYPE type, bool* pass) { CTG_API_ENTER(); - + if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == user || NULL == dbFName || NULL == pass) { CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } int32_t code = 0; CTG_ERR_JRET(ctgChkAuth(CTG_PARAMS_LIST(), user, dbFName, type, pass)); - + _return: CTG_API_LEAVE(code); @@ -1187,10 +1197,9 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) { CTG_API_LEAVE(ctgUpdateUserEnqueue(pCtg, pAuth, false)); } - void catalogDestroy(void) { qInfo("start to destroy catalog"); - + if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) { return; } @@ -1204,21 +1213,21 @@ void catalogDestroy(void) { while (CTG_IS_LOCKED(&gCtgMgmt.lock)) { taosUsleep(1); } - + CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); - SCatalog *pCtg = NULL; - void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); + SCatalog* pCtg = NULL; + void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); while (pIter) { - pCtg = *(SCatalog **)pIter; + pCtg = *(SCatalog**)pIter; if (pCtg) { catalogFreeHandle(pCtg); } - + pIter = taosHashIterate(gCtgMgmt.pCluster, pIter); } - + taosHashCleanup(gCtgMgmt.pCluster); gCtgMgmt.pCluster = NULL; @@ -1226,6 +1235,3 @@ void catalogDestroy(void) { qInfo("catalog destroyed"); } - - - diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 2574528d15..f61a3637ed 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -525,7 +525,7 @@ int32_t ctgDumpTbIndexRes(SCtgTask* pTask) { } SMetaRes res = {.code = pTask->code, .pRes = pTask->res}; - taosArrayPush(pJob->jobRes.pTableHash, &res); + taosArrayPush(pJob->jobRes.pTableIndex, &res); return TSDB_CODE_SUCCESS; } @@ -875,7 +875,9 @@ int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf TSWAP(pTask->res, pTask->msgCtx.out); _return: - + if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) { + code = TSDB_CODE_SUCCESS; + } ctgHandleTaskEnd(pTask, code); CTG_RET(code); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c60cebb95c..f7e65f100a 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2816,10 +2816,85 @@ static int32_t jsonToTableNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkTableIndexInfoIntervalUnit = "IntervalUnit"; +static const char* jkTableIndexInfoSlidingUnit = "SlidingUnit"; +static const char* jkTableIndexInfoInterval = "Interval"; +static const char* jkTableIndexInfoOffset = "Offset"; +static const char* jkTableIndexInfoSliding = "Sliding"; +static const char* jkTableIndexInfoDstTbUid = "DstTbUid"; +static const char* jkTableIndexInfoDstVgId = "DstVgId"; +static const char* jkTableIndexInfoEpSet = "EpSet"; +static const char* jkTableIndexInfoExpr = "Expr"; + +static int32_t tableIndexInfoToJson(const void* pObj, SJson* pJson) { + const STableIndexInfo* pNode = (const STableIndexInfo*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoIntervalUnit, pNode->intervalUnit); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoSlidingUnit, pNode->slidingUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoInterval, pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoOffset, pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoSliding, pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoDstTbUid, pNode->dstTbUid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoDstVgId, pNode->dstVgId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkTableIndexInfoEpSet, epSetToJson, &pNode->epSet); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkTableIndexInfoExpr, pNode->expr); + } + + return code; +} + +static int32_t jsonToTableIndexInfo(const SJson* pJson, void* pObj) { + STableIndexInfo* pNode = (STableIndexInfo*)pObj; + + int32_t code = tjsonGetTinyIntValue(pJson, jkTableIndexInfoIntervalUnit, &pNode->intervalUnit); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkTableIndexInfoSlidingUnit, &pNode->slidingUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableIndexInfoInterval, &pNode->interval); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableIndexInfoOffset, &pNode->offset); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableIndexInfoSliding, &pNode->sliding); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkTableIndexInfoDstTbUid, &pNode->dstTbUid); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkTableIndexInfoDstVgId, &pNode->dstVgId); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkTableIndexInfoEpSet, jsonToEpSet, &pNode->epSet); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonDupStringValue(pJson, jkTableIndexInfoExpr, &pNode->expr); + } + + return code; +} + static const char* jkRealTableMetaSize = "MetaSize"; static const char* jkRealTableMeta = "Meta"; static const char* jkRealTableVgroupsInfoSize = "VgroupsInfoSize"; static const char* jkRealTableVgroupsInfo = "VgroupsInfo"; +static const char* jkRealTableSmaIndexes = "SmaIndexes"; static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) { const SRealTableNode* pNode = (const SRealTableNode*)pObj; @@ -2837,6 +2912,9 @@ static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkRealTableVgroupsInfo, vgroupsInfoToJson, pNode->pVgroupList); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddTArray(pJson, jkRealTableSmaIndexes, tableIndexInfoToJson, pNode->pSmaIndexes); + } return code; } @@ -2858,6 +2936,10 @@ static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonMakeObject(pJson, jkRealTableVgroupsInfo, jsonToVgroupsInfo, (void**)&pNode->pVgroupList, objSize); } + if (TSDB_CODE_SUCCESS == code) { + code = + tjsonToTArray(pJson, jkRealTableSmaIndexes, jsonToTableIndexInfo, &pNode->pSmaIndexes, sizeof(STableIndexInfo)); + } return code; } @@ -4214,6 +4296,7 @@ int32_t nodesStringToList(const char* pStr, SNodeList** pList) { tjsonDelete(pJson); if (TSDB_CODE_SUCCESS != code) { nodesDestroyList(*pList); + *pList = NULL; terrno = code; return code; } diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 0351023f5b..fb67a35368 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -46,6 +46,7 @@ typedef struct SParseMetaCache { SHashObj* pDbInfo; // key is tbFName, element is SDbInfo* SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass SHashObj* pUdf; // key is funcName, element is SFuncInfo* + SHashObj* pTableIndex; // key is tbFName, element is SArray* } SParseMetaCache; int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...); @@ -58,7 +59,7 @@ int32_t getNumOfColumns(const STableMeta* pTableMeta); int32_t getNumOfTags(const STableMeta* pTableMeta); STableComInfo getTableInfo(const STableMeta* pTableMeta); STableMeta* tableMetaDup(const STableMeta* pTableMeta); -int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag **ppTag, SMsgBuf* pMsgBuf); +int32_t parseJsontoTagData(const char* json, SArray* pTagVals, STag** ppTag, SMsgBuf* pMsgBuf); int32_t trimString(const char* src, int32_t len, char* dst, int32_t dlen); @@ -75,6 +76,7 @@ int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pD SParseMetaCache* pMetaCache); int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache); int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache); +int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache); int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta); int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo); int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup); @@ -84,6 +86,7 @@ int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDb int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type, bool* pPass); int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo); +int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes); #ifdef __cplusplus } diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index 26d4b69fa0..cc77b96f5f 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -91,6 +91,7 @@ abort_parse: typedef struct SCollectMetaKeyCxt { SParseContext* pParseCxt; SParseMetaCache* pMetaCache; + SNode* pStmt; } SCollectMetaKeyCxt; static void destroyCollectMetaKeyCxt(SCollectMetaKeyCxt* pCxt) { @@ -114,6 +115,14 @@ static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFu return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR; } +static bool needGetTableIndex(SNode* pStmt) { + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt; + return (NULL != pSelect->pWindow && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow)); + } + return false; +} + static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTableNode* pRealTable, AUTH_TYPE authType) { int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, @@ -129,6 +138,10 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTa if (TSDB_CODE_SUCCESS == code) { code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pCxt->pMetaCache); } + if (TSDB_CODE_SUCCESS == code && needGetTableIndex(pCxt->pStmt)) { + code = reserveTableIndexInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, + pCxt->pMetaCache); + } return code; } @@ -379,6 +392,7 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p } static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { + pCxt->pStmt = pStmt; switch (nodeType(pStmt)) { case QUERY_NODE_SET_OPERATOR: return collectMetaKeyFromSetOperator(pCxt, (SSetOperator*)pStmt); @@ -449,7 +463,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { } int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery) { - SCollectMetaKeyCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache))}; + SCollectMetaKeyCxt cxt = { + .pParseCxt = pParseCxt, .pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache)), .pStmt = pQuery->pRoot}; if (NULL == cxt.pMetaCache) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 538db81588..178cc2595a 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -41,6 +41,7 @@ typedef struct STranslateContext { SHashObj* pTables; SExplainOptions* pExplainOpt; SParseMetaCache* pMetaCache; + bool createStream; } STranslateContext; typedef struct SFullDatabaseName { @@ -255,6 +256,23 @@ static int32_t getUdfInfo(STranslateContext* pCxt, SFunctionNode* pFunc) { return code; } +static int32_t getTableIndex(STranslateContext* pCxt, const SName* pName, SArray** pIndexes) { + SParseContext* pParCxt = pCxt->pParseCxt; + int32_t code = collectUseDatabase(pName, pCxt->pDbs); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(pName, pCxt->pTables); + } + if (pParCxt->async) { + code = getTableIndexFromCache(pCxt->pMetaCache, pName, pIndexes); + } else { + code = catalogGetTableIndex(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pIndexes); + } + if (TSDB_CODE_SUCCESS != code) { + parserError("getTableIndex error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); + } + return code; +} + static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) { pCxt->pParseCxt = pParseCxt; pCxt->errCode = TSDB_CODE_SUCCESS; @@ -329,6 +347,10 @@ static bool isIndefiniteRowsFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsIndefiniteRowsFunc(((SFunctionNode*)pNode)->funcId)); } +static bool isVectorFunc(const SNode* pNode) { + return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsVectorFunc(((SFunctionNode*)pNode)->funcId)); +} + static bool isDistinctOrderBy(STranslateContext* pCxt) { return (SQL_CLAUSE_ORDER_BY == pCxt->currClause && pCxt->pCurrSelectStmt->isDistinct); } @@ -1364,6 +1386,17 @@ static bool isSingleTable(SRealTableNode* pRealTable) { return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType); } +static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) { + if (pCxt->createStream) { + return TSDB_CODE_SUCCESS; + } + if (NULL != pCxt->pCurrSelectStmt && NULL != pCxt->pCurrSelectStmt->pWindow && + QUERY_NODE_INTERVAL_WINDOW == nodeType(pCxt->pCurrSelectStmt->pWindow)) { + return getTableIndex(pCxt, pName, &pRealTable->pSmaIndexes); + } + return TSDB_CODE_SUCCESS; +} + static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pTable)) { @@ -1380,6 +1413,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName); } code = setTableVgroupList(pCxt, &name, pRealTable); + if (TSDB_CODE_SUCCESS == code) { + code = setTableIndex(pCxt, &name, pRealTable); + } } pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision; pRealTable->table.singleTable = isSingleTable(pRealTable); @@ -1803,7 +1839,7 @@ static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni return -1; } -static int32_t checkIntervalWindow(STranslateContext* pCxt, SNode* pWhere, SIntervalWindowNode* pInterval) { +static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision; SValueNode* pInter = (SValueNode*)pInterval->pInterval; @@ -1845,7 +1881,15 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SNode* pWhere, SInte } } - return translateFill(pCxt, pWhere, pInterval); + return TSDB_CODE_SUCCESS; +} + +static int32_t translateIntervalWindow(STranslateContext* pCxt, SSelectStmt* pSelect, SIntervalWindowNode* pInterval) { + int32_t code = checkIntervalWindow(pCxt, pInterval); + if (TSDB_CODE_SUCCESS == code) { + code = translateFill(pCxt, pSelect->pWhere, pInterval); + } + return code; } static EDealRes checkStateExpr(SNode* pNode, void* pContext) { @@ -1867,13 +1911,13 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static int32_t checkStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) { +static int32_t translateStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) { nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt); // todo check for "function not support for state_window" return pCxt->errCode; } -static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) { +static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) { if ('y' == pSession->pGap->unit || 'n' == pSession->pGap->unit || 0 == pSession->pGap->datum.i) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_GAP); } @@ -1884,14 +1928,14 @@ static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* p return TSDB_CODE_SUCCESS; } -static int32_t checkWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { +static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { switch (nodeType(pSelect->pWindow)) { case QUERY_NODE_STATE_WINDOW: - return checkStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow); + return translateStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow); case QUERY_NODE_SESSION_WINDOW: - return checkSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow); + return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow); case QUERY_NODE_INTERVAL_WINDOW: - return checkIntervalWindow(pCxt, pSelect->pWhere, (SIntervalWindowNode*)pSelect->pWindow); + return translateIntervalWindow(pCxt, pSelect, (SIntervalWindowNode*)pSelect->pWindow); default: break; } @@ -1905,7 +1949,7 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { pCxt->currClause = SQL_CLAUSE_WINDOW; int32_t code = translateExpr(pCxt, &pSelect->pWindow); if (TSDB_CODE_SUCCESS == code) { - code = checkWindow(pCxt, pSelect); + code = translateSpecificWindow(pCxt, pSelect); } return code; } @@ -2724,7 +2768,8 @@ typedef struct SSampleAstInfo { STableMeta* pRollupTableMeta; } SSampleAstInfo; -static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, char** pAst, int32_t* pLen) { +static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, char** pAst, int32_t* pLen, char** pExpr, + int32_t* pExprLen) { SSelectStmt* pSelect = nodesMakeNode(QUERY_NODE_SELECT_STMT); if (NULL == pSelect) { return TSDB_CODE_OUT_OF_MEMORY; @@ -2769,10 +2814,14 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch ((SColumnNode*)pInterval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID; strcpy(((SColumnNode*)pInterval->pCol)->colName, PK_TS_COL_INTERNAL_NAME); + pCxt->createStream = true; int32_t code = translateQuery(pCxt, (SNode*)pSelect); if (TSDB_CODE_SUCCESS == code) { code = nodesNodeToString(pSelect, false, pAst, pLen); } + if (TSDB_CODE_SUCCESS == code && NULL != pExpr) { + code = nodesListToString(pSelect->pProjectionList, false, pExpr, pExprLen); + } nodesDestroyNode(pSelect); return code; } @@ -2894,7 +2943,7 @@ static int32_t getRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SR SSampleAstInfo info = {0}; int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info); if (TSDB_CODE_SUCCESS == code) { - code = buildSampleAst(pCxt, &info, pAst, pLen); + code = buildSampleAst(pCxt, &info, pAst, pLen, NULL, NULL); } clearSampleAstInfo(&info); return code; @@ -3152,10 +3201,6 @@ static int32_t getSmaIndexSql(STranslateContext* pCxt, char** pSql, int32_t* pLe return TSDB_CODE_SUCCESS; } -static int32_t getSmaIndexExpr(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pExpr, int32_t* pLen) { - return nodesListToString(pStmt->pOptions->pFuncs, false, pExpr, pLen); -} - static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SSampleAstInfo* pInfo) { pInfo->pDbName = pCxt->pParseCxt->db; pInfo->pTableName = pStmt->tableName; @@ -3171,11 +3216,12 @@ static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexSt return TSDB_CODE_SUCCESS; } -static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) { +static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen, + char** pExpr, int32_t* pExprLen) { SSampleAstInfo info = {0}; int32_t code = buildSampleAstInfoByIndex(pCxt, pStmt, &info); if (TSDB_CODE_SUCCESS == code) { - code = buildSampleAst(pCxt, &info, pAst, pLen); + code = buildSampleAst(pCxt, &info, pAst, pLen, pExpr, pExprLen); } clearSampleAstInfo(&info); return code; @@ -3201,10 +3247,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen); } if (TSDB_CODE_SUCCESS == code) { - code = getSmaIndexExpr(pCxt, pStmt, &pReq->expr, &pReq->exprLen); - } - if (TSDB_CODE_SUCCESS == code) { - code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen); + code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen); } return code; @@ -3487,6 +3530,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* tNameExtractFullName(&name, pReq->targetStbFullName); } + pCxt->createStream = true; int32_t code = translateQuery(pCxt, pStmt->pQuery); if (TSDB_CODE_SUCCESS == code) { getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 29443876a6..1e5a6681ee 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -542,6 +542,9 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog if (TSDB_CODE_SUCCESS == code) { code = buildUdfReq(pMetaCache->pUdf, &pCatalogReq->pUdf); } + if (TSDB_CODE_SUCCESS == code) { + code = buildTableReq(pMetaCache->pTableIndex, &pCatalogReq->pTableIndex); + } return code; } @@ -628,6 +631,9 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet if (TSDB_CODE_SUCCESS == code) { code = putUdfToCache(pCatalogReq->pUdf, pMetaData->pUdfList, pMetaCache->pUdf); } + if (TSDB_CODE_SUCCESS == code) { + code = putTableDataToCache(pCatalogReq->pTableIndex, pMetaData->pTableIndex, pMetaCache->pTableIndex); + } return code; } @@ -806,3 +812,43 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun } return code; } + +static void destroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); } + +static SArray* smaIndexesDup(SArray* pSrc) { + SArray* pDst = taosArrayDup(pSrc); + if (NULL == pDst) { + return NULL; + } + int32_t size = taosArrayGetSize(pDst); + for (int32_t i = 0; i < size; ++i) { + ((STableIndexInfo*)taosArrayGet(pDst, i))->expr = NULL; + } + for (int32_t i = 0; i < size; ++i) { + STableIndexInfo* pIndex = taosArrayGet(pDst, i); + pIndex->expr = taosMemoryStrDup(((STableIndexInfo*)taosArrayGet(pSrc, i))->expr); + if (NULL == pIndex->expr) { + taosArrayDestroyEx(pDst, destroySmaIndex); + return NULL; + } + } + return pDst; +} + +int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) { + return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableIndex); +} + +int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes) { + char fullName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pName, fullName); + SArray* pSmaIndexes = NULL; + int32_t code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pTableIndex, (void**)&pSmaIndexes); + if (TSDB_CODE_SUCCESS == code && NULL != pSmaIndexes) { + *pIndexes = smaIndexesDup(pSmaIndexes); + if (NULL == *pIndexes) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + return code; +} diff --git a/source/libs/parser/test/mockCatalog.cpp b/source/libs/parser/test/mockCatalog.cpp index c7f0990132..0b2aacf5ef 100644 --- a/source/libs/parser/test/mockCatalog.cpp +++ b/source/libs/parser/test/mockCatalog.cpp @@ -214,6 +214,11 @@ int32_t __catalogRefreshGetTableMeta(SCatalog* pCatalog, void* pTransporter, con int32_t __catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { return 0; } +int32_t __catalogGetTableIndex(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pName, + SArray** pRes) { + return g_mockCatalogService->catalogGetTableIndex(pName, pRes); +} + void initMetaDataEnv() { g_mockCatalogService.reset(new MockCatalogService()); @@ -230,6 +235,7 @@ void initMetaDataEnv() { stub.set(catalogGetUdfInfo, __catalogGetUdfInfo); stub.set(catalogRefreshGetTableMeta, __catalogRefreshGetTableMeta); stub.set(catalogRemoveTableMeta, __catalogRemoveTableMeta); + stub.set(catalogGetTableIndex, __catalogGetTableIndex); // { // AddrAny any("libcatalog.so"); // std::map result; diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 0c37c875c0..57d47b8a48 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -149,6 +149,20 @@ class MockCatalogServiceImpl { return TSDB_CODE_SUCCESS; } + int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const { + char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; + tNameExtractFullName(pTableName, tbFName); + auto it = index_.find(tbFName); + if (index_.end() == it) { + return TSDB_CODE_SUCCESS; + } + *pIndexes = taosArrayInit(it->second.size(), sizeof(STableIndexInfo)); + for (const auto& index : it->second) { + taosArrayPush(*pIndexes, &index); + } + return TSDB_CODE_SUCCESS; + } + int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const { int32_t code = getAllTableMeta(pCatalogReq->pTableMeta, &pMetaData->pTableMeta); if (TSDB_CODE_SUCCESS == code) { @@ -169,6 +183,9 @@ class MockCatalogServiceImpl { if (TSDB_CODE_SUCCESS == code) { code = getAllUdf(pCatalogReq->pUdf, &pMetaData->pUdfList); } + if (TSDB_CODE_SUCCESS == code) { + code = getAllTableIndex(pCatalogReq->pTableIndex, &pMetaData->pTableIndex); + } return code; } @@ -176,7 +193,7 @@ class MockCatalogServiceImpl { int32_t numOfColumns, int32_t numOfTags) { builder_ = TableBuilder::createTableBuilder(tableType, numOfColumns, numOfTags); meta_[db][tbname] = builder_->table(); - meta_[db][tbname]->schema->uid = id_++; + meta_[db][tbname]->schema->uid = getNextId(); return *(builder_.get()); } @@ -187,14 +204,11 @@ class MockCatalogServiceImpl { } meta_[db][tbname].reset(new MockTableMeta()); meta_[db][tbname]->schema = table.release(); - meta_[db][tbname]->schema->uid = id_++; + meta_[db][tbname]->schema->uid = getNextId(); meta_[db][tbname]->schema->tableType = TSDB_CHILD_TABLE; SVgroupInfo vgroup = {vgid, 0, 0, {0}, 0}; - addEpIntoEpSet(&vgroup.epSet, "dnode_1", 6030); - addEpIntoEpSet(&vgroup.epSet, "dnode_2", 6030); - addEpIntoEpSet(&vgroup.epSet, "dnode_3", 6030); - vgroup.epSet.inUse = 0; + genEpSet(&vgroup.epSet); meta_[db][tbname]->vgs.emplace_back(vgroup); // super table @@ -268,10 +282,39 @@ class MockCatalogServiceImpl { udf_.insert(std::make_pair(func, info)); } + void createSmaIndex(const SMCreateSmaReq* pReq) { + STableIndexInfo info; + info.intervalUnit = pReq->intervalUnit; + info.slidingUnit = pReq->slidingUnit; + info.interval = pReq->interval; + info.offset = pReq->offset; + info.sliding = pReq->sliding; + info.dstTbUid = getNextId(); + info.dstVgId = pReq->dstVgId; + genEpSet(&info.epSet); + info.expr = strdup(pReq->expr); + auto it = index_.find(pReq->stb); + if (index_.end() == it) { + index_.insert(std::make_pair(std::string(pReq->stb), std::vector{info})); + } else { + it->second.push_back(info); + } + } + private: typedef std::map> TableMetaCache; typedef std::map DbMetaCache; typedef std::map> UdfMetaCache; + typedef std::map> IndexMetaCache; + + uint64_t getNextId() { return id_++; } + + void genEpSet(SEpSet* pEpSet) { + addEpIntoEpSet(pEpSet, "dnode_1", 6030); + addEpIntoEpSet(pEpSet, "dnode_2", 6030); + addEpIntoEpSet(pEpSet, "dnode_3", 6030); + pEpSet->inUse = 0; + } std::string toDbname(const std::string& dbFullName) const { std::string::size_type n = dbFullName.find("."); @@ -463,10 +506,24 @@ class MockCatalogServiceImpl { return TSDB_CODE_SUCCESS; } + int32_t getAllTableIndex(SArray* pTableIndex, SArray** pTableIndexData) const { + if (NULL != pTableIndex) { + int32_t num = taosArrayGetSize(pTableIndex); + *pTableIndexData = taosArrayInit(num, sizeof(SMetaRes)); + for (int32_t i = 0; i < num; ++i) { + SMetaRes res = {0}; + res.code = catalogGetTableIndex((const SName*)taosArrayGet(pTableIndex, i), (SArray**)(&res.pRes)); + taosArrayPush(*pTableIndexData, &res); + } + } + return TSDB_CODE_SUCCESS; + } + uint64_t id_; std::unique_ptr builder_; DbMetaCache meta_; UdfMetaCache udf_; + IndexMetaCache index_; }; MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {} @@ -490,6 +547,8 @@ void MockCatalogService::createFunction(const std::string& func, int8_t funcType impl_->createFunction(func, funcType, outputType, outputLen, bufSize); } +void MockCatalogService::createSmaIndex(const SMCreateSmaReq* pReq) { impl_->createSmaIndex(pReq); } + int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const { return impl_->catalogGetTableMeta(pTableName, pTableMeta); } @@ -510,6 +569,10 @@ int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFunc return impl_->catalogGetUdfInfo(funcName, pInfo); } +int32_t MockCatalogService::catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const { + return impl_->catalogGetTableIndex(pTableName, pIndexes); +} + int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const { return impl_->catalogGetAllMeta(pCatalogReq, pMetaData); } diff --git a/source/libs/parser/test/mockCatalogService.h b/source/libs/parser/test/mockCatalogService.h index 133a355c59..c4ab091b7a 100644 --- a/source/libs/parser/test/mockCatalogService.h +++ b/source/libs/parser/test/mockCatalogService.h @@ -57,12 +57,14 @@ class MockCatalogService { void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid); void showTables() const; void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize); + void createSmaIndex(const SMCreateSmaReq* pReq); int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const; int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const; int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const; int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const; int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const; + int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const; int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const; private: diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index 1a8c7657df..e0728c8654 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -36,11 +36,13 @@ extern "C" { #define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__) int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...); -int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList); +int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList); +int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList); +int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew); -int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode); -int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode); -int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan); +int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan); +int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); +int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan); int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan); int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a9f3909af6..d67b194192 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -220,6 +220,7 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT } TSWAP(pScan->pVgroupList, pRealTable->pVgroupList); + TSWAP(pScan->pSmaIndexes, pRealTable->pSmaIndexes); pScan->tableId = pRealTable->pMeta->uid; pScan->stableId = pRealTable->pMeta->suid; pScan->tableType = pRealTable->pMeta->tableType; @@ -272,10 +273,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // set output if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pScan->pScanCols, &pScan->node.pTargets); + code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pScan->pScanPseudoCols, &pScan->node.pTargets); + code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -441,10 +442,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, // set the output if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pGroupKeys) { - code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets); + code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) { - code = createColumnByRewriteExps(pAgg->pAggFuncs, &pAgg->node.pTargets); + code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -475,7 +476,7 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt // set the output if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pIdfRowsFunc->pVectorFuncs, &pIdfRowsFunc->node.pTargets); + code = createColumnByRewriteExprs(pIdfRowsFunc->pVectorFuncs, &pIdfRowsFunc->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -505,7 +506,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm } if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pWindow->pFuncs, &pWindow->node.pTargets); + code = createColumnByRewriteExprs(pWindow->pFuncs, &pWindow->node.pTargets); } pSelect->hasAggFuncs = false; @@ -776,7 +777,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe // set the output if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets); + code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -918,7 +919,7 @@ static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pS // set the output if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets); + code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -1040,7 +1041,7 @@ static int32_t createDeleteScanLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* p // set output if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pScan->pScanCols, &pScan->node.pTargets); + code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -1064,7 +1065,7 @@ static int32_t createDeleteAggLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pD } // set the output if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pAgg->pAggFuncs, &pAgg->node.pTargets); + code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { @@ -1138,11 +1139,40 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi return TSDB_CODE_FAILED; } -int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) { - SLogicPlanContext cxt = {.pPlanCxt = pCxt}; - int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, pLogicNode); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - return TSDB_CODE_SUCCESS; +static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) { + pNode->pParent = pParent; + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { doSetLogicNodeParent((SLogicNode*)pChild, pNode); } +} + +static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); } + +int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { + SLogicPlanContext cxt = {.pPlanCxt = pCxt}; + + SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); + if (NULL == pSubplan) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pSubplan->id.queryId = pCxt->queryId; + pSubplan->id.groupId = 1; + pSubplan->id.subplanId = 1; + + int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode); + if (TSDB_CODE_SUCCESS == code) { + setLogicNodeParent(pSubplan->pNode); + if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pSubplan->pNode)) { + pSubplan->subplanType = SUBPLAN_TYPE_MODIFY; + } else { + pSubplan->subplanType = SUBPLAN_TYPE_SCAN; + } + } + + if (TSDB_CODE_SUCCESS == code) { + *pLogicSubplan = pSubplan; + } else { + nodesDestroyNode(pSubplan); + } + + return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 8346aca76f..bffe520d6d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -32,8 +32,7 @@ typedef struct SOptimizeContext { bool optimized; } SOptimizeContext; -typedef int32_t (*FMatch)(SOptimizeContext* pCxt, SLogicNode* pLogicNode); -typedef int32_t (*FOptimize)(SOptimizeContext* pCxt, SLogicNode* pLogicNode); +typedef int32_t (*FOptimize)(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan); typedef struct SOptimizeRule { char* pName; @@ -109,7 +108,6 @@ static bool osdMayBeOptimized(SLogicNode* pNode) { } if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) { return true; - // return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType); } return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); } @@ -231,9 +229,9 @@ static void setScanWindowInfo(SScanLogicNode* pScan) { } } -static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { +static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { SOsdInfo info = {0}; - int32_t code = osdMatch(pCxt, pLogicNode, &info); + int32_t code = osdMatch(pCxt, pLogicSubplan->pNode, &info); if (TSDB_CODE_SUCCESS == code && info.pScan) { setScanWindowInfo((SScanLogicNode*)info.pScan); } @@ -635,8 +633,8 @@ static int32_t cpdPushCondition(SOptimizeContext* pCxt, SLogicNode* pLogicNode) return code; } -static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { - return cpdPushCondition(pCxt, pLogicNode); +static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + return cpdPushCondition(pCxt, pLogicSubplan->pNode); } static bool opkIsPrimaryKeyOrderBy(SNodeList* pSortKeys) { @@ -745,26 +743,292 @@ static int32_t opkOptimizeImpl(SOptimizeContext* pCxt, SSortLogicNode* pSort) { return code; } -static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { - SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized); +static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, opkSortMayBeOptimized); if (NULL == pSort) { return TSDB_CODE_SUCCESS; } return opkOptimizeImpl(pCxt, pSort); } -static const SOptimizeRule optimizeRuleSet[] = {{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize}, - {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, - {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}}; +static bool smaOptMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent || + QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) || + WINDOW_TYPE_INTERVAL != ((SWindowLogicNode*)pNode->pParent)->winType) { + return false; + } + + SScanLogicNode* pScan = (SScanLogicNode*)pNode; + if (0 == pScan->interval || NULL == pScan->pSmaIndexes || NULL != pScan->node.pConditions) { + return false; + } + + return true; +} + +static int32_t smaOptCreateMerge(SLogicNode* pChild, SNodeList* pMergeKeys, SNodeList* pTargets, SLogicNode** pOutput) { + SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE); + if (NULL == pMerge) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pMerge->node.precision = pChild->precision; + pMerge->numOfChannels = 2; + pMerge->pMergeKeys = pMergeKeys; + pMerge->node.pTargets = pTargets; + pMerge->pInputs = nodesCloneList(pChild->pTargets); + if (NULL == pMerge->pInputs) { + nodesDestroyNode(pMerge); + return TSDB_CODE_OUT_OF_MEMORY; + } + + *pOutput = (SLogicNode*)pMerge; + return TSDB_CODE_SUCCESS; +} + +static int32_t smaOptRecombinationNode(SLogicSubplan* pLogicSubplan, SLogicNode* pInterval, SLogicNode* pMerge, + SLogicNode* pSmaScan) { + int32_t code = nodesListMakeAppend(&pMerge->pChildren, pInterval); + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeAppend(&pMerge->pChildren, pSmaScan); + } + if (TSDB_CODE_SUCCESS == code) { + code = replaceLogicNode(pLogicSubplan, pInterval, pMerge); + pSmaScan->pParent = pMerge; + pInterval->pParent = pMerge; + } + return code; +} + +static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList* pCols, + SLogicNode** pOutput) { + SScanLogicNode* pSmaScan = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN); + if (NULL == pSmaScan) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pSmaScan->pScanCols = pCols; + pSmaScan->tableType = TSDB_SUPER_TABLE; + pSmaScan->tableId = pIndex->dstTbUid; + pSmaScan->stableId = pIndex->dstTbUid; + pSmaScan->scanType = SCAN_TYPE_TABLE; + pSmaScan->scanSeq[0] = pScan->scanSeq[0]; + pSmaScan->scanSeq[1] = pScan->scanSeq[1]; + pSmaScan->scanRange = pScan->scanRange; + pSmaScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD; + + pSmaScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo)); + if (NULL == pSmaScan->pVgroupList) { + nodesDestroyNode(pSmaScan); + return TSDB_CODE_OUT_OF_MEMORY; + } + pSmaScan->pVgroupList->numOfVgroups = 1; + pSmaScan->pVgroupList->vgroups[0].vgId = pIndex->dstVgId; + memcpy(&(pSmaScan->pVgroupList->vgroups[0].epSet), &pIndex->epSet, sizeof(SEpSet)); + + *pOutput = (SLogicNode*)pSmaScan; + return TSDB_CODE_SUCCESS; +} + +static bool smaOptEqualInterval(SWindowLogicNode* pWindow, STableIndexInfo* pIndex) { + if (pWindow->interval != pIndex->interval || pWindow->intervalUnit != pIndex->intervalUnit || + pWindow->offset != pIndex->offset || pWindow->sliding != pIndex->sliding || + pWindow->slidingUnit != pIndex->slidingUnit) { + return false; + } + // todo time range + return true; +} + +// #define SMA_TABLE_NAME "#sma_table" +// #define SMA_COL_NAME_PREFIX "#sma_col_" + +static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) { + SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return NULL; + } + pCol->tableId = tableId; + pCol->tableType = TSDB_SUPER_TABLE; + pCol->colId = colId; + pCol->colType = COLUMN_TYPE_COLUMN; + snprintf(pCol->colName, sizeof(pCol->colName), "#sma_col_%d", pCol->colId); + // strcpy(pCol->tableName, SMA_TABLE_NAME); + // strcpy(pCol->tableAlias, SMA_TABLE_NAME); + pCol->node.resType = ((SExprNode*)pFunc)->resType; + strcpy(pCol->node.aliasName, ((SExprNode*)pFunc)->aliasName); + return (SNode*)pCol; +} + +static int32_t smaOptFindSmaFunc(SNode* pQueryFunc, SNodeList* pSmaFuncs) { + int32_t index = 0; + SNode* pSmaFunc = NULL; + FOREACH(pSmaFunc, pSmaFuncs) { + if (nodesEqualNode(pQueryFunc, pSmaFunc)) { + return index; + } + ++index; + } + return -1; +} + +static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeList* pSmaFuncs, SNodeList** pOutput, + int32_t* pWStrartIndex) { + SNodeList* pCols = NULL; + SNode* pFunc = NULL; + int32_t code = TSDB_CODE_SUCCESS; + int32_t index = 0; + *pWStrartIndex = -1; + FOREACH(pFunc, pFuncs) { + if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) { + *pWStrartIndex = index; + } + int32_t smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs); + if (smaFuncIndex < 0) { + break; + } else { + code = nodesListMakeStrictAppend(&pCols, smaOptCreateSmaCol(pFunc, tableId, smaFuncIndex + 2)); + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + ++index; + } + + if (TSDB_CODE_SUCCESS == code) { + *pOutput = pCols; + } else { + nodesDestroyList(pCols); + } + + return code; +} + +static int32_t smaOptCouldApplyIndex(SWindowLogicNode* pWindow, STableIndexInfo* pIndex, SNodeList** pCols, + int32_t* pWStrartIndex) { + if (!smaOptEqualInterval(pWindow, pIndex)) { + return TSDB_CODE_SUCCESS; + } + SNodeList* pSmaFuncs = NULL; + int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs); + if (TSDB_CODE_SUCCESS == code) { + code = smaOptCreateSmaCols(pWindow->pFuncs, pIndex->dstTbUid, pSmaFuncs, pCols, pWStrartIndex); + } + nodesDestroyList(pSmaFuncs); + return code; +} + +static SNode* smaOptCreateWStartTs() { + SFunctionNode* pWStart = nodesMakeNode(QUERY_NODE_FUNCTION); + if (NULL == pWStart) { + return NULL; + } + strcpy(pWStart->functionName, "_wstartts"); + snprintf(pWStart->node.aliasName, sizeof(pWStart->node.aliasName), "%s.%p", pWStart->functionName, pWStart); + if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pWStart, NULL, 0)) { + nodesDestroyNode(pWStart); + return NULL; + } + return (SNode*)pWStart; +} + +static int32_t smaOptCreateMergeKey(SNode* pCol, SNodeList** pMergeKeys) { + SOrderByExprNode* pMergeKey = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR); + if (NULL == pMergeKey) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pMergeKey->pExpr = nodesCloneNode(pCol); + if (NULL == pMergeKey->pExpr) { + nodesDestroyNode(pMergeKey); + return TSDB_CODE_OUT_OF_MEMORY; + } + pMergeKey->order = ORDER_ASC; + pMergeKey->nullOrder = NULL_ORDER_FIRST; + return nodesListMakeStrictAppend(pMergeKeys, pMergeKey); +} + +static int32_t smaOptRewriteInterval(SWindowLogicNode* pInterval, int32_t wstrartIndex, SNodeList** pMergeKeys) { + if (wstrartIndex < 0) { + SNode* pWStart = smaOptCreateWStartTs(); + if (NULL == pWStart) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t code = createColumnByRewriteExpr(pWStart, &pInterval->node.pTargets); + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode(pWStart); + return code; + } + wstrartIndex = LIST_LENGTH(pInterval->node.pTargets) - 1; + } + return smaOptCreateMergeKey(nodesListGetNode(pInterval->node.pTargets, wstrartIndex), pMergeKeys); +} + +static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex, + SNodeList* pSmaCols, int32_t wstrartIndex) { + SWindowLogicNode* pInterval = (SWindowLogicNode*)pScan->node.pParent; + SNodeList* pMergeTargets = nodesCloneList(pInterval->node.pTargets); + if (NULL == pMergeTargets) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SLogicNode* pSmaScan = NULL; + SLogicNode* pMerge = NULL; + SNodeList* pMergeKeys = NULL; + int32_t code = smaOptRewriteInterval(pInterval, wstrartIndex, &pMergeKeys); + if (TSDB_CODE_SUCCESS == code) { + code = smaOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan); + } + if (TSDB_CODE_SUCCESS == code) { + code = smaOptCreateMerge(pScan->node.pParent, pMergeKeys, pMergeTargets, &pMerge); + } + if (TSDB_CODE_SUCCESS == code) { + code = smaOptRecombinationNode(pLogicSubplan, pScan->node.pParent, pMerge, pSmaScan); + } + return code; +} + +static void smaOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); } + +static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t nindexes = taosArrayGetSize(pScan->pSmaIndexes); + for (int32_t i = 0; i < nindexes; ++i) { + STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i); + SNodeList* pSmaCols = NULL; + int32_t wstrartIndex = -1; + code = smaOptCouldApplyIndex((SWindowLogicNode*)pScan->node.pParent, pIndex, &pSmaCols, &wstrartIndex); + if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) { + code = smaOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols, wstrartIndex); + taosArrayDestroyEx(pScan->pSmaIndexes, smaOptDestroySmaIndex); + pScan->pSmaIndexes = NULL; + break; + } + } + return code; +} + +static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) { + SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, smaOptMayBeOptimized); + if (NULL == pScan) { + return TSDB_CODE_SUCCESS; + } + return smaOptimizeImpl(pCxt, pLogicSubplan, pScan); +} + +// clang-format off +static const SOptimizeRule optimizeRuleSet[] = { + {.pName = "OptimizeScanData", .optimizeFunc = osdOptimize}, + {.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize}, + {.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}, + {.pName = "SmaIndex", .optimizeFunc = smaOptimize} +}; +// clang-format on static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule)); -static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) { +static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false}; do { cxt.optimized = false; for (int32_t i = 0; i < optimizeRuleNum; ++i) { - int32_t code = optimizeRuleSet[i].optimizeFunc(&cxt, pLogicNode); + int32_t code = optimizeRuleSet[i].optimizeFunc(&cxt, pLogicSubplan); if (TSDB_CODE_SUCCESS != code) { return code; } @@ -773,4 +1037,6 @@ static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) { return TSDB_CODE_SUCCESS; } -int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { return applyOptimizeRule(pCxt, pLogicNode); } +int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { + return applyOptimizeRule(pCxt, pLogicSubplan); +} diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 21555e66a2..f33856c4ce 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -80,29 +80,12 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE return TSDB_CODE_SUCCESS; } -static int32_t splReplaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) { - if (NULL == pOld->pParent) { - pSubplan->pNode = (SLogicNode*)pNew; - return TSDB_CODE_SUCCESS; - } - - SNode* pNode; - FOREACH(pNode, pOld->pParent->pChildren) { - if (nodesEqualNode(pNode, pOld)) { - REPLACE_NODE(pNew); - pNew->pParent = pOld->pParent; - return TSDB_CODE_SUCCESS; - } - } - return TSDB_CODE_PLAN_INTERNAL_ERROR; -} - static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode, ESubplanType subplanType) { SExchangeLogicNode* pExchange = NULL; int32_t code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange); if (TSDB_CODE_SUCCESS == code) { - code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange); + code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange); } if (TSDB_CODE_SUCCESS == code) { pSubplan->subplanType = subplanType; @@ -282,7 +265,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic code = stbSplAppendWStart(pPartWin->pFuncs, &index); } if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pPartWin->pFuncs, &pPartWin->node.pTargets); + code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets); } if (TSDB_CODE_SUCCESS == code) { nodesDestroyNode(pMergeWindow->pTspk); @@ -328,7 +311,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla if (NULL == pSubplan) { code = nodesListMakeAppend(&pSplitNode->pChildren, pMerge); } else { - code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge); + code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge); } } if (TSDB_CODE_SUCCESS != code) { @@ -442,7 +425,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { pPartAgg->pGroupKeys = pGroupKeys; - code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets); + code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets); } if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) { pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets); @@ -457,7 +440,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs); } if (TSDB_CODE_SUCCESS == code) { - code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets); + code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets); } nodesDestroyList(pFunc); @@ -897,12 +880,56 @@ static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) return code; } +typedef struct SSmaIndexSplitInfo { + SMergeLogicNode* pMerge; + SLogicSubplan* pSubplan; +} SSmaIndexSplitInfo; + +static SLogicNode* smaIdxSplMatchByNode(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) { + return pNode; + } + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + SLogicNode* pSplitNode = smaIdxSplMatchByNode((SLogicNode*)pChild); + if (NULL != pSplitNode) { + return pSplitNode; + } + } + return NULL; +} + +static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSmaIndexSplitInfo* pInfo) { + SLogicNode* pSplitNode = smaIdxSplMatchByNode(pSubplan->pNode); + if (NULL != pSplitNode) { + pInfo->pMerge = (SMergeLogicNode*)pSplitNode; + pInfo->pSubplan = pSubplan; + } + return NULL != pSplitNode; +} + +static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { + SSmaIndexSplitInfo info = {0}; + if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge); + if (TSDB_CODE_SUCCESS == code) { + info.pMerge->srcGroupId = pCxt->groupId; + } + ++(pCxt->groupId); + pCxt->split = true; + return code; +} + // clang-format off static const SSplitRule splitRuleSet[] = { {.pName = "SuperTableSplit", .splitFunc = stableSplit}, {.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit}, {.pName = "UnionAllSplit", .splitFunc = unionAllSplit}, - {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit} + {.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}, + {.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit} }; // clang-format on @@ -936,14 +963,6 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) { return TSDB_CODE_SUCCESS; } -static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) { - pNode->pParent = pParent; - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { doSetLogicNodeParent((SLogicNode*)pChild, pNode); } -} - -static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); } - static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList); @@ -954,37 +973,10 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) { FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); } } -int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) { - SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); - if (NULL == pSubplan) { - return TSDB_CODE_OUT_OF_MEMORY; +int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { + if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) { + setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan); + return TSDB_CODE_SUCCESS; } - - pSubplan->pNode = nodesCloneNode(pLogicNode); - if (NULL == pSubplan->pNode) { - nodesDestroyNode(pSubplan); - return TSDB_CODE_OUT_OF_MEMORY; - } - - pSubplan->id.queryId = pCxt->queryId; - pSubplan->id.groupId = 1; - setLogicNodeParent(pSubplan->pNode); - - int32_t code = TSDB_CODE_SUCCESS; - if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicNode)) { - pSubplan->subplanType = SUBPLAN_TYPE_MODIFY; - TSWAP(((SVnodeModifyLogicNode*)pLogicNode)->pDataBlocks, ((SVnodeModifyLogicNode*)pSubplan->pNode)->pDataBlocks); - setVgroupsInfo(pSubplan->pNode, pSubplan); - } else { - pSubplan->subplanType = SUBPLAN_TYPE_SCAN; - code = applySplitRule(pCxt, pSubplan); - } - - if (TSDB_CODE_SUCCESS == code) { - *pLogicSubplan = pSubplan; - } else { - nodesDestroyNode(pSubplan); - } - - return code; -} \ No newline at end of file + return applySplitRule(pCxt, pLogicSubplan); +} diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 63d31912f0..232a49ee3a 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -69,7 +69,7 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) { +int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList) { SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)}; if (NULL == cxt.pList) { return TSDB_CODE_OUT_OF_MEMORY; @@ -85,3 +85,37 @@ int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) { } return cxt.errCode; } + +int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList) { + SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)}; + if (NULL == cxt.pList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + nodesWalkExpr(pExpr, doCreateColumn, &cxt); + if (TSDB_CODE_SUCCESS != cxt.errCode) { + nodesDestroyList(cxt.pList); + return cxt.errCode; + } + if (NULL == *pList) { + *pList = cxt.pList; + } + return cxt.errCode; +} + +int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) { + if (NULL == pOld->pParent) { + pSubplan->pNode = (SLogicNode*)pNew; + return TSDB_CODE_SUCCESS; + } + + SNode* pNode; + FOREACH(pNode, pOld->pParent->pChildren) { + if (nodesEqualNode(pNode, pOld)) { + REPLACE_NODE(pNew); + pNew->pParent = pOld->pParent; + return TSDB_CODE_SUCCESS; + } + } + return TSDB_CODE_PLAN_INTERNAL_ERROR; +} diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 1921b16388..83657d27d0 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -26,16 +26,15 @@ static void dumpQueryPlan(SQueryPlan* pPlan) { } int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) { - SLogicNode* pLogicNode = NULL; SLogicSubplan* pLogicSubplan = NULL; SQueryLogicPlan* pLogicPlan = NULL; - int32_t code = createLogicPlan(pCxt, &pLogicNode); + int32_t code = createLogicPlan(pCxt, &pLogicSubplan); if (TSDB_CODE_SUCCESS == code) { - code = optimizeLogicPlan(pCxt, pLogicNode); + code = optimizeLogicPlan(pCxt, pLogicSubplan); } if (TSDB_CODE_SUCCESS == code) { - code = splitLogicPlan(pCxt, pLogicNode, &pLogicSubplan); + code = splitLogicPlan(pCxt, pLogicSubplan); } if (TSDB_CODE_SUCCESS == code) { code = scaleOutLogicPlan(pCxt, pLogicSubplan, &pLogicPlan); @@ -47,7 +46,6 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo dumpQueryPlan(*pPlan); } - nodesDestroyNode(pLogicNode); nodesDestroyNode(pLogicSubplan); nodesDestroyNode(pLogicPlan); terrno = code; diff --git a/source/libs/planner/test/planOtherTest.cpp b/source/libs/planner/test/planOtherTest.cpp index d5d37fda64..85f8b7d9f6 100644 --- a/source/libs/planner/test/planOtherTest.cpp +++ b/source/libs/planner/test/planOtherTest.cpp @@ -42,7 +42,9 @@ TEST_F(PlanOtherTest, createStreamUseSTable) { TEST_F(PlanOtherTest, createSmaIndex) { useDb("root", "test"); - run("create sma index index1 on t1 function(max(c1), min(c3 + 10), sum(c4)) interval(10s)"); + run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)"); + + run("SELECT SUM(c4) FROM t1 INTERVAL(10s)"); } TEST_F(PlanOtherTest, explain) { diff --git a/source/libs/planner/test/planTestUtil.cpp b/source/libs/planner/test/planTestUtil.cpp index f5c8b58e43..57d7cb6608 100644 --- a/source/libs/planner/test/planTestUtil.cpp +++ b/source/libs/planner/test/planTestUtil.cpp @@ -14,12 +14,14 @@ */ #include "planTestUtil.h" + #include #include #include #include "cmdnodes.h" +#include "mockCatalogService.h" #include "parser.h" #include "planInt.h" @@ -104,13 +106,12 @@ class PlannerTestBaseImpl { SPlanContext cxt = {0}; setPlanContext(pQuery, &cxt); - SLogicNode* pLogicNode = nullptr; - doCreateLogicPlan(&cxt, &pLogicNode); - - doOptimizeLogicPlan(&cxt, pLogicNode); - SLogicSubplan* pLogicSubplan = nullptr; - doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan); + doCreateLogicPlan(&cxt, &pLogicSubplan); + + doOptimizeLogicPlan(&cxt, pLogicSubplan); + + doSplitLogicPlan(&cxt, pLogicSubplan); SQueryLogicPlan* pLogicPlan = nullptr; doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan); @@ -164,13 +165,12 @@ class PlannerTestBaseImpl { SPlanContext cxt = {0}; setPlanContext(stmtEnv_.pQuery_, &cxt); - SLogicNode* pLogicNode = nullptr; - doCreateLogicPlan(&cxt, &pLogicNode); - - doOptimizeLogicPlan(&cxt, pLogicNode); - SLogicSubplan* pLogicSubplan = nullptr; - doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan); + doCreateLogicPlan(&cxt, &pLogicSubplan); + + doOptimizeLogicPlan(&cxt, pLogicSubplan); + + doSplitLogicPlan(&cxt, pLogicSubplan); SQueryLogicPlan* pLogicPlan = nullptr; doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan); @@ -324,19 +324,19 @@ class PlannerTestBaseImpl { res_.ast_ = toString(pQuery->pRoot); } - void doCreateLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) { - DO_WITH_THROW(createLogicPlan, pCxt, pLogicNode); - res_.rawLogicPlan_ = toString((SNode*)(*pLogicNode)); + void doCreateLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) { + DO_WITH_THROW(createLogicPlan, pCxt, pLogicSubplan); + res_.rawLogicPlan_ = toString((SNode*)(*pLogicSubplan)); } - void doOptimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { - DO_WITH_THROW(optimizeLogicPlan, pCxt, pLogicNode); - res_.optimizedLogicPlan_ = toString((SNode*)pLogicNode); + void doOptimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { + DO_WITH_THROW(optimizeLogicPlan, pCxt, pLogicSubplan); + res_.optimizedLogicPlan_ = toString((SNode*)pLogicSubplan); } - void doSplitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) { - DO_WITH_THROW(splitLogicPlan, pCxt, pLogicNode, pLogicSubplan); - res_.splitLogicPlan_ = toString((SNode*)(*pLogicSubplan)); + void doSplitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) { + DO_WITH_THROW(splitLogicPlan, pCxt, pLogicSubplan); + res_.splitLogicPlan_ = toString((SNode*)(pLogicSubplan)); } void doScaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) { @@ -363,6 +363,7 @@ class PlannerTestBaseImpl { } else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) { SMCreateSmaReq req = {0}; tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req); + g_mockCatalogService->createSmaIndex(&req); nodesStringToNode(req.ast, &pCxt->pAstRoot); pCxt->streamQuery = true; } else if (QUERY_NODE_CREATE_STREAM_STMT == nodeType(pQuery->pRoot)) { diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index b15c188f04..45a2ffec77 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -14,6 +14,7 @@ */ #define _DEFAULT_SOURCE + #include "tjson.h" #include "cJSON.h" #include "taoserror.h" @@ -138,6 +139,23 @@ int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* return TSDB_CODE_SUCCESS; } +int32_t tjsonAddTArray(SJson* pJson, const char* pName, FToJson func, const SArray* pArray) { + int32_t num = taosArrayGetSize(pArray); + if (num > 0) { + SJson* pJsonArray = tjsonAddArrayToObject(pJson, pName); + if (NULL == pJsonArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + for (int32_t i = 0; i < num; ++i) { + int32_t code = tjsonAddItem(pJsonArray, func, taosArrayGet(pArray, i)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + } + return TSDB_CODE_SUCCESS; +} + char* tjsonToString(const SJson* pJson) { return cJSON_Print((cJSON*)pJson); } char* tjsonToUnformattedString(const SJson* pJson) { return cJSON_PrintUnformatted((cJSON*)pJson); } @@ -184,7 +202,7 @@ int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal return TSDB_CODE_FAILED; } #ifdef WINDOWS - sscanf(p,"%lld",pVal); + sscanf(p, "%lld", pVal); #else // sscanf(p,"%ld",pVal); *pVal = taosStr2Int64(p, NULL, 10); @@ -219,7 +237,7 @@ int32_t tjsonGetUBigIntValue(const SJson* pJson, const char* pName, uint64_t* pV return TSDB_CODE_FAILED; } #ifdef WINDOWS - sscanf(p,"%llu",pVal); + sscanf(p, "%llu", pVal); #else // sscanf(p,"%ld",pVal); *pVal = taosStr2UInt64(p, NULL, 10); @@ -299,24 +317,43 @@ int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void return TSDB_CODE_SUCCESS; } +int32_t tjsonToTArray(const SJson* pJson, const char* pName, FToObject func, SArray** pArray, int32_t itemSize) { + const cJSON* jArray = tjsonGetObjectItem(pJson, pName); + int32_t size = tjsonGetArraySize(jArray); + if (size > 0) { + *pArray = taosArrayInit(size, itemSize); + if (NULL == *pArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArraySetSize(*pArray, size); + for (int32_t i = 0; i < size; ++i) { + int32_t code = func(tjsonGetArrayItem(jArray, i), taosArrayGet(*pArray, i)); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + } + } + return TSDB_CODE_SUCCESS; +} + SJson* tjsonParse(const char* pStr) { return cJSON_Parse(pStr); } -bool tjsonValidateJson(const char *jIn) { - if (!jIn){ +bool tjsonValidateJson(const char* jIn) { + if (!jIn) { return false; } // set json real data - cJSON *root = cJSON_Parse(jIn); - if (root == NULL){ + cJSON* root = cJSON_Parse(jIn); + if (root == NULL) { return false; } - if(!cJSON_IsObject(root)){ + if (!cJSON_IsObject(root)) { return false; } int size = cJSON_GetArraySize(root); - for(int i = 0; i < size; i++) { + for (int i = 0; i < size; i++) { cJSON* item = cJSON_GetArrayItem(root, i); if (!item) { return false;