Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-27419

This commit is contained in:
54liuyao 2023-12-04 09:12:53 +08:00
commit 38900d672e
47 changed files with 499 additions and 324 deletions

View File

@ -393,7 +393,7 @@ pipeline {
agent{label " Mac_catalina "}
steps {
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
timeout(time: 20, unit: 'MINUTES'){
timeout(time: 30, unit: 'MINUTES'){
pre_test()
pre_test_build_mac()
}

View File

@ -31,8 +31,6 @@ extern "C" {
#endif
#define GRANT_HEART_BEAT_MIN 2
#define GRANT_ACTIVE_CODE "activeCode"
#define GRANT_C_ACTIVE_CODE "cActiveCode"
typedef enum {
TSDB_GRANT_ALL,
@ -52,11 +50,6 @@ typedef enum {
TSDB_GRANT_TABLE,
} EGrantType;
typedef struct {
int64_t grantedTime;
int64_t connGrantedTime;
} SGrantedInfo;
int32_t grantCheck(EGrantType grant);
int32_t grantAlterActiveCode(int32_t did, const char* old, const char* newer, char* out, int8_t type);

View File

@ -216,9 +216,6 @@ int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem);
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue);
#endif
int32_t streamInit();
void streamCleanUp();
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
@ -402,7 +399,8 @@ typedef struct SHistoryTaskInfo {
int32_t tickCount;
int32_t retryTimes;
int32_t waitInterval;
int64_t haltVer; // offset in wal when halt the stream task
int64_t haltVer; // offset in wal when halt the stream task
bool operatorOpen; // false by default
} SHistoryTaskInfo;
typedef struct STaskOutputInfo {
@ -852,7 +850,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
void streamMetaStartHb(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool succ);
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready);
void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);

View File

@ -558,7 +558,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_GRANT_GEN_IVLD_KEY TAOS_DEF_ERROR_CODE(0, 0x0812)
#define TSDB_CODE_GRANT_GEN_APP_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0813)
#define TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN TAOS_DEF_ERROR_CODE(0, 0x0814)
#define TSDB_CODE_GRANT_PAR_IVLD_DIST TAOS_DEF_ERROR_CODE(0, 0x0815)
// sync
// #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x

View File

@ -305,7 +305,7 @@ typedef enum ELogicConditionType {
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
#define TSDB_SYNC_NEGOTIATION_WIN 512
#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048
#define TSDB_SYNC_SNAP_BUFFER_SIZE 1024
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta

View File

@ -28,6 +28,9 @@
} \
} while (0)
extern int32_t streamTimerInit();
extern void streamTimerCleanUp();
static SDnode globalDnode = {0};
SDnode *dmInstance() { return &globalDnode; }
@ -166,6 +169,7 @@ int32_t dmInit() {
#if defined(USE_S3)
if (s3Begin() != 0) return -1;
#endif
if (streamTimerInit() != 0) return -1;
dInfo("dnode env is initialized");
return 0;
@ -194,6 +198,8 @@ void dmCleanup() {
#if defined(USE_S3)
s3End();
#endif
streamTimerCleanUp();
dInfo("dnode env is cleaned up");
taosCleanupCfg();

View File

@ -27,8 +27,6 @@ void mndCleanupCluster(SMnode *pMnode);
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len);
int64_t mndGetClusterId(SMnode *pMnode);
int64_t mndGetClusterCreateTime(SMnode *pMnode);
int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo);
int32_t mndSetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo);
int64_t mndGetClusterUpTime(SMnode *pMnode);
#ifdef __cplusplus

View File

@ -192,8 +192,6 @@ typedef struct {
int64_t createdTime;
int64_t updateTime;
int32_t upTime;
int64_t grantedTime;
int64_t connGrantedTime;
} SClusterObj;
typedef struct {

View File

@ -19,7 +19,7 @@
#include "mndTrans.h"
#define CLUSTER_VER_NUMBE 1
#define CLUSTER_RESERVE_SIZE 44
#define CLUSTER_RESERVE_SIZE 60
int64_t tsExpireTime = 0;
static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster);
@ -112,19 +112,6 @@ int64_t mndGetClusterCreateTime(SMnode *pMnode) {
return createTime;
}
int32_t mndGetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo) {
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
pInfo->grantedTime = pCluster->grantedTime;
pInfo->connGrantedTime = pCluster->connGrantedTime;
mndReleaseCluster(pMnode, pCluster, pIter);
return 0;
}
return -1;
}
static int32_t mndGetClusterUpTimeImp(SClusterObj *pCluster) {
#if 0
int32_t upTime = taosGetTimestampSec() - pCluster->updateTime / 1000;
@ -159,8 +146,6 @@ static SSdbRaw *mndClusterActionEncode(SClusterObj *pCluster) {
SDB_SET_INT64(pRaw, dataPos, pCluster->updateTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pCluster->upTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->grantedTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pCluster->connGrantedTime, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
@ -201,8 +186,6 @@ static SSdbRow *mndClusterActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pCluster->updateTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pCluster->name, TSDB_CLUSTER_ID_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pCluster->upTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pCluster->grantedTime, _OVER);
SDB_GET_INT64(pRaw, dataPos, &pCluster->connGrantedTime, _OVER);
SDB_GET_RESERVE(pRaw, dataPos, CLUSTER_RESERVE_SIZE, _OVER)
terrno = 0;
@ -235,8 +218,6 @@ static int32_t mndClusterActionUpdate(SSdb *pSdb, SClusterObj *pOld, SClusterObj
mTrace("cluster:%" PRId64 ", perform update action, old row:%p new row:%p, uptime from %d to %d", pOld->id, pOld,
pNew, pOld->upTime, pNew->upTime);
pOld->upTime = pNew->upTime;
pOld->grantedTime = pNew->grantedTime;
pOld->connGrantedTime = pNew->connGrantedTime;
pOld->updateTime = taosGetTimestampMs();
return 0;
}
@ -378,44 +359,3 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
mndTransDrop(pTrans);
return 0;
}
int32_t mndSetClusterGrantedInfo(SMnode *pMnode, SGrantedInfo *pInfo) {
SClusterObj clusterObj = {0};
void *pIter = NULL;
SClusterObj *pCluster = mndAcquireCluster(pMnode, &pIter);
if (pCluster != NULL) {
if (pCluster->grantedTime >= pInfo->grantedTime && pCluster->connGrantedTime >= pInfo->connGrantedTime) {
mndReleaseCluster(pMnode, pCluster, pIter);
return 0;
}
memcpy(&clusterObj, pCluster, sizeof(SClusterObj));
if (pCluster->grantedTime < pInfo->grantedTime) clusterObj.grantedTime = pInfo->grantedTime;
if (pCluster->connGrantedTime < pInfo->connGrantedTime) clusterObj.connGrantedTime = pInfo->connGrantedTime;
mndReleaseCluster(pMnode, pCluster, pIter);
}
if (clusterObj.id <= 0) {
mError("can't get cluster info while update granted info");
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, NULL, "granted-info");
if (pTrans == NULL) return -1;
SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
mndTransDrop(pTrans);
return 0;
}

View File

@ -790,9 +790,7 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg
if (cfgAll) { // alter all dnodes:
if (!failRecord) failRecord = taosArrayInit(1, sizeof(int32_t));
if (failRecord) taosArrayPush(failRecord, &pDnode->id);
if (0 == cfgAllErr || cfgAllErr == TSDB_CODE_GRANT_PAR_IVLD_ACTIVE) {
cfgAllErr = terrno; // output 1st or more specific error
}
if (0 == cfgAllErr) cfgAllErr = terrno; // output 1st terrno.
}
} else {
terrno = 0; // no action for dup active code
@ -808,9 +806,7 @@ static int32_t mndConfigDnode(SMnode *pMnode, SRpcMsg *pReq, SMCfgDnodeReq *pCfg
if (cfgAll) {
if (!failRecord) failRecord = taosArrayInit(1, sizeof(int32_t));
if (failRecord) taosArrayPush(failRecord, &pDnode->id);
if (0 == cfgAllErr || cfgAllErr == TSDB_CODE_GRANT_PAR_IVLD_ACTIVE) {
cfgAllErr = terrno; // output 1st or more specific error
}
if (0 == cfgAllErr) cfgAllErr = terrno;
}
} else {
terrno = 0;
@ -1287,12 +1283,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
strcpy(dcfgReq.config, "supportvnodes");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%d", flag);
} else if (strncasecmp(cfgReq.config, GRANT_ACTIVE_CODE, 10) == 0 ||
strncasecmp(cfgReq.config, GRANT_C_ACTIVE_CODE, 11) == 0) {
if (cfgReq.dnodeId != -1) {
terrno = TSDB_CODE_INVALID_CFG;
goto _err_out;
}
} else if (strncasecmp(cfgReq.config, "activeCode", 10) == 0 || strncasecmp(cfgReq.config, "cActiveCode", 11) == 0) {
int8_t opt = strncasecmp(cfgReq.config, "a", 1) == 0 ? DND_ACTIVE_CODE : DND_CONN_ACTIVE_CODE;
int8_t index = opt == DND_ACTIVE_CODE ? 10 : 11;
if (' ' != cfgReq.config[index] && 0 != cfgReq.config[index]) {
@ -1310,11 +1301,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
goto _err_out;
}
strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? GRANT_ACTIVE_CODE : GRANT_C_ACTIVE_CODE);
strcpy(dcfgReq.config, opt == DND_ACTIVE_CODE ? "activeCode" : "cActiveCode");
snprintf(dcfgReq.value, TSDB_DNODE_VALUE_LEN, "%s", cfgReq.value);
if ((terrno = mndConfigDnode(pMnode, pReq, &cfgReq, opt)) != 0) {
if (mndConfigDnode(pMnode, pReq, &cfgReq, opt) != 0) {
mError("dnode:%d, failed to config activeCode since %s", cfgReq.dnodeId, terrstr());
terrno = TSDB_CODE_INVALID_CFG;
goto _err_out;
}
tFreeSMCfgDnodeReq(&cfgReq);

View File

@ -291,12 +291,17 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
goto _OVER;
}
pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
if (pObj != NULL) {
// pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
// if (pObj != NULL) {
// terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
// goto _OVER;
// } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
// goto _OVER;
// }
if (sdbGetSize(pMnode->pSdb, SDB_SNODE) >= 1){
terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
goto _OVER;
} else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
goto _OVER;
}
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
@ -314,7 +319,7 @@ _OVER:
return -1;
}
mndReleaseSnode(pMnode, pObj);
// mndReleaseSnode(pMnode, pObj);
mndReleaseDnode(pMnode, pDnode);
tFreeSMCreateQnodeReq(&createReq);
return code;

View File

@ -363,35 +363,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0;
}
static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) {
if (NULL == ast) {
return TSDB_CODE_SUCCESS;
}
SNode * pAst = NULL;
int32_t code = nodesStringToNode(ast, &pAst);
SQueryPlan *pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) {
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
.watermark = watermark,
};
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL);
}
nodesDestroyNode(pAst);
nodesDestroyNode((SNode *)pPlan);
terrno = code;
return code;
}
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode * pAst = NULL;
SQueryPlan *pPlan = NULL;
@ -733,11 +704,20 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
pReq->streamId = pTask->id.streamId;
STransAction action = {0};
SEpSet epset = {0};
if (pTask->info.nodeId == SNODE_HANDLE) {
SSnodeObj *pObj = mndAcquireSnode(pMnode, pTask->info.nodeId);
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
} else {
SEpSet epset = {0};
if(pTask->info.nodeId == SNODE_HANDLE){
SSnodeObj *pObj = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pMnode->pSdb, pObj);
}
}else{
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
@ -783,13 +763,15 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_MND_TOO_MANY_STREAMS;
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
return terrno;
}
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
mError("Cannot write the same stable as other stream:%s", pStream->name);
sdbCancelFetch(pMnode->pSdb, pIter);
return TSDB_CODE_MND_INVALID_TARGET_TABLE;
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
return terrno;
}
}
@ -797,12 +779,12 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
}
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node;
int32_t code = -1;
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SStreamObj streamObj = {0};
char * sql = NULL;
int32_t sqlLen = 0;
terrno = TSDB_CODE_SUCCESS;
SCMCreateStreamReq createStreamReq = {0};
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
@ -825,7 +807,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (pStream != NULL) {
if (createStreamReq.igExists) {
mInfo("stream:%s, already exist, ignore exist is set", createStreamReq.name);
code = 0;
goto _OVER;
} else {
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
@ -848,8 +829,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
code = checkForNumOfStreams(pMnode, &streamObj);
if (code != TSDB_CODE_SUCCESS) {
if (checkForNumOfStreams(pMnode, &streamObj) < 0) {
goto _OVER;
}
@ -912,8 +892,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
saveStreamTasksInfo(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock);
code = TSDB_CODE_ACTION_IN_PROGRESS;
SName dbname = {0};
tNameFromString(&dbname, createStreamReq.sourceDB, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
@ -930,7 +908,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (terrno != TSDB_CODE_SUCCESS && terrno != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
}
@ -940,7 +918,8 @@ _OVER:
if (sql != NULL) {
taosMemoryFreeClear(sql);
}
return code;
return terrno;
}
int64_t mndStreamGenChkpId(SMnode *pMnode) {

View File

@ -161,6 +161,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
if(pRebVg->oldConsumerId == -1) return 0; //drop stream, no consumer, while split vnode,all consumerId is -1
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
return -1;
}

View File

@ -834,7 +834,7 @@ int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans) {
if (mndCheckTransConflict(pMnode, pTrans)) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
return terrno;
}
return 0;

View File

@ -223,8 +223,6 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
// tq
int tqInit();
void tqCleanUp();
STQ* tqOpen(const char* path, SVnode* pVnode);
void tqNotifyClose(STQ*);
void tqClose(STQ*);

View File

@ -17,12 +17,6 @@
#include "vnd.h"
#include "tqCommon.h"
typedef struct {
int8_t inited;
} STqMgmt;
static STqMgmt tqMgmt = {0};
// 0: not init
// 1: already inited
// 2: wait to be inited or cleaup
@ -32,36 +26,6 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
int32_t tqInit() {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2);
if (old != 2) break;
}
if (old == 0) {
if (streamInit() < 0) {
return -1;
}
atomic_store_8(&tqMgmt.inited, 1);
}
return 0;
}
void tqCleanUp() {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2);
if (old != 2) break;
}
if (old == 1) {
streamCleanUp();
atomic_store_8(&tqMgmt.inited, 0);
}
}
void tqDestroyTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task);
@ -337,7 +301,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter;
tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
if (ASSERT(pHandle->msg != NULL)) {
tqError("pHandle->msg should not be null");

View File

@ -72,7 +72,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
pHandle->msg->contLen = pMsg->contLen;
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
return 0;
}

View File

@ -475,6 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
if (!isLeader) {
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
return code;
@ -482,6 +483,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) {
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, vgId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
@ -670,7 +672,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}

View File

@ -278,6 +278,15 @@ _exit:
return code;
}
static int64_t tBlockDataSize(SBlockData* pBlockData) {
int64_t nData = 0;
for (int32_t iCol = 0; iCol < pBlockData->nColData; iCol++) {
SColData* pColData = tBlockDataGetColDataByIdx(pBlockData, iCol);
nData += pColData->nData;
}
return nData;
}
static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** data) {
int32_t code = 0;
int32_t lino = 0;
@ -320,8 +329,11 @@ static int32_t tsdbSnapReadTimeSeriesData(STsdbSnapReader* reader, uint8_t** dat
code = tsdbIterMergerNext(reader->dataIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
if (reader->blockData->nRow >= 81920) {
break;
if (!(reader->blockData->nRow % 16)) {
int64_t nData = tBlockDataSize(reader->blockData);
if (nData >= 1 * 1024 * 1024) {
break;
}
}
}

View File

@ -39,12 +39,6 @@ int vnodeInit(int nthreads) {
if (walInit() < 0) {
return -1;
}
if (tqInit() < 0) {
return -1;
}
if (s3Init() < 0) {
return -1;
}
return 0;
}
@ -58,7 +52,5 @@ void vnodeCleanup() {
vnodeAsyncDestroy(&vnodeAsyncHandle[1]);
walCleanUp();
tqCleanUp();
smaCleanUp();
s3CleanUp();
}

View File

@ -899,6 +899,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SWindowRowsSup* pRowSup = &pInfo->winSup;
pRowSup->numOfRows = 0;
pRowSup->startRowIndex = 0;
struct SColumnDataAgg* pAgg = NULL;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
@ -923,9 +924,6 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
doKeepTuple(pRowSup, tsList[j], gid);
} else if (compareVal(val, &pInfo->stateKey)) {
doKeepTuple(pRowSup, tsList[j], gid);
if (j == 0 && pRowSup->startRowIndex != 0) {
pRowSup->startRowIndex = 0;
}
} else { // a new state window started
SResultRow* pResult = NULL;

View File

@ -57,11 +57,6 @@ typedef struct {
SSDataBlock* pBlock;
} SStreamTrigger;
typedef struct SStreamGlobalEnv {
int8_t inited;
void* timer;
} SStreamGlobalEnv;
typedef struct SStreamContinueExecInfo {
SEpSet epset;
int32_t taskId;
@ -92,7 +87,7 @@ struct SStreamQueue {
int8_t status;
};
extern SStreamGlobalEnv streamEnv;
extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;

View File

@ -16,38 +16,18 @@
#include "streamInt.h"
#include "ttimer.h"
SStreamGlobalEnv streamEnv;
void* streamTimer = NULL;
int32_t streamInit() {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2);
if (old != 2) break;
int32_t streamTimerInit() {
streamTimer = taosTmrInit(1000, 100, 10000, "STREAM");
if (streamTimer == NULL) {
return -1;
}
if (old == 0) {
streamEnv.timer = taosTmrInit(1000, 100, 10000, "STREAM");
if (streamEnv.timer == NULL) {
atomic_store_8(&streamEnv.inited, 0);
return -1;
}
atomic_store_8(&streamEnv.inited, 1);
}
return 0;
}
void streamCleanUp() {
int8_t old;
while (1) {
old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2);
if (old != 2) break;
}
if (old == 1) {
taosTmrCleanUp(streamEnv.timer);
atomic_store_8(&streamEnv.inited, 0);
}
void streamTimerCleanUp() {
taosTmrCleanUp(streamTimer);
}
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
@ -77,7 +57,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
if (pTrigger == NULL) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer);
return;
}
@ -88,7 +68,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer);
return;
}
@ -97,7 +77,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) {
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer);
return;
}
@ -105,7 +85,7 @@ static void streamSchedByTimer(void* param, void* tmrId) {
}
}
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer);
taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer);
}
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
@ -115,7 +95,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam);
pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer);
pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer);
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}

View File

@ -506,9 +506,9 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
if (pTask->msgInfo.pTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->msgInfo.pTimer);
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pTimer);
} else {
pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer);
pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer);
}
}

View File

@ -256,7 +256,10 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
bool finished = false;
const char* id = pTask->id.idStr;
qSetStreamOpOpen(exec);
if (!pTask->hTaskInfo.operatorOpen) {
qSetStreamOpOpen(exec);
pTask->hTaskInfo.operatorOpen = true;
}
while (1) {
if (streamTaskShouldPause(pTask)) {

View File

@ -369,7 +369,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0;
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
@ -1099,7 +1099,7 @@ void metaHbToMnode(void* param, void* tmrId) {
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
return;
}
@ -1215,7 +1215,7 @@ void metaHbToMnode(void* param, void* tmrId) {
_end:
clearHbMsg(&hbMsg, pIdList);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}

View File

@ -67,7 +67,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
pTask->execInfo.start, true);
return TSDB_CODE_SUCCESS;
}
@ -114,7 +115,7 @@ static void doReExecScanhistory(void* param, void* tmrId) {
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer,
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer);
}
}
@ -137,9 +138,9 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration)
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer);
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
} else {
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer);
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
}
return TSDB_CODE_SUCCESS;
@ -469,14 +470,16 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
}
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pTask->execInfo.init,
taosGetTimestampMs(), false);
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
streamMetaUpdateTaskDownstreamStatus(pHTask, pHTask->execInfo.init, taosGetTimestampMs(), false);
streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init,
taosGetTimestampMs(), false);
streamMetaReleaseTask(pTask->pMeta, pHTask);
}
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
@ -485,7 +488,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer);
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer);
}
}
@ -726,7 +729,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
pHTaskInfo->tickCount -= 1;
if (pHTaskInfo->tickCount > 0) {
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask);
return;
}
@ -754,7 +757,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer);
taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer);
streamMetaReleaseTask(pMeta, pTask);
return;
}
@ -815,7 +818,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
streamTaskInitForLaunchHTask(&pTask->hTaskInfo);
if (pTask->hTaskInfo.pTimer == NULL) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer);
pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer);
if (pTask->hTaskInfo.pTimer == NULL) {
atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr,
@ -828,7 +831,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
} else { // timer exists
ASSERT(pTask->status.timerActive >= 1);
stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr);
taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer);
taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer);
}
return TSDB_CODE_SUCCESS;
@ -1066,15 +1069,13 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ)
}
}
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs, int64_t endTs, bool ready) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t streamMetaUpdateTaskDownstreamStatus(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
streamMetaWLock(pMeta);
STaskId id = streamTaskExtractKey(pTask);
STaskStartInfo* pStartInfo = &pMeta->startInfo;
SHashObj* pDst = ready? pStartInfo->pReadyTaskSet:pStartInfo->pFailedTaskSet;
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
@ -1086,15 +1087,14 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, pTask->id.idStr, pTask->info.taskLevel, pStartInfo->startTs, pStartInfo->readyTs,
pMeta->vgId, numOfTotal, taskId, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo);
} else {
stDebug("vgId:%d recv check down results:%d, total:%d", pMeta->vgId, numOfRecv, numOfTotal);

View File

@ -327,15 +327,19 @@ int32_t walEndSnapshot(SWal *pWal) {
// iterate files, until the searched result
// delete according to file size or close time
SWalFileInfo *pUntil = NULL;
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if ((pWal->cfg.retentionSize > 0 && newTotSize > pWal->cfg.retentionSize) ||
(pWal->cfg.retentionPeriod == 0 ||
pWal->cfg.retentionPeriod > 0 && iter->closeTs >= 0 && iter->closeTs + pWal->cfg.retentionPeriod < ts)) {
deleteCnt++;
newTotSize -= iter->fileSize;
taosArrayPush(pWal->toDeleteFiles, iter);
pUntil = iter;
}
}
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter <= pUntil; iter++) {
deleteCnt++;
taosArrayPush(pWal->toDeleteFiles, iter);
}
// make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt);

View File

@ -247,7 +247,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_NOT_EXIST, "Mnode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_ALREADY_EXIST, "Qnode already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_NOT_EXIST, "Qnode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists")
//TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode can only be created 1")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "The replica of mnode cannot less than 1")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "The replica of mnode cannot exceed 3")
@ -445,7 +446,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_DEC_IVLD_KLEN, "Invalid klen to decod
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_IVLD_KEY, "Invalid key to gen active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_APP_LIMIT, "Limited app num to gen active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_GEN_ENC_IVLD_KLEN, "Invalid klen to encode active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_PAR_IVLD_DIST, "Invalid dist to parse active code")
// sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")

View File

@ -6,6 +6,7 @@
,,y,unit-test,bash test.sh
#system test
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/stream_basic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/scalar_function.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_interval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 8-stream/at_once_session.py
@ -856,6 +857,7 @@ e
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/odbc.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill_with_group.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/state_window.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
,,n,system-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/insertMix.py -N 3
@ -880,7 +882,7 @@ e
,,y,script,./test.sh -f tsim/dnode/balance2.sim
,,y,script,./test.sh -f tsim/vnode/replica3_repeat.sim
,,y,script,./test.sh -f tsim/parser/col_arithmetic_operation.sim
,,y,script,./test.sh -f tsim/trans/create_db.sim
#,,y,script,./test.sh -f tsim/trans/create_db.sim
,,y,script,./test.sh -f tsim/dnode/balance3.sim
,,y,script,./test.sh -f tsim/vnode/replica3_many.sim
,,y,script,./test.sh -f tsim/stable/metrics_idx.sim
@ -1337,7 +1339,7 @@ e
#docs-examples test
,,n,docs-examples-test,bash python.sh
,,n,docs-examples-test,bash node.sh
#,,n,docs-examples-test,bash node.sh
,,n,docs-examples-test,bash csharp.sh
,,n,docs-examples-test,bash jdbc.sh
,,n,docs-examples-test,bash go.sh

View File

@ -113,11 +113,7 @@ sql_error drop snode on dnode 2
print =============== create drop snodes
sql create snode on dnode 1
sql create snode on dnode 2
sql show snodes
if $rows != 2 then
return -1
endi
sql_error create snode on dnode 2
print =============== restart
system sh/exec.sh -n dnode1 -s stop -x SIGINT
@ -127,7 +123,7 @@ system sh/exec.sh -n dnode2 -s start
sleep 2000
sql show snodes
if $rows != 2 then
if $rows != 1 then
return -1
endi

View File

@ -90,7 +90,10 @@ class TDTestCase:
packagePath = "/usr/local/src/"
dataPath = cPath + "/../data/"
packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz"
if platform.system() == "Linux" and platform.machine() == "aarch64":
packageName = "TDengine-server-"+ BASEVERSION + "-Linux-arm64.tar.gz"
else:
packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz"
packageTPath = packageName.split("-Linux-")[0]
my_file = Path(f"{packagePath}/{packageName}")
if not my_file.exists():

View File

@ -247,10 +247,7 @@ class TDTestCase:
tdSql.error('alter all dnodes "activeCode" "' + self.str510 + '"')
tdSql.query(f'select * from information_schema.ins_dnodes')
tdSql.checkEqual(tdSql.queryResult[0][8],"")
tdSql.error('alter dnode 1 "activeCode" ""')
tdSql.error('alter dnode 1 "activeCode"')
tdSql.execute('alter all dnodes "activeCode" ""')
tdSql.execute('alter all dnodes "activeCode"')
tdSql.execute('alter dnode 1 "activeCode" ""')
tdSql.query(f'select active_code,c_active_code from information_schema.ins_dnodes')
tdSql.checkEqual(tdSql.queryResult[0][0],"")
tdSql.checkEqual(tdSql.queryResult[0][1],'')
@ -262,10 +259,6 @@ class TDTestCase:
tdSql.error('alter all dnodes "cActiveCode" "' + self.str257 + '"')
tdSql.error('alter all dnodes "cActiveCode" "' + self.str254 + '"')
tdSql.error('alter dnode 1 "cActiveCode" "' + self.str510 + '"')
tdSql.error('alter dnode 1 "cActiveCode" ""')
tdSql.error('alter dnode 1 "cActiveCode"')
tdSql.execute('alter all dnodes "cActiveCode" ""')
tdSql.execute('alter all dnodes "cActiveCode"')
tdSql.query(f'select active_code,c_active_code from information_schema.ins_dnodes')
tdSql.checkEqual(tdSql.queryResult[0][0],"")
tdSql.checkEqual(tdSql.queryResult[0][1],"")

View File

@ -2,7 +2,7 @@ import subprocess
import random
import time
import os
import platform
from util.log import *
from util.sql import *
from util.cases import *
@ -190,6 +190,8 @@ class TDTestCase:
for v in values:
dnode = random.choice(p_list)
tdSql.execute(f'alter {dnode} "{name} {v}";')
if platform.system() == "Linux" and platform.machine() == "aarch64":
continue
value = self.get_param_value_with_gdb(alias, "taosd")
if value:
tdLog.debug(f"value: {value}")

View File

@ -0,0 +1,203 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 1
self.rowsPerTbl = 10
self.duraion = '1h'
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, paraDict):
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
tdLog.debug("%s"%(sqlString))
tsql.execute(sqlString)
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
for i in range(ctbNum):
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
tsql.execute(sqlString)
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (i < ctbNum/2):
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, 1, j%10, j%10, j%10, j%10, j%10, j%10)
else:
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
if sql != pre_insert:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'test',
'dropFlag': 1,
'vgroups': 2,
'stbName': 'meters',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
'ctbPrefix': 't',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'batchNum': 3000,
'startTs': 1537146000000,
'tsStep': 600000}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create database")
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
tdLog.info("create stb")
self.create_stable(tsql=tdSql, paraDict=paraDict)
tdLog.info("create child tables")
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
return
def prepare_original_data(self):
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("flush database test", queryTimes=1)
time.sleep(2)
def test_crash_for_state_window1(self):
tdSql.execute("drop database if exists test")
self.prepareTestEnv()
tdSql.execute("alter local 'queryPolicy' '3'")
self.prepare_original_data()
tdSql.execute("insert into t0 values(now, 4,4,4,4,4,4,4,4,4)", queryTimes=1)
tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1)
def test_crash_for_state_window2(self):
tdSql.execute("drop database if exists test")
self.prepareTestEnv()
tdSql.execute("alter local 'queryPolicy' '3'")
self.prepare_original_data()
tdSql.execute("insert into t0 values(now, 4,NULL,4,4,4,4,4,4,4)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 4,4,4,4,4,4,4,4,4)", queryTimes=1)
tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1)
def test_crash_for_state_window3(self):
tdSql.execute("drop database if exists test")
self.prepareTestEnv()
tdSql.execute("alter local 'queryPolicy' '3'")
self.prepare_original_data()
tdSql.execute("insert into t0 values(now, 4,NULL,4,4,4,4,4,4,4)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 4,5,4,4,4,4,4,4,4)", queryTimes=1)
tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1)
def test_crash_for_state_window4(self):
tdSql.execute("drop database if exists test")
self.prepareTestEnv()
tdSql.execute("alter local 'queryPolicy' '3'")
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("flush database test", queryTimes=1)
time.sleep(2)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1)
def test_crash_for_state_window5(self):
tdSql.execute("drop database if exists test")
self.prepareTestEnv()
tdSql.execute("alter local 'queryPolicy' '3'")
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 2,2,2,2,2,2,2,2,2)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("flush database test", queryTimes=1)
time.sleep(2)
tdSql.execute("insert into t0 values(now, 3,NULL,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("insert into t0 values(now, 3,3,3,3,3,3,3,3,3)", queryTimes=1)
tdSql.execute("select bottom(c1, 1), c2 from t0 state_window(c2) order by ts", queryTimes=1)
def run(self):
self.test_crash_for_state_window1()
self.test_crash_for_state_window2()
self.test_crash_for_state_window3()
self.test_crash_for_state_window4()
self.test_crash_for_state_window5()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -4,7 +4,7 @@ import taos
import sys
import time
import os
import platform
from util.log import *
from util.sql import *
from util.cases import *
@ -96,7 +96,10 @@ class TDTestCase:
packagePath = "/usr/local/src/"
dataPath = cPath + "/../data/"
packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz"
if platform.system() == "Linux" and platform.machine() == "aarch64":
packageName = "TDengine-server-"+ BASEVERSION + "-Linux-arm64.tar.gz"
else:
packageName = "TDengine-server-"+ BASEVERSION + "-Linux-x64.tar.gz"
packageTPath = packageName.split("-Linux-")[0]
my_file = Path(f"{packagePath}/{packageName}")
if not my_file.exists():

View File

@ -4,6 +4,7 @@ import sys
import time
import socket
import os
import platform
import threading
from enum import Enum
@ -184,6 +185,9 @@ class TDTestCase:
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
# ARM64time cost is so long for stopping taosd, so add the pollDdelay to 120s
if platform.system() == "Linux" and platform.machine() == "aarch64":
paraDict['pollDelay'] = 300
tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)

