Merge pull request #13703 from taosdata/feature/3.0_wxy

feat: sma index optimize
This commit is contained in:
Xiaoyu Wang 2022-06-11 09:51:06 +08:00 committed by GitHub
commit 9ec3c78621
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1065 additions and 422 deletions

View File

@ -2507,7 +2507,7 @@ typedef struct {
int64_t offset;
int64_t sliding;
int64_t dstTbUid;
int32_t dstVgId; // for stream
int32_t dstVgId;
SEpSet epSet;
char* expr;
} STableIndexInfo;

View File

@ -21,13 +21,13 @@ extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
#include "query.h"
#include "tname.h"
#include "tcommon.h"
#include "taosdef.h"
#include "tarray.h"
#include "tcommon.h"
#include "thash.h"
#include "tmsg.h"
#include "tname.h"
#include "transport.h"
typedef struct SCatalog SCatalog;
@ -159,7 +159,8 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, SArray** pVgroupList);
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const char* pDBName,
SArray** pVgroupList);
int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo);
@ -178,7 +179,8 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogGetTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SName* pTableName,
STableMeta** pTableMeta);
/**
* Get a super table's meta data.
@ -189,11 +191,11 @@ int32_t catalogGetTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSe
* @param pTableMeta(output, table meta data, NEED to free it by calller)
* @return error code
*/
int32_t catalogGetSTableMeta(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogGetSTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SName* pTableName,
STableMeta** pTableMeta);
int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
/**
* Force refresh DB's local cached vgroup info.
* @param pCtg (input, got with catalogGetHandle)
@ -215,7 +217,8 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable);
int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SName* pTableName,
int32_t isSTable);
/**
* Force refresh a table's local cached meta data and get the new one.
@ -227,9 +230,8 @@ int32_t catalogRefreshTableMeta(SCatalog* pCatalog, void *pTransporter, const SE
* @param isSTable (input, is super table or not, 1:supposed to be stable, 0: supposed not to be stable, -1:not sure)
* @return error code
*/
int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps,
const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable);
/**
* Get a table's actual vgroup, for stable it's all possible vgroup list.
@ -240,7 +242,8 @@ int32_t catalogRefreshGetTableMeta(SCatalog* pCatalog, void *pTransporter, const
* @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller)
* @return error code
*/
int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList);
int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps,
const SName* pTableName, SArray** pVgroupList);
/**
* Get a table's vgroup from its name's hash value.
@ -251,8 +254,8 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, void *pTransporter, const
* @param vgInfo (output, vgroup info)
* @return error code
*/
int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pName, SVgroupInfo* vgInfo);
int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SName* pName,
SVgroupInfo* vgInfo);
/**
* Get all meta data required in pReq.
@ -263,9 +266,11 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const
* @param pRsp (output, response data)
* @return error code
*/
int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetAllMeta(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq,
SMetaData* pRsp);
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId);
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, uint64_t reqId,
const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId);
int32_t catalogGetQnodeList(SCatalog* pCatalog, void* pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList);
@ -277,13 +282,16 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_
int32_t catalogGetDBCfg(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg);
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
int32_t catalogGetIndexMeta(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* indexName,
SIndexInfo* pInfo);
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes);
int32_t catalogGetTableIndex(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName,
SArray** pRes);
int32_t catalogGetUdfInfo(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
int32_t catalogChkAuth(SCatalog* pCtg, void* pRpc, const SEpSet* pMgmtEps, const char* user, const char* dbFName,
AUTH_TYPE type, bool* pass);
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
@ -291,7 +299,6 @@ int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId,
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate);
/**
* Destroy catalog and relase all resources
*/

View File

@ -62,6 +62,7 @@ typedef struct SScanLogicNode {
int64_t watermark;
int16_t tsColId;
double filesFactor;
SArray* pSmaIndexes;
} SScanLogicNode;
typedef struct SJoinLogicNode {
@ -437,7 +438,6 @@ typedef struct SQueryPlan {
int32_t numOfSubplans;
SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0.
SExplainInfo explainInfo;
SArray* pPlaceholderValues;
} SQueryPlan;
void nodesWalkPhysiPlan(SNode* pNode, FNodeWalker walker, void* pContext);

View File

@ -144,6 +144,7 @@ typedef struct SRealTableNode {
SVgroupsInfo* pVgroupList;
char qualDbName[TSDB_DB_NAME_LEN]; // SHOW qualDbName.TABLES
double ratio;
SArray* pSmaIndexes;
} SRealTableNode;
typedef struct STempTableNode {

View File

@ -17,6 +17,7 @@
#define _TD_UTIL_JSON_H_
#include "os.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
@ -66,12 +67,14 @@ typedef int32_t (*FToJson)(const void* pObj, SJson* pJson);
int32_t tjsonAddObject(SJson* pJson, const char* pName, FToJson func, const void* pObj);
int32_t tjsonAddItem(SJson* pJson, FToJson func, const void* pObj);
int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void* pArray, int32_t itemSize, int32_t num);
int32_t tjsonAddTArray(SJson* pJson, const char* pName, FToJson func, const SArray* pArray);
typedef int32_t (*FToObject)(const SJson* pJson, void* pObj);
int32_t tjsonToObject(const SJson* pJson, const char* pName, FToObject func, void* pObj);
int32_t tjsonMakeObject(const SJson* pJson, const char* pName, FToObject func, void** pObj, int32_t objSize);
int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void* pArray, int32_t itemSize);
int32_t tjsonToTArray(const SJson* pJson, const char* pName, FToObject func, SArray** pArray, int32_t itemSize);
char* tjsonToString(const SJson* pJson);
char* tjsonToUnformattedString(const SJson* pJson);

View File

