fix:pHandle->msg is not null if rebalance
This commit is contained in:
parent
41bec8560a
commit
d1e5d6e0f9
|
@ -1363,6 +1363,7 @@ CREATE_MSG_FAIL:
|
||||||
typedef struct SVgroupSaveInfo {
|
typedef struct SVgroupSaveInfo {
|
||||||
STqOffsetVal offset;
|
STqOffsetVal offset;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
|
int32_t vgStatus;
|
||||||
} SVgroupSaveInfo;
|
} SVgroupSaveInfo;
|
||||||
|
|
||||||
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
|
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
|
||||||
|
@ -1398,7 +1399,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
.currentOffset = offsetNew,
|
.currentOffset = offsetNew,
|
||||||
.vgId = pVgEp->vgId,
|
.vgId = pVgEp->vgId,
|
||||||
.epSet = pVgEp->epSet,
|
.epSet = pVgEp->epSet,
|
||||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
.vgStatus = pInfo != NULL ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
|
||||||
.vgSkipCnt = 0,
|
.vgSkipCnt = 0,
|
||||||
.emptyBlockReceiveTs = 0,
|
.emptyBlockReceiveTs = 0,
|
||||||
.numOfRows = numOfRows,
|
.numOfRows = numOfRows,
|
||||||
|
@ -1457,7 +1458,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
||||||
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
||||||
vgKey, buf);
|
vgKey, buf);
|
||||||
|
|
||||||
SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows};
|
SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows, .vgStatus = pVgCur->vgStatus};
|
||||||
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
|
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1079,11 +1079,11 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
|
|
||||||
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
|
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
tqDebug("vgId:%d start set submit for subscribe", vgId);
|
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){
|
for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){
|
||||||
STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i);
|
STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i);
|
||||||
|
tqDebug("vgId:%d start set submit for pHandle:%p", vgId, pHandle);
|
||||||
if(ASSERT(pHandle->msg != NULL)){
|
if(ASSERT(pHandle->msg != NULL)){
|
||||||
tqError("pHandle->msg should not be null");
|
tqError("pHandle->msg should not be null");
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -181,17 +181,16 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
// code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
// code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
// lock
|
// lock
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
if(ASSERT(pHandle->msg == NULL)){
|
// tqDebug("data is over, register to handle:%p, msg:%p", pHandle, pHandle->msg);
|
||||||
tqError("pHandle->msg should be null");
|
if(pHandle->msg == NULL){
|
||||||
taosWUnLockLatch(&pTq->lock);
|
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||||
goto end;
|
|
||||||
}
|
}
|
||||||
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
|
||||||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||||
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
||||||
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
||||||
pHandle->msg->contLen = pMsg->contLen;
|
pHandle->msg->contLen = pMsg->contLen;
|
||||||
tqError("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
tqDebug("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
||||||
taosArrayPush(pTq->pPushArray, &pHandle);
|
taosArrayPush(pTq->pPushArray, &pHandle);
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
|
Loading…
Reference in New Issue