Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

This commit is contained in:
Hongze Cheng 2022-06-17 06:41:12 +00:00
commit 189f99eb74
49 changed files with 691 additions and 243 deletions

View File

@ -185,6 +185,8 @@ bool fmIsUserDefinedFunc(int32_t funcId);
bool fmIsDistExecFunc(int32_t funcId);
bool fmIsForbidFillFunc(int32_t funcId);
bool fmIsForbidStreamFunc(int32_t funcId);
bool fmIsForbidWindowFunc(int32_t funcId);
bool fmIsForbidGroupByFunc(int32_t funcId);
bool fmIsIntervalInterpoFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);

View File

@ -102,6 +102,7 @@ typedef enum ENodeType {
QUERY_NODE_EXPLAIN_OPTIONS,
QUERY_NODE_STREAM_OPTIONS,
QUERY_NODE_LEFT_VALUE,
QUERY_NODE_COLUMN_REF,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR,

View File

@ -65,11 +65,16 @@ typedef struct SColumnNode {
char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
SNode* pProjectRef;
int16_t dataBlockId;
int16_t slotId;
// SNode* pProjectRef;
int16_t dataBlockId;
int16_t slotId;
} SColumnNode;
typedef struct SColumnRefNode {
ENodeType type;
char colName[TSDB_COL_NAME_LEN];
} SColumnRefNode;
typedef struct STargetNode {
ENodeType type;
int16_t dataBlockId;
@ -247,6 +252,8 @@ typedef struct SSelectStmt {
bool hasIndefiniteRowsFunc;
bool hasSelectFunc;
bool hasSelectValFunc;
bool hasUniqueFunc;
bool hasTailFunc;
} SSelectStmt;
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;
@ -374,6 +381,9 @@ bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsRegularOp(const SOperatorNode* pOp);
bool nodesExprHasColumn(SNode* pNode);
bool nodesExprsHasColumn(SNodeList* pList);
void* nodesGetValueFromNode(SValueNode* pNode);
int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value);
char* nodesGetStrValueFromNode(SValueNode* pNode);

View File

@ -192,6 +192,7 @@ bool syncEnvIsStart();
const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);

View File

@ -46,7 +46,6 @@ typedef struct SRpcHandleInfo {
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
SRpcConnInfo connInfo;
// app info
void *ahandle; // app handle set by client
void *wrapper; // wrapper handle
@ -55,6 +54,9 @@ typedef struct SRpcHandleInfo {
// resp info
void * rsp;
int32_t rspLen;
// conn info
SRpcConnInfo conn;
} SRpcHandleInfo;
typedef struct SRpcMsg {
@ -63,7 +65,6 @@ typedef struct SRpcMsg {
int32_t contLen;
int32_t code;
SRpcHandleInfo info;
SRpcConnInfo conn;
} SRpcMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);

View File

@ -659,6 +659,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG TAOS_DEF_ERROR_CODE(0, 0x2656)
#define TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2657)
#define TSDB_CODE_PAR_INVALID_WINDOW_PC TAOS_DEF_ERROR_CODE(0, 0x2658)
#define TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2659)
#define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A)
#define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)

View File

@ -21,21 +21,6 @@ static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
static void dmSendRsp(SRpcMsg *pMsg);
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo);
// if (IsReq(pRpc)) {
// terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
// dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
// return -1;
//}
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN);
pMsg->conn.clientIp = pConnInfo->clientIp;
pMsg->conn.clientPort = pConnInfo->clientPort;
return 0;
}
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
if (msgFp == NULL) {
@ -116,14 +101,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
}
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) {
goto _OVER;
}
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
if (pMsg == NULL) goto _OVER;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
goto _OVER;
}
dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
if (InParentProc(pWrapper)) {
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);

View File

@ -56,7 +56,7 @@ static int32_t mndProcessAuthReq(SRpcMsg *pReq) {
memcpy(authRsp.user, authReq.user, TSDB_USER_LEN);
int32_t code = mndRetriveAuth(pReq->info.node, &authRsp);
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->conn.user, authRsp.spi, authRsp.encrypt,
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->info.conn.user, authRsp.spi, authRsp.encrypt,
authRsp.user);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp);

View File

@ -292,7 +292,7 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_BNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_BNODE) != 0) {
goto _OVER;
}
@ -394,7 +394,7 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_BNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_BNODE) != 0) {
goto _OVER;
}

View File

@ -521,12 +521,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
goto _OVER;
}
pUser = mndAcquireUser(pMnode, pReq->conn.user);
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
goto _OVER;
}
@ -700,7 +700,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
goto _OVER;
}
@ -980,7 +980,7 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
}
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_DROP_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) {
goto _OVER;
}
@ -1127,7 +1127,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
} else {
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_USE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_USE_DB, pDb) != 0) {
goto _OVER;
}
@ -1252,7 +1252,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
goto _OVER;
}

View File

@ -522,7 +522,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE) != 0) {
goto _OVER;
}
@ -623,7 +623,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
}
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
goto _OVER;
}

View File

@ -318,7 +318,7 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_FUNC) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC) != 0) {
goto _OVER;
}
@ -365,7 +365,7 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
}
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_FUNC) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC) != 0) {
goto _OVER;
}

View File

@ -414,7 +414,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_MNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
goto _OVER;
}
@ -621,7 +621,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
goto _OVER;
}

View File

