Merge branch '3.0' into enh/TD-30554-3.0
This commit is contained in:
commit
50d163f10d
|
@ -432,7 +432,7 @@ The charset that takes effect is UTF-8.
|
||||||
| Applicable | Server Only |
|
| Applicable | Server Only |
|
||||||
| Meaning | Maximum number of threads to commit |
|
| Meaning | Maximum number of threads to commit |
|
||||||
| Value Range | 0-1024 |
|
| Value Range | 0-1024 |
|
||||||
| Default Value | |
|
| Default Value | 4 |
|
||||||
|
|
||||||
## Log Parameters
|
## Log Parameters
|
||||||
|
|
||||||
|
|
|
@ -430,7 +430,7 @@ charset 的有效值是 UTF-8。
|
||||||
| 适用范围 | 仅服务端适用 |
|
| 适用范围 | 仅服务端适用 |
|
||||||
| 含义 | 设置写入线程的最大数量 |
|
| 含义 | 设置写入线程的最大数量 |
|
||||||
| 取值范围 | 0-1024 |
|
| 取值范围 | 0-1024 |
|
||||||
| 缺省值 | |
|
| 缺省值 | 4 |
|
||||||
|
|
||||||
## 日志相关
|
## 日志相关
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ extern int32_t tsNumOfQnodeFetchThreads;
|
||||||
extern int32_t tsNumOfSnodeStreamThreads;
|
extern int32_t tsNumOfSnodeStreamThreads;
|
||||||
extern int32_t tsNumOfSnodeWriteThreads;
|
extern int32_t tsNumOfSnodeWriteThreads;
|
||||||
extern int64_t tsRpcQueueMemoryAllowed;
|
extern int64_t tsRpcQueueMemoryAllowed;
|
||||||
|
extern int32_t tsRetentionSpeedLimitMB;
|
||||||
|
|
||||||
// sync raft
|
// sync raft
|
||||||
extern int32_t tsElectInterval;
|
extern int32_t tsElectInterval;
|
||||||
|
|
|
@ -1362,6 +1362,8 @@ void blockDataEmpty(SSDataBlock* pDataBlock) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFreeClear(pDataBlock->pBlockAgg);
|
||||||
|
|
||||||
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
|
|
|
@ -74,6 +74,7 @@ int32_t tsNumOfSnodeStreamThreads = 4;
|
||||||
int32_t tsNumOfSnodeWriteThreads = 1;
|
int32_t tsNumOfSnodeWriteThreads = 1;
|
||||||
int32_t tsMaxStreamBackendCache = 128; // M
|
int32_t tsMaxStreamBackendCache = 128; // M
|
||||||
int32_t tsPQSortMemThreshold = 16; // M
|
int32_t tsPQSortMemThreshold = 16; // M
|
||||||
|
int32_t tsRetentionSpeedLimitMB = 0; // unlimited
|
||||||
|
|
||||||
// sync raft
|
// sync raft
|
||||||
int32_t tsElectInterval = 25 * 1000;
|
int32_t tsElectInterval = 25 * 1000;
|
||||||
|
@ -668,6 +669,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
|
||||||
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||||
|
@ -1118,6 +1120,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
|
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
|
||||||
|
|
||||||
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
|
||||||
|
tsRetentionSpeedLimitMB = cfgGetItem(pCfg, "retentionSpeedLimitMB")->i32;
|
||||||
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
|
||||||
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
|
||||||
tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval;
|
tsRatioOfVnodeStreamThreads = cfgGetItem(pCfg, "ratioOfVnodeStreamThreads")->fval;
|
||||||
|
|
|
@ -103,8 +103,8 @@ typedef enum {
|
||||||
TRN_CONFLICT_GLOBAL = 1,
|
TRN_CONFLICT_GLOBAL = 1,
|
||||||
TRN_CONFLICT_DB = 2,
|
TRN_CONFLICT_DB = 2,
|
||||||
TRN_CONFLICT_DB_INSIDE = 3,
|
TRN_CONFLICT_DB_INSIDE = 3,
|
||||||
TRN_CONFLICT_TOPIC = 4,
|
// TRN_CONFLICT_TOPIC = 4,
|
||||||
TRN_CONFLICT_TOPIC_INSIDE = 5,
|
// TRN_CONFLICT_TOPIC_INSIDE = 5,
|
||||||
TRN_CONFLICT_ARBGROUP = 6,
|
TRN_CONFLICT_ARBGROUP = 6,
|
||||||
} ETrnConflct;
|
} ETrnConflct;
|
||||||
|
|
||||||
|
|
|
@ -91,7 +91,7 @@ void mndSendConsumerMsg(SMnode *pMnode, int64_t consumerId, uint16_t msgType, SR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *pMnode, const char *pUser,
|
static int32_t validateTopics(const SArray *pTopicList, SMnode *pMnode, const char *pUser,
|
||||||
bool enableReplay) {
|
bool enableReplay) {
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -135,11 +135,6 @@ static int32_t validateTopics(STrans *pTrans, const SArray *pTopicList, SMnode *
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pOneTopic, NULL);
|
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
|
||||||
code = -1;
|
|
||||||
goto FAILED;
|
|
||||||
}
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,12 +172,12 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "recover-csm");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "recover-csm");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
code = -1;
|
code = -1;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
code = validateTopics(pTrans, pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false);
|
code = validateTopics(pConsumer->assignedTopics, pMnode, pMsg->info.conn.user, false);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
@ -675,13 +670,13 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "subscribe");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = validateTopics(pTrans, subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
|
code = validateTopics(subscribe.topicNames, pMnode, pMsg->info.conn.user, subscribe.enableReplay);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
|
@ -618,13 +618,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
char cgroup[TSDB_CGROUP_LEN] = {0};
|
char cgroup[TSDB_CGROUP_LEN] = {0};
|
||||||
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
|
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "tmq-reb");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, topic, cgroup);
|
mndTransSetDbName(pTrans, pOutput->pSub->dbName, cgroup);
|
||||||
code = mndTransCheckConflict(pMnode, pTrans);
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto END;
|
goto END;
|
||||||
|
@ -908,33 +908,37 @@ END:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){
|
static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){
|
||||||
// iter all vnode to delete handle
|
void* pIter = NULL;
|
||||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
SVgObj* pVgObj = NULL;
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
while (1) {
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void**)&pVgObj);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!mndVgroupInDb(pVgObj, pSub->dbUid)) {
|
||||||
|
sdbRelease(pMnode->pSdb, pVgObj);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
||||||
if(pReq == NULL){
|
if(pReq == NULL){
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
sdbRelease(pMnode->pSdb, pVgObj);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pReq->head.vgId = htonl(pVgEp->vgId);
|
pReq->head.vgId = htonl(pVgObj->vgId);
|
||||||
pReq->vgId = pVgEp->vgId;
|
pReq->vgId = pVgObj->vgId;
|
||||||
pReq->consumerId = -1;
|
pReq->consumerId = -1;
|
||||||
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
|
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId);
|
|
||||||
if (pVgObj == NULL) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);;
|
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);;
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
action.contLen = sizeof(SMqVDeleteReq);
|
action.contLen = sizeof(SMqVDeleteReq);
|
||||||
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
|
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
|
||||||
|
action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
||||||
|
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
sdbRelease(pMnode->pSdb, pVgObj);
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -996,7 +1000,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC_INSIDE, pMsg, "drop-cgroup");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "drop-cgroup");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -1004,7 +1008,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
|
||||||
mndTransSetDbName(pTrans, dropReq.topic, dropReq.cgroup);
|
mndTransSetDbName(pTrans, pSub->dbName, dropReq.cgroup);
|
||||||
code = mndTransCheckConflict(pMnode, pTrans);
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto end;
|
goto end;
|
||||||
|
|
|
@ -422,14 +422,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
SQueryPlan *pPlan = NULL;
|
SQueryPlan *pPlan = NULL;
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "create-topic");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq, "create-topic");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
code = -1;
|
code = -1;
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pCreate->name, NULL);
|
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||||
code = mndTransCheckConflict(pMnode, pTrans);
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto _OUT;
|
goto _OUT;
|
||||||
|
@ -779,14 +779,14 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_TOPIC, pReq, "drop-topic");
|
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-topic");
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
|
||||||
code = -1;
|
code = -1;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransSetDbName(pTrans, pTopic->name, NULL);
|
mndTransSetDbName(pTrans, pTopic->db, NULL);
|
||||||
code = mndTransCheckConflict(pMnode, pTrans);
|
code = mndTransCheckConflict(pMnode, pTrans);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto end;
|
goto end;
|
||||||
|
|
|
@ -836,26 +836,26 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNew->conflict == TRN_CONFLICT_TOPIC) {
|
// if (pNew->conflict == TRN_CONFLICT_TOPIC) {
|
||||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||||
if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
// if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||||
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
// if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||||
if (pTrans->conflict == TRN_CONFLICT_TOPIC) {
|
// if (pTrans->conflict == TRN_CONFLICT_TOPIC) {
|
||||||
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||||
}
|
// }
|
||||||
if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
// if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||||
if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0)
|
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0)
|
||||||
conflict = true;
|
// conflict = true;
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
|
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
|
||||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||||
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
|
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
|
||||||
void *pIter = taosHashIterate(pNew->arbGroupIds, NULL);
|
pIter = taosHashIterate(pNew->arbGroupIds, NULL);
|
||||||
while (pIter != NULL) {
|
while (pIter != NULL) {
|
||||||
int32_t groupId = *(int32_t *)pIter;
|
int32_t groupId = *(int32_t *)pIter;
|
||||||
if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) {
|
if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) {
|
||||||
|
|
|
@ -50,6 +50,9 @@ void tqDestroyTqHandle(void* data) {
|
||||||
if (pData->block != NULL) {
|
if (pData->block != NULL) {
|
||||||
blockDataDestroy(pData->block);
|
blockDataDestroy(pData->block);
|
||||||
}
|
}
|
||||||
|
if (pData->pRef) {
|
||||||
|
walCloseRef(pData->pRef->pWal, pData->pRef->refId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||||
|
@ -571,9 +574,6 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
taosMsleep(10);
|
taosMsleep(10);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (pHandle->pRef) {
|
|
||||||
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
|
|
||||||
}
|
|
||||||
|
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
|
|
||||||
|
@ -658,12 +658,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
taosRLockLatch(&pTq->lock);
|
taosRLockLatch(&pTq->lock);
|
||||||
ret = tqMetaGetHandle(pTq, req.subKey);
|
ret = tqMetaGetHandle(pTq, req.subKey);
|
||||||
taosRUnLockLatch(&pTq->lock);
|
taosRUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
if (req.oldConsumerId != -1) {
|
if (req.oldConsumerId != -1) {
|
||||||
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
|
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
|
||||||
|
@ -708,7 +706,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
end:
|
end:
|
||||||
tDecoderClear(&dc);
|
tDecoderClear(&dc);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -352,7 +352,6 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
||||||
|
|
||||||
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
handle->consumerId = req->newConsumerId;
|
handle->consumerId = req->newConsumerId;
|
||||||
handle->epoch = -1;
|
|
||||||
|
|
||||||
handle->execHandle.subType = req->subType;
|
handle->execHandle.subType = req->subType;
|
||||||
handle->fetchMeta = req->withMeta;
|
handle->fetchMeta = req->withMeta;
|
||||||
|
@ -371,7 +370,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
||||||
if(buildHandle(pTq, handle) < 0){
|
if(buildHandle(pTq, handle) < 0){
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer);
|
||||||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -87,7 +87,7 @@ int tqUnregisterPushHandle(STQ* pTq, void *handle) {
|
||||||
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
|
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
|
||||||
tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
|
tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
|
||||||
|
|
||||||
if(pHandle->msg != NULL) {
|
if(ret == 0 && pHandle->msg != NULL) {
|
||||||
// tqPushDataRsp(pHandle, vgId);
|
// tqPushDataRsp(pHandle, vgId);
|
||||||
tqPushEmptyDataRsp(pHandle, vgId);
|
tqPushEmptyDataRsp(pHandle, vgId);
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,34 @@ static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
|
||||||
return TARRAY2_APPEND(&rtner->fopArr, op);
|
return TARRAY2_APPEND(&rtner->fopArr, op);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int64_t tsdbCopyFileWithLimitedSpeed(TdFilePtr from, TdFilePtr to, int64_t size, uint32_t limitMB) {
|
||||||
|
int64_t total = 0;
|
||||||
|
int64_t interval = 1000; // 1s
|
||||||
|
int64_t limit = limitMB ? limitMB * 1024 * 1024 : INT64_MAX;
|
||||||
|
int64_t offset = 0;
|
||||||
|
int64_t remain = size;
|
||||||
|
|
||||||
|
while (remain > 0) {
|
||||||
|
int64_t n;
|
||||||
|
int64_t last = taosGetTimestampMs();
|
||||||
|
if ((n = taosFSendFile(to, from, &offset, TMIN(limit, remain))) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
total += n;
|
||||||
|
remain -= n;
|
||||||
|
|
||||||
|
if (remain > 0) {
|
||||||
|
int64_t elapsed = taosGetTimestampMs() - last;
|
||||||
|
if (elapsed < interval) {
|
||||||
|
taosMsleep(interval - elapsed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
|
static int32_t tsdbDoCopyFileLC(SRTNer *rtner, const STFileObj *from, const STFile *to) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -98,7 +126,8 @@ static int32_t tsdbDoCopyFile(SRTNer *rtner, const STFileObj *from, const STFile
|
||||||
if (fdTo == NULL) code = terrno;
|
if (fdTo == NULL) code = terrno;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtner->szPage));
|
int64_t n = tsdbCopyFileWithLimitedSpeed(fdFrom, fdTo, tsdbLogicToFileSize(from->f->size, rtner->szPage),
|
||||||
|
tsRetentionSpeedLimitMB);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
|
@ -2178,9 +2178,25 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
if (group == NULL || groupByTbname) {
|
if (group == NULL || groupByTbname) {
|
||||||
for (int32_t i = 0; i < numOfTables; i++) {
|
if (tsCountAlwaysReturnValue && QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanNode) && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
|
||||||
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
pTableListInfo->remainGroups =
|
||||||
info->groupId = groupByTbname ? info->uid : 0;
|
taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
|
if (pTableListInfo->remainGroups == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numOfTables; i++) {
|
||||||
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
info->groupId = info->uid;
|
||||||
|
|
||||||
|
taosHashPut(pTableListInfo->remainGroups, &(info->groupId), sizeof(info->groupId), &(info->uid),
|
||||||
|
sizeof(info->uid));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (int32_t i = 0; i < numOfTables; i++) {
|
||||||
|
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
|
||||||
|
info->groupId = groupByTbname ? info->uid : 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableListInfo->oneTableForEachGroup = groupByTbname;
|
pTableListInfo->oneTableForEachGroup = groupByTbname;
|
||||||
|
@ -2193,8 +2209,6 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
} else if (groupByTbname && pScanNode->groupOrderScan) {
|
} else if (groupByTbname && pScanNode->groupOrderScan) {
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
pTableListInfo->numOfOuputGroups = numOfTables;
|
||||||
} else if (groupByTbname && tsCountAlwaysReturnValue && ((STableScanPhysiNode*)pScanNode)->needCountEmptyTable) {
|
|
||||||
pTableListInfo->numOfOuputGroups = numOfTables;
|
|
||||||
} else {
|
} else {
|
||||||
pTableListInfo->numOfOuputGroups = 1;
|
pTableListInfo->numOfOuputGroups = 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -725,7 +725,7 @@ void markGroupProcessed(STableScanInfo* pInfo, uint64_t groupId) {
|
||||||
if (pInfo->countState == TABLE_COUNT_STATE_END) {
|
if (pInfo->countState == TABLE_COUNT_STATE_END) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (pInfo->base.pTableListInfo->oneTableForEachGroup || pInfo->base.pTableListInfo->groupOffset) {
|
if (pInfo->base.pTableListInfo->groupOffset) {
|
||||||
pInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
pInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
||||||
} else {
|
} else {
|
||||||
taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
|
taosHashRemove(pInfo->base.pTableListInfo->remainGroups, &groupId, sizeof(groupId));
|
||||||
|
@ -890,7 +890,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) {
|
if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) {
|
||||||
STableListInfo* pTableListInfo = pTableScanInfo->base.pTableListInfo;
|
STableListInfo* pTableListInfo = pTableScanInfo->base.pTableListInfo;
|
||||||
if (pTableListInfo->oneTableForEachGroup || pTableListInfo->groupOffset) { // group by tbname, group by tag + sort
|
if (pTableListInfo->groupOffset) { // group by tbname, group by tag + sort
|
||||||
if (pTableScanInfo->countState < TABLE_COUNT_STATE_PROCESSED) {
|
if (pTableScanInfo->countState < TABLE_COUNT_STATE_PROCESSED) {
|
||||||
pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
pTableScanInfo->countState = TABLE_COUNT_STATE_PROCESSED;
|
||||||
STableKeyInfo* pStart =
|
STableKeyInfo* pStart =
|
||||||
|
|
|
@ -237,21 +237,19 @@ class ClusterComCheck:
|
||||||
last_number=vgroup_numbers-1
|
last_number=vgroup_numbers-1
|
||||||
while count < count_number:
|
while count < count_number:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
count+=1
|
||||||
|
print("check vgroup count :", count)
|
||||||
tdSql.query(f"show {db_name}.vgroups;")
|
tdSql.query(f"show {db_name}.vgroups;")
|
||||||
if count == 0 :
|
if tdSql.getRows() != vgroup_numbers :
|
||||||
if tdSql.checkRows(vgroup_numbers) :
|
continue
|
||||||
tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" )
|
|
||||||
else:
|
|
||||||
tdLog.exit(f"vgroup number of {db_name} is not correct")
|
|
||||||
if self.db_replica == 1 :
|
if self.db_replica == 1 :
|
||||||
if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader':
|
if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader':
|
||||||
tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';")
|
tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';")
|
||||||
print("db replica :",tdSql.queryResult[0][0])
|
print("db replica :",tdSql.queryResult[0][0])
|
||||||
if tdSql.queryResult[0][0] == db_replica:
|
if tdSql.queryResult[0][0] == db_replica:
|
||||||
ready_time= (count + 1)
|
tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count} s")
|
||||||
tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count + 1} s")
|
|
||||||
return True
|
return True
|
||||||
count+=1
|
|
||||||
elif self.db_replica == 3 :
|
elif self.db_replica == 3 :
|
||||||
vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]]
|
vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]]
|
||||||
|
|
||||||
|
@ -261,10 +259,8 @@ class ClusterComCheck:
|
||||||
tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';")
|
tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';")
|
||||||
print("db replica :",tdSql.queryResult[0][0])
|
print("db replica :",tdSql.queryResult[0][0])
|
||||||
if tdSql.queryResult[0][0] == db_replica:
|
if tdSql.queryResult[0][0] == db_replica:
|
||||||
ready_time= (count + 1)
|
tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {count} s")
|
||||||
tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {ready_time} s")
|
|
||||||
return True
|
return True
|
||||||
count+=1
|
|
||||||
else:
|
else:
|
||||||
tdLog.debug(tdSql.queryResult)
|
tdLog.debug(tdSql.queryResult)
|
||||||
tdLog.notice(f"elections of {db_name} all vgroups with replica {self.db_replica} are failed in {count} s ")
|
tdLog.notice(f"elections of {db_name} all vgroups with replica {self.db_replica} are failed in {count} s ")
|
||||||
|
|
|
@ -219,7 +219,7 @@ class TDTestCase:
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
topicList = topicName1
|
topicList = topicName1
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
ifManualCommit = 1
|
ifManualCommit = 0
|
||||||
keyList = 'group.id:cgrp1,\
|
keyList = 'group.id:cgrp1,\
|
||||||
enable.auto.commit:false,\
|
enable.auto.commit:false,\
|
||||||
auto.commit.interval.ms:6000,\
|
auto.commit.interval.ms:6000,\
|
||||||
|
|
|
@ -157,7 +157,7 @@ class TDTestCase:
|
||||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
|
|
||||||
if self.snapshot == 0:
|
if self.snapshot == 0:
|
||||||
if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)):
|
if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows <= expectrowcnt)):
|
||||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||||
|
|
||||||
tdLog.info("wait subscriptions exit ....")
|
tdLog.info("wait subscriptions exit ....")
|
||||||
|
@ -249,7 +249,7 @@ class TDTestCase:
|
||||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
|
|
||||||
if self.snapshot == 0:
|
if self.snapshot == 0:
|
||||||
if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows < expectrowcnt)):
|
if not ((totalConsumeRows > expectrowcnt / 2) and (totalConsumeRows <= expectrowcnt)):
|
||||||
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
tdLog.exit("tmq consume rows error with snapshot = 0!")
|
||||||
|
|
||||||
tdLog.info("wait subscriptions exit ....")
|
tdLog.info("wait subscriptions exit ....")
|
||||||
|
|
|
@ -200,12 +200,11 @@ class TDTestCase:
|
||||||
|
|
||||||
# tmqCom.checkFileContent(consumerId, queryString)
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
|
||||||
time.sleep(2)
|
|
||||||
for i in range(len(topicNameList)):
|
for i in range(len(topicNameList)):
|
||||||
tdSql.query("drop topic %s"%topicNameList[i])
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
if deleteWal == True:
|
clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240)
|
||||||
clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240)
|
|
||||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
|
@ -199,13 +199,11 @@ class TDTestCase:
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
# tmqCom.checkFileContent(consumerId, queryString)
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240)
|
||||||
|
|
||||||
time.sleep(2)
|
time.sleep(3)
|
||||||
for i in range(len(topicNameList)):
|
for i in range(len(topicNameList)):
|
||||||
tdSql.query("drop topic %s"%topicNameList[i])
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
if deleteWal == True:
|
|
||||||
clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240)
|
|
||||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
Loading…
Reference in New Issue