@ -13,16 +13,15 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h"
#include "query.h"
#include "systable.h"
#include "tname.h"
#include "tref.h"
#include "trpc.h"
SCatalogMgmt gCtgMgmt = {0};
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
int32_t code = 0;
STableMeta* tblMeta = NULL;
@ -53,7 +52,8 @@ _return:
CTG_RET(code);
}
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache,
SDBVgInfo** pInfo) {
int32_t code = 0;
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache));
@ -119,8 +119,6 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
return TSDB_CODE_SUCCESS;
}
int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput** pOutput, bool syncReq) {
SVgroupInfo vgroupInfo = {0};
int32_t code = 0;
@ -139,7 +137,8 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(ctx->pName));
CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), (char *)ctx->pName->dbname, (char *)ctx->pName->tname, output, NULL));
CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), (char*)ctx->pName->dbname, (char*)ctx->pName->tname,
output, NULL));
} else if (CTG_FLAG_IS_STB(ctx->flag)) {
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(ctx->pName));
@ -150,7 +149,8 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut
CTG_ERR_JRET(ctgGetTbMetaFromVnode(CTG_PARAMS_LIST(), ctx->pName, &vgroupInfo, output, NULL));
}
} else {
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName),
ctx->flag);
// if get from vnode failed or no table meta, will not try mnode
CTG_ERR_JRET(ctgGetTbMetaFromVnode(CTG_PARAMS_LIST(), ctx->pName, &vgroupInfo, output, NULL));
@ -192,9 +192,11 @@ int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOut
}
if (CTG_IS_META_TABLE(output->metaType)) {
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType);
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName,
output->tbMeta->tableType);
} else {
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName,
output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
}
if (pOutput) {
@ -221,7 +223,8 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl
CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));
if (*pTableMeta) {
if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
return TSDB_CODE_SUCCESS;
}
@ -235,7 +238,6 @@ int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTabl
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
int32_t code = 0;
STableMetaOutput* output = NULL;
@ -344,8 +346,8 @@ _return:
CTG_RET(code);
}
int32_t ctgChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) {
int32_t ctgChkAuth(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName,
AUTH_TYPE type, bool* pass) {
bool inCache = false;
int32_t code = 0;
@ -495,7 +497,8 @@ int32_t catalogInit(SCatalogCfg *cfg) {
gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
}
gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT),
false, HASH_ENTRY_LOCK);
if (NULL == gCtgMgmt.pCluster) {
qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
@ -521,7 +524,8 @@ int32_t catalogInit(SCatalogCfg *cfg) {
CTG_ERR_RET(ctgStartUpdateThread());
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum,
gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
return TSDB_CODE_SUCCESS;
}
@ -559,7 +563,8 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
false, HASH_ENTRY_LOCK);
if (NULL == clusterCtg->dbCache) {
qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
@ -645,7 +650,8 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName, SArray** vgroupList) {
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName,
SArray** vgroupList) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == dbFName || NULL == pTrans || NULL == pMgmtEps || NULL == vgroupList) {
@ -684,7 +690,6 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) {
CTG_API_ENTER();
@ -702,7 +707,6 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
CTG_API_ENTER();
@ -761,7 +765,6 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) {
CTG_API_ENTER();
@ -784,7 +787,8 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
int32_t catalogGetTableMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName,
STableMeta** pTableMeta) {
CTG_API_ENTER();
SCtgTbMetaCtx ctx = {0};
@ -794,7 +798,8 @@ int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE(ctgGetTbMeta(pCtg, pTrans, pMgmtEps, &ctx, pTableMeta));
}
int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
int32_t catalogGetSTableMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName,
STableMeta** pTableMeta) {
CTG_API_ENTER();
SCtgTbMetaCtx ctx = {0};
@ -868,7 +873,6 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName) {
CTG_API_ENTER();
@ -879,7 +883,8 @@ int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmt
CTG_API_LEAVE(ctgRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName));
}
int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
int32_t catalogRefreshTableMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName,
int32_t isSTable) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
@ -893,7 +898,8 @@ int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
CTG_API_LEAVE(ctgRefreshTbMeta(CTG_PARAMS_LIST(), &ctx, NULL, true));
}
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName,
STableMeta** pTableMeta, int32_t isSTable) {
CTG_API_ENTER();
SCtgTbMetaCtx ctx = {0};
@ -903,7 +909,8 @@ int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* p
CTG_API_LEAVE(ctgGetTbMeta(CTG_PARAMS_LIST(), &ctx, pTableMeta));
}
int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName,
SArray** pVgList) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
@ -918,8 +925,8 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pM
CTG_API_LEAVE(ctgGetTbDistVgInfo(pCtg, pTrans, pMgmtEps, (SName*)pTableName, pVgList));
}
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
int32_t catalogGetTableHashVgroup(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName,
SVgroupInfo* pVgroup) {
CTG_API_ENTER();
if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
@ -952,8 +959,8 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
int32_t catalogGetAllMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SCatalogReq* pReq,
SMetaData* pRsp) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
@ -1016,7 +1023,8 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) {
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, uint64_t reqId,
const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == fp || NULL == param) {
@ -1115,7 +1123,6 @@ int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogGetDBCfg(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
CTG_API_ENTER();
@ -1126,7 +1133,8 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, co
CTG_API_LEAVE(ctgGetDBCfgFromMnode(CTG_PARAMS_LIST(), dbFName, pDbCfg, NULL));
}
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo) {
int32_t catalogGetIndexMeta(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* indexName,
SIndexInfo* pInfo) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == indexName || NULL == pInfo) {
@ -1136,7 +1144,8 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL));
}
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes) {
int32_t catalogGetTableIndex(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pTableName,
SArray** pRes) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == pRes) {
@ -1146,8 +1155,8 @@ int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEp
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), (SName*)pTableName, pRes, NULL));
}
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) {
int32_t catalogGetUdfInfo(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* funcName,
SFuncInfo* pInfo) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == funcName || NULL == pInfo) {
@ -1162,7 +1171,8 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) {
int32_t catalogChkAuth(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName,
AUTH_TYPE type, bool* pass) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == user || NULL == dbFName || NULL == pass) {
@ -1187,7 +1197,6 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
CTG_API_LEAVE(ctgUpdateUserEnqueue(pCtg, pAuth, false));
}
void catalogDestroy(void) {
qInfo("start to destroy catalog");
@ -1226,6 +1235,3 @@ void catalogDestroy(void) {
qInfo("catalog destroyed");
}

View File

@ -525,7 +525,7 @@ int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
}
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pTableHash, &res);
taosArrayPush(pJob->jobRes.pTableIndex, &res);
return TSDB_CODE_SUCCESS;
}
@ -875,7 +875,9 @@ int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
TSWAP(pTask->res, pTask->msgCtx.out);
_return:
if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
code = TSDB_CODE_SUCCESS;
}
ctgHandleTaskEnd(pTask, code);
CTG_RET(code);

View File

@ -2816,10 +2816,85 @@ static int32_t jsonToTableNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkTableIndexInfoIntervalUnit = "IntervalUnit";
static const char* jkTableIndexInfoSlidingUnit = "SlidingUnit";
static const char* jkTableIndexInfoInterval = "Interval";
static const char* jkTableIndexInfoOffset = "Offset";
static const char* jkTableIndexInfoSliding = "Sliding";
static const char* jkTableIndexInfoDstTbUid = "DstTbUid";
static const char* jkTableIndexInfoDstVgId = "DstVgId";
static const char* jkTableIndexInfoEpSet = "EpSet";
static const char* jkTableIndexInfoExpr = "Expr";
static int32_t tableIndexInfoToJson(const void* pObj, SJson* pJson) {
const STableIndexInfo* pNode = (const STableIndexInfo*)pObj;
int32_t code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoIntervalUnit, pNode->intervalUnit);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoSlidingUnit, pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoInterval, pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoOffset, pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoSliding, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoDstTbUid, pNode->dstTbUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableIndexInfoDstVgId, pNode->dstVgId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkTableIndexInfoEpSet, epSetToJson, &pNode->epSet);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddStringToObject(pJson, jkTableIndexInfoExpr, pNode->expr);
}
return code;
}
static int32_t jsonToTableIndexInfo(const SJson* pJson, void* pObj) {
STableIndexInfo* pNode = (STableIndexInfo*)pObj;
int32_t code = tjsonGetTinyIntValue(pJson, jkTableIndexInfoIntervalUnit, &pNode->intervalUnit);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkTableIndexInfoSlidingUnit, &pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkTableIndexInfoInterval, &pNode->interval);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkTableIndexInfoOffset, &pNode->offset);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkTableIndexInfoSliding, &pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkTableIndexInfoDstTbUid, &pNode->dstTbUid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkTableIndexInfoDstVgId, &pNode->dstVgId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkTableIndexInfoEpSet, jsonToEpSet, &pNode->epSet);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonDupStringValue(pJson, jkTableIndexInfoExpr, &pNode->expr);
}
return code;
}
static const char* jkRealTableMetaSize = "MetaSize";
static const char* jkRealTableMeta = "Meta";
static const char* jkRealTableVgroupsInfoSize = "VgroupsInfoSize";
static const char* jkRealTableVgroupsInfo = "VgroupsInfo";
static const char* jkRealTableSmaIndexes = "SmaIndexes";
static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) {
const SRealTableNode* pNode = (const SRealTableNode*)pObj;
@ -2837,6 +2912,9 @@ static int32_t realTableNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkRealTableVgroupsInfo, vgroupsInfoToJson, pNode->pVgroupList);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddTArray(pJson, jkRealTableSmaIndexes, tableIndexInfoToJson, pNode->pSmaIndexes);
}
return code;
}
@ -2858,6 +2936,10 @@ static int32_t jsonToRealTableNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonMakeObject(pJson, jkRealTableVgroupsInfo, jsonToVgroupsInfo, (void**)&pNode->pVgroupList, objSize);
}
if (TSDB_CODE_SUCCESS == code) {
code =
tjsonToTArray(pJson, jkRealTableSmaIndexes, jsonToTableIndexInfo, &pNode->pSmaIndexes, sizeof(STableIndexInfo));
}
return code;
}
@ -4214,6 +4296,7 @@ int32_t nodesStringToList(const char* pStr, SNodeList** pList) {
tjsonDelete(pJson);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyList(*pList);
*pList = NULL;
terrno = code;
return code;
}

View File