@ -133,7 +133,7 @@ static void mndFreeConn(SConnObj *pConn) {
taosWLockLatch(&pConn->queryLock);
taosArrayDestroyEx(pConn->pQueries, tFreeClientHbQueryDesc);
taosWUnLockLatch(&pConn->queryLock);
mTrace("conn:%u, is destroyed, data:%p", pConn->id, pConn);
}
@ -194,15 +194,15 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
goto CONN_OVER;
}
taosIp2String(pReq->conn.clientIp, ip);
taosIp2String(pReq->info.conn.clientIp, ip);
pUser = mndAcquireUser(pMnode, pReq->conn.user);
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) {
mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr());
mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr());
goto CONN_OVER;
}
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd);
code = TSDB_CODE_RPC_AUTH_FAILURE;
goto CONN_OVER;
}
@ -213,15 +213,16 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
pDb = mndAcquireDb(pMnode, db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB;
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr());
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
terrstr());
goto CONN_OVER;
}
}
pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort,
connReq.pid, connReq.app, connReq.startTime);
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr());
mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
goto CONN_OVER;
}
@ -246,7 +247,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
pReq->info.rspLen = contLen;
pReq->info.rsp = pRsp;
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app);
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
code = 0;
@ -348,7 +349,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query;
SRpcConnInfo connInfo = pMsg->conn;
SRpcConnInfo connInfo = pMsg->info.conn;
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
if (pConn == NULL) {
@ -361,7 +362,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
mDebug("user:%s, conn:%u is freed and create a new conn:%u", connInfo.user, pBasic->connId, pConn->id);
}
}
SQueryHbRspBasic *rspBasic = taosMemoryCalloc(1, sizeof(SQueryHbRspBasic));
if (rspBasic == NULL) {
mndReleaseConn(pMnode, pConn);
@ -386,7 +387,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
mndGetMnodeEpSet(pMnode, &rspBasic->epSet);
mndCreateQnodeList(pMnode, &rspBasic->pQnodeList, -1);
mndReleaseConn(pMnode, pConn);
hbRsp.query = rspBasic;
@ -500,7 +501,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) return 0;
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
@ -523,7 +524,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->info.conn.user);
pConn->killId = killReq.queryId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return 0;
@ -534,7 +535,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) return 0;
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
@ -555,7 +556,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->info.conn.user);
pConn->killed = 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS;
@ -563,12 +564,12 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
}
static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SConnObj *pConn = NULL;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SConnObj *pConn = NULL;
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
@ -619,12 +620,12 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
}
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SConnObj *pConn = NULL;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
int32_t cols = 0;
SConnObj *pConn = NULL;
if (pShow->pIter == NULL) {
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
pShow->pIter = taosCacheCreateIter(pMgmt->cache);
@ -645,7 +646,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
for (int32_t i = 0; i < numOfQueries; ++i) {
SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i);
SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
cols = 0;
char queryId[26 + VARSTR_HEADER_SIZE] = {0};
@ -691,14 +692,14 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pQuery->subPlanNum, false);
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t strSize = sizeof(subStatus);
int32_t offset = VARSTR_HEADER_SIZE;
for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) {
if (i) {
offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
}
SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i);
SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
}
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
@ -712,7 +713,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
numOfRows++;
}
taosRUnLockLatch(&pConn->queryLock);
}

View File

@ -294,7 +294,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_QNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE) != 0) {
goto _OVER;
}
@ -396,7 +396,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_QNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE) != 0) {
goto _OVER;
}

View File

@ -229,7 +229,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
// if (mndCheckShowAuth(pMnode, pReq->conn.user, pShow->type) != 0) return -1;
// if (mndCheckShowAuth(pMnode, pReq->info.conn.user, pShow->type) != 0) return -1;
int32_t numOfCols = pShow->pMeta->numOfColumns;
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));

View File

@ -710,7 +710,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
@ -971,7 +971,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}

View File

@ -300,7 +300,7 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_SNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_SNODE) != 0) {
goto _OVER;
}
@ -404,7 +404,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_SNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_SNODE) != 0) {
goto _OVER;
}

View File

@ -805,7 +805,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
@ -1454,7 +1454,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
@ -1584,7 +1584,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}

View File

@ -545,7 +545,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
return -1;
}
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@ -602,7 +602,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
#endif
@ -627,7 +627,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
// create stb for stream
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
@ -696,7 +696,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
#if 0
// todo check auth
pUser = mndAcquireUser(pMnode, pReq->conn.user);
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) {
goto DROP_STREAM_OVER;
}

View File

@ -70,7 +70,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) {
SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, cbMeta.index, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
sdbWriteFile(pMnode->pSdb);
@ -90,7 +91,10 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}

View File

@ -474,7 +474,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}

View File

@ -22,8 +22,8 @@
#include "mndSync.h"
#include "mndUser.h"
#define TRANS_VER_NUMBER 1
#define TRANS_ARRAY_SIZE 8
#define TRANS_VER_NUMBER 1
#define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 64
static SSdbRaw *mndTransActionEncode(STrans *pTrans);
@ -1369,7 +1369,7 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq) {
mInfo("trans:%d, start to kill", killReq.transId);
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_KILL_TRANS) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS) != 0) {
goto _OVER;
}
@ -1417,7 +1417,9 @@ void mndTransPullup(SMnode *pMnode) {
}
SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
sdbWriteFile(pMnode->pSdb);

View File

@ -354,13 +354,13 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
goto _OVER;
}
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pOperUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_USER) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_USER) != 0) {
goto _OVER;
}
@ -460,7 +460,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
goto _OVER;
}
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pOperUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto _OVER;
@ -643,7 +643,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_USER) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_USER) != 0) {
goto _OVER;
}

View File

@ -1200,7 +1200,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
pVgroup = mndAcquireVgroup(pMnode, req.vgId);
if (pVgroup == NULL) goto _OVER;
@ -1500,7 +1500,7 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb == NULL) goto _OVER;
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
@ -1624,7 +1624,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
mDebug("start to balance vgroup");
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
while (1) {
SDnodeObj *pDnode = NULL;

View File

@ -184,7 +184,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) {
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) {
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno;
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
}
@ -329,8 +329,9 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
@ -359,10 +360,11 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {

View File

@ -2171,19 +2171,12 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
blockDataEnsureCapacity(p, capacity);
while (1) {
STupleHandle* pTupleHandle = NULL;
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
}
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(p, pTupleHandle);
if (p->info.rows >= capacity) {
break;
}
@ -2260,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
@ -2283,17 +2275,16 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReaders = dataReaders;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList;
pInfo->curTWinIdx = 0;
pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReaders = dataReaders;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList;
pInfo->curTWinIdx = 0;
pInfo->pResBlock = createResDataBlock(pDescNode);
@ -2306,22 +2297,27 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
taosArrayPush(pInfo->sortSourceParams, param);
taosMemoryFree(param);
}
pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize;
int32_t blockMetaSize = (int32_t)blockDataGetSerialMetaSize(pInfo->pResBlock->info.numOfCols);
pInfo->bufPageSize = (rowSize * 2 + blockMetaSize) < 1024 ? 1024 : (rowSize * 2 + blockMetaSize);
pInfo->sortBufSize = pInfo->bufPageSize * 16;
pInfo->hasGroupId = false;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL;
pOperator->name = "TableMergeScanOperator";
pOperator->name = "TableMergeScanOperator";
// TODO : change it
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 1024);
pOperator->fpSet =

View File

@ -44,6 +44,8 @@ extern "C" {
#define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15)
#define FUNC_MGT_INTERVAL_INTERPO_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(16)
#define FUNC_MGT_FORBID_STREAM_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(17)
#define FUNC_MGT_FORBID_WINDOW_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(18)
#define FUNC_MGT_FORBID_GROUP_BY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(19)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)

View File

@ -1017,7 +1017,7 @@ static int32_t translateDerivative(SFunctionNode* pFunc, char* pErrBuf, int32_t
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pParamNode2 = nodesListGetNode(pFunc->pParameterList, 2);
SNode* pParamNode2 = nodesListGetNode(pFunc->pParameterList, 2);
SValueNode* pValue2 = (SValueNode*)pParamNode2;
pValue2->notReserved = true;
@ -1086,8 +1086,8 @@ static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE can only be columns");
if (!nodesExprHasColumn(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE must contain columns");
}
pFunc->node.resType = ((SExprNode*)pPara)->resType;
@ -2114,7 +2114,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "tail",
.type = FUNCTION_TYPE_TAIL,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC,
.translateFunc = translateTail,
.getEnvFunc = getTailFuncEnv,
.initFunc = tailFunctionSetup,
@ -2124,7 +2124,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "unique",
.type = FUNCTION_TYPE_UNIQUE,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC,
.translateFunc = translateUnique,
.getEnvFunc = getUniqueFuncEnv,
.initFunc = uniqueFunctionSetup,

View File

@ -165,6 +165,10 @@ bool fmIsIntervalInterpoFunc(int32_t funcId) { return isSpecificClassifyFunc(fun
bool fmIsForbidStreamFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_STREAM_FUNC); }
bool fmIsForbidWindowFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_WINDOW_FUNC); }
bool fmIsForbidGroupByFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_GROUP_BY_FUNC); }
void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
@ -206,7 +210,7 @@ bool fmIsInvertible(int32_t funcId) {
return res;
}
//function has same input/output type
// function has same input/output type
bool fmIsSameInOutType(int32_t funcId) {
bool res = false;
switch (funcMgtBuiltins[funcId].type) {
@ -301,7 +305,7 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
//overwrite function restype set by translate function
// overwrite function restype set by translate function
if (fmIsSameInOutType(pSrcFunc->funcId)) {
(*pMergeFunc)->node.resType = pSrcFunc->node.resType;
}

View File

@ -88,6 +88,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SStreamOptions));
case QUERY_NODE_LEFT_VALUE:
return makeNode(type, sizeof(SLeftValueNode));
case QUERY_NODE_COLUMN_REF:
return makeNode(type, sizeof(SColumnDefNode));
case QUERY_NODE_SET_OPERATOR:
return makeNode(type, sizeof(SSetOperator));
case QUERY_NODE_SELECT_STMT:
@ -1463,6 +1465,26 @@ int32_t nodesCollectSpecialNodes(SSelectStmt* pSelect, ESqlClause clause, ENodeT
return TSDB_CODE_SUCCESS;
}
static EDealRes hasColumn(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
bool nodesExprHasColumn(SNode* pNode) {
bool hasCol = false;
nodesWalkExprPostOrder(pNode, hasColumn, &hasCol);
return hasCol;
}
bool nodesExprsHasColumn(SNodeList* pList) {
bool hasCol = false;
nodesWalkExprsPostOrder(pList, hasColumn, &hasCol);
return hasCol;
}
char* nodesGetFillModeString(EFillMode mode) {
switch (mode) {
case FILL_MODE_NONE:

View File

@ -648,6 +648,8 @@ SNode* addOrderByClause(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pOrder
CHECK_PARSER_STATUS(pCxt);
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
((SSelectStmt*)pStmt)->pOrderByList = pOrderByList;
} else {
((SSetOperator*)pStmt)->pOrderByList = pOrderByList;
}
return pStmt;
}

View File

@ -36,6 +36,7 @@ typedef struct STranslateContext {
int32_t currLevel;
ESqlClause currClause;
SSelectStmt* pCurrSelectStmt;
SSetOperator* pCurrSetOperator;
SCmdMsgInfo* pCmdMsg;
SHashObj* pDbs;
SHashObj* pTables;
@ -432,7 +433,7 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p
static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) {
SColumnNode* pCol = *pColRef;
pCol->pProjectRef = (SNode*)pExpr;
// pCol->pProjectRef = (SNode*)pExpr;
if (NULL == pExpr->pAssociation) {
pExpr->pAssociation = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
}
@ -613,17 +614,36 @@ static EDealRes translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNod
return DEAL_RES_CONTINUE;
}
static bool translateColumnUseAlias(STranslateContext* pCxt, SColumnNode** pCol) {
SNodeList* pProjectionList = pCxt->pCurrSelectStmt->pProjectionList;
static SNodeList* getProjectListFromCxt(STranslateContext* pCxt) {
if (NULL != pCxt->pCurrSelectStmt) {
return pCxt->pCurrSelectStmt->pProjectionList;
} else if (NULL != pCxt->pCurrSetOperator) {
return pCxt->pCurrSetOperator->pProjectionList;
} else {
return NULL;
}
}
static EDealRes translateColumnUseAlias(STranslateContext* pCxt, SColumnNode** pCol, bool* pFound) {
SNodeList* pProjectionList = getProjectListFromCxt(pCxt);
SNode* pNode;
FOREACH(pNode, pProjectionList) {
SExprNode* pExpr = (SExprNode*)pNode;
if (0 == strcmp((*pCol)->colName, pExpr->aliasName)) {
setColumnInfoByExpr(NULL, pExpr, pCol);
return true;
SColumnRefNode* pColRef = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pColRef) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
strcpy(pColRef->colName, pExpr->aliasName);
nodesDestroyNode(*(SNode**)pCol);
*(SNode**)pCol = (SNode*)pColRef;
*pFound = true;
return DEAL_RES_CONTINUE;
}
}
return false;
*pFound = false;
return DEAL_RES_CONTINUE;
}
static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) {
@ -638,9 +658,11 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) {
} else {
bool found = false;
if (SQL_CLAUSE_ORDER_BY == pCxt->currClause) {
found = translateColumnUseAlias(pCxt, pCol);
res = translateColumnUseAlias(pCxt, pCol, &found);
}
if (DEAL_RES_ERROR != res && !found) {
res = translateColumnWithoutPrefix(pCxt, pCol);
}
res = (found ? DEAL_RES_CONTINUE : translateColumnWithoutPrefix(pCxt, pCol));
}
return res;
}
@ -1098,11 +1120,43 @@ static int32_t translateWindowPseudoColumnFunc(STranslateContext* pCxt, SFunctio
return TSDB_CODE_SUCCESS;
}
static int32_t translateForbidWindowFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsForbidWindowFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (NULL != pCxt->pCurrSelectStmt->pWindow) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC, pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateForbidStreamFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsForbidStreamFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (pCxt->createStream) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC, pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateForbidGroupByFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsForbidGroupByFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (NULL != pCxt->pCurrSelectStmt->pGroupByList) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC, pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static void setFuncClassification(SSelectStmt* pSelect, SFunctionNode* pFunc) {
if (NULL != pSelect) {
pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId);
pSelect->hasRepeatScanFuncs = pSelect->hasRepeatScanFuncs ? true : fmIsRepeatScanFunc(pFunc->funcId);
pSelect->hasIndefiniteRowsFunc = pSelect->hasIndefiniteRowsFunc ? true : fmIsIndefiniteRowsFunc(pFunc->funcId);
pSelect->hasUniqueFunc = pSelect->hasUniqueFunc ? true : (FUNCTION_TYPE_UNIQUE == pFunc->funcType);
pSelect->hasTailFunc = pSelect->hasTailFunc ? true : (FUNCTION_TYPE_TAIL == pFunc->funcType);
}
}
@ -1130,6 +1184,15 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateWindowPseudoColumnFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateForbidWindowFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateForbidStreamFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateForbidGroupByFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
setFuncClassification(pCxt->pCurrSelectStmt, pFunc);
}
@ -1762,11 +1825,11 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro
} else if (0 == pos || pos > LIST_LENGTH(pProjectionList)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_NUMBER_OF_SELECT);
} else {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pCol) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY);
}
setColumnInfoByExpr(NULL, (SExprNode*)nodesListGetNode(pProjectionList, pos - 1), &pCol);
strcpy(pCol->colName, ((SExprNode*)nodesListGetNode(pProjectionList, pos - 1))->aliasName);
((SOrderByExprNode*)pNode)->pExpr = (SNode*)pCol;
nodesDestroyNode(pExpr);
}
@ -1781,16 +1844,15 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro
static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
bool other;
int32_t code = translateOrderByPosition(pCxt, pSelect->pProjectionList, pSelect->pOrderByList, &other);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (!other) {
return TSDB_CODE_SUCCESS;
}
pCxt->currClause = SQL_CLAUSE_ORDER_BY;
code = translateExprList(pCxt, pSelect->pOrderByList);
if (TSDB_CODE_SUCCESS == code) {
code = checkExprListForGroupBy(pCxt, pSelect->pOrderByList);
if (!other) {
return TSDB_CODE_SUCCESS;
}
pCxt->currClause = SQL_CLAUSE_ORDER_BY;
code = translateExprList(pCxt, pSelect->pOrderByList);
if (TSDB_CODE_SUCCESS == code) {
code = checkExprListForGroupBy(pCxt, pSelect->pOrderByList);
}
}
return code;
}
@ -2116,6 +2178,172 @@ static int32_t rewriteTimelineFunc(STranslateContext* pCxt, SSelectStmt* pSelect
return pCxt->errCode;
}
typedef struct SRwriteUniqueCxt {
STranslateContext* pTranslateCxt;
SNode* pExpr;
} SRwriteUniqueCxt;
static EDealRes rewriteSeletcValueFunc(STranslateContext* pCxt, SNode** pNode) {
SFunctionNode* pFirst = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFirst) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
strcpy(pFirst->functionName, "first");
TSWAP(pFirst->pParameterList, ((SFunctionNode*)*pNode)->pParameterList);
strcpy(pFirst->node.aliasName, ((SExprNode*)*pNode)->aliasName);
nodesDestroyNode(*pNode);
*pNode = (SNode*)pFirst;
pCxt->errCode = fmGetFuncInfo(pFirst, pCxt->msgBuf.buf, pCxt->msgBuf.len);
pCxt->pCurrSelectStmt->hasAggFuncs = true;
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR;
}
static EDealRes rewriteUniqueFunc(SNode** pNode, void* pContext) {
SRwriteUniqueCxt* pCxt = pContext;
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (FUNCTION_TYPE_UNIQUE == pFunc->funcType) {
SNode* pExpr = nodesListGetNode(pFunc->pParameterList, 0);
NODES_CLEAR_LIST(pFunc->pParameterList);
strcpy(((SExprNode*)pExpr)->aliasName, ((SExprNode*)*pNode)->aliasName);
nodesDestroyNode(*pNode);
*pNode = pExpr;
pCxt->pExpr = pExpr;
return DEAL_RES_IGNORE_CHILD;
} else if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType) {
return rewriteSeletcValueFunc(pCxt->pTranslateCxt, pNode);
}
}
return DEAL_RES_CONTINUE;
}
static SNode* createGroupingSet(SNode* pExpr) {
SGroupingSetNode* pGroupingSet = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
if (NULL == pGroupingSet) {
return NULL;
}
pGroupingSet->groupingSetType = GP_TYPE_NORMAL;
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pGroupingSet->pParameterList, nodesCloneNode(pExpr))) {
nodesDestroyNode((SNode*)pGroupingSet);
return NULL;
}
return (SNode*)pGroupingSet;
}
static int32_t rewriteUniqueStmt(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pSelect->hasUniqueFunc) {
return TSDB_CODE_SUCCESS;
}
SRwriteUniqueCxt cxt = {.pTranslateCxt = pCxt, .pExpr = NULL};
nodesRewriteExprs(pSelect->pProjectionList, rewriteUniqueFunc, &cxt);
if (TSDB_CODE_SUCCESS == cxt.pTranslateCxt->errCode) {
cxt.pTranslateCxt->errCode = nodesListMakeStrictAppend(&pSelect->pGroupByList, createGroupingSet(cxt.pExpr));
}
pSelect->hasIndefiniteRowsFunc = false;
return cxt.pTranslateCxt->errCode;
}
typedef struct SRwriteTailCxt {
STranslateContext* pTranslateCxt;
int64_t limit;
int64_t offset;
} SRwriteTailCxt;
static EDealRes rewriteTailFunc(SNode** pNode, void* pContext) {
SRwriteTailCxt* pCxt = pContext;
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (FUNCTION_TYPE_TAIL == pFunc->funcType) {
pCxt->limit = ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i;
if (3 == LIST_LENGTH(pFunc->pParameterList)) {
pCxt->offset = ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 2))->datum.i;
}
SNode* pExpr = nodesListGetNode(pFunc->pParameterList, 0);
strcpy(((SExprNode*)pExpr)->aliasName, ((SExprNode*)*pNode)->aliasName);
NODES_CLEAR_LIST(pFunc->pParameterList);
nodesDestroyNode(*pNode);
*pNode = pExpr;
return DEAL_RES_IGNORE_CHILD;
}
}
return DEAL_RES_CONTINUE;
}
static int32_t createLimieNode(SRwriteTailCxt* pCxt, SLimitNode** pOutput) {
*pOutput = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT);
if (NULL == *pOutput) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pOutput)->limit = pCxt->limit;
(*pOutput)->offset = pCxt->offset;
return TSDB_CODE_SUCCESS;
}
static SNode* createOrderByExpr(STranslateContext* pCxt) {
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pOrder) {
return NULL;
}
pCxt->errCode = createPrimaryKeyCol(pCxt, &pOrder->pExpr);
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
nodesDestroyNode((SNode*)pOrder);
return NULL;
}
pOrder->order = ORDER_DESC;
pOrder->nullOrder = NULL_ORDER_FIRST;
return (SNode*)pOrder;
}
static int32_t rewriteTailStmt(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pSelect->hasTailFunc) {
return TSDB_CODE_SUCCESS;
}
SRwriteTailCxt cxt = {.pTranslateCxt = pCxt, .limit = -1, .offset = -1};
nodesRewriteExprs(pSelect->pProjectionList, rewriteTailFunc, &cxt);
int32_t code = nodesListMakeStrictAppend(&pSelect->pOrderByList, createOrderByExpr(pCxt));
if (TSDB_CODE_SUCCESS == code) {
code = createLimieNode(&cxt, &pSelect->pLimit);
}
pSelect->hasIndefiniteRowsFunc = false;
return code;
}
typedef struct SReplaceOrderByAliasCxt {
STranslateContext* pTranslateCxt;
SNodeList* pProjectionList;
} SReplaceOrderByAliasCxt;
static EDealRes replaceOrderByAliasImpl(SNode** pNode, void* pContext) {
SReplaceOrderByAliasCxt* pCxt = pContext;
if (QUERY_NODE_COLUMN_REF == nodeType(*pNode)) {
SNodeList* pProjectionList = pCxt->pProjectionList;
SNode* pProject = NULL;
FOREACH(pProject, pProjectionList) {
SExprNode* pExpr = (SExprNode*)pProject;
if (0 == strcmp(((SColumnRefNode*)*pNode)->colName, pExpr->aliasName)) {
SNode* pNew = nodesCloneNode(pProject);
if (NULL == pNew) {
pCxt->pTranslateCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
nodesDestroyNode(*pNode);
*pNode = pNew;
return DEAL_RES_CONTINUE;
}
}
}
return DEAL_RES_CONTINUE;
}
static int32_t replaceOrderByAlias(STranslateContext* pCxt, SNodeList* pProjectionList, SNodeList* pOrderByList) {
SReplaceOrderByAliasCxt cxt = {.pTranslateCxt = pCxt, .pProjectionList = pProjectionList};
nodesRewriteExprsPostOrder(pOrderByList, replaceOrderByAliasImpl, &cxt);
return pCxt->errCode;
}
static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->pCurrSelectStmt = pSelect;
int32_t code = translateFrom(pCxt, pSelect->pFromTable);
@ -2147,9 +2375,18 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_CODE_SUCCESS == code) {
code = checkLimit(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteUniqueStmt(pCxt, pSelect);
}
// if (TSDB_CODE_SUCCESS == code) {
// code = rewriteTailStmt(pCxt, pSelect);
// }
if (TSDB_CODE_SUCCESS == code) {
code = rewriteTimelineFunc(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceOrderByAlias(pCxt, pSelect->pProjectionList, pSelect->pOrderByList);
}
return code;
}
@ -2186,7 +2423,7 @@ static int32_t createCastFunc(STranslateContext* pCxt, SNode* pExpr, SDataType d
return TSDB_CODE_SUCCESS;
}
static int32_t translateSetOperatorImpl(STranslateContext* pCxt, SSetOperator* pSetOperator) {
static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pSetOperator) {
SNodeList* pLeftProjections = getProjectList(pSetOperator->pLeft);
SNodeList* pRightProjections = getProjectList(pSetOperator->pRight);
if (LIST_LENGTH(pLeftProjections) != LIST_LENGTH(pRightProjections)) {
@ -2221,6 +2458,23 @@ static uint8_t calcSetOperatorPrecision(SSetOperator* pSetOperator) {
return calcPrecision(getStmtPrecision(pSetOperator->pLeft), getStmtPrecision(pSetOperator->pRight));
}
static int32_t translateSetOperOrderBy(STranslateContext* pCxt, SSetOperator* pSetOperator) {
bool other;
int32_t code = translateOrderByPosition(pCxt, pSetOperator->pProjectionList, pSetOperator->pOrderByList, &other);
if (TSDB_CODE_SUCCESS == code) {
if (other) {
pCxt->currClause = SQL_CLAUSE_ORDER_BY;
pCxt->pCurrSelectStmt = NULL;
pCxt->pCurrSetOperator = pSetOperator;
code = translateExprList(pCxt, pSetOperator->pOrderByList);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceOrderByAlias(pCxt, pSetOperator->pProjectionList, pSetOperator->pOrderByList);
}
return code;
}
static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetOperator) {
int32_t code = translateQuery(pCxt, pSetOperator->pLeft);
if (TSDB_CODE_SUCCESS == code) {
@ -2231,7 +2485,10 @@ static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetO
}
if (TSDB_CODE_SUCCESS == code) {
pSetOperator->precision = calcSetOperatorPrecision(pSetOperator);
code = translateSetOperatorImpl(pCxt, pSetOperator);
code = translateSetOperProject(pCxt, pSetOperator);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateSetOperOrderBy(pCxt, pSetOperator);
}
return code;
}

View File

@ -185,9 +185,15 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG:
return "The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes";
case TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC:
return "%s function not allowed in fill query";
return "%s function does not supportted in fill query";
case TSDB_CODE_PAR_INVALID_WINDOW_PC:
return "_WSTARTTS, _WENDTS and _WDURATION can only be used in window queries";
return "_WSTARTTS, _WENDTS and _WDURATION can only be used in window query";
case TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC:
return "%s function does not supportted in time window query";
case TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC:
return "%s function does not supportted in stream query";
case TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC:
return "%s function does not supportted in group query";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:

View File

@ -528,6 +528,12 @@ TEST_F(ParserInitialCTest, createStream) {
clearCreateStreamReq();
}
TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
useDb("root", "test");
run("CREATE STREAM s1 AS SELECT PERCENTILE(c1, 30) FROM t1", TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC);
}
TEST_F(ParserInitialCTest, createTable) {
useDb("root", "test");

View File

@ -161,6 +161,40 @@ TEST_F(ParserSelectTest, useDefinedFunc) {
run("SELECT udf2(c1) FROM t1 GROUP BY c2");
}
TEST_F(ParserSelectTest, uniqueFunc) {
useDb("root", "test");
run("SELECT UNIQUE(c1) FROM t1");
run("SELECT UNIQUE(c2 + 10) FROM t1 WHERE c1 > 10");
run("SELECT UNIQUE(c2 + 10), ts, c2 FROM t1 WHERE c1 > 10");
}
TEST_F(ParserSelectTest, uniqueFuncSemanticCheck) {
useDb("root", "test");
run("SELECT UNIQUE(c1) FROM t1 INTERVAL(10S)", TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC);
run("SELECT UNIQUE(c1) FROM t1 GROUP BY c2", TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC);
}
TEST_F(ParserSelectTest, tailFunc) {
useDb("root", "test");
run("SELECT TAIL(c1, 10) FROM t1");
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10");
}
TEST_F(ParserSelectTest, tailFuncSemanticCheck) {
useDb("root", "test");
run("SELECT TAIL(c1, 10) FROM t1 INTERVAL(10S)", TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC);
run("SELECT TAIL(c1, 10) FROM t1 GROUP BY c2", TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC);
}
TEST_F(ParserSelectTest, groupBy) {
useDb("root", "test");
@ -328,6 +362,8 @@ TEST_F(ParserSelectTest, setOperator) {
run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)");
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)");
run("SELECT c1, c2 FROM t1 UNION ALL SELECT c1 as a, c2 as b FROM t1 ORDER BY c1");
}
TEST_F(ParserSelectTest, informationSchema) {

View File

@ -21,7 +21,7 @@
static void dumpQueryPlan(SQueryPlan* pPlan) {
char* pStr = NULL;
nodesNodeToString((SNode*)pPlan, false, &pStr, NULL);
planDebugL("Query Plan: %s", pStr);
planDebugL("QID:0x%" PRIx64 " Query Plan: %s", pPlan->queryId, pStr);
taosMemoryFree(pStr);
}

View File

@ -53,3 +53,23 @@ TEST_F(PlanBasicTest, func) {
run("SELECT TOP(c1, 60) FROM t1");
}
TEST_F(PlanBasicTest, uniqueFunc) {
useDb("root", "test");
run("SELECT UNIQUE(c1) FROM t1");
run("SELECT UNIQUE(c2 + 10) FROM t1 WHERE c1 > 10");
run("SELECT UNIQUE(c2 + 10), ts, c2 FROM t1 WHERE c1 > 10");
run("SELECT UNIQUE(c1) a FROM t1 ORDER BY a");
}
TEST_F(PlanBasicTest, tailFunc) {
useDb("root", "test");
run("SELECT TAIL(c1, 10) FROM t1");
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10");
}

View File

@ -27,6 +27,8 @@ TEST_F(PlanOrderByTest, basic) {
run("SELECT c1 FROM t1 ORDER BY c1");
// ORDER BY key is not in the projection list
run("SELECT c1 FROM t1 ORDER BY c2");
run("SELECT c1 + 10 AS a FROM t1 ORDER BY a");
}
TEST_F(PlanOrderByTest, expr) {
@ -41,6 +43,12 @@ TEST_F(PlanOrderByTest, nullsOrder) {
run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST");
}
TEST_F(PlanOrderByTest, withGroupBy) {
useDb("root", "test");
run("SELECT SUM(c1) AS a FROM t1 GROUP BY c2 ORDER BY a");
}
TEST_F(PlanOrderByTest, stable) {
useDb("root", "test");

View File

@ -23,9 +23,9 @@ class PlanSetOpTest : public PlannerTestBase {};
TEST_F(PlanSetOpTest, unionAll) {
useDb("root", "test");
// sql 1: single UNION ALL operator
// single UNION ALL operator
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20");
// sql 2: multi UNION ALL operator
// multi UNION ALL operator
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 "
"UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20 "
"UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 30");
@ -46,6 +46,14 @@ TEST_F(PlanSetOpTest, unionAllWithSubquery) {
run("SELECT ts FROM (SELECT ts FROM st1) UNION ALL SELECT ts FROM (SELECT ts FROM st1)");
}
TEST_F(PlanSetOpTest, unionAllWithOrderBy) {
useDb("root", "test");
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20 ORDER BY c1");
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20 ORDER BY 1");
}
TEST_F(PlanSetOpTest, union) {
useDb("root", "test");

View File

@ -29,6 +29,8 @@ extern "C" {
#define CONFIG_FILE_LEN 1024
#define MAX_CONFIG_INDEX_COUNT 512
typedef struct SRaftCfg {
SSyncCfg cfg;
TdFilePtr pFile;
@ -36,6 +38,10 @@ typedef struct SRaftCfg {
int8_t isStandBy;
int8_t snapshotEnable;
SyncIndex lastConfigIndex;
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
int32_t configIndexCount;
} SRaftCfg;
SRaftCfg *raftCfgOpen(const char *path);

View File

@ -420,6 +420,29 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
return 0;
}
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
assert(rid == pSyncNode->rid);
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
}
}
sMeta->lastConfigIndex = lastIndex;
sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
const char* syncGetMyRoleStr(int64_t rid) {
const char* s = syncUtilState2String(syncGetMyRole(rid));
return s;
@ -2197,8 +2220,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, oldStr, newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
@ -2214,8 +2237,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, oldStr, newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
@ -2297,6 +2320,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
}
// restore finish
// if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinishCb != NULL) {

View File

@ -85,16 +85,11 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
}
return pRoot;
/*
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot);
return pJson;
*/
}
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -154,6 +149,16 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64);
cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfg->configIndexCount);
cJSON *pIndexArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr);
for (int i = 0; i < pRaftCfg->configIndexCount; ++i) {
snprintf(buf64, sizeof(buf64), "%ld", (pRaftCfg->configIndexArr)[i]);
cJSON *pIndexObj = cJSON_CreateObject();
cJSON_AddStringToObject(pIndexObj, "index", buf64);
cJSON_AddItemToArray(pIndexArr, pIndexObj);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
return pJson;
@ -161,7 +166,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -177,6 +182,9 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
raftCfg.isStandBy = meta.isStandBy;
raftCfg.snapshotEnable = meta.snapshotEnable;
raftCfg.lastConfigIndex = meta.lastConfigIndex;
raftCfg.configIndexCount = 1;
memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
raftCfg.configIndexArr[0] = -1;
char *s = raftCfg2Str(&raftCfg);
char buf[CONFIG_FILE_LEN] = {0};
@ -207,7 +215,24 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount");
pRaftCfg->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount);
cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr");
int arraySize = cJSON_GetArraySize(pIndexArr);
assert(arraySize == pRaftCfg->configIndexCount);
memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr));
for (int i = 0; i < arraySize; ++i) {
cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i);
assert(pIndexObj != NULL);
cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index");
assert(cJSON_IsString(pIndex));
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
}
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0);

