diff --git a/source/dnode/mnode/impl/inc/mndOffset.h b/source/dnode/mnode/impl/inc/mndOffset.h index e18eec973f..556bfe6a53 100644 --- a/source/dnode/mnode/impl/inc/mndOffset.h +++ b/source/dnode/mnode/impl/inc/mndOffset.h @@ -31,7 +31,7 @@ void mndReleaseOffset(SMnode *pMnode, SMqOffsetObj *pOffset); SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset); SSdbRow *mndOffsetActionDecode(SSdbRaw *pRaw); -int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs); +int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs); static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, const char *topicName, int32_t vgId) { return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName); diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index 31c16e1e1d..dad912b4e6 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -130,8 +130,7 @@ OFFSET_DECODE_OVER: return pRow; } -int32_t mndCreateOffset(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) { - int32_t code = 0; +int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicName, const SArray *vgs) { int32_t sz = taosArrayGetSize(vgs); for (int32_t i = 0; i < sz; i++) { SMqConsumerEp *pConsumerEp = taosArrayGet(vgs, i); @@ -170,13 +169,22 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { if (mndMakePartitionKey(key, pOffset->cgroup, pOffset->topicName, pOffset->vgId) < 0) { return -1; } + bool create = false; SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key); - ASSERT(pOffsetObj); + if (pOffsetObj == NULL) { + pOffsetObj = taosMemoryMalloc(sizeof(SMqOffset)); + memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN); + create = true; + } pOffsetObj->offset = pOffset->offset; SSdbRaw *pOffsetRaw = mndOffsetActionEncode(pOffsetObj); sdbSetRawStatus(pOffsetRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pOffsetRaw); - mndReleaseOffset(pMnode, pOffsetObj); + if (create) { + taosMemoryFree(pOffsetObj); + } else { + mndReleaseOffset(pMnode, pOffsetObj); + } } if (mndTransPrepare(pMnode, pTrans) != 0) { @@ -201,7 +209,7 @@ static int32_t mndOffsetActionDelete(SSdb *pSdb, SMqOffsetObj *pOffset) { static int32_t mndOffsetActionUpdate(SSdb *pSdb, SMqOffsetObj *pOldOffset, SMqOffsetObj *pNewOffset) { mTrace("offset:%s, perform update action", pOldOffset->key); - pOldOffset->offset = pNewOffset->offset; + atomic_store_64(&pOldOffset->offset, pNewOffset->offset); return 0; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7b2ec83230..f708d4ffc1 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -813,6 +813,20 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter; if (pEpInSub->consumerId == -1) continue; ASSERT(pEpInSub->consumerId > 0); + + // push until equal minVg + while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt) { + // iter hash and find one vg + pRemovedIter = taosHashIterate(pHash, pRemovedIter); + ASSERT(pRemovedIter); + pRebVg = (SMqRebOutputVg *)pRemovedIter; + // push + taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp); + pRebVg->newConsumerId = pEpInSub->consumerId; + taosArrayPush(pOutput->rebVgs, pRebVg); + } + +#if 0 /*int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);*/ if (imbCnt < imbConsumerNum) { imbCnt++; @@ -840,13 +854,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR taosArrayPush(pOutput->rebVgs, pRebVg); } } +#endif } // 7. handle unassigned vg if (taosHashGetSize(pOutput->pSub->consumerHash) != 1) { - // if has consumer, vg should be all assigned - pRemovedIter = taosHashIterate(pHash, pRemovedIter); - ASSERT(pRemovedIter == NULL); + // if has consumer, assign all left vg + while (1) { + pRemovedIter = taosHashIterate(pHash, pRemovedIter); + if (pRemovedIter == NULL) break; + pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); + ASSERT(pIter); + pRebVg = (SMqRebOutputVg *)pRemovedIter; + SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter; + if (pEpInSub->consumerId == -1) continue; + ASSERT(pEpInSub->consumerId > 0); + taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp); + pRebVg->newConsumerId = pEpInSub->consumerId; + taosArrayPush(pOutput->rebVgs, pRebVg); + } } else { // if all consumer is removed, put all vg into unassigned int64_t unexistKey = -1; @@ -992,7 +1018,9 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { // TODO replace assert with error check ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); - ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0); + // if add more consumer to balanced subscribe, + // possibly no vg is changed + /*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/ ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0); if (rebInput.pTopic) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 16c5a4752e..c7c8054120 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -355,6 +355,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); // TODO destroy + taosArrayDestroy(rsp.blockData); + taosArrayDestroy(rsp.blockDataLen); return 0; } @@ -599,19 +601,20 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); return 0; } else { - if (req.newConsumerId != -1) { - /*taosWLockLatch(&pExec->lock);*/ - ASSERT(pExec->consumerId == req.oldConsumerId); - // TODO handle qmsg and exec modification - atomic_store_64(&pExec->consumerId, req.newConsumerId); - atomic_add_fetch_32(&pExec->epoch, 1); - /*taosWUnLockLatch(&pExec->lock);*/ - return 0; - } else { - // TODO - /*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/ - return 0; - } + /*if (req.newConsumerId != -1) {*/ + /*taosWLockLatch(&pExec->lock);*/ + ASSERT(pExec->consumerId == req.oldConsumerId); + // TODO handle qmsg and exec modification + atomic_store_32(&pExec->epoch, -1); + atomic_store_64(&pExec->consumerId, req.newConsumerId); + atomic_add_fetch_32(&pExec->epoch, 1); + /*taosWUnLockLatch(&pExec->lock);*/ + return 0; + /*} else {*/ + // TODO + /*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/ + /*return 0;*/ + /*}*/ } }