Merge branch 'feature/3_liaohj' of github.com:taosdata/tdengine into feature/3_liaohj
This commit is contained in:
commit
a6c6b2e461
|
@ -224,7 +224,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||||
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRId64, sub, pVgEp->vgId, consumerId);
|
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, sub, pVgEp->vgId, consumerId);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pConsumerEp->vgs);
|
taosArrayDestroy(pConsumerEp->vgs);
|
||||||
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||||
|
@ -296,7 +296,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||||
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
|
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRIx64 ",(first scan)", sub, pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
imbCnt++;
|
imbCnt++;
|
||||||
|
@ -311,7 +311,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||||
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRId64 ",(first scan)", sub, pVgEp->vgId,
|
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:%" PRIx64 ",(first scan)", sub, pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -329,7 +329,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||||
mInfo("sub:%s mq rebalance add new consumer:%" PRId64, sub, consumerId);
|
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, sub, consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,7 +357,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
|
mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -387,12 +387,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||||
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
|
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
|
||||||
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 " (second scan)", pRebVg->pVgEp->vgId,
|
mInfo("mq rebalance: skip vg %d for same consumer:%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
|
mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1017,7 +1017,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
||||||
|
|
||||||
mDebug("mnd show subscriptions: topic %s, consumer %" PRId64 " cgroup %s vgid %d", varDataVal(topic),
|
mDebug("mnd show subscriptions: topic %s, consumer:%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
||||||
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
|
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
|
||||||
|
|
||||||
// offset
|
// offset
|
||||||
|
|
|
@ -503,7 +503,7 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
||||||
mInfo("trans:%d, perform insert action, row:%p stage:%s, startFunc:%d, callfunc:1", pTrans->id, pTrans, mndTransStr(pTrans->stage),
|
mInfo("trans:%d, perform insert action, row:%p stage:%s, callfunc:1, startFunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
|
||||||
pTrans->startFunc);
|
pTrans->startFunc);
|
||||||
|
|
||||||
if (pTrans->startFunc > 0) {
|
if (pTrans->startFunc > 0) {
|
||||||
|
@ -547,8 +547,8 @@ static void mndTransDropData(STrans *pTrans) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
|
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
|
||||||
mInfo("trans:%d, perform delete action, row:%p stage:%s callfunc:%d, stopFunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
|
mInfo("trans:%d, perform delete action, row:%p stage:%s callfunc:%d, stopFunc:%d", pTrans->id, pTrans,
|
||||||
pTrans->stopFunc, callFunc);
|
mndTransStr(pTrans->stage), callFunc, pTrans->stopFunc);
|
||||||
|
|
||||||
if (pTrans->stopFunc > 0 && callFunc) {
|
if (pTrans->stopFunc > 0 && callFunc) {
|
||||||
TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc);
|
TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc);
|
||||||
|
|
Loading…
Reference in New Issue