Merge branch 'fix/TD-30365' of https://github.com/taosdata/TDengine into fix/TD-30365
This commit is contained in:
commit
a644814674
|
@ -912,6 +912,11 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
|
||||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
||||||
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId);
|
||||||
|
if (pVgObj == NULL) {
|
||||||
|
mError("sendDeleteSubToVnode %s failed since vg %d doesn't exist", pSub->key, pVgEp->vgId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq));
|
||||||
if(pReq == NULL){
|
if(pReq == NULL){
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -922,17 +927,12 @@ static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STran
|
||||||
pReq->consumerId = -1;
|
pReq->consumerId = -1;
|
||||||
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
|
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId);
|
|
||||||
if (pVgObj == NULL) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);;
|
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);;
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
action.contLen = sizeof(SMqVDeleteReq);
|
action.contLen = sizeof(SMqVDeleteReq);
|
||||||
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
|
action.msgType = TDMT_VND_TMQ_DELETE_SUB;
|
||||||
|
action.acceptableCode = TSDB_CODE_MND_VGROUP_NOT_EXIST;
|
||||||
|
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
|
|
@ -649,21 +649,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
||||||
req.oldConsumerId, req.newConsumerId);
|
req.oldConsumerId, req.newConsumerId);
|
||||||
|
|
||||||
STqHandle* pHandle = NULL;
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
while (1) {
|
|
||||||
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
|
||||||
if (pHandle) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
taosRLockLatch(&pTq->lock);
|
|
||||||
ret = tqMetaGetHandle(pTq, req.subKey);
|
|
||||||
taosRUnLockLatch(&pTq->lock);
|
|
||||||
|
|
||||||
if (ret < 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
if (req.oldConsumerId != -1) {
|
if (req.oldConsumerId != -1) {
|
||||||
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
|
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
|
||||||
|
@ -698,10 +684,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||||
req.newConsumerId);
|
req.newConsumerId);
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
|
||||||
atomic_store_32(&pHandle->epoch, 0);
|
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
taosHashRemove(pTq->pHandle, pHandle->subKey, strlen(pHandle->subKey));
|
||||||
|
|
||||||
|
// update handle to avoid req->qmsg changed if spilt vnode is failed
|
||||||
|
STqHandle handle = {0};
|
||||||
|
ret = tqCreateHandle(pTq, &req, &handle);
|
||||||
|
if (ret < 0) {
|
||||||
|
tqDestroyTqHandle(&handle);
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
|
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -351,7 +351,6 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
||||||
|
|
||||||
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
handle->consumerId = req->newConsumerId;
|
handle->consumerId = req->newConsumerId;
|
||||||
handle->epoch = -1;
|
|
||||||
|
|
||||||
handle->execHandle.subType = req->subType;
|
handle->execHandle.subType = req->subType;
|
||||||
handle->fetchMeta = req->withMeta;
|
handle->fetchMeta = req->withMeta;
|
||||||
|
|
|
@ -87,7 +87,7 @@ int tqUnregisterPushHandle(STQ* pTq, void *handle) {
|
||||||
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
|
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
|
||||||
tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
|
tqInfo("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
|
||||||
|
|
||||||
if(pHandle->msg != NULL) {
|
if(ret == 0 && pHandle->msg != NULL) {
|
||||||
// tqPushDataRsp(pHandle, vgId);
|
// tqPushDataRsp(pHandle, vgId);
|
||||||
tqPushEmptyDataRsp(pHandle, vgId);
|
tqPushEmptyDataRsp(pHandle, vgId);
|
||||||
|
|
||||||
|
|
|
@ -1314,6 +1314,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6
|
||||||
pTableScanInfo->tableEndIndex = -1;
|
pTableScanInfo->tableEndIndex = -1;
|
||||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
|
pTableScanInfo->scanMode = TABLE_SCAN__BLOCK_ORDER;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
|
static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs,
|
||||||
|
|
|
@ -237,21 +237,19 @@ class ClusterComCheck:
|
||||||
last_number=vgroup_numbers-1
|
last_number=vgroup_numbers-1
|
||||||
while count < count_number:
|
while count < count_number:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
count+=1
|
||||||
|
print("check vgroup count :", count)
|
||||||
tdSql.query(f"show {db_name}.vgroups;")
|
tdSql.query(f"show {db_name}.vgroups;")
|
||||||
if count == 0 :
|
if tdSql.getRows() != vgroup_numbers :
|
||||||
if tdSql.checkRows(vgroup_numbers) :
|
continue
|
||||||
tdLog.success(f"{db_name} has {vgroup_numbers} vgroups" )
|
|
||||||
else:
|
|
||||||
tdLog.exit(f"vgroup number of {db_name} is not correct")
|
|
||||||
if self.db_replica == 1 :
|
if self.db_replica == 1 :
|
||||||
if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader':
|
if tdSql.queryResult[0][4] == 'leader' and tdSql.queryResult[last_number][4] == 'leader':
|
||||||
tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';")
|
tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';")
|
||||||
print("db replica :",tdSql.queryResult[0][0])
|
print("db replica :",tdSql.queryResult[0][0])
|
||||||
if tdSql.queryResult[0][0] == db_replica:
|
if tdSql.queryResult[0][0] == db_replica:
|
||||||
ready_time= (count + 1)
|
tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count} s")
|
||||||
tdLog.success(f"all vgroups with replica {self.db_replica} of {db_name} are leaders in {count + 1} s")
|
|
||||||
return True
|
return True
|
||||||
count+=1
|
|
||||||
elif self.db_replica == 3 :
|
elif self.db_replica == 3 :
|
||||||
vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]]
|
vgroup_status_first=[tdSql.queryResult[0][4],tdSql.queryResult[0][6],tdSql.queryResult[0][8]]
|
||||||
|
|
||||||
|
@ -261,10 +259,8 @@ class ClusterComCheck:
|
||||||
tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';")
|
tdSql.query(f"select `replica` from information_schema.ins_databases where `name`='{db_name}';")
|
||||||
print("db replica :",tdSql.queryResult[0][0])
|
print("db replica :",tdSql.queryResult[0][0])
|
||||||
if tdSql.queryResult[0][0] == db_replica:
|
if tdSql.queryResult[0][0] == db_replica:
|
||||||
ready_time= (count + 1)
|
tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {count} s")
|
||||||
tdLog.success(f"elections of {db_name}.vgroups with replica {self.db_replica} are ready in {ready_time} s")
|
|
||||||
return True
|
return True
|
||||||
count+=1
|
|
||||||
else:
|
else:
|
||||||
tdLog.debug(tdSql.queryResult)
|
tdLog.debug(tdSql.queryResult)
|
||||||
tdLog.notice(f"elections of {db_name} all vgroups with replica {self.db_replica} are failed in {count} s ")
|
tdLog.notice(f"elections of {db_name} all vgroups with replica {self.db_replica} are failed in {count} s ")
|
||||||
|
|
|
@ -200,12 +200,11 @@ class TDTestCase:
|
||||||
|
|
||||||
# tmqCom.checkFileContent(consumerId, queryString)
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
|
||||||
time.sleep(2)
|
|
||||||
for i in range(len(topicNameList)):
|
for i in range(len(topicNameList)):
|
||||||
tdSql.query("drop topic %s"%topicNameList[i])
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
if deleteWal == True:
|
clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240)
|
||||||
clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240)
|
|
||||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
|
@ -199,13 +199,11 @@ class TDTestCase:
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
# tmqCom.checkFileContent(consumerId, queryString)
|
# tmqCom.checkFileContent(consumerId, queryString)
|
||||||
|
clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240)
|
||||||
|
|
||||||
time.sleep(2)
|
time.sleep(3)
|
||||||
for i in range(len(topicNameList)):
|
for i in range(len(topicNameList)):
|
||||||
tdSql.query("drop topic %s"%topicNameList[i])
|
tdSql.query("drop topic %s"%topicNameList[i])
|
||||||
|
|
||||||
if deleteWal == True:
|
|
||||||
clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240)
|
|
||||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
|
Loading…
Reference in New Issue