diff --git a/example/src/tmq.c b/example/src/tmq.c index 094fd94bfc..35c3e655d6 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -160,7 +160,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000); if (tmqmessage) { msg_process(tmqmessage); tmq_message_destroy(tmqmessage); diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 37d20cdb97..a04e2afc94 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -49,6 +49,11 @@ enum { TMQ_CONF__RESET_OFFSET__NONE = -3, }; +enum { + TMQ_MSG_TYPE__POLL_RSP = 0, + TMQ_MSG_TYPE__EP_RSP, +}; + typedef struct { uint32_t numOfTables; SArray* pGroupList; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ebd4563f8c..b60f09bdf4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1901,6 +1901,7 @@ struct tmq_message_t { SMqConsumeRsp consumeRsp; SMqCMGetSubEpRsp getEpRsp; }; + void* extra; }; static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } @@ -1955,7 +1956,6 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); - tlen += taosEncodeFixedI32(buf, pRsp->epoch); tlen += taosEncodeString(buf, pRsp->cgroup); int32_t sz = taosArrayGetSize(pRsp->topics); tlen += taosEncodeFixedI32(buf, sz); @@ -1968,7 +1968,6 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { buf = taosDecodeFixedI64(buf, &pRsp->consumerId); - buf = taosDecodeFixedI32(buf, &pRsp->epoch); buf = taosDecodeStringTo(buf, pRsp->cgroup); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f31130acf4..acb96013c9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -69,16 +69,10 @@ struct tmq_t { SArray* clientTopics; // SArray STaosQueue* mqueue; // queue of tmq_message_t STaosQall* qall; - SRWLatch pollLock; // stat int64_t pollCnt; }; -enum { - TMQ_MSG_TYPE__POLL_RSP = 0, - TMQ_MSG_TYPE__EP_RSP, -}; - enum { TMQ_VG_STATUS__IDLE = 0, TMQ_VG_STATUS__WAIT, @@ -123,10 +117,10 @@ typedef struct { } SMqAskEpCbParam; typedef struct { - tmq_t* tmq; - SMqClientVg* pVg; - tmq_message_t* rspMsg; - tsem_t rspSem; + tmq_t* tmq; + SMqClientVg* pVg; + int32_t epoch; + tsem_t rspSem; } SMqPollCbParam; typedef struct { @@ -249,7 +243,6 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - taosInitRWLatch(&pTmq->pollLock); // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -632,31 +625,50 @@ void tmqShowMsg(tmq_message_t* tmq_message) { } int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { + printf("recv poll\n"); SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqClientVg* pVg = pParam->pVg; + tmq_t* tmq = pParam->tmq; if (code != 0) { printf("msg discard\n"); + if (pParam->epoch == tmq->epoch) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } return 0; } - SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp)); - if (pRsp == NULL) { - taosWUnLockLatch(&pParam->tmq->pollLock); - return -1; - } - tDecodeSMqConsumeRsp(pMsg->pData, pRsp); - /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ - if (pRsp->numOfTopics == 0) { - /*printf("no data\n");*/ - free(pRsp); - taosWUnLockLatch(&pParam->tmq->pollLock); + int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; + int32_t tmqEpoch = atomic_load_32(&tmq->epoch); + if (msgEpoch < tmqEpoch) { + printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); return 0; } - pParam->rspMsg = (tmq_message_t*)pRsp; - pVg->currentOffset = pRsp->rspOffset; + + if (msgEpoch != tmqEpoch) { + printf("mismatch rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch); + } + + /*SMqConsumeRsp* pRsp = calloc(1, sizeof(SMqConsumeRsp));*/ + tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); + if (pRsp == NULL) { + return -1; + } + memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); + /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ + if (pRsp->consumeRsp.numOfTopics == 0) { + /*printf("no data\n");*/ + taosFreeQitem(pRsp); + return 0; + } + pRsp->extra = pParam->pVg; + taosWriteQitem(tmq->mqueue, pRsp); + printf("poll in queue\n"); + /*pParam->rspMsg = (tmq_message_t*)pRsp;*/ + /*pVg->currentOffset = pRsp->consumeRsp.rspOffset;*/ + /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ /*printf("-----msg begin----\n");*/ - taosWUnLockLatch(&pParam->tmq->pollLock); /*printf("\n-----msg end------\n");*/ return 0; } @@ -715,6 +727,10 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { tDeleteSMqCMGetSubEpRsp(&rsp); } else { tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqCMGetSubEpRsp(pMsg->pData, &pRsp->getEpRsp); taosWriteQitem(tmq->mqueue, pRsp); @@ -723,6 +739,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t tmqAskEp(tmq_t* tmq, bool sync) { + printf("ask ep sync %d\n", sync); int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { @@ -842,6 +859,7 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { + printf("call poll\n"); for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { @@ -864,6 +882,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { } param->tmq = tmq; param->pVg = pVg; + param->epoch = tmq->epoch; SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, @@ -877,6 +896,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { sendInfo->fp = tmqPollCb; int64_t transporterId = 0; + printf("send poll\n"); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++; tmq->pollCnt++; @@ -885,17 +905,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { return 0; } -void tmqFetchLeftRes(tmq_t* tmq, tmq_message_t** pRspMsg) { - taosGetQitem(tmq->qall, (void**)pRspMsg); - if (pRspMsg == NULL) { - taosReadAllQitems(tmq->mqueue, tmq->qall); - taosGetQitem(tmq->qall, (void**)pRspMsg); - } -} - // return int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__EP_RSP) { + printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch); if (rspMsg->head.epoch > atomic_load_32(&tmq->epoch)) { tmqUpdateEp(tmq, rspMsg->head.epoch, &rspMsg->getEpRsp); tmqClearUnhandleMsg(tmq); @@ -904,12 +917,45 @@ int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) { *pReset = false; } } else { - *pReset = false; return -1; } return 0; } +tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { + while (1) { + tmq_message_t* rspMsg = NULL; + taosGetQitem(tmq->qall, (void**)&rspMsg); + if (rspMsg == NULL) { + break; + } + + if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + printf("handle poll rsp %d\n", rspMsg->head.mqMsgType); + if (rspMsg->head.epoch == atomic_load_32(&tmq->epoch)) { + printf("epoch match\n"); + SMqClientVg* pVg = rspMsg->extra; + pVg->currentOffset = rspMsg->consumeRsp.rspOffset; + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + return rspMsg; + } else { + printf("epoch mismatch\n"); + taosFreeQitem(rspMsg); + } + } else { + printf("handle ep rsp %d\n", rspMsg->head.mqMsgType); + bool reset = false; + tmqHandleRes(tmq, rspMsg, &reset); + taosFreeQitem(rspMsg); + if (pollIfReset && reset) { + printf("reset and repoll\n"); + tmqPollImpl(tmq, blockingTime); + } + } + } + return NULL; +} + tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* rspMsg = NULL; int64_t startTime = taosGetTimestampMs(); @@ -918,43 +964,21 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { int64_t status = atomic_load_64(&tmq->status); tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); - tmqFetchLeftRes(tmq, &rspMsg); - taosGetQitem(tmq->qall, (void**)&rspMsg); if (rspMsg == NULL) { taosReadAllQitems(tmq->mqueue, tmq->qall); } - - while (1) { - taosGetQitem(tmq->qall, (void**)&rspMsg); - if (rspMsg == NULL) break; - if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - return rspMsg; - } - bool reset = false; - tmqHandleRes(tmq, rspMsg, &reset); - taosFreeQitem(rspMsg); - } + tmqHandleAllRsp(tmq, blocking_time, false); tmqPollImpl(tmq, blocking_time); while (1) { taosReadAllQitems(tmq->mqueue, tmq->qall); - while (1) { - taosGetQitem(tmq->qall, (void**)&rspMsg); - if (rspMsg == NULL) break; - - if (rspMsg->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - return rspMsg; - } else { - bool reset = false; - tmqHandleRes(tmq, rspMsg, &reset); - taosFreeQitem(rspMsg); - if (reset) tmqPollImpl(tmq, blocking_time); - } - } + tmqHandleAllRsp(tmq, blocking_time, true); int64_t endTime = taosGetTimestampMs(); if (endTime - startTime > blocking_time) { + printf("cycle end\n"); + usleep(1000 * 1000); return NULL; } } @@ -1092,9 +1116,9 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; - SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; + SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; tDeleteSMqConsumeRsp(pRsp); - free(tmq_message); + taosFreeQitem(tmq_message); } tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2ea157fea4..60533b979c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -270,9 +270,8 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { strcpy(rsp.cgroup, pReq->cgroup); rsp.consumerId = consumerId; - rsp.epoch = pConsumer->epoch; - if (epoch != rsp.epoch) { - mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch); + if (epoch != pConsumer->epoch) { + mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, pConsumer->epoch); SArray *pTopics = pConsumer->currentTopics; int32_t sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); @@ -308,13 +307,16 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { mndReleaseSubscribe(pMnode, pSub); } } - int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqCMGetSubEpRsp(NULL, &rsp); void *buf = rpcMallocCont(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - void *abuf = buf; + ((SMqRspHead *)buf)->mqMsgType = TMQ_MSG_TYPE__EP_RSP; + ((SMqRspHead *)buf)->epoch = pConsumer->epoch; + + void *abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); tDeleteSMqCMGetSubEpRsp(&rsp); mndReleaseConsumer(pMnode, pConsumer); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 9faabc3874..9822550ee5 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -438,6 +438,33 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo } #endif +static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { + SSdb *pSdb = pMnode->pSdb; + SDbObj *pDb = mndAcquireDb(pMnode, dbName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + int32_t numOfTopics = 0; + void *pIter = NULL; + while (1) { + SMqTopicObj *pTopic = NULL; + pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); + if (pIter == NULL) break; + + if (pTopic->dbUid == pDb->uid) { + numOfTopics++; + } + + sdbRelease(pSdb, pTopic); + } + + *pNumOfTopics = numOfTopics; + mndReleaseDb(pMnode, pDb); + return 0; +} + static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7c8c96fb54..5f5a434ec9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -220,7 +220,11 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { fetchOffset = pReq->currentOffset + 1; } - SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; + SMqConsumeRsp rsp = { + .consumerId = consumerId, + .numOfTopics = 0, + .pBlockData = NULL, + }; STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { @@ -296,14 +300,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } } - int32_t tlen = tEncodeSMqConsumeRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; return -1; } + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; - void* abuf = buf; + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqConsumeRsp(&abuf, &rsp); rsp.pBlockData = NULL; pMsg->pCont = buf;