From bdfeb329230f95708c1246d9cecf14b2e3adb3cd Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 5 Jul 2023 16:04:34 +0800 Subject: [PATCH 1/3] fix:bugs in tmq_get_topic_assignment --- source/client/src/clientTmq.c | 52 ++++++++++++++++++---------------- source/dnode/vnode/src/tq/tq.c | 10 ++----- 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 83550aa15d..accaa8fa1d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2583,6 +2583,8 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->vgId = pClientVg->vgId; + tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, + pAssignment->vgId, pAssignment->currentOffset); } if (needFetch) { @@ -2673,34 +2675,36 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a int32_t num = taosArrayGetSize(pCommon->pList); for(int32_t i = 0; i < num; ++i) { (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); + tscInfo("consumer:0x%" PRIx64 " get assignment from server:%d->%" PRId64, tmq->consumerId, + (*assignment)[i].vgId, (*assignment)[i].currentOffset); } *numOfAssignment = num; } - for (int32_t j = 0; j < (*numOfAssignment); ++j) { - tmq_topic_assignment* p = &(*assignment)[j]; - - for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) { - SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId != p->vgId) { - continue; - } - - SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - - pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - - char offsetBuf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - - tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf); - - pOffsetInfo->walVerBegin = p->begin; - pOffsetInfo->walVerEnd = p->end; - pOffsetInfo->currentOffset.version = p->currentOffset; - pOffsetInfo->committedOffset.version = p->currentOffset; - } - } +// for (int32_t j = 0; j < (*numOfAssignment); ++j) { +// tmq_topic_assignment* p = &(*assignment)[j]; +// +// for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) { +// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); +// if (pClientVg->vgId != p->vgId) { +// continue; +// } +// +// SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; +// +// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; +// +// char offsetBuf[80] = {0}; +// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); +// +// tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf); +// +// pOffsetInfo->walVerBegin = p->begin; +// pOffsetInfo->walVerEnd = p->end; +// pOffsetInfo->currentOffset.version = p->currentOffset; +// pOffsetInfo->committedOffset.version = p->currentOffset; +// } +// } destroyCommonInfo(pCommon); return code; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a293858207..0989b980ea 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -571,6 +571,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; dataRsp.rspOffset.version = pOffset->val.version; + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from stroe:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); } else { if (req.useSnapshot == true) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); @@ -581,14 +582,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; - if (reqOffset.type == TMQ_OFFSET__LOG) { - int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader); - if (currentVer == -1) { // not start to read data from wal yet, return req offset directly - dataRsp.rspOffset.version = reqOffset.version; - } else { - dataRsp.rspOffset.version = currentVer; // return current consume offset value - } - } else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { + if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEST) { dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { dataRsp.rspOffset.version = ever; From 34a62c3f3845ef8bd72b417039da7edfe0d28f49 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 5 Jul 2023 22:55:41 +0800 Subject: [PATCH 2/3] fix:vginfo assigment not in same time,we should get vginfo by vg,not all vg --- source/client/src/clientTmq.c | 14 ++++++++------ source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- source/dnode/vnode/src/tq/tq.c | 3 ++- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index accaa8fa1d..62cb5d2bb2 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2565,15 +2565,15 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } bool needFetch = false; - + int32_t index = 0; for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); if (!pClientVg->receivedInfoFromVnode) { needFetch = true; - break; + continue; } - tmq_topic_assignment* pAssignment = &(*assignment)[j]; + tmq_topic_assignment* pAssignment = &(*assignment)[index++]; if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) { pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version; } else { @@ -2603,7 +2603,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a terrno = TSDB_CODE_OUT_OF_MEMORY; for (int32_t i = 0; i < (*numOfAssignment); ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - + if (pClientVg->receivedInfoFromVnode) { + continue; + } SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam)); if (pParam == NULL) { destroyCommonInfo(pCommon); @@ -2674,11 +2676,11 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } else { int32_t num = taosArrayGetSize(pCommon->pList); for(int32_t i = 0; i < num; ++i) { - (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); + (*assignment)[index++] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i); tscInfo("consumer:0x%" PRIx64 " get assignment from server:%d->%" PRId64, tmq->consumerId, (*assignment)[i].vgId, (*assignment)[i].currentOffset); } - *numOfAssignment = num; + *numOfAssignment = index; } // for (int32_t j = 0; j < (*numOfAssignment); ++j) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7ecd994b5a..48de21199b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -275,7 +275,7 @@ static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pI taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); taosArrayPush(pOutput->newConsumers, &consumerId); - mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId); + mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, pSubKey, consumerId); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0989b980ea..baba735d6c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -571,7 +571,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { dataRsp.rspOffset.type = TMQ_OFFSET__LOG; dataRsp.rspOffset.version = pOffset->val.version; - tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from stroe:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from store:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); } else { if (req.useSnapshot == true) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s snapshot not support wal info", consumerId, vgId, req.subKey); @@ -593,6 +593,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { tDeleteMqDataRsp(&dataRsp); return -1; } + tqInfo("consumer:0x%" PRIx64 " vgId:%d subkey:%s get assignment from init:%"PRId64, consumerId, vgId, req.subKey, dataRsp.rspOffset.version); } tqDoSendDataRsp(&pMsg->info, &dataRsp, req.epoch, req.consumerId, TMQ_MSG_TYPE__WALINFO_RSP, sver, ever); From 9e6e59adc2b27eb0f8564ccdfa82bbf3858ac5fb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 6 Jul 2023 16:47:48 +0800 Subject: [PATCH 3/3] fix:race condition for pTq->pStore->pHash --- source/dnode/vnode/src/tq/tqOffset.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 0a9905b544..11bb737225 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -104,7 +104,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { pStore->needCommit = 0; pTq->pOffsetStore = pStore; - pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); + pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); if (pStore->pHash == NULL) { taosMemoryFree(pStore); return NULL;