fix:tmqVnodeSplit-column.py failed in arm64 because of rebalance after unsubscribe while split vnode
This commit is contained in:
parent
02575e0f28
commit
4f168b4b2d
|
@ -328,35 +328,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64_t watermark, char **pStr) {
|
|
||||||
if (NULL == ast) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SNode * pAst = NULL;
|
|
||||||
int32_t code = nodesStringToNode(ast, &pAst);
|
|
||||||
|
|
||||||
SQueryPlan *pPlan = NULL;
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
SPlanContext cxt = {
|
|
||||||
.pAstRoot = pAst,
|
|
||||||
.topicQuery = false,
|
|
||||||
.streamQuery = true,
|
|
||||||
.triggerType = (triggerType == STREAM_TRIGGER_MAX_DELAY) ? STREAM_TRIGGER_WINDOW_CLOSE : triggerType,
|
|
||||||
.watermark = watermark,
|
|
||||||
};
|
|
||||||
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
|
||||||
code = nodesNodeToString((SNode *)pPlan, false, pStr, NULL);
|
|
||||||
}
|
|
||||||
nodesDestroyNode(pAst);
|
|
||||||
nodesDestroyNode((SNode *)pPlan);
|
|
||||||
terrno = code;
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
|
||||||
SNode * pAst = NULL;
|
SNode * pAst = NULL;
|
||||||
SQueryPlan *pPlan = NULL;
|
SQueryPlan *pPlan = NULL;
|
||||||
|
@ -777,6 +748,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
SDbObj * pDb = NULL;
|
SDbObj * pDb = NULL;
|
||||||
SCMCreateStreamReq createStreamReq = {0};
|
SCMCreateStreamReq createStreamReq = {0};
|
||||||
SStreamObj streamObj = {0};
|
SStreamObj streamObj = {0};
|
||||||
|
char* sql = NULL;
|
||||||
|
|
||||||
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
|
if (tDeserializeSCMCreateStreamReq(pReq->pCont, pReq->contLen, &createStreamReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -808,7 +780,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* sql = NULL;
|
|
||||||
int32_t sqlLen = 0;
|
int32_t sqlLen = 0;
|
||||||
if(createStreamReq.sql != NULL){
|
if(createStreamReq.sql != NULL){
|
||||||
sqlLen = strlen(createStreamReq.sql);
|
sqlLen = strlen(createStreamReq.sql);
|
||||||
|
|
|
@ -161,6 +161,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj
|
||||||
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
|
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
|
||||||
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
|
const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
|
||||||
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
|
||||||
|
if(pRebVg->oldConsumerId == -1) return 0; //drop stream, no consumer, while split vnode,all consumerId is -1
|
||||||
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -337,7 +337,7 @@ int32_t tqProcessPollPush(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
STqHandle* pHandle = *(STqHandle**)pIter;
|
STqHandle* pHandle = *(STqHandle**)pIter;
|
||||||
tqInfo("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
|
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
|
||||||
|
|
||||||
if (ASSERT(pHandle->msg != NULL)) {
|
if (ASSERT(pHandle->msg != NULL)) {
|
||||||
tqError("pHandle->msg should not be null");
|
tqError("pHandle->msg should not be null");
|
||||||
|
|
|
@ -72,7 +72,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
||||||
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
||||||
pHandle->msg->contLen = pMsg->contLen;
|
pHandle->msg->contLen = pMsg->contLen;
|
||||||
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
|
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
|
||||||
tqInfo("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
|
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
|
||||||
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -188,7 +188,7 @@ class TDTestCase:
|
||||||
expectRows = 1
|
expectRows = 1
|
||||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
if expectrowcnt / 2 >= resultList[0]:
|
if expectrowcnt / 2 > resultList[0]:
|
||||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0]))
|
||||||
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
tdLog.exit("%d tmq consume rows error!"%consumerId)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue