From 0712f87de051303fc989213cc94fab0d475f33e1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 27 Jan 2022 16:35:15 +0800 Subject: [PATCH] fix query crash --- include/common/common.h | 12 ++ include/common/tmsg.h | 7 + source/client/src/clientImpl.c | 156 +++++++++++++++------ source/client/test/clientTests.cpp | 6 +- source/dnode/mnode/impl/inc/mndDef.h | 10 +- source/dnode/mnode/impl/src/mndSubscribe.c | 114 +++++++++++---- source/dnode/vnode/src/tq/tq.c | 17 ++- 7 files changed, 244 insertions(+), 78 deletions(-) diff --git a/include/common/common.h b/include/common/common.h index 0299a29eb4..29ef6d953f 100644 --- a/include/common/common.h +++ b/include/common/common.h @@ -126,6 +126,12 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp int32_t tlen = 0; int32_t sz = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); + tlen += taosEncodeFixedI64(buf, pRsp->committedOffset); + tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); + tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); + tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); + tlen += taosEncodeFixedI32(buf, pRsp->numOfTopics); + if (pRsp->numOfTopics == 0) return tlen; tlen += tEncodeSSchemaWrapper(buf, pRsp->schemas); if (pRsp->pBlockData) { sz = taosArrayGetSize(pRsp->pBlockData); @@ -141,6 +147,12 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { int32_t sz; buf = taosDecodeFixedI64(buf, &pRsp->consumerId); + buf = taosDecodeFixedI64(buf, &pRsp->committedOffset); + buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); + buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); + buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); + buf = taosDecodeFixedI32(buf, &pRsp->numOfTopics); + if (pRsp->numOfTopics == 0) return buf; pRsp->schemas = (SSchemaWrapper*)calloc(1, sizeof(SSchemaWrapper)); if (pRsp->schemas == NULL) return NULL; buf = tDecodeSSchemaWrapper(buf, pRsp->schemas); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 289f2143ab..21d5ec021f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1655,6 +1655,10 @@ typedef struct SMqTopicBlk { typedef struct SMqConsumeRsp { int64_t consumerId; SSchemaWrapper* schemas; + int64_t committedOffset; + int64_t reqOffset; + int64_t rspOffset; + int32_t skipLogNum; int32_t numOfTopics; SArray* pBlockData; //SArray } SMqConsumeRsp; @@ -1688,6 +1692,7 @@ typedef struct SMqSubTopicEp { typedef struct SMqCMGetSubEpRsp { int64_t consumerId; + int64_t epoch; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray* topics; // SArray } SMqCMGetSubEpRsp; @@ -1736,6 +1741,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); + tlen += taosEncodeFixedI64(buf, pRsp->epoch); tlen += taosEncodeString(buf, pRsp->cgroup); int32_t sz = taosArrayGetSize(pRsp->topics); tlen += taosEncodeFixedI32(buf, sz); @@ -1748,6 +1754,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { buf = taosDecodeFixedI64(buf, &pRsp->consumerId); + buf = taosDecodeFixedI64(buf, &pRsp->epoch); buf = taosDecodeStringTo(buf, pRsp->cgroup); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index afac8acc7a..56f89e30c4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -326,13 +326,17 @@ int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { struct tmq_t { char groupId[256]; char clientId[256]; + SRWLatch lock; int64_t consumerId; + int64_t epoch; int64_t status; tsem_t rspSem; STscObj* pTscObj; tmq_commit_cb* commit_cb; int32_t nextTopicIdx; SArray* clientTopics; //SArray + //stat + int64_t pollCnt; }; tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) { @@ -342,6 +346,9 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err } pTmq->pTscObj = (STscObj*)conn; pTmq->status = 0; + pTmq->pollCnt = 0; + pTmq->epoch = 0; + taosInitRWLatch(&pTmq->lock); strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); pTmq->commit_cb = conf->commit_cb; @@ -621,34 +628,61 @@ struct tmq_message_t { }; int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { + SMqClientVg* pVg = (SMqClientVg*)param; SMqConsumeRsp rsp; tDecodeSMqConsumeRsp(pMsg->pData, &rsp); + if (rsp.numOfTopics == 0) { + /*printf("no data\n");*/ + return 0; + } int32_t colNum = rsp.schemas->nCols; + pVg->currentOffset = rsp.rspOffset; + /*printf("rsp offset: %ld\n", rsp.rspOffset);*/ + /*printf("-----msg begin----\n");*/ + printf("|"); for (int32_t i = 0; i < colNum; i++) { - printf("| %s |", rsp.schemas->pSchema[i].name); + printf(" %15s |", rsp.schemas->pSchema[i].name); } printf("\n"); + printf("=====================================\n"); int32_t sz = taosArrayGetSize(rsp.pBlockData); for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i); int32_t rows = pDataBlock->info.rows; - for (int32_t j = 0; j < colNum; j++) { - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, j); - for (int32_t k = 0; k < rows; k++) { - void* var = POINTER_SHIFT(pColInfoData->pData, k * pColInfoData->info.bytes); - if (j == 0) printf(" %ld ", *(int64_t*)var); - if (j == 1) printf(" %d ", *(int32_t*)var); + for (int32_t j = 0; j < rows; j++) { + printf("|"); + for (int32_t k = 0; k < colNum; k++) { + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + switch(pColInfoData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + printf(" %15lu |", *(uint64_t*)var); + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + printf(" %15u |", *(uint32_t*)var); + break; + } } + printf("\n"); } - /*pDataBlock->*/ } + /*printf("\n-----msg end------\n");*/ return 0; } +typedef struct SMqAskEpCbParam { + tmq_t* tmq; + int32_t wait; +} SMqAskEpCbParam; + int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { - tmq_t* tmq = (tmq_t*)param; + SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; + tmq_t* tmq = pParam->tmq; if (code != 0) { - tsem_post(&tmq->rspSem); + if (pParam->wait) { + tsem_post(&tmq->rspSem); + } return 0; } tscDebug("tmq ask ep cb called"); @@ -657,36 +691,47 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); int32_t sz = taosArrayGetSize(rsp.topics); // TODO: lock - tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); - for (int32_t i = 0; i < sz; i++) { - SMqClientTopic topic = {0}; - SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i); - topic.topicName = strdup(pTopicEp->topic); - int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); - topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); - for (int32_t j = 0; j < vgSz; j++) { - SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); - SMqClientVg clientVg = { - .pollCnt = 0, - .committedOffset = -1, - .currentOffset = -1, - .vgId = pVgEp->vgId, - .epSet = pVgEp->epSet - }; - taosArrayPush(topic.vgs, &clientVg); - set = true; + if (rsp.epoch != tmq->epoch) { + /*printf("rsp epoch %ld", rsp.epoch);*/ + /*printf("tmq epoch %ld", tmq->epoch);*/ + //TODO + if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); + tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); + for (int32_t i = 0; i < sz; i++) { + SMqClientTopic topic = {0}; + SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i); + topic.topicName = strdup(pTopicEp->topic); + int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); + topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgSz; j++) { + SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + SMqClientVg clientVg = { + .pollCnt = 0, + .committedOffset = -1, + .currentOffset = -1, + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet + }; + taosArrayPush(topic.vgs, &clientVg); + set = true; + } + taosArrayPush(tmq->clientTopics, &topic); } - taosArrayPush(tmq->clientTopics, &topic); + tmq->epoch = rsp.epoch; + } + if (set) { + atomic_store_64(&tmq->status, 1); } - if(set) tmq->status = 1; // unlock - tsem_post(&tmq->rspSem); + /*tsem_post(&tmq->rspSem);*/ + if (pParam->wait) { + tsem_post(&tmq->rspSem); + } + free(pParam); return 0; } -tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { - - if (taosArrayGetSize(tmq->clientTopics) == 0 || tmq->status == 0) { +int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { @@ -702,9 +747,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam)); + if (pParam == NULL) { + free(buf); + goto END; + } + pParam->tmq = tmq; + pParam->wait = wait; + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; - sendInfo->param = tmq; + sendInfo->param = pParam; sendInfo->fp = tmq_ask_ep_cb; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); @@ -712,11 +765,20 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - tsem_wait(&tmq->rspSem); - } +END: + if (wait) tsem_wait(&tmq->rspSem); + return 0; +} + +tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { + int64_t status = atomic_load_64(&tmq->status); + tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics)); + + if (blocking_time < 0) blocking_time = 500; if (taosArrayGetSize(tmq->clientTopics) == 0) { tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); + usleep(blocking_time * 1000); return NULL; } @@ -730,10 +792,15 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); strcpy(pReq->topic, pTopic->topicName); - int32_t nextVgIdx = pTopic->nextVgIdx; - pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx); - pReq->offset = pVg->currentOffset; + int32_t vgSz = taosArrayGetSize(pTopic->vgs); + if (vgSz == 0) { + free(pReq); + usleep(blocking_time * 1000); + return NULL; + } + pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % vgSz); + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); + pReq->offset = pVg->currentOffset+1; pReq->head.vgId = htonl(pVg->vgId); pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); @@ -743,13 +810,16 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); sendInfo->requestObjRefId = 0; - /*sendInfo->param = &tmq_message;*/ + sendInfo->param = pVg; sendInfo->fp = tmq_poll_cb_inner; + /*printf("req offset: %ld\n", pReq->offset);*/ + int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + tmq->pollCnt++; - tsem_wait(&pRequest->body.rspSem); + usleep(blocking_time * 1000); return tmq_message; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index b73079741c..b2a6724e72 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -570,7 +570,6 @@ TEST(testCase, create_topic_Test) { //taos_close(pConn); //} -#if 0 TEST(testCase, tmq_subscribe_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -590,12 +589,11 @@ TEST(testCase, tmq_subscribe_Test) { tmq_subscribe(tmq, topic_list); while (1) { - tmq_message_t* msg = tmq_consume_poll(tmq, 0); - printf("get msg\n"); + tmq_message_t* msg = tmq_consume_poll(tmq, 1000); + //printf("get msg\n"); //if (msg == NULL) break; } } -#endif TEST(testCase, tmq_consume_Test) { } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 5ec9173fc8..891f731a42 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -658,12 +658,17 @@ typedef struct SMqConsumerObj { SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray* topics; // SArray - // SHashObj *topicHash; //SHashObj + int64_t epoch; + // stat + int64_t pollCnt; } SMqConsumerObj; static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) { int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); + tlen += taosEncodeFixedI64(buf, pConsumer->connId); + tlen += taosEncodeFixedI64(buf, pConsumer->epoch); + tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt); tlen += taosEncodeString(buf, pConsumer->cgroup); int32_t sz = taosArrayGetSize(pConsumer->topics); tlen += taosEncodeFixedI32(buf, sz); @@ -676,6 +681,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) { buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); + buf = taosDecodeFixedI64(buf, &pConsumer->connId); + buf = taosDecodeFixedI64(buf, &pConsumer->epoch); + buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt); buf = taosDecodeStringTo(buf, pConsumer->cgroup); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 87f66297a4..2c445a59e3 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -30,6 +30,8 @@ #define MND_SUBSCRIBE_VER_NUMBER 1 #define MND_SUBSCRIBE_RESERVE_SIZE 64 +#define MND_SUBSCRIBE_REBALANCE_MS 5000 + static char *mndMakeSubscribeKey(char *cgroup, char *topicName); static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); @@ -69,6 +71,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont; SMqCMGetSubEpRsp rsp; int64_t consumerId = be64toh(pReq->consumerId); + int64_t currentTs = taosGetTimestampMs(); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId); if (pConsumer == NULL) { @@ -79,6 +82,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { strcpy(rsp.cgroup, pReq->cgroup); rsp.consumerId = consumerId; + rsp.epoch = pConsumer->epoch; SArray *pTopics = pConsumer->topics; int32_t sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); @@ -88,21 +92,43 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { strcpy(topicEp.topic, pConsumerTopic->name); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name); + ASSERT(pSub); + bool found = 0; + bool changed = 0; + for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { + if (*(int64_t*)taosArrayGet(pSub->availConsumer, j) == consumerId) { + found = 1; + break; + } + } + if (found == 0) { + taosArrayPush(pSub->availConsumer, &consumerId); + } + SSdbRaw* pRaw = mndSubActionEncode(pSub); + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + sdbWriteNotFree(pMnode->pSdb, pRaw); + int32_t assignedSz = taosArrayGetSize(pSub->assigned); topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp)); for (int32_t j = 0; j < assignedSz; j++) { SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j); if (pCEp->consumerId == consumerId) { + pCEp->lastConsumerHbTs = currentTs; SMqSubVgEp vgEp = { .epSet = pCEp->epSet, .vgId = pCEp->vgId }; taosArrayPush(topicEp.vgs, &vgEp); + changed = 1; } } if (taosArrayGetSize(topicEp.vgs) != 0) { taosArrayPush(rsp.topics, &topicEp); } + if (changed || found) { + + } + mndReleaseSubscribe(pMnode, pSub); } int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); void *buf = rpcMallocCont(tlen); @@ -124,9 +150,9 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { i++; } key[i] = 0; - *topic = strdup(key); + *cgroup = strdup(key); key[i] = ':'; - *cgroup = strdup(&key[i + 1]); + *topic = strdup(&key[i + 1]); return 0; } @@ -135,9 +161,37 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { SSdb *pSdb = pMnode->pSdb; SMqSubscribeObj *pSub = NULL; void *pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); - int sz; + int64_t currentTs = taosGetTimestampMs(); + int32_t sz; while (pIter != NULL) { - if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) { + for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) { + SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i); + int64_t consumerId = pCEp->consumerId; + if(pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) { + // put consumer into lostConsumer + taosArrayPush(pSub->lostConsumer, pCEp); + // put vg into unassgined + taosArrayPush(pSub->unassignedVg, pCEp); + // remove from assigned + // TODO: swap with last one, reduce size and reset i + taosArrayRemove(pSub->assigned, i); + // remove from available consumer + for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { + if (*(int64_t*)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) { + taosArrayRemove(pSub->availConsumer, j); + break; + } + // TODO: acquire consumer, set status to unavail + } + SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId); + pConsumer->epoch++; + SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer); + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + sdbWriteNotFree(pMnode->pSdb, pRaw); + mndReleaseConsumer(pMnode, pConsumer); + } + } + if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0 && taosArrayGetSize(pSub->availConsumer) > 0) { char *topic = NULL; char *cgroup = NULL; mndSplitSubscribeKey(pSub->key, &topic, &cgroup); @@ -146,7 +200,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { // create trans STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - for (int i = 0; i < sz; i++) { + for (int32_t i = 0; i < sz; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx); SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); pCEp->consumerId = consumerId; @@ -155,49 +209,49 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { // build msg - SMqSetCVgReq *pReq = malloc(sizeof(SMqSetCVgReq)); - if (pReq == NULL) { + SMqSetCVgReq req = {0}; + strcpy(req.cgroup, cgroup); + strcpy(req.topicName, topic); + req.sql = pTopic->sql; + req.logicalPlan = pTopic->logicalPlan; + req.physicalPlan = pTopic->physicalPlan; + req.qmsg = pCEp->qmsg; + int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + void *buf = malloc(tlen); + if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - strcpy(pReq->cgroup, cgroup); - strcpy(pReq->topicName, topic); - pReq->sql = strdup(pTopic->sql); - pReq->logicalPlan = strdup(pTopic->logicalPlan); - pReq->physicalPlan = strdup(pTopic->physicalPlan); - pReq->qmsg = strdup(pCEp->qmsg); - int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); - void *reqStr = malloc(tlen); - if (reqStr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - void *abuf = reqStr; - tEncodeSMqSetCVgReq(&abuf, pReq); + SMsgHead *pMsgHead = (SMsgHead *)buf; + + pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); + pMsgHead->vgId = htonl(pCEp->vgId); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncodeSMqSetCVgReq(&abuf, &req); // persist msg STransAction action = {0}; action.epSet = pCEp->epSet; - action.pCont = reqStr; + action.pCont = buf; action.contLen = tlen; action.msgType = TDMT_VND_MQ_SET_CONN; mndTransAppendRedoAction(pTrans, &action); // persist raw SSdbRaw *pRaw = mndSubActionEncode(pSub); + sdbSetRawStatus(pRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pRaw); - free(pReq); tfree(topic); tfree(cgroup); } if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); } - /*mndReleaseTopic(pMnode, pTopic);*/ + mndReleaseTopic(pMnode, pTopic); mndTransDrop(pTrans); } - pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub); } return 0; } @@ -434,10 +488,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pConsumer->epoch = 1; pConsumer->consumerId = consumerId; strcpy(pConsumer->cgroup, consumerGroup); taosInitRWLatch(&pConsumer->lock); } else { + pConsumer->epoch++; oldSub = pConsumer->topics; } pConsumer->topics = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); @@ -541,6 +597,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName); + bool create = false; if (pSub == NULL) { mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName); pSub = tNewSubscribeObj(); @@ -549,10 +606,16 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return -1; } char *key = mndMakeSubscribeKey(consumerGroup, newTopicName); + if (key == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } strcpy(pSub->key, key); + free(key); // set unassigned vg mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); // TODO: disable alter + create = true; } taosArrayPush(pSub->availConsumer, &consumerId); @@ -575,6 +638,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SSdbRaw *pRaw = mndSubActionEncode(pSub); sdbSetRawStatus(pRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pRaw); + if (!create) mndReleaseSubscribe(pMnode, pSub); #if 0 SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); if (pGroup == NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3195691a13..1ecd76e1b5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -670,15 +670,13 @@ int tqItemSSize() { int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SMqConsumeReq* pReq = pMsg->pCont; - SRpcMsg rpcMsg; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId; - int64_t reqOffset = pReq->offset; - int64_t fetchOffset = reqOffset; + int64_t fetchOffset = pReq->offset; int64_t blockingTime = pReq->blockingTime; int rspLen = 0; - SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 1, .pBlockData = NULL}; + SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL}; STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); ASSERT(pConsumer); @@ -690,6 +688,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (strcmp(pTopic->topicName, pReq->topic) != 0) { continue; } + rsp.committedOffset = pTopic->committedOffset; + rsp.reqOffset = pReq->offset; + rsp.skipLogNum = 0; if (fetchOffset == -1) { fetchOffset = pTopic->committedOffset + 1; @@ -715,6 +716,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { if (pHead->head.msgType == TDMT_VND_SUBMIT) { break; } + rsp.skipLogNum++; if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { atomic_store_8(&pTopic->buffer.output[pos].status, 0); skip = 1; @@ -745,6 +747,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } //TODO copy rsp.schemas = pTopic->buffer.output[pos].pReadHandle->pSchemaWrapper; + rsp.rspOffset = fetchOffset; atomic_store_8(&pTopic->buffer.output[pos].status, 0); @@ -752,6 +755,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { taosArrayDestroy(pRes); fetchOffset++; continue; + } else { + rsp.numOfTopics++; } rsp.pBlockData = pRes; @@ -931,6 +936,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { SMemRow row; int32_t kvIdx = 0; + int32_t curRow = 0; tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { // get all wanted col of that block @@ -940,8 +946,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { // TODO ASSERT(pCol->colId == pColData->info.colId); void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); - memcpy(pColData->pData, val, pCol->bytes); + memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), val, pCol->bytes); } + curRow++; } return pArray; }