fix:pHandle not load if taosd restart

This commit is contained in:
wangmm0220 2023-07-28 11:48:53 +08:00
parent 5fa580960e
commit 2f6c565392
3 changed files with 17 additions and 9 deletions

View File

@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
bool mndRebTryStart() { bool mndRebTryStart() {
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mInfo("tq timer, rebalance counter old val:%d", old); mDebug("tq timer, rebalance counter old val:%d", old);
return old == 0; return old == 0;
} }
@ -116,7 +116,7 @@ void mndRebCntDec() {
int32_t newVal = val - 1; int32_t newVal = val - 1;
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal); int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
if (oldVal == val) { if (oldVal == val) {
mInfo("rebalance trans end, rebalance counter:%d", newVal); mDebug("rebalance trans end, rebalance counter:%d", newVal);
break; break;
} }
} }

View File

@ -522,10 +522,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
// 1. find handle // 1. find handle
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); do{
terrno = TSDB_CODE_INVALID_MSG; if (tqMetaGetHandle(pTq, req.subKey) == 0){
taosWUnLockLatch(&pTq->lock); pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
return -1; if(pHandle != NULL){
break;
}
}
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
taosWUnLockLatch(&pTq->lock);
return -1;
}while(0);
} }
// 2. check re-balance status // 2. check re-balance status

View File

@ -338,7 +338,7 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
return -1; return -1;
} }
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid); tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid);
handle->execHandle.pTqReader = tqReaderOpen(pVnode); handle->execHandle.pTqReader = tqReaderOpen(pVnode);
tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL); tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
taosArrayDestroy(tbUidList); taosArrayDestroy(tbUidList);
@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
if(buildHandle(pTq, handle) < 0){ if(buildHandle(pTq, handle) < 0){
return -1; return -1;
} }
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
} }
@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
if(buildHandle(pTq, handle) < 0){ if(buildHandle(pTq, handle) < 0){
return -1; return -1;
} }
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId); tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle)); return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
} }