@ -46,6 +46,7 @@ typedef struct SParseMetaCache {
SHashObj* pDbInfo; // key is tbFName, element is SDbInfo*
SHashObj* pUserAuth; // key is SUserAuthInfo serialized string, element is bool indicating whether or not to pass
SHashObj* pUdf; // key is funcName, element is SFuncInfo*
SHashObj* pTableIndex; // key is tbFName, element is SArray<STableIndexInfo>*
} SParseMetaCache;
int32_t generateSyntaxErrMsg(SMsgBuf* pBuf, int32_t errCode, ...);
@ -75,6 +76,7 @@ int32_t reserveUserAuthInCache(int32_t acctId, const char* pUser, const char* pD
SParseMetaCache* pMetaCache);
int32_t reserveUserAuthInCacheExt(const char* pUser, const SName* pName, AUTH_TYPE type, SParseMetaCache* pMetaCache);
int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache);
int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache);
int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta);
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup);
@ -84,6 +86,7 @@ int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDb
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type,
bool* pPass);
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo);
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes);
#ifdef __cplusplus
}

View File

@ -91,6 +91,7 @@ abort_parse:
typedef struct SCollectMetaKeyCxt {
SParseContext* pParseCxt;
SParseMetaCache* pMetaCache;
SNode* pStmt;
} SCollectMetaKeyCxt;
static void destroyCollectMetaKeyCxt(SCollectMetaKeyCxt* pCxt) {
@ -114,6 +115,14 @@ static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFu
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
}
static bool needGetTableIndex(SNode* pStmt) {
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
return (NULL != pSelect->pWindow && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow));
}
return false;
}
static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTableNode* pRealTable,
AUTH_TYPE authType) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName,
@ -129,6 +138,10 @@ static int32_t collectMetaKeyFromRealTableImpl(SCollectMetaKeyCxt* pCxt, SRealTa
if (TSDB_CODE_SUCCESS == code) {
code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code && needGetTableIndex(pCxt->pStmt)) {
code = reserveTableIndexInCache(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName,
pCxt->pMetaCache);
}
return code;
}
@ -379,6 +392,7 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p
}
static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
pCxt->pStmt = pStmt;
switch (nodeType(pStmt)) {
case QUERY_NODE_SET_OPERATOR:
return collectMetaKeyFromSetOperator(pCxt, (SSetOperator*)pStmt);
@ -449,7 +463,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
}
int32_t collectMetaKey(SParseContext* pParseCxt, SQuery* pQuery) {
SCollectMetaKeyCxt cxt = {.pParseCxt = pParseCxt, .pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache))};
SCollectMetaKeyCxt cxt = {
.pParseCxt = pParseCxt, .pMetaCache = taosMemoryCalloc(1, sizeof(SParseMetaCache)), .pStmt = pQuery->pRoot};
if (NULL == cxt.pMetaCache) {
return TSDB_CODE_OUT_OF_MEMORY;
}

View File

@ -41,6 +41,7 @@ typedef struct STranslateContext {
SHashObj* pTables;
SExplainOptions* pExplainOpt;
SParseMetaCache* pMetaCache;
bool createStream;
} STranslateContext;
typedef struct SFullDatabaseName {
@ -255,6 +256,23 @@ static int32_t getUdfInfo(STranslateContext* pCxt, SFunctionNode* pFunc) {
return code;
}
static int32_t getTableIndex(STranslateContext* pCxt, const SName* pName, SArray** pIndexes) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
code = collectUseTable(pName, pCxt->pTables);
}
if (pParCxt->async) {
code = getTableIndexFromCache(pCxt->pMetaCache, pName, pIndexes);
} else {
code = catalogGetTableIndex(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pIndexes);
}
if (TSDB_CODE_SUCCESS != code) {
parserError("getTableIndex error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname);
}
return code;
}
static int32_t initTranslateContext(SParseContext* pParseCxt, SParseMetaCache* pMetaCache, STranslateContext* pCxt) {
pCxt->pParseCxt = pParseCxt;
pCxt->errCode = TSDB_CODE_SUCCESS;
@ -329,6 +347,10 @@ static bool isIndefiniteRowsFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsIndefiniteRowsFunc(((SFunctionNode*)pNode)->funcId));
}
static bool isVectorFunc(const SNode* pNode) {
return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsVectorFunc(((SFunctionNode*)pNode)->funcId));
}
static bool isDistinctOrderBy(STranslateContext* pCxt) {
return (SQL_CLAUSE_ORDER_BY == pCxt->currClause && pCxt->pCurrSelectStmt->isDistinct);
}
@ -1364,6 +1386,17 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
return (TSDB_CHILD_TABLE == tableType || TSDB_NORMAL_TABLE == tableType);
}
static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
if (pCxt->createStream) {
return TSDB_CODE_SUCCESS;
}
if (NULL != pCxt->pCurrSelectStmt && NULL != pCxt->pCurrSelectStmt->pWindow &&
QUERY_NODE_INTERVAL_WINDOW == nodeType(pCxt->pCurrSelectStmt->pWindow)) {
return getTableIndex(pCxt, pName, &pRealTable->pSmaIndexes);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(pTable)) {
@ -1380,6 +1413,9 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName);
}
code = setTableVgroupList(pCxt, &name, pRealTable);
if (TSDB_CODE_SUCCESS == code) {
code = setTableIndex(pCxt, &name, pRealTable);
}
}
pRealTable->table.precision = pRealTable->pMeta->tableInfo.precision;
pRealTable->table.singleTable = isSingleTable(pRealTable);
@ -1803,7 +1839,7 @@ static int64_t getMonthsFromTimeVal(int64_t val, int32_t fromPrecision, char uni
return -1;
}
static int32_t checkIntervalWindow(STranslateContext* pCxt, SNode* pWhere, SIntervalWindowNode* pInterval) {
static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) {
uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision;
SValueNode* pInter = (SValueNode*)pInterval->pInterval;
@ -1845,7 +1881,15 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SNode* pWhere, SInte
}
}
return translateFill(pCxt, pWhere, pInterval);
return TSDB_CODE_SUCCESS;
}
static int32_t translateIntervalWindow(STranslateContext* pCxt, SSelectStmt* pSelect, SIntervalWindowNode* pInterval) {
int32_t code = checkIntervalWindow(pCxt, pInterval);
if (TSDB_CODE_SUCCESS == code) {
code = translateFill(pCxt, pSelect->pWhere, pInterval);
}
return code;
}
static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
@ -1867,13 +1911,13 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static int32_t checkStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
static int32_t translateStateWindow(STranslateContext* pCxt, SStateWindowNode* pState) {
nodesWalkExprPostOrder(pState->pExpr, checkStateExpr, pCxt);
// todo check for "function not support for state_window"
return pCxt->errCode;
}
static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) {
static int32_t translateSessionWindow(STranslateContext* pCxt, SSessionWindowNode* pSession) {
if ('y' == pSession->pGap->unit || 'n' == pSession->pGap->unit || 0 == pSession->pGap->datum.i) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_SESSION_GAP);
}
@ -1884,14 +1928,14 @@ static int32_t checkSessionWindow(STranslateContext* pCxt, SSessionWindowNode* p
return TSDB_CODE_SUCCESS;
}
static int32_t checkWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
static int32_t translateSpecificWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
switch (nodeType(pSelect->pWindow)) {
case QUERY_NODE_STATE_WINDOW:
return checkStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow);
return translateStateWindow(pCxt, (SStateWindowNode*)pSelect->pWindow);
case QUERY_NODE_SESSION_WINDOW:
return checkSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow);
return translateSessionWindow(pCxt, (SSessionWindowNode*)pSelect->pWindow);
case QUERY_NODE_INTERVAL_WINDOW:
return checkIntervalWindow(pCxt, pSelect->pWhere, (SIntervalWindowNode*)pSelect->pWindow);
return translateIntervalWindow(pCxt, pSelect, (SIntervalWindowNode*)pSelect->pWindow);
default:
break;
}
@ -1905,7 +1949,7 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->currClause = SQL_CLAUSE_WINDOW;
int32_t code = translateExpr(pCxt, &pSelect->pWindow);
if (TSDB_CODE_SUCCESS == code) {
code = checkWindow(pCxt, pSelect);
code = translateSpecificWindow(pCxt, pSelect);
}
return code;
}
@ -2724,7 +2768,8 @@ typedef struct SSampleAstInfo {
STableMeta* pRollupTableMeta;
} SSampleAstInfo;
static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, char** pAst, int32_t* pLen) {
static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, char** pAst, int32_t* pLen, char** pExpr,
int32_t* pExprLen) {
SSelectStmt* pSelect = nodesMakeNode(QUERY_NODE_SELECT_STMT);
if (NULL == pSelect) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -2769,10 +2814,14 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch
((SColumnNode*)pInterval->pCol)->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
strcpy(((SColumnNode*)pInterval->pCol)->colName, PK_TS_COL_INTERNAL_NAME);
pCxt->createStream = true;
int32_t code = translateQuery(pCxt, (SNode*)pSelect);
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pSelect, false, pAst, pLen);
}
if (TSDB_CODE_SUCCESS == code && NULL != pExpr) {
code = nodesListToString(pSelect->pProjectionList, false, pExpr, pExprLen);
}
nodesDestroyNode(pSelect);
return code;
}
@ -2894,7 +2943,7 @@ static int32_t getRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, SR
SSampleAstInfo info = {0};
int32_t code = buildSampleAstInfoByTable(pCxt, pStmt, pRetension, precision, &info);
if (TSDB_CODE_SUCCESS == code) {
code = buildSampleAst(pCxt, &info, pAst, pLen);
code = buildSampleAst(pCxt, &info, pAst, pLen, NULL, NULL);
}
clearSampleAstInfo(&info);
return code;
@ -3152,10 +3201,6 @@ static int32_t getSmaIndexSql(STranslateContext* pCxt, char** pSql, int32_t* pLe
return TSDB_CODE_SUCCESS;
}
static int32_t getSmaIndexExpr(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pExpr, int32_t* pLen) {
return nodesListToString(pStmt->pOptions->pFuncs, false, pExpr, pLen);
}
static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexStmt* pStmt, SSampleAstInfo* pInfo) {
pInfo->pDbName = pCxt->pParseCxt->db;
pInfo->pTableName = pStmt->tableName;
@ -3171,11 +3216,12 @@ static int32_t buildSampleAstInfoByIndex(STranslateContext* pCxt, SCreateIndexSt
return TSDB_CODE_SUCCESS;
}
static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen) {
static int32_t getSmaIndexAst(STranslateContext* pCxt, SCreateIndexStmt* pStmt, char** pAst, int32_t* pLen,
char** pExpr, int32_t* pExprLen) {
SSampleAstInfo info = {0};
int32_t code = buildSampleAstInfoByIndex(pCxt, pStmt, &info);
if (TSDB_CODE_SUCCESS == code) {
code = buildSampleAst(pCxt, &info, pAst, pLen);
code = buildSampleAst(pCxt, &info, pAst, pLen, pExpr, pExprLen);
}
clearSampleAstInfo(&info);
return code;
@ -3201,10 +3247,7 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm
code = getSmaIndexSql(pCxt, &pReq->sql, &pReq->sqlLen);
}
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexExpr(pCxt, pStmt, &pReq->expr, &pReq->exprLen);
}
if (TSDB_CODE_SUCCESS == code) {
code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen);
code = getSmaIndexAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen);
}
return code;
@ -3487,6 +3530,7 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
tNameExtractFullName(&name, pReq->targetStbFullName);
}
pCxt->createStream = true;
int32_t code = translateQuery(pCxt, pStmt->pQuery);
if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB);

View File

@ -542,6 +542,9 @@ int32_t buildCatalogReq(const SParseMetaCache* pMetaCache, SCatalogReq* pCatalog
if (TSDB_CODE_SUCCESS == code) {
code = buildUdfReq(pMetaCache->pUdf, &pCatalogReq->pUdf);
}
if (TSDB_CODE_SUCCESS == code) {
code = buildTableReq(pMetaCache->pTableIndex, &pCatalogReq->pTableIndex);
}
return code;
}
@ -628,6 +631,9 @@ int32_t putMetaDataToCache(const SCatalogReq* pCatalogReq, const SMetaData* pMet
if (TSDB_CODE_SUCCESS == code) {
code = putUdfToCache(pCatalogReq->pUdf, pMetaData->pUdfList, pMetaCache->pUdf);
}
if (TSDB_CODE_SUCCESS == code) {
code = putTableDataToCache(pCatalogReq->pTableIndex, pMetaData->pTableIndex, pMetaCache->pTableIndex);
}
return code;
}
@ -806,3 +812,43 @@ int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFun
}
return code;
}
static void destroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); }
static SArray* smaIndexesDup(SArray* pSrc) {
SArray* pDst = taosArrayDup(pSrc);
if (NULL == pDst) {
return NULL;
}
int32_t size = taosArrayGetSize(pDst);
for (int32_t i = 0; i < size; ++i) {
((STableIndexInfo*)taosArrayGet(pDst, i))->expr = NULL;
}
for (int32_t i = 0; i < size; ++i) {
STableIndexInfo* pIndex = taosArrayGet(pDst, i);
pIndex->expr = taosMemoryStrDup(((STableIndexInfo*)taosArrayGet(pSrc, i))->expr);
if (NULL == pIndex->expr) {
taosArrayDestroyEx(pDst, destroySmaIndex);
return NULL;
}
}
return pDst;
}
int32_t reserveTableIndexInCache(int32_t acctId, const char* pDb, const char* pTable, SParseMetaCache* pMetaCache) {
return reserveTableReqInCache(acctId, pDb, pTable, &pMetaCache->pTableIndex);
}
int32_t getTableIndexFromCache(SParseMetaCache* pMetaCache, const SName* pName, SArray** pIndexes) {
char fullName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pName, fullName);
SArray* pSmaIndexes = NULL;
int32_t code = getMetaDataFromHash(fullName, strlen(fullName), pMetaCache->pTableIndex, (void**)&pSmaIndexes);
if (TSDB_CODE_SUCCESS == code && NULL != pSmaIndexes) {
*pIndexes = smaIndexesDup(pSmaIndexes);
if (NULL == *pIndexes) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
return code;
}

View File

@ -214,6 +214,11 @@ int32_t __catalogRefreshGetTableMeta(SCatalog* pCatalog, void* pTransporter, con
int32_t __catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { return 0; }
int32_t __catalogGetTableIndex(SCatalog* pCtg, void* pTrans, const SEpSet* pMgmtEps, const SName* pName,
SArray** pRes) {
return g_mockCatalogService->catalogGetTableIndex(pName, pRes);
}
void initMetaDataEnv() {
g_mockCatalogService.reset(new MockCatalogService());
@ -230,6 +235,7 @@ void initMetaDataEnv() {
stub.set(catalogGetUdfInfo, __catalogGetUdfInfo);
stub.set(catalogRefreshGetTableMeta, __catalogRefreshGetTableMeta);
stub.set(catalogRemoveTableMeta, __catalogRemoveTableMeta);
stub.set(catalogGetTableIndex, __catalogGetTableIndex);
// {
// AddrAny any("libcatalog.so");
// std::map<std::string,void*> result;

View File

@ -149,6 +149,20 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(pTableName, tbFName);
auto it = index_.find(tbFName);
if (index_.end() == it) {
return TSDB_CODE_SUCCESS;
}
*pIndexes = taosArrayInit(it->second.size(), sizeof(STableIndexInfo));
for (const auto& index : it->second) {
taosArrayPush(*pIndexes, &index);
}
return TSDB_CODE_SUCCESS;
}
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
int32_t code = getAllTableMeta(pCatalogReq->pTableMeta, &pMetaData->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
@ -169,6 +183,9 @@ class MockCatalogServiceImpl {
if (TSDB_CODE_SUCCESS == code) {
code = getAllUdf(pCatalogReq->pUdf, &pMetaData->pUdfList);
}
if (TSDB_CODE_SUCCESS == code) {
code = getAllTableIndex(pCatalogReq->pTableIndex, &pMetaData->pTableIndex);
}
return code;
}
@ -176,7 +193,7 @@ class MockCatalogServiceImpl {
int32_t numOfColumns, int32_t numOfTags) {
builder_ = TableBuilder::createTableBuilder(tableType, numOfColumns, numOfTags);
meta_[db][tbname] = builder_->table();
meta_[db][tbname]->schema->uid = id_++;
meta_[db][tbname]->schema->uid = getNextId();
return *(builder_.get());
}
@ -187,14 +204,11 @@ class MockCatalogServiceImpl {
}
meta_[db][tbname].reset(new MockTableMeta());
meta_[db][tbname]->schema = table.release();
meta_[db][tbname]->schema->uid = id_++;
meta_[db][tbname]->schema->uid = getNextId();
meta_[db][tbname]->schema->tableType = TSDB_CHILD_TABLE;
SVgroupInfo vgroup = {vgid, 0, 0, {0}, 0};
addEpIntoEpSet(&vgroup.epSet, "dnode_1", 6030);
addEpIntoEpSet(&vgroup.epSet, "dnode_2", 6030);
addEpIntoEpSet(&vgroup.epSet, "dnode_3", 6030);
vgroup.epSet.inUse = 0;
genEpSet(&vgroup.epSet);
meta_[db][tbname]->vgs.emplace_back(vgroup);
// super table
@ -268,10 +282,39 @@ class MockCatalogServiceImpl {
udf_.insert(std::make_pair(func, info));
}
void createSmaIndex(const SMCreateSmaReq* pReq) {
STableIndexInfo info;
info.intervalUnit = pReq->intervalUnit;
info.slidingUnit = pReq->slidingUnit;
info.interval = pReq->interval;
info.offset = pReq->offset;
info.sliding = pReq->sliding;
info.dstTbUid = getNextId();
info.dstVgId = pReq->dstVgId;
genEpSet(&info.epSet);
info.expr = strdup(pReq->expr);
auto it = index_.find(pReq->stb);
if (index_.end() == it) {
index_.insert(std::make_pair(std::string(pReq->stb), std::vector<STableIndexInfo>{info}));
} else {
it->second.push_back(info);
}
}
private:
typedef std::map<std::string, std::shared_ptr<MockTableMeta>> TableMetaCache;
typedef std::map<std::string, TableMetaCache> DbMetaCache;
typedef std::map<std::string, std::shared_ptr<SFuncInfo>> UdfMetaCache;
typedef std::map<std::string, std::vector<STableIndexInfo>> IndexMetaCache;
uint64_t getNextId() { return id_++; }
void genEpSet(SEpSet* pEpSet) {
addEpIntoEpSet(pEpSet, "dnode_1", 6030);
addEpIntoEpSet(pEpSet, "dnode_2", 6030);
addEpIntoEpSet(pEpSet, "dnode_3", 6030);
pEpSet->inUse = 0;
}
std::string toDbname(const std::string& dbFullName) const {
std::string::size_type n = dbFullName.find(".");
@ -463,10 +506,24 @@ class MockCatalogServiceImpl {
return TSDB_CODE_SUCCESS;
}
int32_t getAllTableIndex(SArray* pTableIndex, SArray** pTableIndexData) const {
if (NULL != pTableIndex) {
int32_t num = taosArrayGetSize(pTableIndex);
*pTableIndexData = taosArrayInit(num, sizeof(SMetaRes));
for (int32_t i = 0; i < num; ++i) {
SMetaRes res = {0};
res.code = catalogGetTableIndex((const SName*)taosArrayGet(pTableIndex, i), (SArray**)(&res.pRes));
taosArrayPush(*pTableIndexData, &res);
}
}
return TSDB_CODE_SUCCESS;
}
uint64_t id_;
std::unique_ptr<TableBuilder> builder_;
DbMetaCache meta_;
UdfMetaCache udf_;
IndexMetaCache index_;
};
MockCatalogService::MockCatalogService() : impl_(new MockCatalogServiceImpl()) {}
@ -490,6 +547,8 @@ void MockCatalogService::createFunction(const std::string& func, int8_t funcType
impl_->createFunction(func, funcType, outputType, outputLen, bufSize);
}
void MockCatalogService::createSmaIndex(const SMCreateSmaReq* pReq) { impl_->createSmaIndex(pReq); }
int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const {
return impl_->catalogGetTableMeta(pTableName, pTableMeta);
}
@ -510,6 +569,10 @@ int32_t MockCatalogService::catalogGetUdfInfo(const std::string& funcName, SFunc
return impl_->catalogGetUdfInfo(funcName, pInfo);
}
int32_t MockCatalogService::catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const {
return impl_->catalogGetTableIndex(pTableName, pIndexes);
}
int32_t MockCatalogService::catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const {
return impl_->catalogGetAllMeta(pCatalogReq, pMetaData);
}

View File

@ -57,12 +57,14 @@ class MockCatalogService {
void createSubTable(const std::string& db, const std::string& stbname, const std::string& tbname, int16_t vgid);
void showTables() const;
void createFunction(const std::string& func, int8_t funcType, int8_t outputType, int32_t outputLen, int32_t bufSize);
void createSmaIndex(const SMCreateSmaReq* pReq);
int32_t catalogGetTableMeta(const SName* pTableName, STableMeta** pTableMeta) const;
int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const;
int32_t catalogGetTableDistVgInfo(const SName* pTableName, SArray** pVgList) const;
int32_t catalogGetDBVgInfo(const char* pDbFName, SArray** pVgList) const;
int32_t catalogGetUdfInfo(const std::string& funcName, SFuncInfo* pInfo) const;
int32_t catalogGetTableIndex(const SName* pTableName, SArray** pIndexes) const;
int32_t catalogGetAllMeta(const SCatalogReq* pCatalogReq, SMetaData* pMetaData) const;
private:

View File

@ -36,11 +36,13 @@ extern "C" {
#define planTrace(param, ...) qTrace("PLAN: " param, __VA_ARGS__)
int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...);
int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList);
int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList);
int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList);
int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew);
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode);
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode);
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan);
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan);
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);

View File

@ -220,6 +220,7 @@ static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealT
}
TSWAP(pScan->pVgroupList, pRealTable->pVgroupList);
TSWAP(pScan->pSmaIndexes, pRealTable->pSmaIndexes);
pScan->tableId = pRealTable->pMeta->uid;
pScan->stableId = pRealTable->pMeta->suid;
pScan->tableType = pRealTable->pMeta->tableType;
@ -272,10 +273,10 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
// set output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pScan->pScanCols, &pScan->node.pTargets);
code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pScan->pScanPseudoCols, &pScan->node.pTargets);
code = createColumnByRewriteExprs(pScan->pScanPseudoCols, &pScan->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
@ -441,10 +442,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
// set the output
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pGroupKeys) {
code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets);
code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
code = createColumnByRewriteExps(pAgg->pAggFuncs, &pAgg->node.pTargets);
code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
@ -475,7 +476,7 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pIdfRowsFunc->pVectorFuncs, &pIdfRowsFunc->node.pTargets);
code = createColumnByRewriteExprs(pIdfRowsFunc->pVectorFuncs, &pIdfRowsFunc->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
@ -505,7 +506,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pWindow->pFuncs, &pWindow->node.pTargets);
code = createColumnByRewriteExprs(pWindow->pFuncs, &pWindow->node.pTargets);
}
pSelect->hasAggFuncs = false;
@ -776,7 +777,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets);
code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
@ -918,7 +919,7 @@ static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pS
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pAgg->pGroupKeys, &pAgg->node.pTargets);
code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
@ -1040,7 +1041,7 @@ static int32_t createDeleteScanLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* p
// set output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pScan->pScanCols, &pScan->node.pTargets);
code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
@ -1064,7 +1065,7 @@ static int32_t createDeleteAggLogicNode(SLogicPlanContext* pCxt, SDeleteStmt* pD
}
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pAgg->pAggFuncs, &pAgg->node.pTargets);
code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
@ -1138,11 +1139,40 @@ static int32_t createQueryLogicNode(SLogicPlanContext* pCxt, SNode* pStmt, SLogi
return TSDB_CODE_FAILED;
}
int32_t createLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) {
static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
pNode->pParent = pParent;
SNode* pChild;
FOREACH(pChild, pNode->pChildren) { doSetLogicNodeParent((SLogicNode*)pChild, pNode); }
}
static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); }
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
SLogicPlanContext cxt = {.pPlanCxt = pCxt};
int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, pLogicNode);
if (TSDB_CODE_SUCCESS != code) {
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->id.queryId = pCxt->queryId;
pSubplan->id.groupId = 1;
pSubplan->id.subplanId = 1;
int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode);
if (TSDB_CODE_SUCCESS == code) {
setLogicNodeParent(pSubplan->pNode);
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pSubplan->pNode)) {
pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
} else {
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
}
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicSubplan = pSubplan;
} else {
nodesDestroyNode(pSubplan);
}
return code;
}
return TSDB_CODE_SUCCESS;
}

View File

@ -32,8 +32,7 @@ typedef struct SOptimizeContext {
bool optimized;
} SOptimizeContext;
typedef int32_t (*FMatch)(SOptimizeContext* pCxt, SLogicNode* pLogicNode);
typedef int32_t (*FOptimize)(SOptimizeContext* pCxt, SLogicNode* pLogicNode);
typedef int32_t (*FOptimize)(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan);
typedef struct SOptimizeRule {
char* pName;
@ -109,7 +108,6 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
}
if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) {
return true;
// return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType);
}
return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys);
}
@ -231,9 +229,9 @@ static void setScanWindowInfo(SScanLogicNode* pScan) {
}
}
static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
static int32_t osdOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SOsdInfo info = {0};
int32_t code = osdMatch(pCxt, pLogicNode, &info);
int32_t code = osdMatch(pCxt, pLogicSubplan->pNode, &info);
if (TSDB_CODE_SUCCESS == code && info.pScan) {
setScanWindowInfo((SScanLogicNode*)info.pScan);
}
@ -635,8 +633,8 @@ static int32_t cpdPushCondition(SOptimizeContext* pCxt, SLogicNode* pLogicNode)
return code;
}
static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
return cpdPushCondition(pCxt, pLogicNode);
static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
return cpdPushCondition(pCxt, pLogicSubplan->pNode);
}
static bool opkIsPrimaryKeyOrderBy(SNodeList* pSortKeys) {
@ -745,26 +743,292 @@ static int32_t opkOptimizeImpl(SOptimizeContext* pCxt, SSortLogicNode* pSort) {
return code;
}
static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized);
static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, opkSortMayBeOptimized);
if (NULL == pSort) {
return TSDB_CODE_SUCCESS;
}
return opkOptimizeImpl(pCxt, pSort);
}
static const SOptimizeRule optimizeRuleSet[] = {{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
static bool smaOptMayBeOptimized(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent ||
QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) ||
WINDOW_TYPE_INTERVAL != ((SWindowLogicNode*)pNode->pParent)->winType) {
return false;
}
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
if (0 == pScan->interval || NULL == pScan->pSmaIndexes || NULL != pScan->node.pConditions) {
return false;
}
return true;
}
static int32_t smaOptCreateMerge(SLogicNode* pChild, SNodeList* pMergeKeys, SNodeList* pTargets, SLogicNode** pOutput) {
SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMerge->node.precision = pChild->precision;
pMerge->numOfChannels = 2;
pMerge->pMergeKeys = pMergeKeys;
pMerge->node.pTargets = pTargets;
pMerge->pInputs = nodesCloneList(pChild->pTargets);
if (NULL == pMerge->pInputs) {
nodesDestroyNode(pMerge);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pOutput = (SLogicNode*)pMerge;
return TSDB_CODE_SUCCESS;
}
static int32_t smaOptRecombinationNode(SLogicSubplan* pLogicSubplan, SLogicNode* pInterval, SLogicNode* pMerge,
SLogicNode* pSmaScan) {
int32_t code = nodesListMakeAppend(&pMerge->pChildren, pInterval);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeAppend(&pMerge->pChildren, pSmaScan);
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceLogicNode(pLogicSubplan, pInterval, pMerge);
pSmaScan->pParent = pMerge;
pInterval->pParent = pMerge;
}
return code;
}
static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList* pCols,
SLogicNode** pOutput) {
SScanLogicNode* pSmaScan = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
if (NULL == pSmaScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSmaScan->pScanCols = pCols;
pSmaScan->tableType = TSDB_SUPER_TABLE;
pSmaScan->tableId = pIndex->dstTbUid;
pSmaScan->stableId = pIndex->dstTbUid;
pSmaScan->scanType = SCAN_TYPE_TABLE;
pSmaScan->scanSeq[0] = pScan->scanSeq[0];
pSmaScan->scanSeq[1] = pScan->scanSeq[1];
pSmaScan->scanRange = pScan->scanRange;
pSmaScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
pSmaScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
if (NULL == pSmaScan->pVgroupList) {
nodesDestroyNode(pSmaScan);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSmaScan->pVgroupList->numOfVgroups = 1;
pSmaScan->pVgroupList->vgroups[0].vgId = pIndex->dstVgId;
memcpy(&(pSmaScan->pVgroupList->vgroups[0].epSet), &pIndex->epSet, sizeof(SEpSet));
*pOutput = (SLogicNode*)pSmaScan;
return TSDB_CODE_SUCCESS;
}
static bool smaOptEqualInterval(SWindowLogicNode* pWindow, STableIndexInfo* pIndex) {
if (pWindow->interval != pIndex->interval || pWindow->intervalUnit != pIndex->intervalUnit ||
pWindow->offset != pIndex->offset || pWindow->sliding != pIndex->sliding ||
pWindow->slidingUnit != pIndex->slidingUnit) {
return false;
}
// todo time range
return true;
}
// #define SMA_TABLE_NAME "#sma_table"
// #define SMA_COL_NAME_PREFIX "#sma_col_"
static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) {
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
pCol->tableId = tableId;
pCol->tableType = TSDB_SUPER_TABLE;
pCol->colId = colId;
pCol->colType = COLUMN_TYPE_COLUMN;
snprintf(pCol->colName, sizeof(pCol->colName), "#sma_col_%d", pCol->colId);
// strcpy(pCol->tableName, SMA_TABLE_NAME);
// strcpy(pCol->tableAlias, SMA_TABLE_NAME);
pCol->node.resType = ((SExprNode*)pFunc)->resType;
strcpy(pCol->node.aliasName, ((SExprNode*)pFunc)->aliasName);
return (SNode*)pCol;
}
static int32_t smaOptFindSmaFunc(SNode* pQueryFunc, SNodeList* pSmaFuncs) {
int32_t index = 0;
SNode* pSmaFunc = NULL;
FOREACH(pSmaFunc, pSmaFuncs) {
if (nodesEqualNode(pQueryFunc, pSmaFunc)) {
return index;
}
++index;
}
return -1;
}
static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeList* pSmaFuncs, SNodeList** pOutput,
int32_t* pWStrartIndex) {
SNodeList* pCols = NULL;
SNode* pFunc = NULL;
int32_t code = TSDB_CODE_SUCCESS;
int32_t index = 0;
*pWStrartIndex = -1;
FOREACH(pFunc, pFuncs) {
if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) {
*pWStrartIndex = index;
}
int32_t smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs);
if (smaFuncIndex < 0) {
break;
} else {
code = nodesListMakeStrictAppend(&pCols, smaOptCreateSmaCol(pFunc, tableId, smaFuncIndex + 2));
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
++index;
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = pCols;
} else {
nodesDestroyList(pCols);
}
return code;
}
static int32_t smaOptCouldApplyIndex(SWindowLogicNode* pWindow, STableIndexInfo* pIndex, SNodeList** pCols,
int32_t* pWStrartIndex) {
if (!smaOptEqualInterval(pWindow, pIndex)) {
return TSDB_CODE_SUCCESS;
}
SNodeList* pSmaFuncs = NULL;
int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs);
if (TSDB_CODE_SUCCESS == code) {
code = smaOptCreateSmaCols(pWindow->pFuncs, pIndex->dstTbUid, pSmaFuncs, pCols, pWStrartIndex);
}
nodesDestroyList(pSmaFuncs);
return code;
}
static SNode* smaOptCreateWStartTs() {
SFunctionNode* pWStart = nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pWStart) {
return NULL;
}
strcpy(pWStart->functionName, "_wstartts");
snprintf(pWStart->node.aliasName, sizeof(pWStart->node.aliasName), "%s.%p", pWStart->functionName, pWStart);
if (TSDB_CODE_SUCCESS != fmGetFuncInfo(pWStart, NULL, 0)) {
nodesDestroyNode(pWStart);
return NULL;
}
return (SNode*)pWStart;
}
static int32_t smaOptCreateMergeKey(SNode* pCol, SNodeList** pMergeKeys) {
SOrderByExprNode* pMergeKey = nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pMergeKey) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMergeKey->pExpr = nodesCloneNode(pCol);
if (NULL == pMergeKey->pExpr) {
nodesDestroyNode(pMergeKey);
return TSDB_CODE_OUT_OF_MEMORY;
}
pMergeKey->order = ORDER_ASC;
pMergeKey->nullOrder = NULL_ORDER_FIRST;
return nodesListMakeStrictAppend(pMergeKeys, pMergeKey);
}
static int32_t smaOptRewriteInterval(SWindowLogicNode* pInterval, int32_t wstrartIndex, SNodeList** pMergeKeys) {
if (wstrartIndex < 0) {
SNode* pWStart = smaOptCreateWStartTs();
if (NULL == pWStart) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = createColumnByRewriteExpr(pWStart, &pInterval->node.pTargets);
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode(pWStart);
return code;
}
wstrartIndex = LIST_LENGTH(pInterval->node.pTargets) - 1;
}
return smaOptCreateMergeKey(nodesListGetNode(pInterval->node.pTargets, wstrartIndex), pMergeKeys);
}
static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
SNodeList* pSmaCols, int32_t wstrartIndex) {
SWindowLogicNode* pInterval = (SWindowLogicNode*)pScan->node.pParent;
SNodeList* pMergeTargets = nodesCloneList(pInterval->node.pTargets);
if (NULL == pMergeTargets) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SLogicNode* pSmaScan = NULL;
SLogicNode* pMerge = NULL;
SNodeList* pMergeKeys = NULL;
int32_t code = smaOptRewriteInterval(pInterval, wstrartIndex, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = smaOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan);
}
if (TSDB_CODE_SUCCESS == code) {
code = smaOptCreateMerge(pScan->node.pParent, pMergeKeys, pMergeTargets, &pMerge);
}
if (TSDB_CODE_SUCCESS == code) {
code = smaOptRecombinationNode(pLogicSubplan, pScan->node.pParent, pMerge, pSmaScan);
}
return code;
}
static void smaOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); }
static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t nindexes = taosArrayGetSize(pScan->pSmaIndexes);
for (int32_t i = 0; i < nindexes; ++i) {
STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i);
SNodeList* pSmaCols = NULL;
int32_t wstrartIndex = -1;
code = smaOptCouldApplyIndex((SWindowLogicNode*)pScan->node.pParent, pIndex, &pSmaCols, &wstrartIndex);
if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) {
code = smaOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols, wstrartIndex);
taosArrayDestroyEx(pScan->pSmaIndexes, smaOptDestroySmaIndex);
pScan->pSmaIndexes = NULL;
break;
}
}
return code;
}
static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, smaOptMayBeOptimized);
if (NULL == pScan) {
return TSDB_CODE_SUCCESS;
}
return smaOptimizeImpl(pCxt, pLogicSubplan, pScan);
}
// clang-format off
static const SOptimizeRule optimizeRuleSet[] = {
{.pName = "OptimizeScanData", .optimizeFunc = osdOptimize},
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize}};
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
{.pName = "SmaIndex", .optimizeFunc = smaOptimize}
};
// clang-format on
static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule));
static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) {
static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
SOptimizeContext cxt = {.pPlanCxt = pCxt, .optimized = false};
do {
cxt.optimized = false;
for (int32_t i = 0; i < optimizeRuleNum; ++i) {
int32_t code = optimizeRuleSet[i].optimizeFunc(&cxt, pLogicNode);
int32_t code = optimizeRuleSet[i].optimizeFunc(&cxt, pLogicSubplan);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
@ -773,4 +1037,6 @@ static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) {
return TSDB_CODE_SUCCESS;
}
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { return applyOptimizeRule(pCxt, pLogicNode); }
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
return applyOptimizeRule(pCxt, pLogicSubplan);
}

View File

@ -80,29 +80,12 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
return TSDB_CODE_SUCCESS;
}
static int32_t splReplaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) {
if (NULL == pOld->pParent) {
pSubplan->pNode = (SLogicNode*)pNew;
return TSDB_CODE_SUCCESS;
}
SNode* pNode;
FOREACH(pNode, pOld->pParent->pChildren) {
if (nodesEqualNode(pNode, pOld)) {
REPLACE_NODE(pNew);
pNew->pParent = pOld->pParent;
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
ESubplanType subplanType) {
SExchangeLogicNode* pExchange = NULL;
int32_t code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
if (TSDB_CODE_SUCCESS == code) {
code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
}
if (TSDB_CODE_SUCCESS == code) {
pSubplan->subplanType = subplanType;
@ -282,7 +265,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
code = stbSplAppendWStart(pPartWin->pFuncs, &index);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pPartWin->pFuncs, &pPartWin->node.pTargets);
code = createColumnByRewriteExprs(pPartWin->pFuncs, &pPartWin->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
nodesDestroyNode(pMergeWindow->pTspk);
@ -328,7 +311,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
if (NULL == pSubplan) {
code = nodesListMakeAppend(&pSplitNode->pChildren, pMerge);
} else {
code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
}
}
if (TSDB_CODE_SUCCESS != code) {
@ -442,7 +425,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
pPartAgg->pGroupKeys = pGroupKeys;
code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
code = createColumnByRewriteExprs(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
@ -457,7 +440,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
code = createColumnByRewriteExprs(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
}
nodesDestroyList(pFunc);
@ -897,12 +880,56 @@ static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan)
return code;
}
typedef struct SSmaIndexSplitInfo {
SMergeLogicNode* pMerge;
SLogicSubplan* pSubplan;
} SSmaIndexSplitInfo;
static SLogicNode* smaIdxSplMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = smaIdxSplMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSmaIndexSplitInfo* pInfo) {
SLogicNode* pSplitNode = smaIdxSplMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pMerge = (SMergeLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
}
return NULL != pSplitNode;
}
static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SSmaIndexSplitInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)smaIdxSplFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pMerge);
if (TSDB_CODE_SUCCESS == code) {
info.pMerge->srcGroupId = pCxt->groupId;
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
// clang-format off
static const SSplitRule splitRuleSet[] = {
{.pName = "SuperTableSplit", .splitFunc = stableSplit},
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit}
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}
};
// clang-format on
@ -936,14 +963,6 @@ static int32_t applySplitRule(SPlanContext* pCxt, SLogicSubplan* pSubplan) {
return TSDB_CODE_SUCCESS;
}
static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
pNode->pParent = pParent;
SNode* pChild;
FOREACH(pChild, pNode->pChildren) { doSetLogicNodeParent((SLogicNode*)pChild, pNode); }
}
static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); }
static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
TSWAP(((SScanLogicNode*)pNode)->pVgroupList, pSubplan->pVgroupList);
@ -954,37 +973,10 @@ static void setVgroupsInfo(SLogicNode* pNode, SLogicSubplan* pSubplan) {
FOREACH(pChild, pNode->pChildren) { setVgroupsInfo((SLogicNode*)pChild, pSubplan); }
}
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) {
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
if (NULL == pSubplan) {
return TSDB_CODE_OUT_OF_MEMORY;
int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicSubplan->pNode)) {
setVgroupsInfo(pLogicSubplan->pNode, pLogicSubplan);
return TSDB_CODE_SUCCESS;
}
pSubplan->pNode = nodesCloneNode(pLogicNode);
if (NULL == pSubplan->pNode) {
nodesDestroyNode(pSubplan);
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->id.queryId = pCxt->queryId;
pSubplan->id.groupId = 1;
setLogicNodeParent(pSubplan->pNode);
int32_t code = TSDB_CODE_SUCCESS;
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY == nodeType(pLogicNode)) {
pSubplan->subplanType = SUBPLAN_TYPE_MODIFY;
TSWAP(((SVnodeModifyLogicNode*)pLogicNode)->pDataBlocks, ((SVnodeModifyLogicNode*)pSubplan->pNode)->pDataBlocks);
setVgroupsInfo(pSubplan->pNode, pSubplan);
} else {
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
code = applySplitRule(pCxt, pSubplan);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicSubplan = pSubplan;
} else {
nodesDestroyNode(pSubplan);
}
return code;
return applySplitRule(pCxt, pLogicSubplan);
}

View File

@ -69,7 +69,7 @@ static EDealRes doCreateColumn(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) {
int32_t createColumnByRewriteExprs(SNodeList* pExprs, SNodeList** pList) {
SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
if (NULL == cxt.pList) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -85,3 +85,37 @@ int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) {
}
return cxt.errCode;
}
int32_t createColumnByRewriteExpr(SNode* pExpr, SNodeList** pList) {
SCreateColumnCxt cxt = {.errCode = TSDB_CODE_SUCCESS, .pList = (NULL == *pList ? nodesMakeList() : *pList)};
if (NULL == cxt.pList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
nodesWalkExpr(pExpr, doCreateColumn, &cxt);
if (TSDB_CODE_SUCCESS != cxt.errCode) {
nodesDestroyList(cxt.pList);
return cxt.errCode;
}
if (NULL == *pList) {
*pList = cxt.pList;
}
return cxt.errCode;
}
int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) {
if (NULL == pOld->pParent) {
pSubplan->pNode = (SLogicNode*)pNew;
return TSDB_CODE_SUCCESS;
}
SNode* pNode;
FOREACH(pNode, pOld->pParent->pChildren) {
if (nodesEqualNode(pNode, pOld)) {
REPLACE_NODE(pNew);
pNew->pParent = pOld->pParent;
return TSDB_CODE_SUCCESS;
}
}
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}

View File

@ -26,16 +26,15 @@ static void dumpQueryPlan(SQueryPlan* pPlan) {
}
int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList) {
SLogicNode* pLogicNode = NULL;
SLogicSubplan* pLogicSubplan = NULL;
SQueryLogicPlan* pLogicPlan = NULL;
int32_t code = createLogicPlan(pCxt, &pLogicNode);
int32_t code = createLogicPlan(pCxt, &pLogicSubplan);
if (TSDB_CODE_SUCCESS == code) {
code = optimizeLogicPlan(pCxt, pLogicNode);
code = optimizeLogicPlan(pCxt, pLogicSubplan);
}
if (TSDB_CODE_SUCCESS == code) {
code = splitLogicPlan(pCxt, pLogicNode, &pLogicSubplan);
code = splitLogicPlan(pCxt, pLogicSubplan);
}
if (TSDB_CODE_SUCCESS == code) {
code = scaleOutLogicPlan(pCxt, pLogicSubplan, &pLogicPlan);
@ -47,7 +46,6 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
dumpQueryPlan(*pPlan);
}
nodesDestroyNode(pLogicNode);
nodesDestroyNode(pLogicSubplan);
nodesDestroyNode(pLogicPlan);
terrno = code;

View File

@ -42,7 +42,9 @@ TEST_F(PlanOtherTest, createStreamUseSTable) {
TEST_F(PlanOtherTest, createSmaIndex) {
useDb("root", "test");
run("create sma index index1 on t1 function(max(c1), min(c3 + 10), sum(c4)) interval(10s)");
run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
run("SELECT SUM(c4) FROM t1 INTERVAL(10s)");
}
TEST_F(PlanOtherTest, explain) {

View File

@ -14,12 +14,14 @@
*/
#include "planTestUtil.h"
#include <getopt.h>
#include <algorithm>
#include <array>
#include "cmdnodes.h"
#include "mockCatalogService.h"
#include "parser.h"
#include "planInt.h"
@ -104,13 +106,12 @@ class PlannerTestBaseImpl {
SPlanContext cxt = {0};
setPlanContext(pQuery, &cxt);
SLogicNode* pLogicNode = nullptr;
doCreateLogicPlan(&cxt, &pLogicNode);
doOptimizeLogicPlan(&cxt, pLogicNode);
SLogicSubplan* pLogicSubplan = nullptr;
doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan);
doCreateLogicPlan(&cxt, &pLogicSubplan);
doOptimizeLogicPlan(&cxt, pLogicSubplan);
doSplitLogicPlan(&cxt, pLogicSubplan);
SQueryLogicPlan* pLogicPlan = nullptr;
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
@ -164,13 +165,12 @@ class PlannerTestBaseImpl {
SPlanContext cxt = {0};
setPlanContext(stmtEnv_.pQuery_, &cxt);
SLogicNode* pLogicNode = nullptr;
doCreateLogicPlan(&cxt, &pLogicNode);
doOptimizeLogicPlan(&cxt, pLogicNode);
SLogicSubplan* pLogicSubplan = nullptr;
doSplitLogicPlan(&cxt, pLogicNode, &pLogicSubplan);
doCreateLogicPlan(&cxt, &pLogicSubplan);
doOptimizeLogicPlan(&cxt, pLogicSubplan);
doSplitLogicPlan(&cxt, pLogicSubplan);
SQueryLogicPlan* pLogicPlan = nullptr;
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
@ -324,19 +324,19 @@ class PlannerTestBaseImpl {
res_.ast_ = toString(pQuery->pRoot);
}
void doCreateLogicPlan(SPlanContext* pCxt, SLogicNode** pLogicNode) {
DO_WITH_THROW(createLogicPlan, pCxt, pLogicNode);
res_.rawLogicPlan_ = toString((SNode*)(*pLogicNode));
void doCreateLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
DO_WITH_THROW(createLogicPlan, pCxt, pLogicSubplan);
res_.rawLogicPlan_ = toString((SNode*)(*pLogicSubplan));
}
void doOptimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) {
DO_WITH_THROW(optimizeLogicPlan, pCxt, pLogicNode);
res_.optimizedLogicPlan_ = toString((SNode*)pLogicNode);
void doOptimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
DO_WITH_THROW(optimizeLogicPlan, pCxt, pLogicSubplan);
res_.optimizedLogicPlan_ = toString((SNode*)pLogicSubplan);
}
void doSplitLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode, SLogicSubplan** pLogicSubplan) {
DO_WITH_THROW(splitLogicPlan, pCxt, pLogicNode, pLogicSubplan);
res_.splitLogicPlan_ = toString((SNode*)(*pLogicSubplan));
void doSplitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan) {
DO_WITH_THROW(splitLogicPlan, pCxt, pLogicSubplan);
res_.splitLogicPlan_ = toString((SNode*)(pLogicSubplan));
}
void doScaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan) {
@ -363,6 +363,7 @@ class PlannerTestBaseImpl {
} else if (QUERY_NODE_CREATE_INDEX_STMT == nodeType(pQuery->pRoot)) {
SMCreateSmaReq req = {0};
tDeserializeSMCreateSmaReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req);
g_mockCatalogService->createSmaIndex(&req);
nodesStringToNode(req.ast, &pCxt->pAstRoot);
pCxt->streamQuery = true;
} else if (QUERY_NODE_CREATE_STREAM_STMT == nodeType(pQuery->pRoot)) {

View File

@ -14,6 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include "tjson.h"
#include "cJSON.h"
#include "taoserror.h"
@ -138,6 +139,23 @@ int32_t tjsonAddArray(SJson* pJson, const char* pName, FToJson func, const void*
return TSDB_CODE_SUCCESS;
}
int32_t tjsonAddTArray(SJson* pJson, const char* pName, FToJson func, const SArray* pArray) {
int32_t num = taosArrayGetSize(pArray);
if (num > 0) {
SJson* pJsonArray = tjsonAddArrayToObject(pJson, pName);
if (NULL == pJsonArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < num; ++i) {
int32_t code = tjsonAddItem(pJsonArray, func, taosArrayGet(pArray, i));
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
char* tjsonToString(const SJson* pJson) { return cJSON_Print((cJSON*)pJson); }
char* tjsonToUnformattedString(const SJson* pJson) { return cJSON_PrintUnformatted((cJSON*)pJson); }
@ -299,6 +317,25 @@ int32_t tjsonToArray(const SJson* pJson, const char* pName, FToObject func, void
return TSDB_CODE_SUCCESS;
}
int32_t tjsonToTArray(const SJson* pJson, const char* pName, FToObject func, SArray** pArray, int32_t itemSize) {
const cJSON* jArray = tjsonGetObjectItem(pJson, pName);
int32_t size = tjsonGetArraySize(jArray);
if (size > 0) {
*pArray = taosArrayInit(size, itemSize);
if (NULL == *pArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArraySetSize(*pArray, size);
for (int32_t i = 0; i < size; ++i) {
int32_t code = func(tjsonGetArrayItem(jArray, i), taosArrayGet(*pArray, i));
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
SJson* tjsonParse(const char* pStr) { return cJSON_Parse(pStr); }
bool tjsonValidateJson(const char* jIn) {