This commit is contained in:
Liu Jicong 2022-04-01 09:57:55 +08:00
parent b06db715bc
commit e916f83fd7
5 changed files with 37 additions and 27 deletions

View File

@ -882,11 +882,13 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
// TODO: alloc mem // TODO: alloc mem
/*pRsp->*/ /*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
#if 0
if (pRsp->msg.numOfTopics == 0) { if (pRsp->msg.numOfTopics == 0) {
/*printf("no data\n");*/ /*printf("no data\n");*/
taosFreeQitem(pRsp); taosFreeQitem(pRsp);
goto WRITE_QUEUE_FAIL; goto WRITE_QUEUE_FAIL;
} }
#endif
tscError("tmq recv poll: vg %d, req offset %ld, rsp offset %ld", pParam->pVg->vgId, pRsp->msg.reqOffset, tscError("tmq recv poll: vg %d, req offset %ld, rsp offset %ld", pParam->pVg->vgId, pRsp->msg.reqOffset,
pRsp->msg.rspOffset); pRsp->msg.rspOffset);
@ -938,7 +940,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
for (int32_t k = 0; k < vgNumCur; k++) { for (int32_t k = 0; k < vgNumCur; k++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId); sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
/*printf("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey);*/ printf("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
} }
break; break;
@ -952,12 +954,12 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
int64_t offset = pVgEp->offset; int64_t offset = pVgEp->offset;
/*printf("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset);*/ printf("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset);
if (pOffset != NULL) { if (pOffset != NULL) {
offset = *pOffset; offset = *pOffset;
/*printf("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey);*/ printf("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey);
} }
/*printf("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset);*/ printf("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset);
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0, .pollCnt = 0,
.currentOffset = offset, .currentOffset = offset,
@ -1260,13 +1262,13 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
} }
// return // return
int32_t tmqHandleRes(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) { int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) {
if (rspHead->mqMsgType == TMQ_MSG_TYPE__EP_RSP) { if (rspHead->mqMsgType == TMQ_MSG_TYPE__EP_RSP) {
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
if (rspHead->epoch > atomic_load_32(&tmq->epoch)) { if (rspHead->epoch > atomic_load_32(&tmq->epoch)) {
SMqCMGetSubEpRsp* rspMsg = (SMqCMGetSubEpRsp*)rspHead; SMqCMGetSubEpRsp* rspMsg = (SMqCMGetSubEpRsp*)rspHead;
tmqUpdateEp(tmq, rspHead->epoch, rspMsg); tmqUpdateEp(tmq, rspHead->epoch, rspMsg);
tmqClearUnhandleMsg(tmq); /*tmqClearUnhandleMsg(tmq);*/
*pReset = true; *pReset = true;
} else { } else {
*pReset = false; *pReset = false;
@ -1297,6 +1299,11 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = rspMsg->msg.rspOffset; pVg->currentOffset = rspMsg->msg.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (rspMsg->msg.numOfTopics == 0) {
taosFreeQitem(rspMsg);
rspHead = NULL;
continue;
}
return rspMsg; return rspMsg;
} else { } else {
/*printf("epoch mismatch\n");*/ /*printf("epoch mismatch\n");*/
@ -1305,7 +1312,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
} else { } else {
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
bool reset = false; bool reset = false;
tmqHandleRes(tmq, rspHead, &reset); tmqHandleNoPollRsp(tmq, rspHead, &reset);
taosFreeQitem(rspHead); taosFreeQitem(rspHead);
if (pollIfReset && reset) { if (pollIfReset && reset) {
printf("reset and repoll\n"); printf("reset and repoll\n");

View File

@ -620,13 +620,13 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) { static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
if (pSub->consumers) { if (pSub->consumers) {
taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer); //taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
// taosArrayDestroy(pSub->consumers); // taosArrayDestroy(pSub->consumers);
pSub->consumers = NULL; pSub->consumers = NULL;
} }
if (pSub->unassignedVg) { if (pSub->unassignedVg) {
taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp); //taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
// taosArrayDestroy(pSub->unassignedVg); // taosArrayDestroy(pSub->unassignedVg);
pSub->unassignedVg = NULL; pSub->unassignedVg = NULL;
} }

View File

@ -238,6 +238,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
if (consumerId == pSubConsumer->consumerId) { if (consumerId == pSubConsumer->consumerId) {
int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
mInfo("topic %s has %d vg", topicName, pConsumer->epoch);
SMqSubTopicEp topicEp; SMqSubTopicEp topicEp;
strcpy(topicEp.topic, topicName); strcpy(topicEp.topic, topicName);
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
@ -419,7 +420,6 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
int32_t vgNum = pSub->vgNum; int32_t vgNum = pSub->vgNum;
int32_t vgEachConsumer = vgNum / consumerNum; int32_t vgEachConsumer = vgNum / consumerNum;
int32_t imbalanceVg = vgNum % consumerNum; int32_t imbalanceVg = vgNum % consumerNum;
int32_t imbalanceSolved = 0;
// iterate all consumers, set unassignedVgStash // iterate all consumers, set unassignedVgStash
for (int32_t i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
@ -446,19 +446,24 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) { (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) {
SMqConsumerObj* pNewRebConsumer = taosMemoryMalloc(sizeof(SMqConsumerObj));
ASSERT(pNewRebConsumer);
memcpy(pNewRebConsumer, pRebConsumer, sizeof(SMqConsumerObj));
pNewRebConsumer->currentTopics = taosArrayDup(pRebConsumer->currentTopics);
pNewRebConsumer->recentRemovedTopics = taosArrayDup(pRebConsumer->recentRemovedTopics);
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) { if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {
pRebConsumer->epoch++; pNewRebConsumer->epoch++;
} }
if (vgThisConsumerAfterRb != 0) { if (vgThisConsumerAfterRb != 0) {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
} else { } else {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE); atomic_store_32(&pNewRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
} }
mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pRebConsumer->consumerId, status, mInfo("mq consumer:%" PRId64 ", status change from %d to %d", pNewRebConsumer->consumerId, status,
pRebConsumer->status); pNewRebConsumer->status);
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer); SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pNewRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw); mndTransAppendRedolog(pTrans, pConsumerRaw);
} }
@ -469,7 +474,6 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
if (taosArrayGetSize(pSub->unassignedVg) != 0) { if (taosArrayGetSize(pSub->unassignedVg) != 0) {
for (int32_t i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int32_t vgThisConsumerAfterRb; int32_t vgThisConsumerAfterRb;
if (i < imbalanceVg) if (i < imbalanceVg)
vgThisConsumerAfterRb = vgEachConsumer + 1; vgThisConsumerAfterRb = vgEachConsumer + 1;

View File

@ -264,7 +264,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
fetchOffset = pReq->currentOffset + 1; fetchOffset = pReq->currentOffset + 1;
} }
/*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/ printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
SMqPollRsp rsp = { SMqPollRsp rsp = {
/*.consumerId = consumerId,*/ /*.consumerId = consumerId,*/
@ -299,8 +299,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// response to user // response to user
break; break;
} }
/*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType,
* pReq->epoch);*/ pReq->epoch);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/ /*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
@ -353,9 +353,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->pCont = buf; pMsg->pCont = buf;
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
/*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);
* pHead->msgType,*/
/*pReq->epoch);*/
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
taosMemoryFree(pHead); taosMemoryFree(pHead);
return 0; return 0;
@ -377,6 +375,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch; ((SMqRspHead*)buf)->epoch = pReq->epoch;
rsp.rspOffset = fetchOffset - 1;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp); tEncodeSMqPollRsp(&abuf, &rsp);
@ -385,7 +384,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->contLen = tlen; pMsg->contLen = tlen;
pMsg->code = 0; pMsg->code = 0;
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
/*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/ printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);
/*}*/ /*}*/
return 0; return 0;
@ -452,7 +451,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
ASSERT(pTopic->buffer.output[i].task); ASSERT(pTopic->buffer.output[i].task);
} }
printf("set topic %s to consumer %ld\n", pTopic->topicName, req.consumerId); printf("set topic %s to consumer %ld on vg %d\n", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.consumerId); tqHandleCommit(pTq->tqMeta, req.consumerId);

View File

@ -35,7 +35,7 @@ sql connect
$dbNamme = d0 $dbNamme = d0
print =============== create database , vgroup 1 print =============== create database , vgroup 1
sql create database $dbNamme vgroups 1 sql create database $dbNamme vgroups 10
sql show databases sql show databases
print $data00 $data01 $data02 print $data00 $data01 $data02
if $rows != 2 then if $rows != 2 then