View File

@ -410,20 +410,18 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
}
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
// assert(walCommit(pWal, index) == 0);
int32_t code = walCommit(pWal, index);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t linuxErr = errno;
const char* linuxErrMsg = strerror(errno);
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg); ASSERT(0);
}
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
// assert(walCommit(pWal, index) == 0);
int32_t code = walCommit(pWal, index);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t linuxErr = errno;
const char* linuxErrMsg = strerror(errno);
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
ASSERT(0);
}
return 0;
}

View File

@ -421,7 +421,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -542,7 +542,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
cJSON * pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
@ -566,7 +566,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -671,8 +671,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
char *newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, oldStr, newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s",
oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);

View File

@ -27,6 +27,14 @@ SRaftCfg* createRaftCfg() {
}
pCfg->isStandBy = taosGetTimestampSec() % 100;
pCfg->configIndexCount = 5;
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
(pCfg->configIndexArr)[i] = -1;
}
for (int i = 0; i < pCfg->configIndexCount; ++i) {
(pCfg->configIndexArr)[i] = i * 100;
}
return pCfg;
}
@ -100,6 +108,15 @@ void test5() {
pCfg->isStandBy += 2;
pCfg->snapshotEnable += 3;
pCfg->lastConfigIndex += 1000;
pCfg->configIndexCount = 5;
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
(pCfg->configIndexArr)[i] = -1;
}
for (int i = 0; i < pCfg->configIndexCount; ++i) {
(pCfg->configIndexArr)[i] = i * 100;
}
raftCfgPersist(pCfg);
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);
@ -118,6 +135,6 @@ int main() {
test3();
test4();
test5();
return 0;
}

View File

@ -310,7 +310,7 @@ static void uvHandleReq(SSvrConn* pConn) {
}
// set up conn info
SRpcConnInfo* pConnInfo = &(transMsg.info.connInfo);
SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr);
pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));

View File

@ -189,6 +189,7 @@ class TDTestCase:
def check_tail_table(self , tbname , col_name , tail_rows , offset):
tail_sql = f"select tail({col_name} , {tail_rows} , {offset}) from {tbname}"
equal_sql = f"select {col_name} from (select ts , {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}) order by ts"
#equal_sql = f"select {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}"
tdSql.query(tail_sql)
tail_result = tdSql.queryResult
@ -293,22 +294,22 @@ class TDTestCase:
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.query("select tail(c1,3,2) from ct4 where c1 >2 ")
tdSql.checkData(0, 0, 7)
tdSql.query("select tail(c1,3,2) from ct4 where c1 >2 order by 1")
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 6)
tdSql.checkData(2, 0, 5)
tdSql.checkData(2, 0, 7)
tdSql.query("select tail(c1,2,1) from ct4 where c2 between 0 and 99999")
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 1)
tdSql.query("select tail(c1,2,1) from ct4 where c2 between 0 and 99999 order by 1")
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 2)
# tail with union all
tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct1")
tdSql.checkRows(15)
tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct2")
tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct2 order by 1")
tdSql.checkRows(2)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 0)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, 1)
tdSql.query("select tail(c2,2,1) from ct4 union all select abs(c2)/2 from ct4")
tdSql.checkRows(14)
@ -334,18 +335,18 @@ class TDTestCase:
tdSql.execute(f" insert into ttb1 values({ts_value} , {i})")
tdSql.execute(f" insert into ttb2 values({ts_value} , {i})")
tdSql.query("select tail(tb2.num,3,2) from tb1, tb2 where tb1.ts=tb2.ts ")
tdSql.query("select tail(tb2.num,3,2) from tb1, tb2 where tb1.ts=tb2.ts order by 1 desc")
tdSql.checkRows(3)
tdSql.checkData(0,0,5)
tdSql.checkData(0,0,7)
tdSql.checkData(1,0,6)
tdSql.checkData(2,0,7)
tdSql.checkData(2,0,5)
# nest query
# tdSql.query("select tail(c1,2) from (select c1 from ct1)")
tdSql.query("select c1 from (select tail(c1,2) c1 from ct4)")
tdSql.query("select c1 from (select tail(c1,2) c1 from ct4) order by 1 nulls first")
tdSql.checkRows(2)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, None)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, 0)
tdSql.query("select sum(c1) from (select tail(c1,2) c1 from ct1)")
tdSql.checkRows(1)

View File

@ -93,8 +93,8 @@ class TDTestCase:
"select unique(c1) , min(c1) from t1",
"select unique(c1) , spread(c1) from t1",
"select unique(c1) , diff(c1) from t1",
"select unique(c1) , abs(c1) from t1",
"select unique(c1) , c1 from t1",
#"select unique(c1) , abs(c1) from t1", # support
#"select unique(c1) , c1 from t1",
"select unique from stb1 partition by tbname",
"select unique(123--123)==1 from stb1 partition by tbname",
"select unique(123) from stb1 partition by tbname",
@ -104,21 +104,21 @@ class TDTestCase:
"select unique(c1 ,c2 ) from stb1 partition by tbname",
"select unique(c1 ,NULL) from stb1 partition by tbname",
"select unique(,) from stb1 partition by tbname;",
"select unique(floor(c1) ab from stb1 partition by tbname)",
"select unique(c1) as int from stb1 partition by tbname",
#"select unique(floor(c1) ab from stb1 partition by tbname)", # support
#"select unique(c1) as int from stb1 partition by tbname",
"select unique('c1') from stb1 partition by tbname",
"select unique(NULL) from stb1 partition by tbname",
"select unique('') from stb1 partition by tbname",
"select unique(c%) from stb1 partition by tbname",
#"select unique(t1) from stb1 partition by tbname",
#"select unique(t1) from stb1 partition by tbname", # support
"select unique(True) from stb1 partition by tbname",
"select unique(c1) , count(c1) from stb1 partition by tbname",
"select unique(c1) , avg(c1) from stb1 partition by tbname",
"select unique(c1) , min(c1) from stb1 partition by tbname",
"select unique(c1) , spread(c1) from stb1 partition by tbname",
"select unique(c1) , diff(c1) from stb1 partition by tbname",
"select unique(c1) , abs(c1) from stb1 partition by tbname",
"select unique(c1) , c1 from stb1 partition by tbname"
#"select unique(c1) , abs(c1) from stb1 partition by tbname", # support
#"select unique(c1) , c1 from stb1 partition by tbname" # support
]
for error_sql in error_sql_lists:
@ -198,7 +198,7 @@ class TDTestCase:
unique_datas = []
for elem in unique_result:
unique_datas.append(elem[0])
unique_datas.sort(key=lambda x: (x is None, x))
tdSql.query(origin_sql)
origin_result = tdSql.queryResult
@ -212,6 +212,7 @@ class TDTestCase:
continue
else:
pre_unique.append(elem)
pre_unique.sort(key=lambda x: (x is None, x))
if pre_unique == unique_datas:
tdLog.info(" unique query check pass , unique sql is: %s" %unique_sql)
@ -266,16 +267,16 @@ class TDTestCase:
tdSql.checkRows(10)
tdSql.error("select unique(c1),tbname from ct1")
tdSql.error("select unique(c1),t1 from ct1")
#tdSql.error("select unique(c1),t1 from ct1") #support
# unique with common col
tdSql.error("select unique(c1) ,ts from ct1")
tdSql.error("select unique(c1) ,c1 from ct1")
#tdSql.error("select unique(c1) ,ts from ct1")
#tdSql.error("select unique(c1) ,c1 from ct1")
# unique with scalar function
tdSql.error("select unique(c1) ,abs(c1) from ct1")
#tdSql.error("select unique(c1) ,abs(c1) from ct1")
tdSql.error("select unique(c1) , unique(c2) from ct1")
tdSql.error("select unique(c1) , abs(c2)+2 from ct1")
#tdSql.error("select unique(c1) , abs(c2)+2 from ct1")
# unique with aggregate function
@ -288,13 +289,13 @@ class TDTestCase:
tdSql.query("select unique(c1) from ct4 where c1 is null")
tdSql.checkData(0, 0, None)
tdSql.query("select unique(c1) from ct4 where c1 >2 ")
tdSql.checkData(0, 0, 8)
tdSql.checkData(1, 0, 7)
tdSql.checkData(2, 0, 6)
tdSql.checkData(5, 0, 3)
tdSql.query("select unique(c1) from ct4 where c1 >2 order by 1")
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 4)
tdSql.checkData(2, 0, 5)
tdSql.checkData(5, 0, 8)
tdSql.query("select unique(c1) from ct4 where c2 between 0 and 99999")
tdSql.query("select unique(c1) from ct4 where c2 between 0 and 99999 order by 1 desc")
tdSql.checkData(0, 0, 8)
tdSql.checkData(1, 0, 7)
tdSql.checkData(2, 0, 6)
@ -335,23 +336,23 @@ class TDTestCase:
tdSql.execute(f" insert into ttb1 values({ts_value} , {i})")
tdSql.execute(f" insert into ttb2 values({ts_value} , {i})")
tdSql.query("select unique(tb2.num) from tb1, tb2 where tb1.ts=tb2.ts ")
tdSql.query("select unique(tb2.num) from tb1, tb2 where tb1.ts=tb2.ts order by 1")
tdSql.checkRows(10)
tdSql.checkData(0,0,0)
tdSql.checkData(1,0,1)
tdSql.checkData(2,0,2)
tdSql.checkData(9,0,9)
tdSql.query("select unique(tb2.num) from tb1, tb2 where tb1.ts=tb2.ts union all select unique(tb1.num) from tb1, tb2 where tb1.ts=tb2.ts ")
tdSql.query("select unique(tb2.num) from tb1, tb2 where tb1.ts=tb2.ts union all select unique(tb1.num) from tb1, tb2 where tb1.ts=tb2.ts order by 1")
tdSql.checkRows(20)
tdSql.checkData(0,0,0)
tdSql.checkData(1,0,1)
tdSql.checkData(2,0,2)
tdSql.checkData(9,0,9)
tdSql.checkData(2,0,1)
tdSql.checkData(4,0,2)
tdSql.checkData(18,0,9)
# nest query
# tdSql.query("select unique(c1) from (select c1 from ct1)")
tdSql.query("select c1 from (select unique(c1) c1 from ct4)")
tdSql.query("select c1 from (select unique(c1) c1 from ct4) order by 1 desc nulls first")
tdSql.checkRows(10)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, 8)
@ -366,7 +367,7 @@ class TDTestCase:
tdSql.checkData(0, 0, 45)
tdSql.checkData(1, 0, 45)
tdSql.query("select 1-abs(c1) from (select unique(c1) c1 from ct4)")
tdSql.query("select 1-abs(c1) from (select unique(c1) c1 from ct4) order by 1 nulls first")
tdSql.checkRows(10)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, -7.000000000)
@ -421,7 +422,7 @@ class TDTestCase:
f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.query("select unique(c2) from sub1_bound")
tdSql.query("select unique(c2) from sub1_bound order by 1 desc")
tdSql.checkRows(5)
tdSql.checkData(0,0,9223372036854775807)