Merge pull request #23860 from taosdata/fix/TD-27455
fix:tmqVnodeSplit-column.py failed in arm64 because of rebalance after unsubscribe while split vnode
This commit is contained in:
commit
7e63943c4e
|
@ -291,12 +291,17 @@ static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
|
// pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
|
||||||
if (pObj != NULL) {
|
// if (pObj != NULL) {
|
||||||
|
// terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
|
||||||
|
// goto _OVER;
|
||||||
|
// } else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
|
||||||
|
// goto _OVER;
|
||||||
|
// }
|
||||||
|
|
||||||
|
if (sdbGetSize(pMnode->pSdb, SDB_SNODE) >= 1){
|
||||||
terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
|
terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
} else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
|
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
|
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
|
||||||
|
@ -314,7 +319,7 @@ _OVER:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseSnode(pMnode, pObj);
|
// mndReleaseSnode(pMnode, pObj);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
tFreeSMCreateQnodeReq(&createReq);
|
tFreeSMCreateQnodeReq(&createReq);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -363,35 +363,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) {
|
|
||||||
if (NULL == ast) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SNode * pAst = NULL;
|
|
||||||
int32_t code = nodesStringToNode(ast, &pAst);
|
|
||||||
|
|
||||||
SQueryPlan *pPlan = NULL;
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
SPlanContext cxt = {
|
|
||||||
.pAstRoot = pAst,
|
|
||||||
.topicQuery = false,
|
|
||||||
.streamQuery = true,
|
|
||||||
.triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
|
|
||||||
.watermark = watermark,
|
|
||||||
};
|
|
||||||
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL);
|
|
||||||
}
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
nodesDestroyNode((SNode *)pPlan);
|
|
||||||
terrno = code;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
||||||
SNode * pAst = NULL;
|
SNode * pAst = NULL;
|
||||||
SQueryPlan *pPlan = NULL;
|
SQueryPlan *pPlan = NULL;
|
||||||
|
@ -733,11 +704,20 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
|
||||||
pReq->streamId = pTask->id.streamId;
|
pReq->streamId = pTask->id.streamId;
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
SEpSet epset = {0};
|
SEpSet epset = {0};
|
||||||
if (pTask->info.nodeId == SNODE_HANDLE) {
|
if(pTask->info.nodeId == SNODE_HANDLE){
|
||||||
SSnodeObj *pObj = mndAcquireSnode(pMnode, pTask->info.nodeId);
|
SSnodeObj *pObj = NULL;
|
||||||
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
void *pIter = NULL;
|
||||||
} else {
|
while (1) {
|
||||||
|
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
||||||
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
|
}
|
||||||
|
}else{
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||||
epset = mndGetVgroupEpset(pMnode, pVgObj);
|
epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
|
|
|
@ -161,6 +161,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
|
||||||
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
|
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
|
||||||
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
|
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
|
||||||
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
||||||
|
if(pRebVg->oldConsumerId == -1) return 0; //drop stream, no consumer, while split vnode,all consumerId is -1
|
||||||
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -337,7 +337,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
STqHandle* pHandle = *(STqHandle**)pIter;
|
STqHandle* pHandle = *(STqHandle**)pIter;
|
||||||
tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
|
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
|
||||||
|
|
||||||
if (ASSERT(pHandle->msg != NULL)) {
|
if (ASSERT(pHandle->msg != NULL)) {
|
||||||
tqError("pHandle->msg should not be null");
|
tqError("pHandle->msg should not be null");
|
||||||
|
|
|
@ -72,7 +72,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
||||||
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
||||||
pHandle->msg->contLen = pMsg->contLen;
|
pHandle->msg->contLen = pMsg->contLen;
|
||||||
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
|
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
|
||||||
tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
|
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
|
||||||
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -247,7 +247,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists"
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_NOT_EXIST, "Mnode not there")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_NOT_EXIST, "Mnode not there")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_ALREADY_EXIST, "Qnode already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_ALREADY_EXIST, "Qnode already exists")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_NOT_EXIST, "Qnode not there")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_QNODE_NOT_EXIST, "Qnode not there")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists")
|
//TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode can only be created 1")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "The replica of mnode cannot less than 1")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "The replica of mnode cannot less than 1")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "The replica of mnode cannot exceed 3")
|
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_MNODES, "The replica of mnode cannot exceed 3")
|
||||||
|
|
|
@ -113,11 +113,7 @@ sql_error drop snode on dnode 2
|
||||||
|
|
||||||
print =============== create drop snodes
|
print =============== create drop snodes
|
||||||
sql create snode on dnode 1
|
sql create snode on dnode 1
|
||||||
sql create snode on dnode 2
|
sql_error create snode on dnode 2
|
||||||
sql show snodes
|
|
||||||
if $rows != 2 then
|
|
||||||
return -1
|
|
||||||
endi
|
|
||||||
|
|
||||||
print =============== restart
|
print =============== restart
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
@ -127,7 +123,7 @@ system sh/exec.sh -n dnode2 -s start
|
||||||
|
|
||||||
sleep 2000
|
sleep 2000
|
||||||
sql show snodes
|
sql show snodes
|
||||||
if $rows != 2 then
|
if $rows != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -49,7 +49,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -118,7 +118,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
|
|
@ -23,7 +23,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -49,7 +49,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -118,7 +118,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -189,7 +189,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -51,7 +51,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -120,7 +120,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 120,
|
'pollDelay': 120,
|
||||||
|
@ -189,7 +189,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -51,7 +51,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -120,7 +120,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 120,
|
'pollDelay': 120,
|
||||||
|
@ -189,7 +189,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -53,7 +53,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -122,7 +122,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 120,
|
'pollDelay': 120,
|
||||||
|
@ -192,7 +192,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -51,7 +51,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -120,7 +120,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -190,7 +190,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -46,7 +46,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 30,
|
'pollDelay': 30,
|
||||||
|
@ -138,7 +138,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 10,
|
'pollDelay': 10,
|
||||||
|
@ -217,7 +217,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 10,
|
'pollDelay': 10,
|
||||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -46,7 +46,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -137,7 +137,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb1',
|
'ctbPrefix': 'ctb1',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -207,7 +207,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ class TDTestCase:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 1
|
self.vgroups = 1
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
self.rowsPerTbl = 10000
|
self.rowsPerTbl = 1000
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
self.replicaVar = int(replicaVar)
|
self.replicaVar = int(replicaVar)
|
||||||
|
@ -46,7 +46,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -137,7 +137,7 @@ class TDTestCase:
|
||||||
'ctbPrefix': 'ctb',
|
'ctbPrefix': 'ctb',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
'ctbNum': 10,
|
'ctbNum': 10,
|
||||||
'rowsPerTbl': 10000,
|
'rowsPerTbl': 1000,
|
||||||
'batchNum': 10,
|
'batchNum': 10,
|
||||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||||
'pollDelay': 60,
|
'pollDelay': 60,
|
||||||
|
@ -203,7 +203,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue