Merge branch '3.0' into feature/stream

This commit is contained in:
Liu Jicong 2022-06-17 15:35:41 +08:00
commit abc7d06305
68 changed files with 1036 additions and 376 deletions

View File

@ -250,7 +250,7 @@ The [Taos] structure is the connection manager in [libtaos] and provides two mai
Column information is stored using [ColumnMeta].
``rust
```rust
let cols = &q.column_meta;
for col in cols {
println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes);

View File

@ -185,6 +185,8 @@ bool fmIsUserDefinedFunc(int32_t funcId);
bool fmIsDistExecFunc(int32_t funcId);
bool fmIsForbidFillFunc(int32_t funcId);
bool fmIsForbidStreamFunc(int32_t funcId);
bool fmIsForbidWindowFunc(int32_t funcId);
bool fmIsForbidGroupByFunc(int32_t funcId);
bool fmIsIntervalInterpoFunc(int32_t funcId);
int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc, SFunctionNode** pMergeFunc);

View File

@ -102,6 +102,7 @@ typedef enum ENodeType {
QUERY_NODE_EXPLAIN_OPTIONS,
QUERY_NODE_STREAM_OPTIONS,
QUERY_NODE_LEFT_VALUE,
QUERY_NODE_COLUMN_REF,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR,

View File

@ -65,11 +65,16 @@ typedef struct SColumnNode {
char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
SNode* pProjectRef;
// SNode* pProjectRef;
int16_t dataBlockId;
int16_t slotId;
} SColumnNode;
typedef struct SColumnRefNode {
ENodeType type;
char colName[TSDB_COL_NAME_LEN];
} SColumnRefNode;
typedef struct STargetNode {
ENodeType type;
int16_t dataBlockId;
@ -247,6 +252,8 @@ typedef struct SSelectStmt {
bool hasIndefiniteRowsFunc;
bool hasSelectFunc;
bool hasSelectValFunc;
bool hasUniqueFunc;
bool hasTailFunc;
} SSelectStmt;
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;
@ -374,6 +381,9 @@ bool nodesIsComparisonOp(const SOperatorNode* pOp);
bool nodesIsJsonOp(const SOperatorNode* pOp);
bool nodesIsRegularOp(const SOperatorNode* pOp);
bool nodesExprHasColumn(SNode* pNode);
bool nodesExprsHasColumn(SNodeList* pList);
void* nodesGetValueFromNode(SValueNode* pNode);
int32_t nodesSetValueNodeValue(SValueNode* pNode, void* value);
char* nodesGetStrValueFromNode(SValueNode* pNode);

View File

@ -192,6 +192,7 @@ bool syncEnvIsStart();
const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);

View File

@ -46,7 +46,6 @@ typedef struct SRpcHandleInfo {
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not
SRpcConnInfo connInfo;
// app info
void *ahandle; // app handle set by client
void *wrapper; // wrapper handle
@ -55,6 +54,9 @@ typedef struct SRpcHandleInfo {
// resp info
void * rsp;
int32_t rspLen;
// conn info
SRpcConnInfo conn;
} SRpcHandleInfo;
typedef struct SRpcMsg {
@ -63,7 +65,6 @@ typedef struct SRpcMsg {
int32_t contLen;
int32_t code;
SRpcHandleInfo info;
SRpcConnInfo conn;
} SRpcMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *rf);

View File

@ -659,6 +659,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG TAOS_DEF_ERROR_CODE(0, 0x2656)
#define TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2657)
#define TSDB_CODE_PAR_INVALID_WINDOW_PC TAOS_DEF_ERROR_CODE(0, 0x2658)
#define TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x2659)
#define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A)
#define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)

View File

@ -148,6 +148,10 @@ static int32_t mmStart(SMnodeMgmt *pMgmt) {
static void mmStop(SMnodeMgmt *pMgmt) {
dDebug("mnode-mgmt start to stop");
taosThreadRwlockWrlock(&pMgmt->lock);
pMgmt->stopped = 1;
taosThreadRwlockUnlock(&pMgmt->lock);
mndStop(pMgmt->pMnode);
}

View File

@ -220,9 +220,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
}
void mmStopWorker(SMnodeMgmt *pMgmt) {
taosThreadRwlockWrlock(&pMgmt->lock);
pMgmt->stopped = 1;
taosThreadRwlockUnlock(&pMgmt->lock);
while (pMgmt->refCount > 0) taosMsleep(10);
tSingleWorkerCleanup(&pMgmt->monitorWorker);

View File

@ -228,7 +228,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
vmReleaseVnode(pMgmt, pVnode);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
code = terrno;
goto _OVER;
return 0;
}
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);

View File

@ -21,21 +21,6 @@ static void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet);
static void dmSendRsp(SRpcMsg *pMsg);
static void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
static inline int32_t dmBuildNodeMsg(SRpcMsg *pMsg, SRpcMsg *pRpc) {
SRpcConnInfo *pConnInfo = &(pRpc->info.connInfo);
// if (IsReq(pRpc)) {
// terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
// dError("failed to build msg since %s, app:%p handle:%p", terrstr(), pRpc->info.ahandle, pRpc->info.handle);
// return -1;
//}
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
memcpy(pMsg->conn.user, pConnInfo->user, TSDB_USER_LEN);
pMsg->conn.clientIp = pConnInfo->clientIp;
pMsg->conn.clientPort = pConnInfo->clientPort;
return 0;
}
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) {
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)];
if (msgFp == NULL) {
@ -116,14 +101,10 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
}
pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
if (pMsg == NULL) {
goto _OVER;
}
dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType));
if (pMsg == NULL) goto _OVER;
memcpy(pMsg, pRpc, sizeof(SRpcMsg));
if (dmBuildNodeMsg(pMsg, pRpc) != 0) {
goto _OVER;
}
dTrace("msg:%p, is created, type:%s handle:%p", pMsg, TMSG_INFO(pRpc->msgType), pMsg->info.handle);
if (InParentProc(pWrapper)) {
code = dmPutToProcCQueue(&pWrapper->proc, pMsg, DND_FUNC_REQ);

View File

@ -56,7 +56,7 @@ static int32_t mndProcessAuthReq(SRpcMsg *pReq) {
memcpy(authRsp.user, authReq.user, TSDB_USER_LEN);
int32_t code = mndRetriveAuth(pReq->info.node, &authRsp);
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->conn.user, authRsp.spi, authRsp.encrypt,
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->info.conn.user, authRsp.spi, authRsp.encrypt,
authRsp.user);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp);

View File

@ -292,7 +292,7 @@ static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_BNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_BNODE) != 0) {
goto _OVER;
}
@ -394,7 +394,7 @@ static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_BNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_BNODE) != 0) {
goto _OVER;
}

View File

@ -521,12 +521,12 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
goto _OVER;
}
pUser = mndAcquireUser(pMnode, pReq->conn.user);
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DB, NULL) != 0) {
goto _OVER;
}
@ -700,7 +700,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_ALTER_DB, pDb) != 0) {
goto _OVER;
}
@ -980,7 +980,7 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
}
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_DROP_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_DB, pDb) != 0) {
goto _OVER;
}
@ -1127,7 +1127,7 @@ static int32_t mndProcessUseDbReq(SRpcMsg *pReq) {
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
} else {
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_USE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_USE_DB, pDb) != 0) {
goto _OVER;
}
@ -1252,7 +1252,7 @@ static int32_t mndProcessCompactDbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_COMPACT_DB, pDb) != 0) {
goto _OVER;
}

View File

@ -522,7 +522,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_DNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_DNODE) != 0) {
goto _OVER;
}
@ -623,7 +623,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
}
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
goto _OVER;
}

View File

@ -318,7 +318,7 @@ static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_FUNC) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC) != 0) {
goto _OVER;
}
@ -365,7 +365,7 @@ static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
}
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_FUNC) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC) != 0) {
goto _OVER;
}

View File

@ -505,6 +505,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
}
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&

View File

@ -414,7 +414,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_MNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_MNODE) != 0) {
goto _OVER;
}
@ -621,7 +621,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_MNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) {
goto _OVER;
}
@ -742,7 +742,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
pMgmt->errCode = 0;
pMgmt->transId = -1;
tsem_wait(&pMgmt->syncSem);
mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode));
mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode));
terrno = pMgmt->errCode;
return pMgmt->errCode;
}

View File

@ -194,15 +194,15 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
goto CONN_OVER;
}
taosIp2String(pReq->conn.clientIp, ip);
taosIp2String(pReq->info.conn.clientIp, ip);
pUser = mndAcquireUser(pMnode, pReq->conn.user);
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) {
mError("user:%s, failed to login while acquire user since %s", pReq->conn.user, terrstr());
mError("user:%s, failed to login while acquire user since %s", pReq->info.conn.user, terrstr());
goto CONN_OVER;
}
if (0 != strncmp(connReq.passwd, pUser->pass, TSDB_PASSWORD_LEN - 1)) {
mError("user:%s, failed to auth while acquire user, input:%s", pReq->conn.user, connReq.passwd);
mError("user:%s, failed to auth while acquire user, input:%s", pReq->info.conn.user, connReq.passwd);
code = TSDB_CODE_RPC_AUTH_FAILURE;
goto CONN_OVER;
}
@ -213,15 +213,16 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
pDb = mndAcquireDb(pMnode, db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB;
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->conn.user, ip, connReq.db, terrstr());
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->info.conn.user, ip, connReq.db,
terrstr());
goto CONN_OVER;
}
}
pConn = mndCreateConn(pMnode, pReq->conn.user, connReq.connType, pReq->conn.clientIp, pReq->conn.clientPort,
connReq.pid, connReq.app, connReq.startTime);
pConn = mndCreateConn(pMnode, pReq->info.conn.user, connReq.connType, pReq->info.conn.clientIp,
pReq->info.conn.clientPort, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->conn.user, ip, terrstr());
mError("user:%s, failed to login from %s while create connection since %s", pReq->info.conn.user, ip, terrstr());
goto CONN_OVER;
}
@ -246,7 +247,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
pReq->info.rspLen = contLen;
pReq->info.rsp = pRsp;
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->conn.user, ip, pConn->port, pConn->id, connReq.app);
mDebug("user:%s, login from %s:%d, conn:%u, app:%s", pReq->info.conn.user, ip, pConn->port, pConn->id, connReq.app);
code = 0;
@ -348,7 +349,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
if (pHbReq->query) {
SQueryHbReqBasic *pBasic = pHbReq->query;
SRpcConnInfo connInfo = pMsg->conn;
SRpcConnInfo connInfo = pMsg->info.conn;
SConnObj *pConn = mndAcquireConn(pMnode, pBasic->connId);
if (pConn == NULL) {
@ -500,7 +501,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) return 0;
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
@ -523,7 +524,7 @@ static int32_t mndProcessKillQueryReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->conn.user);
mInfo("connId:%d, queryId:%d is killed by user:%s", killReq.connId, killReq.queryId, pReq->info.conn.user);
pConn->killId = killReq.queryId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return 0;
@ -534,7 +535,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pReq->conn.user);
SUserObj *pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) return 0;
if (!pUser->superUser) {
mndReleaseUser(pMnode, pUser);
@ -555,7 +556,7 @@ static int32_t mndProcessKillConnReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->conn.user);
mInfo("connId:%d, is killed by user:%s", killReq.connId, pReq->info.conn.user);
pConn->killed = 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS;
@ -645,7 +646,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
for (int32_t i = 0; i < numOfQueries; ++i) {
SQueryDesc* pQuery = taosArrayGet(pConn->pQueries, i);
SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
cols = 0;
char queryId[26 + VARSTR_HEADER_SIZE] = {0};
@ -698,7 +699,7 @@ static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
if (i) {
offset += snprintf(subStatus + offset, strSize - offset - 1, ",");
}
SQuerySubDesc* pDesc = taosArrayGet(pQuery->subDesc, i);
SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
}
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);

View File

@ -294,7 +294,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_QNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_QNODE) != 0) {
goto _OVER;
}
@ -396,7 +396,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_QNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_QNODE) != 0) {
goto _OVER;
}

View File

@ -229,7 +229,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
// if (mndCheckShowAuth(pMnode, pReq->conn.user, pShow->type) != 0) return -1;
// if (mndCheckShowAuth(pMnode, pReq->info.conn.user, pShow->type) != 0) return -1;
int32_t numOfCols = pShow->pMeta->numOfColumns;
SSDataBlock *pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));

View File

@ -710,7 +710,7 @@ static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
@ -971,7 +971,7 @@ static int32_t mndProcessDropSmaReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}

View File

@ -300,7 +300,7 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_SNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_SNODE) != 0) {
goto _OVER;
}
@ -404,7 +404,7 @@ static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_SNODE) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_SNODE) != 0) {
goto _OVER;
}

View File

@ -805,7 +805,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
@ -1454,7 +1454,7 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
@ -1584,7 +1584,7 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}

View File

@ -545,7 +545,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
return -1;
}
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
if (streamObj.targetSTbName[0] && mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
@ -602,7 +602,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}
#endif
@ -627,7 +627,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
// create stb for stream
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->conn.user) < 0) {
if (mndCreateStbForStream(pMnode, pTrans, &streamObj, pReq->info.conn.user) < 0) {
mError("trans:%d, failed to create stb for stream %s since %s", pTrans->id, createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
@ -696,7 +696,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
#if 0
// todo check auth
pUser = mndAcquireUser(pMnode, pReq->conn.user);
pUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pUser == NULL) {
goto DROP_STREAM_OVER;
}

View File

@ -59,6 +59,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
}
pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem);
} else {
STrans *pTrans = mndAcquireTrans(pMnode, transId);
@ -69,7 +70,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) {
SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, cbMeta.index, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
sdbWriteFile(pMnode->pSdb);
@ -89,7 +91,10 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
if (pMgmt->errCode != 0) {
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
}
pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem);
}
}
@ -258,13 +264,17 @@ void mndSyncStart(SMnode *pMnode) {
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
}
void mndSyncStop(SMnode *pMnode) {}
void mndSyncStop(SMnode *pMnode) {
if (pMnode->syncMgmt.transId != 0) {
pMnode->syncMgmt.transId = 0;
tsem_post(&pMnode->syncMgmt.syncSem);
}
}
bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
ESyncState state = syncGetMyRole(pMgmt->sync);
if (state != TAOS_SYNC_STATE_LEADER) {
if (!syncIsReady(pMgmt->sync)) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
return false;
}

View File

@ -474,7 +474,7 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckDbAuth(pMnode, pReq->conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
if (mndCheckDbAuth(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) {
goto _OVER;
}

View File

@ -1369,7 +1369,7 @@ static int32_t mndProcessKillTransReq(SRpcMsg *pReq) {
mInfo("trans:%d, start to kill", killReq.transId);
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_KILL_TRANS) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_KILL_TRANS) != 0) {
goto _OVER;
}
@ -1417,7 +1417,9 @@ void mndTransPullup(SMnode *pMnode) {
}
SSnapshotMeta sMeta = {0};
if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
// if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) {
SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb);
if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) {
sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex);
}
sdbWriteFile(pMnode->pSdb);

View File

@ -354,13 +354,13 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
goto _OVER;
}
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pOperUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_CREATE_USER) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_CREATE_USER) != 0) {
goto _OVER;
}
@ -460,7 +460,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
goto _OVER;
}
pOperUser = mndAcquireUser(pMnode, pReq->conn.user);
pOperUser = mndAcquireUser(pMnode, pReq->info.conn.user);
if (pOperUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto _OVER;
@ -643,7 +643,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
goto _OVER;
}
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_DROP_USER) != 0) {
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_DROP_USER) != 0) {
goto _OVER;
}

View File

@ -1200,7 +1200,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3);
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER;
pVgroup = mndAcquireVgroup(pMnode, req.vgId);
if (pVgroup == NULL) goto _OVER;
@ -1500,7 +1500,7 @@ static int32_t mndProcessSplitVgroupMsg(SRpcMsg *pReq) {
pDb = mndAcquireDb(pMnode, pVgroup->dbName);
if (pDb == NULL) goto _OVER;
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_SPLIT_VGROUP) != 0) goto _OVER;
code = mndSplitVgroup(pMnode, pReq, pDb, pVgroup);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
@ -1624,7 +1624,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
mDebug("start to balance vgroup");
if (mndCheckOperAuth(pMnode, pReq->conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_BALANCE_VGROUP) != 0) goto _OVER;
while (1) {
SDnodeObj *pDnode = NULL;

View File

@ -184,7 +184,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) {
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->conn.applyIndex, &rsp) < 0) {
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno;
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
}
@ -329,8 +329,9 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
@ -359,10 +360,11 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {

View File

@ -3507,8 +3507,7 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
// check for the limitation in each group
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
if (pProjectInfo->slimit.limit != -1 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
pOperator->status = OP_EXEC_DONE;
}

View File

@ -331,7 +331,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
}
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo);
#if 0
if(pOperator->fpSet.encodeResultRow){

View File

@ -2170,25 +2170,17 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, int32_t capa
blockDataEnsureCapacity(p, capacity);
while (1) {
STupleHandle* pTupleHandle = NULL;
if (pInfo->prefetchedTuple == NULL) {
pTupleHandle = tsortNextTuple(pHandle);
} else {
pTupleHandle = pInfo->prefetchedTuple;
}
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
if (pTupleHandle == NULL) {
break;
}
appendOneRowToDataBlock(p, pTupleHandle);
if (p->info.rows >= capacity) {
break;
}
}
qDebug("%s get sorted row blocks, rows:%d", GET_TASKID(pTaskInfo), p->info.rows);
return (p->info.rows > 0) ? p : NULL;
}
@ -2261,7 +2253,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
@ -2288,7 +2279,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio = pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReaders = dataReaders;
@ -2307,17 +2297,22 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
taosArrayPush(pInfo->sortSourceParams, param);
taosMemoryFree(param);
}
pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
pInfo->sortBufSize = pInfo->bufPageSize * 16;
pInfo->bufPageSize = getProperSortPageSize(rowSize);
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
pInfo->sortBufSize = pInfo->bufPageSize * (taosArrayGetSize(dataReaders) + 1);
pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL;
pOperator->name = "TableMergeScanOperator";
// TODO : change it
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;

View File

@ -392,10 +392,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
pInfo->bufPageSize = getProperSortPageSize(rowSize);
uint32_t numOfSources = taosArrayGetSize(pSortInfo);
numOfSources = TMAX(4, numOfSources);
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
// one additional is reserved for merged result.
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
pOperator->fpSet =
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,

View File

@ -195,6 +195,11 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
}
static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) {
pSource->src.rowIndex = -1;
++pHandle->numOfCompletedSources;
}
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) {
cmpParam->pSources = taosArrayGet(pSources, startIndex);
cmpParam->numOfSources = (endIndex - startIndex + 1);
@ -205,8 +210,10 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i];
// set current source is done
if (taosArrayGetSize(pSource->pageIdList) == 0) {
return TSDB_CODE_SUCCESS;
setCurrentSourceIsDone(pSource, pHandle);
continue;
}
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
@ -233,10 +240,9 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
SSortSource* pSource = cmpParam->pSources[i];
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
// set current source id done
// set current source is done
if (pSource->src.pBlock == NULL) {
pSource->src.rowIndex = -1;
++pHandle->numOfCompletedSources;
setCurrentSourceIsDone(pSource, pHandle);
}
}
}

View File

@ -44,6 +44,8 @@ extern "C" {
#define FUNC_MGT_FORBID_FILL_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(15)
#define FUNC_MGT_INTERVAL_INTERPO_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(16)
#define FUNC_MGT_FORBID_STREAM_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(17)
#define FUNC_MGT_FORBID_WINDOW_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(18)
#define FUNC_MGT_FORBID_GROUP_BY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(19)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)

View File

@ -1086,8 +1086,8 @@ static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE can only be columns");
if (!nodesExprHasColumn(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "The parameters of UNIQUE must contain columns");
}
pFunc->node.resType = ((SExprNode*)pPara)->resType;
@ -2114,7 +2114,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "tail",
.type = FUNCTION_TYPE_TAIL,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC,
.translateFunc = translateTail,
.getEnvFunc = getTailFuncEnv,
.initFunc = tailFunctionSetup,
@ -2124,7 +2124,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "unique",
.type = FUNCTION_TYPE_UNIQUE,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_WINDOW_FUNC | FUNC_MGT_FORBID_GROUP_BY_FUNC,
.translateFunc = translateUnique,
.getEnvFunc = getUniqueFuncEnv,
.initFunc = uniqueFunctionSetup,

View File

@ -1384,8 +1384,9 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
int32_t rowIndex);
static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rIndex);
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
@ -1407,11 +1408,23 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if (pEntryInfo->numOfRes > 0) {
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
} else {
setNullSelectivityValue(pCtx, pBlock, currentRow);
}
return pEntryInfo->numOfRes;
}
void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colDataAppendNULL(pDstCol, rowIndex);
}
}
void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) {
int32_t pageId = pTuplePos->pageId;
int32_t offset = pTuplePos->offset;
@ -4627,8 +4640,6 @@ int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STailItem* pItem = pInfo->pItems[i];
colDataAppend(pCol, currentRow, pItem->data, false);
// setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow);
currentRow += 1;
}

View File

@ -165,6 +165,10 @@ bool fmIsIntervalInterpoFunc(int32_t funcId) { return isSpecificClassifyFunc(fun
bool fmIsForbidStreamFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_STREAM_FUNC); }
bool fmIsForbidWindowFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_WINDOW_FUNC); }
bool fmIsForbidGroupByFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_FORBID_GROUP_BY_FUNC); }
void fmFuncMgtDestroy() {
void* m = gFunMgtService.pFuncNameHashTable;
if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) {
@ -206,7 +210,7 @@ bool fmIsInvertible(int32_t funcId) {
return res;
}
//function has same input/output type
// function has same input/output type
bool fmIsSameInOutType(int32_t funcId) {
bool res = false;
switch (funcMgtBuiltins[funcId].type) {
@ -301,7 +305,7 @@ static int32_t createMergeFunction(const SFunctionNode* pSrcFunc, const SFunctio
nodesDestroyList(pParameterList);
return TSDB_CODE_OUT_OF_MEMORY;
}
//overwrite function restype set by translate function
// overwrite function restype set by translate function
if (fmIsSameInOutType(pSrcFunc->funcId)) {
(*pMergeFunc)->node.resType = pSrcFunc->node.resType;
}

View File

@ -88,6 +88,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SStreamOptions));
case QUERY_NODE_LEFT_VALUE:
return makeNode(type, sizeof(SLeftValueNode));
case QUERY_NODE_COLUMN_REF:
return makeNode(type, sizeof(SColumnDefNode));
case QUERY_NODE_SET_OPERATOR:
return makeNode(type, sizeof(SSetOperator));
case QUERY_NODE_SELECT_STMT:
@ -1463,6 +1465,26 @@ int32_t nodesCollectSpecialNodes(SSelectStmt* pSelect, ESqlClause clause, ENodeT
return TSDB_CODE_SUCCESS;
}
static EDealRes hasColumn(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
bool nodesExprHasColumn(SNode* pNode) {
bool hasCol = false;
nodesWalkExprPostOrder(pNode, hasColumn, &hasCol);
return hasCol;
}
bool nodesExprsHasColumn(SNodeList* pList) {
bool hasCol = false;
nodesWalkExprsPostOrder(pList, hasColumn, &hasCol);
return hasCol;
}
char* nodesGetFillModeString(EFillMode mode) {
switch (mode) {
case FILL_MODE_NONE:

View File

@ -648,6 +648,8 @@ SNode* addOrderByClause(SAstCreateContext* pCxt, SNode* pStmt, SNodeList* pOrder
CHECK_PARSER_STATUS(pCxt);
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
((SSelectStmt*)pStmt)->pOrderByList = pOrderByList;
} else {
((SSetOperator*)pStmt)->pOrderByList = pOrderByList;
}
return pStmt;
}

View File

@ -36,6 +36,7 @@ typedef struct STranslateContext {
int32_t currLevel;
ESqlClause currClause;
SSelectStmt* pCurrSelectStmt;
SSetOperator* pCurrSetOperator;
SCmdMsgInfo* pCmdMsg;
SHashObj* pDbs;
SHashObj* pTables;
@ -432,7 +433,7 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p
static void setColumnInfoByExpr(const STableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) {
SColumnNode* pCol = *pColRef;
pCol->pProjectRef = (SNode*)pExpr;
// pCol->pProjectRef = (SNode*)pExpr;
if (NULL == pExpr->pAssociation) {
pExpr->pAssociation = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
}
@ -613,17 +614,36 @@ static EDealRes translateColumnWithoutPrefix(STranslateContext* pCxt, SColumnNod
return DEAL_RES_CONTINUE;
}
static bool translateColumnUseAlias(STranslateContext* pCxt, SColumnNode** pCol) {
SNodeList* pProjectionList = pCxt->pCurrSelectStmt->pProjectionList;
static SNodeList* getProjectListFromCxt(STranslateContext* pCxt) {
if (NULL != pCxt->pCurrSelectStmt) {
return pCxt->pCurrSelectStmt->pProjectionList;
} else if (NULL != pCxt->pCurrSetOperator) {
return pCxt->pCurrSetOperator->pProjectionList;
} else {
return NULL;
}
}
static EDealRes translateColumnUseAlias(STranslateContext* pCxt, SColumnNode** pCol, bool* pFound) {
SNodeList* pProjectionList = getProjectListFromCxt(pCxt);
SNode* pNode;
FOREACH(pNode, pProjectionList) {
SExprNode* pExpr = (SExprNode*)pNode;
if (0 == strcmp((*pCol)->colName, pExpr->aliasName)) {
setColumnInfoByExpr(NULL, pExpr, pCol);
return true;
SColumnRefNode* pColRef = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pColRef) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
strcpy(pColRef->colName, pExpr->aliasName);
nodesDestroyNode(*(SNode**)pCol);
*(SNode**)pCol = (SNode*)pColRef;
*pFound = true;
return DEAL_RES_CONTINUE;
}
}
return false;
*pFound = false;
return DEAL_RES_CONTINUE;
}
static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) {
@ -638,9 +658,11 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode** pCol) {
} else {
bool found = false;
if (SQL_CLAUSE_ORDER_BY == pCxt->currClause) {
found = translateColumnUseAlias(pCxt, pCol);
res = translateColumnUseAlias(pCxt, pCol, &found);
}
if (DEAL_RES_ERROR != res && !found) {
res = translateColumnWithoutPrefix(pCxt, pCol);
}
res = (found ? DEAL_RES_CONTINUE : translateColumnWithoutPrefix(pCxt, pCol));
}
return res;
}
@ -1098,11 +1120,43 @@ static int32_t translateWindowPseudoColumnFunc(STranslateContext* pCxt, SFunctio
return TSDB_CODE_SUCCESS;
}
static int32_t translateForbidWindowFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsForbidWindowFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (NULL != pCxt->pCurrSelectStmt->pWindow) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC, pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateForbidStreamFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsForbidStreamFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (pCxt->createStream) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC, pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateForbidGroupByFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsForbidGroupByFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (NULL != pCxt->pCurrSelectStmt->pGroupByList) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC, pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}
static void setFuncClassification(SSelectStmt* pSelect, SFunctionNode* pFunc) {
if (NULL != pSelect) {
pSelect->hasAggFuncs = pSelect->hasAggFuncs ? true : fmIsAggFunc(pFunc->funcId);
pSelect->hasRepeatScanFuncs = pSelect->hasRepeatScanFuncs ? true : fmIsRepeatScanFunc(pFunc->funcId);
pSelect->hasIndefiniteRowsFunc = pSelect->hasIndefiniteRowsFunc ? true : fmIsIndefiniteRowsFunc(pFunc->funcId);
pSelect->hasUniqueFunc = pSelect->hasUniqueFunc ? true : (FUNCTION_TYPE_UNIQUE == pFunc->funcType);
pSelect->hasTailFunc = pSelect->hasTailFunc ? true : (FUNCTION_TYPE_TAIL == pFunc->funcType);
}
}
@ -1130,6 +1184,15 @@ static EDealRes translateFunction(STranslateContext* pCxt, SFunctionNode* pFunc)
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateWindowPseudoColumnFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateForbidWindowFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateForbidStreamFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = translateForbidGroupByFunc(pCxt, pFunc);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) {
setFuncClassification(pCxt->pCurrSelectStmt, pFunc);
}
@ -1762,11 +1825,11 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro
} else if (0 == pos || pos > LIST_LENGTH(pProjectionList)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_NUMBER_OF_SELECT);
} else {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
SColumnRefNode* pCol = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF);
if (NULL == pCol) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY);
}
setColumnInfoByExpr(NULL, (SExprNode*)nodesListGetNode(pProjectionList, pos - 1), &pCol);
strcpy(pCol->colName, ((SExprNode*)nodesListGetNode(pProjectionList, pos - 1))->aliasName);
((SOrderByExprNode*)pNode)->pExpr = (SNode*)pCol;
nodesDestroyNode(pExpr);
}
@ -1781,9 +1844,7 @@ static int32_t translateOrderByPosition(STranslateContext* pCxt, SNodeList* pPro
static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
bool other;
int32_t code = translateOrderByPosition(pCxt, pSelect->pProjectionList, pSelect->pOrderByList, &other);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
if (TSDB_CODE_SUCCESS == code) {
if (!other) {
return TSDB_CODE_SUCCESS;
}
@ -1792,6 +1853,7 @@ static int32_t translateOrderBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_CODE_SUCCESS == code) {
code = checkExprListForGroupBy(pCxt, pSelect->pOrderByList);
}
}
return code;
}
@ -2116,6 +2178,172 @@ static int32_t rewriteTimelineFunc(STranslateContext* pCxt, SSelectStmt* pSelect
return pCxt->errCode;
}
typedef struct SRwriteUniqueCxt {
STranslateContext* pTranslateCxt;
SNode* pExpr;
} SRwriteUniqueCxt;
static EDealRes rewriteSeletcValueFunc(STranslateContext* pCxt, SNode** pNode) {
SFunctionNode* pFirst = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (NULL == pFirst) {
pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
strcpy(pFirst->functionName, "first");
TSWAP(pFirst->pParameterList, ((SFunctionNode*)*pNode)->pParameterList);
strcpy(pFirst->node.aliasName, ((SExprNode*)*pNode)->aliasName);
nodesDestroyNode(*pNode);
*pNode = (SNode*)pFirst;
pCxt->errCode = fmGetFuncInfo(pFirst, pCxt->msgBuf.buf, pCxt->msgBuf.len);
pCxt->pCurrSelectStmt->hasAggFuncs = true;
return TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR;
}
static EDealRes rewriteUniqueFunc(SNode** pNode, void* pContext) {
SRwriteUniqueCxt* pCxt = pContext;
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (FUNCTION_TYPE_UNIQUE == pFunc->funcType) {
SNode* pExpr = nodesListGetNode(pFunc->pParameterList, 0);
NODES_CLEAR_LIST(pFunc->pParameterList);
strcpy(((SExprNode*)pExpr)->aliasName, ((SExprNode*)*pNode)->aliasName);
nodesDestroyNode(*pNode);
*pNode = pExpr;
pCxt->pExpr = pExpr;
return DEAL_RES_IGNORE_CHILD;
} else if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType) {
return rewriteSeletcValueFunc(pCxt->pTranslateCxt, pNode);
}
}
return DEAL_RES_CONTINUE;
}
static SNode* createGroupingSet(SNode* pExpr) {
SGroupingSetNode* pGroupingSet = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
if (NULL == pGroupingSet) {
return NULL;
}
pGroupingSet->groupingSetType = GP_TYPE_NORMAL;
if (TSDB_CODE_SUCCESS != nodesListMakeStrictAppend(&pGroupingSet->pParameterList, nodesCloneNode(pExpr))) {
nodesDestroyNode((SNode*)pGroupingSet);
return NULL;
}
return (SNode*)pGroupingSet;
}
static int32_t rewriteUniqueStmt(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pSelect->hasUniqueFunc) {
return TSDB_CODE_SUCCESS;
}
SRwriteUniqueCxt cxt = {.pTranslateCxt = pCxt, .pExpr = NULL};
nodesRewriteExprs(pSelect->pProjectionList, rewriteUniqueFunc, &cxt);
if (TSDB_CODE_SUCCESS == cxt.pTranslateCxt->errCode) {
cxt.pTranslateCxt->errCode = nodesListMakeStrictAppend(&pSelect->pGroupByList, createGroupingSet(cxt.pExpr));
}
pSelect->hasIndefiniteRowsFunc = false;
return cxt.pTranslateCxt->errCode;
}
typedef struct SRwriteTailCxt {
STranslateContext* pTranslateCxt;
int64_t limit;
int64_t offset;
} SRwriteTailCxt;
static EDealRes rewriteTailFunc(SNode** pNode, void* pContext) {
SRwriteTailCxt* pCxt = pContext;
if (QUERY_NODE_FUNCTION == nodeType(*pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)*pNode;
if (FUNCTION_TYPE_TAIL == pFunc->funcType) {
pCxt->limit = ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i;
if (3 == LIST_LENGTH(pFunc->pParameterList)) {
pCxt->offset = ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 2))->datum.i;
}
SNode* pExpr = nodesListGetNode(pFunc->pParameterList, 0);
strcpy(((SExprNode*)pExpr)->aliasName, ((SExprNode*)*pNode)->aliasName);
NODES_CLEAR_LIST(pFunc->pParameterList);
nodesDestroyNode(*pNode);
*pNode = pExpr;
return DEAL_RES_IGNORE_CHILD;
}
}
return DEAL_RES_CONTINUE;
}
static int32_t createLimieNode(SRwriteTailCxt* pCxt, SLimitNode** pOutput) {
*pOutput = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT);
if (NULL == *pOutput) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pOutput)->limit = pCxt->limit;
(*pOutput)->offset = pCxt->offset;
return TSDB_CODE_SUCCESS;
}
static SNode* createOrderByExpr(STranslateContext* pCxt) {
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pOrder) {
return NULL;
}
pCxt->errCode = createPrimaryKeyCol(pCxt, &pOrder->pExpr);
if (TSDB_CODE_SUCCESS != pCxt->errCode) {
nodesDestroyNode((SNode*)pOrder);
return NULL;
}
pOrder->order = ORDER_DESC;
pOrder->nullOrder = NULL_ORDER_FIRST;
return (SNode*)pOrder;
}
static int32_t rewriteTailStmt(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (!pSelect->hasTailFunc) {
return TSDB_CODE_SUCCESS;
}
SRwriteTailCxt cxt = {.pTranslateCxt = pCxt, .limit = -1, .offset = -1};
nodesRewriteExprs(pSelect->pProjectionList, rewriteTailFunc, &cxt);
int32_t code = nodesListMakeStrictAppend(&pSelect->pOrderByList, createOrderByExpr(pCxt));
if (TSDB_CODE_SUCCESS == code) {
code = createLimieNode(&cxt, &pSelect->pLimit);
}
pSelect->hasIndefiniteRowsFunc = false;
return code;
}
typedef struct SReplaceOrderByAliasCxt {
STranslateContext* pTranslateCxt;
SNodeList* pProjectionList;
} SReplaceOrderByAliasCxt;
static EDealRes replaceOrderByAliasImpl(SNode** pNode, void* pContext) {
SReplaceOrderByAliasCxt* pCxt = pContext;
if (QUERY_NODE_COLUMN_REF == nodeType(*pNode)) {
SNodeList* pProjectionList = pCxt->pProjectionList;
SNode* pProject = NULL;
FOREACH(pProject, pProjectionList) {
SExprNode* pExpr = (SExprNode*)pProject;
if (0 == strcmp(((SColumnRefNode*)*pNode)->colName, pExpr->aliasName)) {
SNode* pNew = nodesCloneNode(pProject);
if (NULL == pNew) {
pCxt->pTranslateCxt->errCode = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
nodesDestroyNode(*pNode);
*pNode = pNew;
return DEAL_RES_CONTINUE;
}
}
}
return DEAL_RES_CONTINUE;
}
static int32_t replaceOrderByAlias(STranslateContext* pCxt, SNodeList* pProjectionList, SNodeList* pOrderByList) {
SReplaceOrderByAliasCxt cxt = {.pTranslateCxt = pCxt, .pProjectionList = pProjectionList};
nodesRewriteExprsPostOrder(pOrderByList, replaceOrderByAliasImpl, &cxt);
return pCxt->errCode;
}
static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->pCurrSelectStmt = pSelect;
int32_t code = translateFrom(pCxt, pSelect->pFromTable);
@ -2147,9 +2375,18 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_CODE_SUCCESS == code) {
code = checkLimit(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteUniqueStmt(pCxt, pSelect);
}
// if (TSDB_CODE_SUCCESS == code) {
// code = rewriteTailStmt(pCxt, pSelect);
// }
if (TSDB_CODE_SUCCESS == code) {
code = rewriteTimelineFunc(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceOrderByAlias(pCxt, pSelect->pProjectionList, pSelect->pOrderByList);
}
return code;
}
@ -2186,7 +2423,7 @@ static int32_t createCastFunc(STranslateContext* pCxt, SNode* pExpr, SDataType d
return TSDB_CODE_SUCCESS;
}
static int32_t translateSetOperatorImpl(STranslateContext* pCxt, SSetOperator* pSetOperator) {
static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pSetOperator) {
SNodeList* pLeftProjections = getProjectList(pSetOperator->pLeft);
SNodeList* pRightProjections = getProjectList(pSetOperator->pRight);
if (LIST_LENGTH(pLeftProjections) != LIST_LENGTH(pRightProjections)) {
@ -2221,6 +2458,23 @@ static uint8_t calcSetOperatorPrecision(SSetOperator* pSetOperator) {
return calcPrecision(getStmtPrecision(pSetOperator->pLeft), getStmtPrecision(pSetOperator->pRight));
}
static int32_t translateSetOperOrderBy(STranslateContext* pCxt, SSetOperator* pSetOperator) {
bool other;
int32_t code = translateOrderByPosition(pCxt, pSetOperator->pProjectionList, pSetOperator->pOrderByList, &other);
if (TSDB_CODE_SUCCESS == code) {
if (other) {
pCxt->currClause = SQL_CLAUSE_ORDER_BY;
pCxt->pCurrSelectStmt = NULL;
pCxt->pCurrSetOperator = pSetOperator;
code = translateExprList(pCxt, pSetOperator->pOrderByList);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = replaceOrderByAlias(pCxt, pSetOperator->pProjectionList, pSetOperator->pOrderByList);
}
return code;
}
static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetOperator) {
int32_t code = translateQuery(pCxt, pSetOperator->pLeft);
if (TSDB_CODE_SUCCESS == code) {
@ -2231,7 +2485,10 @@ static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetO
}
if (TSDB_CODE_SUCCESS == code) {
pSetOperator->precision = calcSetOperatorPrecision(pSetOperator);
code = translateSetOperatorImpl(pCxt, pSetOperator);
code = translateSetOperProject(pCxt, pSetOperator);
}
if (TSDB_CODE_SUCCESS == code) {
code = translateSetOperOrderBy(pCxt, pSetOperator);
}
return code;
}

View File

@ -185,9 +185,15 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG:
return "The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes";
case TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC:
return "%s function not allowed in fill query";
return "%s function does not supportted in fill query";
case TSDB_CODE_PAR_INVALID_WINDOW_PC:
return "_WSTARTTS, _WENDTS and _WDURATION can only be used in window queries";
return "_WSTARTTS, _WENDTS and _WDURATION can only be used in window query";
case TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC:
return "%s function does not supportted in time window query";
case TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC:
return "%s function does not supportted in stream query";
case TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC:
return "%s function does not supportted in group query";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:

View File

@ -528,6 +528,12 @@ TEST_F(ParserInitialCTest, createStream) {
clearCreateStreamReq();
}
TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
useDb("root", "test");
run("CREATE STREAM s1 AS SELECT PERCENTILE(c1, 30) FROM t1", TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC);
}
TEST_F(ParserInitialCTest, createTable) {
useDb("root", "test");

View File

@ -161,6 +161,40 @@ TEST_F(ParserSelectTest, useDefinedFunc) {
run("SELECT udf2(c1) FROM t1 GROUP BY c2");
}
TEST_F(ParserSelectTest, uniqueFunc) {
useDb("root", "test");
run("SELECT UNIQUE(c1) FROM t1");
run("SELECT UNIQUE(c2 + 10) FROM t1 WHERE c1 > 10");
run("SELECT UNIQUE(c2 + 10), ts, c2 FROM t1 WHERE c1 > 10");
}
TEST_F(ParserSelectTest, uniqueFuncSemanticCheck) {
useDb("root", "test");
run("SELECT UNIQUE(c1) FROM t1 INTERVAL(10S)", TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC);
run("SELECT UNIQUE(c1) FROM t1 GROUP BY c2", TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC);
}
TEST_F(ParserSelectTest, tailFunc) {
useDb("root", "test");
run("SELECT TAIL(c1, 10) FROM t1");
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10");
}
TEST_F(ParserSelectTest, tailFuncSemanticCheck) {
useDb("root", "test");
run("SELECT TAIL(c1, 10) FROM t1 INTERVAL(10S)", TSDB_CODE_PAR_WINDOW_NOT_ALLOWED_FUNC);
run("SELECT TAIL(c1, 10) FROM t1 GROUP BY c2", TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC);
}
TEST_F(ParserSelectTest, groupBy) {
useDb("root", "test");
@ -328,6 +362,8 @@ TEST_F(ParserSelectTest, setOperator) {
run("(SELECT * FROM t1) UNION ALL (SELECT * FROM t1)");
run("SELECT c1 FROM (SELECT c1 FROM t1 UNION ALL SELECT c1 FROM t1)");
run("SELECT c1, c2 FROM t1 UNION ALL SELECT c1 as a, c2 as b FROM t1 ORDER BY c1");
}
TEST_F(ParserSelectTest, informationSchema) {

View File

@ -21,7 +21,7 @@
static void dumpQueryPlan(SQueryPlan* pPlan) {
char* pStr = NULL;
nodesNodeToString((SNode*)pPlan, false, &pStr, NULL);
planDebugL("Query Plan: %s", pStr);
planDebugL("QID:0x%" PRIx64 " Query Plan: %s", pPlan->queryId, pStr);
taosMemoryFree(pStr);
}

View File

@ -53,3 +53,23 @@ TEST_F(PlanBasicTest, func) {
run("SELECT TOP(c1, 60) FROM t1");
}
TEST_F(PlanBasicTest, uniqueFunc) {
useDb("root", "test");
run("SELECT UNIQUE(c1) FROM t1");
run("SELECT UNIQUE(c2 + 10) FROM t1 WHERE c1 > 10");
run("SELECT UNIQUE(c2 + 10), ts, c2 FROM t1 WHERE c1 > 10");
run("SELECT UNIQUE(c1) a FROM t1 ORDER BY a");
}
TEST_F(PlanBasicTest, tailFunc) {
useDb("root", "test");
run("SELECT TAIL(c1, 10) FROM t1");
run("SELECT TAIL(c2 + 10, 10, 80) FROM t1 WHERE c1 > 10");
}

View File

@ -27,6 +27,8 @@ TEST_F(PlanOrderByTest, basic) {
run("SELECT c1 FROM t1 ORDER BY c1");
// ORDER BY key is not in the projection list
run("SELECT c1 FROM t1 ORDER BY c2");
run("SELECT c1 + 10 AS a FROM t1 ORDER BY a");
}
TEST_F(PlanOrderByTest, expr) {
@ -41,6 +43,12 @@ TEST_F(PlanOrderByTest, nullsOrder) {
run("SELECT * FROM t1 ORDER BY c1 DESC NULLS FIRST");
}
TEST_F(PlanOrderByTest, withGroupBy) {
useDb("root", "test");
run("SELECT SUM(c1) AS a FROM t1 GROUP BY c2 ORDER BY a");
}
TEST_F(PlanOrderByTest, stable) {
useDb("root", "test");

View File

@ -23,9 +23,9 @@ class PlanSetOpTest : public PlannerTestBase {};
TEST_F(PlanSetOpTest, unionAll) {
useDb("root", "test");
// sql 1: single UNION ALL operator
// single UNION ALL operator
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20");
// sql 2: multi UNION ALL operator
// multi UNION ALL operator
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 "
"UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20 "
"UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 30");
@ -46,6 +46,14 @@ TEST_F(PlanSetOpTest, unionAllWithSubquery) {
run("SELECT ts FROM (SELECT ts FROM st1) UNION ALL SELECT ts FROM (SELECT ts FROM st1)");
}
TEST_F(PlanSetOpTest, unionAllWithOrderBy) {
useDb("root", "test");
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20 ORDER BY c1");
run("SELECT c1, c2 FROM t1 WHERE c1 > 10 UNION ALL SELECT c1, c2 FROM t1 WHERE c1 > 20 ORDER BY 1");
}
TEST_F(PlanSetOpTest, union) {
useDb("root", "test");

View File

@ -29,6 +29,8 @@ extern "C" {
#define CONFIG_FILE_LEN 1024
#define MAX_CONFIG_INDEX_COUNT 512
typedef struct SRaftCfg {
SSyncCfg cfg;
TdFilePtr pFile;
@ -36,6 +38,10 @@ typedef struct SRaftCfg {
int8_t isStandBy;
int8_t snapshotEnable;
SyncIndex lastConfigIndex;
SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT];
int32_t configIndexCount;
} SRaftCfg;
SRaftCfg *raftCfgOpen(const char *path);

View File

@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
sDebug("vgId:%d sync event currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, ths->pRaftStore->currentTerm,
delBegin, delEnd);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code;
@ -880,6 +880,72 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
}
} while (0);
// fake match2
//
// condition1:
// preIndex <= my commit index
//
// operation:
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
// match my-commit-index or my-commit-index + 1
// no operation on log
do {
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex);
if (condition) {
sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex,
ths->commitIndex);
SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0;
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
// append entry
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ASSERT(pAppendEntry != NULL);
{
// has extra entries (> preIndex) in local log
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex;
if (hasExtraEntries) {
// make log same, rollback deleted entries
code = syncNodeMakeLogSame(ths, pMsg);
ASSERT(code == 0);
}
}
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
// pre commit
code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0);
matchIndex = pMsg->prevLogIndex + 1;
syncEntryDestory(pAppendEntry);
}
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true;
pReply->matchIndex = matchIndex;
// send response
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
return ret;
}
} while (0);
// calculate logOK here, before will coredump, due to fake match
bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
@ -995,8 +1061,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", ths->vgId,
ths->pRaftStore->currentTerm, commitBegin, commitEnd, syncUtilState2String(ths->state));
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
commitBegin, commitEnd);
}
SyncIndex beginIndex = ths->commitIndex + 1;

View File

@ -190,19 +190,23 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
"sender:%s",
ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s);
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, s);
taosMemoryFree(s);
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm);
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
}
}

View File

@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex,
syncUtilState2String(pSyncNode->state));
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex);
}
// update commit index

View File

@ -420,6 +420,29 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
return 0;
}
int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
assert(rid == pSyncNode->rid);
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotIndex) {
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
}
}
sMeta->lastConfigIndex = lastIndex;
sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
const char* syncGetMyRoleStr(int64_t rid) {
const char* s = syncUtilState2String(syncGetMyRole(rid));
return s;
@ -575,8 +598,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return -1;
}
assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType);
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
@ -585,8 +609,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub;
@ -832,7 +857,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1;
sDebug("vgId:%d sync event currentTerm:%lu sync open", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
return pSyncNode;
}
@ -879,7 +905,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void syncNodeClose(SSyncNode* pSyncNode) {
sDebug("vgId:%d sync event currentTerm:%lu sync close", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
int32_t ret;
assert(pSyncNode != NULL);
@ -1318,7 +1345,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d sync event currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId,
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm);
if (gRaftDetailLog) {
;
@ -1371,9 +1399,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
char host[128];
uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port,
oldSenders[j], oldSenders[j]->privateTerm);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
"privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j],
oldSenders[j]->privateTerm);
(pSyncNode->senders)[i] = oldSenders[j];
oldSenders[j] = NULL;
reset = true;
@ -1381,9 +1412,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
// reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i;
sDebug("vgId:%d sync event currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port,
(pSyncNode->senders)[i], reset);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
"reset:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
}
}
}
@ -1392,9 +1425,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug("vgId:%d sync event currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i,
(pSyncNode->senders)[i]->privateTerm);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
}
}
@ -1402,7 +1436,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]);
sDebug("vgId:%d sync event currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId,
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
oldSenders[i] = NULL;
}
@ -1474,9 +1509,10 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug(
"vgId:%d sync event currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// maybe clear leader cache
@ -1514,8 +1550,11 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// reset restoreFinish
pSyncNode->restoreFinish = false;
sDebug("vgId:%d sync event currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// state change
@ -2090,13 +2129,15 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
sDebug("vgId:%d sync event currentTerm:%lu begin leader transfer", ths->vgId, ths->pRaftStore->currentTerm);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn,
pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
// reset elect timer now!
int32_t electMS = 1;
@ -2174,8 +2215,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
// change isStandBy to normal
if (!isDrop) {
char tmpbuf[128];
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
char tmpbuf[512];
char* oldStr = syncCfg2Str(&oldSyncCfg);
char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths, tmpbuf);
} else {
@ -2183,8 +2232,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
}
}
} else {
char tmpbuf[128];
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum);
char tmpbuf[512];
char* oldStr = syncCfg2Str(&oldSyncCfg);
char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
syncNodeBecomeFollower(ths, tmpbuf);
}
@ -2218,8 +2275,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0;
ESyncState state = flag;
sDebug("vgId:%d sync event currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId,
ths->pRaftStore->currentTerm, beginIndex, endIndex, syncUtilState2String(state));
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64
", %s",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex,
endIndex, syncUtilState2String(state));
// execute fsm
if (ths->pFsm != NULL) {
@ -2261,14 +2320,16 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
}
// restore finish
// if only snapshot, a noop entry will be append, so syncLogLastIndex is always ok
if (pEntry->index == ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinishCb != NULL) {
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sDebug("vgId:%d sync event currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), pEntry->index);
sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
syncUtilState2String(ths->state), pEntry->index);
}
}

View File

@ -85,16 +85,11 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
}
return pRoot;
/*
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot);
return pJson;
*/
}
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -154,6 +149,16 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
cJSON_AddStringToObject(pRoot, "lastConfigIndex", buf64);
cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfg->configIndexCount);
cJSON *pIndexArr = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr);
for (int i = 0; i < pRaftCfg->configIndexCount; ++i) {
snprintf(buf64, sizeof(buf64), "%ld", (pRaftCfg->configIndexArr)[i]);
cJSON *pIndexObj = cJSON_CreateObject();
cJSON_AddStringToObject(pIndexObj, "index", buf64);
cJSON_AddItemToArray(pIndexArr, pIndexObj);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
return pJson;
@ -161,7 +166,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -177,6 +182,9 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
raftCfg.isStandBy = meta.isStandBy;
raftCfg.snapshotEnable = meta.snapshotEnable;
raftCfg.lastConfigIndex = meta.lastConfigIndex;
raftCfg.configIndexCount = 1;
memset(raftCfg.configIndexArr, 0, sizeof(raftCfg.configIndexArr));
raftCfg.configIndexArr[0] = -1;
char *s = raftCfg2Str(&raftCfg);
char buf[CONFIG_FILE_LEN] = {0};
@ -207,7 +215,24 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonLastConfigIndex = cJSON_GetObjectItem(pJson, "lastConfigIndex");
pRaftCfg->lastConfigIndex = atoll(cJSON_GetStringValue(pJsonLastConfigIndex));
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount");
pRaftCfg->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount);
cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr");
int arraySize = cJSON_GetArraySize(pIndexArr);
assert(arraySize == pRaftCfg->configIndexCount);
memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr));
for (int i = 0; i < arraySize; ++i) {
cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i);
assert(pIndexObj != NULL);
cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index");
assert(cJSON_IsString(pIndex));
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
}
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0);

View File

@ -163,9 +163,11 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true);
sDebug("vgId:%d sync event currentTerm:%lu write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d",
pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index,
syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy,
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code;
@ -323,10 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync(pWal, true);
sDebug(
"vgId:%d sync event currentTerm:%lu old write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d",
pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index,
syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType),
pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d",
pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code;
}

View File

@ -46,7 +46,9 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId,
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
@ -71,9 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType,
index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex));
return 1; // get one object
@ -90,9 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType,
index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
taosThreadMutexUnlock(&(pObj->mutex));

View File

@ -141,20 +141,24 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
}
@ -283,29 +287,35 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
}
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
}
@ -339,14 +349,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->privateTerm, msgStr);
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu send "
"msg:%s",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm,
msgStr);
taosMemoryFree(msgStr);
} else {
sDebug("vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq,
pSender->ack, pSender->privateTerm);
sDebug(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm);
}
syncSnapshotSendDestroy(pMsg);
@ -406,7 +421,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -527,7 +542,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
cJSON * pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
@ -551,7 +566,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
@ -579,17 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm);
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
@ -602,7 +623,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
int32_t oldReplicaNum = pSyncNode->replicaNum;
// int32_t oldReplicaNum = pSyncNode->replicaNum;
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
// update new config myIndex
SSyncCfg newSyncCfg = pMsg->lastConfig;
@ -626,24 +648,34 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
bool isDrop;
if (IamInNew) {
sDebug(
"vgId:%d sync event currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld ",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex);
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu do not update config by snapshot, I am not in newCfg, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
"newCfg, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex);
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
}
// change isStandBy to normal
if (!isDrop) {
char tmpbuf[128];
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d", oldReplicaNum, newSyncCfg.replicaNum);
char tmpbuf[512];
char *oldStr = syncCfg2Str(&oldSyncCfg);
char *newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s",
oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, tmpbuf);
} else {
@ -662,20 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm,
logSimpleStr);
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr);
taosMemoryFree(logSimpleStr);
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm);
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm);
}
pReceiver->pWriter = NULL;
@ -686,17 +721,21 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
@ -711,20 +750,24 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv "
"msg:%s",
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
}
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
@ -744,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr);
} else {
sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm);
}
} else {

View File

@ -168,6 +168,7 @@ char* syncUtilRaftId2Str(const SRaftId* p) {
}
const char* syncUtilState2String(ESyncState state) {
/*
if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "TAOS_SYNC_STATE_FOLLOWER";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) {
@ -177,6 +178,17 @@ const char* syncUtilState2String(ESyncState state) {
} else {
return "TAOS_SYNC_STATE_UNKNOWN";
}
*/
if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "follower";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) {
return "candidate";
} else if (state == TAOS_SYNC_STATE_LEADER) {
return "leader";
} else {
return "state_error";
}
}
bool syncUtilCanPrint(char c) {

View File

@ -27,6 +27,14 @@ SRaftCfg* createRaftCfg() {
}
pCfg->isStandBy = taosGetTimestampSec() % 100;
pCfg->configIndexCount = 5;
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
(pCfg->configIndexArr)[i] = -1;
}
for (int i = 0; i < pCfg->configIndexCount; ++i) {
(pCfg->configIndexArr)[i] = i * 100;
}
return pCfg;
}
@ -100,6 +108,15 @@ void test5() {
pCfg->isStandBy += 2;
pCfg->snapshotEnable += 3;
pCfg->lastConfigIndex += 1000;
pCfg->configIndexCount = 5;
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
(pCfg->configIndexArr)[i] = -1;
}
for (int i = 0; i < pCfg->configIndexCount; ++i) {
(pCfg->configIndexArr)[i] = i * 100;
}
raftCfgPersist(pCfg);
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);

View File

@ -282,6 +282,8 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) {
}
int32_t tfsRmdir(STfs *pTfs, const char *rname) {
ASSERT(rname[0] != 0);
char aname[TMPNAME_LEN] = "\0";
for (int32_t level = 0; level < pTfs->nlevel; level++) {
@ -289,6 +291,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id];
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
uInfo("====> tfs remove dir : path:%s aname:%s rname:[%s]", pDisk->path, aname, rname);
taosRemoveDir(aname);
}
}

View File

@ -310,7 +310,7 @@ static void uvHandleReq(SSvrConn* pConn) {
}
// set up conn info
SRpcConnInfo* pConnInfo = &(transMsg.info.connInfo);
SRpcConnInfo* pConnInfo = &(transMsg.info.conn);
pConnInfo->clientIp = (uint32_t)(pConn->addr.sin_addr.s_addr);
pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));

View File

@ -946,8 +946,7 @@ int32_t taosGetFqdn(char *fqdn) {
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
// printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
assert(0);
fprintf(stderr,"failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1;
}

View File

@ -276,9 +276,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid query")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED, "Consumer unchanged")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
// mnode-stream
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")

View File

@ -3,6 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
@ -260,7 +264,7 @@ $x = 0
step92:
$x = $x + 1
sleep 1000
if $x == 10 then
if $x == 20 then
return -1
endi
sql show mnodes

View File

@ -189,6 +189,7 @@ class TDTestCase:
def check_tail_table(self , tbname , col_name , tail_rows , offset):
tail_sql = f"select tail({col_name} , {tail_rows} , {offset}) from {tbname}"
equal_sql = f"select {col_name} from (select ts , {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}) order by ts"
#equal_sql = f"select {col_name} from {tbname} order by ts desc limit {tail_rows} offset {offset}"
tdSql.query(tail_sql)
tail_result = tdSql.queryResult
@ -293,22 +294,22 @@ class TDTestCase:
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.query("select tail(c1,3,2) from ct4 where c1 >2 ")
tdSql.checkData(0, 0, 7)
tdSql.query("select tail(c1,3,2) from ct4 where c1 >2 order by 1")
tdSql.checkData(0, 0, 5)
tdSql.checkData(1, 0, 6)
tdSql.checkData(2, 0, 5)
tdSql.checkData(2, 0, 7)
tdSql.query("select tail(c1,2,1) from ct4 where c2 between 0 and 99999")
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 1)
tdSql.query("select tail(c1,2,1) from ct4 where c2 between 0 and 99999 order by 1")
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 2)
# tail with union all
tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct1")
tdSql.checkRows(15)
tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct2")
tdSql.query("select tail(c1,2,1) from ct4 union all select c1 from ct2 order by 1")
tdSql.checkRows(2)
tdSql.checkData(0, 0, 1)
tdSql.checkData(1, 0, 0)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, 1)
tdSql.query("select tail(c2,2,1) from ct4 union all select abs(c2)/2 from ct4")
tdSql.checkRows(14)
@ -334,18 +335,18 @@ class TDTestCase:
tdSql.execute(f" insert into ttb1 values({ts_value} , {i})")
tdSql.execute(f" insert into ttb2 values({ts_value} , {i})")
tdSql.query("select tail(tb2.num,3,2) from tb1, tb2 where tb1.ts=tb2.ts ")
tdSql.query("select tail(tb2.num,3,2) from tb1, tb2 where tb1.ts=tb2.ts order by 1 desc")
tdSql.checkRows(3)
tdSql.checkData(0,0,5)
tdSql.checkData(0,0,7)
tdSql.checkData(1,0,6)
tdSql.checkData(2,0,7)
tdSql.checkData(2,0,5)
# nest query
# tdSql.query("select tail(c1,2) from (select c1 from ct1)")
tdSql.query("select c1 from (select tail(c1,2) c1 from ct4)")
tdSql.query("select c1 from (select tail(c1,2) c1 from ct4) order by 1 nulls first")
tdSql.checkRows(2)
tdSql.checkData(0, 0, 0)
tdSql.checkData(1, 0, None)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, 0)
tdSql.query("select sum(c1) from (select tail(c1,2) c1 from ct1)")
tdSql.checkRows(1)

View File

@ -93,8 +93,8 @@ class TDTestCase:
"select unique(c1) , min(c1) from t1",
"select unique(c1) , spread(c1) from t1",
"select unique(c1) , diff(c1) from t1",
"select unique(c1) , abs(c1) from t1",
"select unique(c1) , c1 from t1",
#"select unique(c1) , abs(c1) from t1", # support
#"select unique(c1) , c1 from t1",
"select unique from stb1 partition by tbname",
"select unique(123--123)==1 from stb1 partition by tbname",
"select unique(123) from stb1 partition by tbname",
@ -104,21 +104,21 @@ class TDTestCase:
"select unique(c1 ,c2 ) from stb1 partition by tbname",
"select unique(c1 ,NULL) from stb1 partition by tbname",
"select unique(,) from stb1 partition by tbname;",
"select unique(floor(c1) ab from stb1 partition by tbname)",
"select unique(c1) as int from stb1 partition by tbname",
#"select unique(floor(c1) ab from stb1 partition by tbname)", # support
#"select unique(c1) as int from stb1 partition by tbname",
"select unique('c1') from stb1 partition by tbname",
"select unique(NULL) from stb1 partition by tbname",
"select unique('') from stb1 partition by tbname",
"select unique(c%) from stb1 partition by tbname",
#"select unique(t1) from stb1 partition by tbname",
#"select unique(t1) from stb1 partition by tbname", # support
"select unique(True) from stb1 partition by tbname",
"select unique(c1) , count(c1) from stb1 partition by tbname",
"select unique(c1) , avg(c1) from stb1 partition by tbname",
"select unique(c1) , min(c1) from stb1 partition by tbname",
"select unique(c1) , spread(c1) from stb1 partition by tbname",
"select unique(c1) , diff(c1) from stb1 partition by tbname",
"select unique(c1) , abs(c1) from stb1 partition by tbname",
"select unique(c1) , c1 from stb1 partition by tbname"
#"select unique(c1) , abs(c1) from stb1 partition by tbname", # support
#"select unique(c1) , c1 from stb1 partition by tbname" # support
]
for error_sql in error_sql_lists:
@ -198,7 +198,7 @@ class TDTestCase:
unique_datas = []
for elem in unique_result:
unique_datas.append(elem[0])
unique_datas.sort(key=lambda x: (x is None, x))
tdSql.query(origin_sql)
origin_result = tdSql.queryResult
@ -212,6 +212,7 @@ class TDTestCase:
continue
else:
pre_unique.append(elem)
pre_unique.sort(key=lambda x: (x is None, x))
if pre_unique == unique_datas:
tdLog.info(" unique query check pass , unique sql is: %s" %unique_sql)
@ -266,16 +267,16 @@ class TDTestCase:
tdSql.checkRows(10)
tdSql.error("select unique(c1),tbname from ct1")
tdSql.error("select unique(c1),t1 from ct1")
#tdSql.error("select unique(c1),t1 from ct1") #support
# unique with common col
tdSql.error("select unique(c1) ,ts from ct1")
tdSql.error("select unique(c1) ,c1 from ct1")
#tdSql.error("select unique(c1) ,ts from ct1")
#tdSql.error("select unique(c1) ,c1 from ct1")
# unique with scalar function
tdSql.error("select unique(c1) ,abs(c1) from ct1")
#tdSql.error("select unique(c1) ,abs(c1) from ct1")
tdSql.error("select unique(c1) , unique(c2) from ct1")
tdSql.error("select unique(c1) , abs(c2)+2 from ct1")
#tdSql.error("select unique(c1) , abs(c2)+2 from ct1")
# unique with aggregate function
@ -288,13 +289,13 @@ class TDTestCase:
tdSql.query("select unique(c1) from ct4 where c1 is null")
tdSql.checkData(0, 0, None)
tdSql.query("select unique(c1) from ct4 where c1 >2 ")
tdSql.checkData(0, 0, 8)
tdSql.checkData(1, 0, 7)
tdSql.checkData(2, 0, 6)
tdSql.checkData(5, 0, 3)
tdSql.query("select unique(c1) from ct4 where c1 >2 order by 1")
tdSql.checkData(0, 0, 3)
tdSql.checkData(1, 0, 4)
tdSql.checkData(2, 0, 5)
tdSql.checkData(5, 0, 8)
tdSql.query("select unique(c1) from ct4 where c2 between 0 and 99999")
tdSql.query("select unique(c1) from ct4 where c2 between 0 and 99999 order by 1 desc")
tdSql.checkData(0, 0, 8)
tdSql.checkData(1, 0, 7)
tdSql.checkData(2, 0, 6)
@ -335,23 +336,23 @@ class TDTestCase:
tdSql.execute(f" insert into ttb1 values({ts_value} , {i})")
tdSql.execute(f" insert into ttb2 values({ts_value} , {i})")
tdSql.query("select unique(tb2.num) from tb1, tb2 where tb1.ts=tb2.ts ")
tdSql.query("select unique(tb2.num) from tb1, tb2 where tb1.ts=tb2.ts order by 1")
tdSql.checkRows(10)
tdSql.checkData(0,0,0)
tdSql.checkData(1,0,1)
tdSql.checkData(2,0,2)
tdSql.checkData(9,0,9)
tdSql.query("select unique(tb2.num) from tb1, tb2 where tb1.ts=tb2.ts union all select unique(tb1.num) from tb1, tb2 where tb1.ts=tb2.ts ")
tdSql.query("select unique(tb2.num) from tb1, tb2 where tb1.ts=tb2.ts union all select unique(tb1.num) from tb1, tb2 where tb1.ts=tb2.ts order by 1")
tdSql.checkRows(20)
tdSql.checkData(0,0,0)
tdSql.checkData(1,0,1)
tdSql.checkData(2,0,2)
tdSql.checkData(9,0,9)
tdSql.checkData(2,0,1)
tdSql.checkData(4,0,2)
tdSql.checkData(18,0,9)
# nest query
# tdSql.query("select unique(c1) from (select c1 from ct1)")
tdSql.query("select c1 from (select unique(c1) c1 from ct4)")
tdSql.query("select c1 from (select unique(c1) c1 from ct4) order by 1 desc nulls first")
tdSql.checkRows(10)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, 8)
@ -366,7 +367,7 @@ class TDTestCase:
tdSql.checkData(0, 0, 45)
tdSql.checkData(1, 0, 45)
tdSql.query("select 1-abs(c1) from (select unique(c1) c1 from ct4)")
tdSql.query("select 1-abs(c1) from (select unique(c1) c1 from ct4) order by 1 nulls first")
tdSql.checkRows(10)
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, -7.000000000)
@ -421,7 +422,7 @@ class TDTestCase:
f"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql.query("select unique(c2) from sub1_bound")
tdSql.query("select unique(c2) from sub1_bound order by 1 desc")
tdSql.checkRows(5)
tdSql.checkData(0,0,9223372036854775807)