View File

@ -23,7 +23,7 @@ class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -49,7 +49,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -118,7 +118,7 @@ class TDTestCase:
'ctbPrefix': 'ctb1',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -188,7 +188,7 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt / 2 >= resultList[0]:
if expectrowcnt / 2 > resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -23,7 +23,7 @@ class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -49,7 +49,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -118,7 +118,7 @@ class TDTestCase:
'ctbPrefix': 'ctb1',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -189,7 +189,7 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt / 2 >= resultList[0]:
if expectrowcnt / 2 > resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -25,7 +25,7 @@ class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -51,7 +51,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -120,7 +120,7 @@ class TDTestCase:
'ctbPrefix': 'ctb1',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 120,
@ -189,7 +189,7 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt / 2 >= resultList[0]:
if expectrowcnt / 2 > resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -25,7 +25,7 @@ class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -51,7 +51,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -120,7 +120,7 @@ class TDTestCase:
'ctbPrefix': 'ctb1',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 120,
@ -189,7 +189,7 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt / 2 >= resultList[0]:
if expectrowcnt / 2 > resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -27,7 +27,7 @@ class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -53,7 +53,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -122,7 +122,7 @@ class TDTestCase:
'ctbPrefix': 'ctb1',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 120,
@ -192,7 +192,7 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt / 2 >= resultList[0]:
if expectrowcnt / 2 > resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -25,7 +25,7 @@ class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -51,7 +51,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -120,7 +120,7 @@ class TDTestCase:
'ctbPrefix': 'ctb1',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -190,7 +190,7 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt / 2 >= resultList[0]:
if expectrowcnt / 2 > resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -20,7 +20,7 @@ class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -46,7 +46,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 30,
@ -138,7 +138,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
@ -217,7 +217,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,

View File

@ -20,7 +20,7 @@ class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -46,7 +46,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -137,7 +137,7 @@ class TDTestCase:
'ctbPrefix': 'ctb1',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -207,7 +207,7 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt / 2 >= resultList[0]:
if expectrowcnt / 2 > resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -20,7 +20,7 @@ class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
self.rowsPerTbl = 1000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
@ -46,7 +46,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -137,7 +137,7 @@ class TDTestCase:
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'rowsPerTbl': 1000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 60,
@ -203,7 +203,7 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt / 2 >= resultList[0]:
if expectrowcnt / 2 > resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -0,0 +1,110 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.sql import *
from util.common import *
from util.sqlset import *
from util.autogen import *
import random
import time
import traceback
import os
from os import path
class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True)
# autoGen
self.autoGen = AutoGen()
def waitTranslation(self, waitSeconds):
# wait end
for i in range(waitSeconds):
sql ="show transactions;"
rows = tdSql.query(sql)
if rows == 0:
return True
tdLog.info(f"i={i} wait for translation finish ...")
time.sleep(1)
return False
def getPath(self, tool="taosBenchmark"):
if (platform.system().lower() == 'windows'):
tool = tool + ".exe"
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
paths = []
for root, dirs, files in os.walk(projPath):
if ((tool) in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
paths.append(os.path.join(root, tool))
break
if (len(paths) == 0):
tdLog.exit("taosBenchmark not found!")
return
else:
tdLog.info("taosBenchmark found in %s" % paths[0])
return paths[0]
def taosBenchmark(self, param):
binPath = self.getPath()
cmd = f"{binPath} {param}"
tdLog.info(cmd)
os.system(cmd)
# run
def run(self):
# gen data
random.seed(int(time.time()))
self.taosBenchmark(" -d db -t 2 -v 2 -n 1000000 -y")
# create stream
tdSql.execute("use db")
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
sql = "select count(*) from sta"
# loop wait max 60s to check count is ok
tdLog.info("loop wait result ...")
tdSql.checkDataLoop(0, 0, 99999, sql, loopCount=120, waitTime=0.5)
# check all data is correct
sql = "select * from sta where cnt != 20;"
tdSql.query(sql)
tdSql.checkRows(0)
# check ts interval is correct
sql = "select * from ( select diff(_wstart) as tsdif from sta ) where tsdif != 10;"
tdSql.query(sql)
tdSql.checkRows(0)
# stop
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addLinux(__file__, TDTestCase())