diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9e60f8b04d..54e929c9a4 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1363,6 +1363,7 @@ CREATE_MSG_FAIL: typedef struct SVgroupSaveInfo { STqOffsetVal offset; int64_t numOfRows; + int32_t vgStatus; } SVgroupSaveInfo; static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap, @@ -1398,7 +1399,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .currentOffset = offsetNew, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, - .vgStatus = TMQ_VG_STATUS__IDLE, + .vgStatus = pInfo != NULL ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, .numOfRows = numOfRows, @@ -1457,7 +1458,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows}; + SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows, .vgStatus = pVgCur->vgStatus}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8e35183ebb..0080feadbe 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1079,11 +1079,11 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); - tqDebug("vgId:%d start set submit for subscribe", vgId); taosWLockLatch(&pTq->lock); for(size_t i = 0; i < taosArrayGetSize(pTq->pPushArray); i++){ STqHandle* pHandle = (STqHandle*)taosArrayGetP(pTq->pPushArray, i); + tqDebug("vgId:%d start set submit for pHandle:%p", vgId, pHandle); if(ASSERT(pHandle->msg != NULL)){ tqError("pHandle->msg should not be null"); break; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 3f92414c34..663dc8bbb9 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -181,17 +181,16 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); // lock taosWLockLatch(&pTq->lock); - if(ASSERT(pHandle->msg == NULL)){ - tqError("pHandle->msg should be null"); - taosWUnLockLatch(&pTq->lock); - goto end; +// tqDebug("data is over, register to handle:%p, msg:%p", pHandle, pHandle->msg); + if(pHandle->msg == NULL){ + pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); } - pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; - tqError("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen); + tqDebug("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen); taosArrayPush(pTq->pPushArray, &pHandle); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp);