diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9fc25be2e7..3fbde36e89 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1877,6 +1877,23 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p return 0; } +static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, int64_t ever, int64_t consumerId){ + if (!pVg->seekUpdated) { + tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); + pVg->offsetInfo.currentOffset = *offset; + } else { + tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); + } + + // update the status + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + + // update the valid wal version range + pVg->offsetInfo.walVerBegin = sver; + pVg->offsetInfo.walVerEnd = ever; + pVg->receivedInfoFromVnode = true; +} + static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); @@ -1925,22 +1942,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } - // update the local offset value only for the returned values, only when the local offset is NOT updated - // by tmq_offset_seek function - if (!pVg->seekUpdated) { - tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId); - pVg->offsetInfo.currentOffset = pDataRsp->rspOffset; - } else { - tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId); - } - - // update the status - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - - // update the valid wal version range - pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver; - pVg->offsetInfo.walVerEnd = pDataRsp->head.walever; - pVg->receivedInfoFromVnode = true; + updateVgInfo(pVg, &pDataRsp->rspOffset, pDataRsp->head.walsver, pDataRsp->head.walever, tmq->consumerId); char buf[TSDB_OFFSET_LEN]; tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); @@ -1990,11 +1992,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate - pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset; - } - - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); taosFreeQitem(pollRspWrapper); @@ -2022,18 +2020,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return NULL; } - // update the local offset value only for the returned values, only when the local offset is NOT updated - // by tmq_offset_seek function - if (!pVg->seekUpdated) { - if(pollRspWrapper->taosxRsp.rspOffset.type != 0) { // if offset is validate - tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId); - pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset; - } - } else { - tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId); - } - - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + updateVgInfo(pVg, &pollRspWrapper->taosxRsp.rspOffset, pollRspWrapper->taosxRsp.head.walsver, pollRspWrapper->taosxRsp.head.walever, tmq->consumerId); if (pollRspWrapper->taosxRsp.blockNum == 0) { tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, @@ -2615,6 +2602,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->vgId = pClientVg->vgId; + tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, + pAssignment->vgId, pAssignment->currentOffset); } if (needFetch) { @@ -2690,7 +2679,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset); tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, - tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); + tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); } @@ -2718,17 +2707,17 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; +// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); +// char offsetBuf[TSDB_OFFSET_LEN] = {0}; +// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); + tscInfo("vgId:%d offset is old to:%"PRId64, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; - pOffsetInfo->currentOffset.version = p->currentOffset; - pOffsetInfo->committedOffset.version = p->currentOffset; +// pOffsetInfo->currentOffset.version = p->currentOffset; +// pOffsetInfo->committedOffset.version = p->currentOffset; } } } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7ecd994b5a..48de21199b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -275,7 +275,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId); + mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a293858207..2d757fff08 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -560,48 +560,32 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, &req); - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); - if (pOffset != NULL) { - if (pOffset->val.type != TMQ_OFFSET__LOG) { - tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s use snapshot, no valid wal info", consumerId, vgId, req.subKey); - terrno = TSDB_CODE_INVALID_PARA; - tDeleteMqDataRsp(&dataRsp); - return -1; - } - - dataRsp.rspOffset.type = TMQ_OFFSET__LOG; - dataRsp.rspOffset.version = pOffset->val.version; - } else { - if (req.useSnapshot == true) { - tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); - terrno = TSDB_CODE_INVALID_PARA; - tDeleteMqDataRsp(&dataRsp); - return -1; - } - - dataRsp.rspOffset.type = TMQ_OFFSET__LOG; - - if (reqOffset.type == TMQ_OFFSET__LOG) { - int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); - if (currentVer == -1) { // not start to read data from wal yet, return req offset directly - dataRsp.rspOffset.version = reqOffset.version; - } else { - dataRsp.rspOffset.version = currentVer; // return current consume offset value - } - } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { - dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position - } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - dataRsp.rspOffset.version = ever; - } else { - tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, - reqOffset.type); - terrno = TSDB_CODE_INVALID_PARA; - tDeleteMqDataRsp(&dataRsp); - return -1; - } + if (req.useSnapshot == true) { + tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); + terrno = TSDB_CODE_INVALID_PARA; + tDeleteMqDataRsp(&dataRsp); + return -1; } + dataRsp.rspOffset.type = TMQ_OFFSET__LOG; + + if (reqOffset.type == TMQ_OFFSET__LOG) { + dataRsp.rspOffset.version = reqOffset.version; + } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { + dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position + } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { + dataRsp.rspOffset.version = ever; + } else { + tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s invalid offset type:%d", consumerId, vgId, req.subKey, + reqOffset.type); + terrno = TSDB_CODE_INVALID_PARA; + tDeleteMqDataRsp(&dataRsp); + return -1; + } + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); + tDeleteMqDataRsp(&dataRsp); return 0; }