diff --git a/source/common/src/systable.c b/source/common/src/systable.c index db0cc78de6..3ef64eab4c 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -290,7 +290,7 @@ static const SSysDbTableSchema subscriptionSchema[] = { {.name = "topic_name", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, - {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, }; static const SSysDbTableSchema vnodesSchema[] = { @@ -350,7 +350,7 @@ static const SSysDbTableSchema connectionsSchema[] = { static const SSysDbTableSchema consumerSchema[] = { - {.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, + {.name = "consumer_id", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "consumer_group", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY, .sysInfo = false}, diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 82b714e6eb..bf2b91c354 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -137,12 +137,12 @@ typedef enum { } EDndReason; typedef enum { - CONSUMER_UPDATE__TOUCH = 1, // rebalance req do not need change consume topic - CONSUMER_UPDATE__ADD, - CONSUMER_UPDATE__REMOVE, - CONSUMER_UPDATE__LOST, - CONSUMER_UPDATE__RECOVER, - CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic + CONSUMER_UPDATE_REB_MODIFY_NOTOPIC = 1, // topic do not need modified after rebalance + CONSUMER_UPDATE_REB_MODIFY_TOPIC, // topic need modified after rebalance + CONSUMER_UPDATE_REB_MODIFY_REMOVE, // topic need removed after rebalance + CONSUMER_UPDATE_TIMER_LOST, + CONSUMER_UPDATE_RECOVER, + CONSUMER_UPDATE_SUB_MODIFY, // modify after subscribe req } ECsmUpdateType; typedef struct { @@ -547,13 +547,13 @@ typedef struct { // data for display int32_t pid; SEpSet ep; - int64_t upTime; + int64_t createTime; int64_t subscribeTime; int64_t rebalanceTime; } SMqConsumerObj; SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]); -void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer); +void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer, bool delete); int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer); void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 117c1082a5..764cf5348e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -122,7 +122,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { } SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__LOST; + pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; mndReleaseConsumer(pMnode, pConsumer); @@ -139,13 +139,11 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) { goto FAIL; } - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); mndTransDrop(pTrans); return 0; FAIL: - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); mndTransDrop(pTrans); return -1; } @@ -162,14 +160,14 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { mInfo("receive consumer recover msg, consumer:0x%" PRIx64 " status:%d(%s)", pRecoverMsg->consumerId, pConsumer->status, mndConsumerStatusName(pConsumer->status)); - if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { + if (pConsumer->status != MQ_CONSUMER_STATUS__LOST) { mndReleaseConsumer(pMnode, pConsumer); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; return -1; } SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__RECOVER; + pConsumerNew->updateType = CONSUMER_UPDATE_RECOVER; mndReleaseConsumer(pMnode, pConsumer); @@ -181,13 +179,13 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); + mndTransDrop(pTrans); return 0; FAIL: - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); + mndTransDrop(pTrans); return -1; } @@ -212,7 +210,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { } SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumer->consumerId, pConsumer->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__LOST; +// pConsumerNew->updateType = CONSUMER_UPDATE_TIMER_LOST; mndReleaseConsumer(pMnode, pConsumer); @@ -223,14 +221,14 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); + mndTransDrop(pTrans); return 0; FAIL: - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); + mndTransDrop(pTrans); return -1; } @@ -297,8 +295,8 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1); int32_t status = atomic_load_32(&pConsumer->status); - mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", uptime:%" PRId64 ", hbstatus:%d", - pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime, + mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d", + pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime, hbStatus); if (status == MQ_CONSUMER_STATUS__READY) { @@ -411,7 +409,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumer->status); - if (status == MQ_CONSUMER_STATUS__LOST_REBD) { + if (status == MQ_CONSUMER_STATUS__LOST) { mInfo("try to recover consumer:0x%" PRIx64 "", consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); @@ -462,7 +460,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); - if (status == MQ_CONSUMER_STATUS__LOST_REBD) { + if (status == MQ_CONSUMER_STATUS__LOST) { mInfo("try to recover consumer:0x%" PRIx64, consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); @@ -675,8 +673,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); - // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE; +// pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY; // use insert logic taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); @@ -706,11 +703,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE; + pConsumerNew->updateType = CONSUMER_UPDATE_SUB_MODIFY; taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); - int32_t oldTopicNum = (pExistedConsumer->currentTopics) ? taosArrayGetSize(pExistedConsumer->currentTopics) : 0; + int32_t oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics); int32_t i = 0, j = 0; while (i < oldTopicNum || j < newTopicNum) { @@ -765,10 +762,7 @@ _over: mndReleaseConsumer(pMnode, pExistedConsumer); } - if (pConsumerNew) { - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); - } + tDeleteSMqConsumerObj(pConsumerNew, true); // TODO: replace with destroy subscribe msg taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); @@ -868,17 +862,17 @@ CM_DECODE_OVER: } static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mDebug("consumer:0x%" PRIx64 " cgroup:%s status:%d(%s) epoch:%d load from sdb, perform insert action", + mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d", pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status), pConsumer->epoch); - pConsumer->subscribeTime = pConsumer->upTime; + pConsumer->subscribeTime = taosGetTimestampMs(); return 0; } static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { - mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status, + mInfo("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId, pConsumer->status, mndConsumerStatusName(pConsumer->status)); - tDeleteSMqConsumerObj(pConsumer); + tDeleteSMqConsumerObj(pConsumer, false); return 0; } @@ -889,7 +883,6 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { if (status == MQ_CONSUMER_STATUS_REBALANCE) { pConsumer->status = MQ_CONSUMER_STATUS__READY; } else if (status == MQ_CONSUMER_STATUS__LOST) { - ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0); pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; } } @@ -904,7 +897,7 @@ static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic taosArrayRemove(pConsumer->rebNewTopics, i); taosMemoryFree(p); - mDebug("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId, + mInfo("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebNewTopics)); break; } @@ -920,7 +913,7 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo taosArrayRemove(pConsumer->rebRemovedTopics, i); taosMemoryFree(p); - mDebug("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d", + mInfo("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d", pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics)); break; } @@ -935,7 +928,7 @@ static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pT taosArrayRemove(pConsumer->currentTopics, i); taosMemoryFree(topic); - mDebug("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d", + mInfo("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d", pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics)); break; } @@ -958,47 +951,46 @@ static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* } static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { - mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64, - pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime); + mInfo("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", createTime:%" PRId64, + pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->createTime); taosWLockLatch(&pOldConsumer->lock); - if (pNewConsumer->updateType == CONSUMER_UPDATE__REBALANCE) { + if (pNewConsumer->updateType == CONSUMER_UPDATE_SUB_MODIFY) { TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics); TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics); TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics); - pOldConsumer->subscribeTime = pNewConsumer->upTime; + pOldConsumer->subscribeTime = taosGetTimestampMs(); pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { + mInfo("consumer:0x%" PRIx64 " sub update, modify existed consumer",pOldConsumer->consumerId); + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_TIMER_LOST) { int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); for (int32_t i = 0; i < sz; i++) { char *topic = taosStrdup(taosArrayGetP(pOldConsumer->currentTopics, i)); taosArrayPush(pOldConsumer->rebRemovedTopics, &topic); } - pOldConsumer->rebalanceTime = pNewConsumer->upTime; - int32_t prevStatus = pOldConsumer->status; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST; - mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d", + mInfo("consumer:0x%" PRIx64 " timer update, timer lost. state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d", pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_RECOVER) { int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *topic = taosStrdup(taosArrayGetP(pOldConsumer->assignedTopics, i)); taosArrayPush(pOldConsumer->rebNewTopics, &topic); } - pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) { + mInfo("consumer:0x%" PRIx64 " timer update, timer recover",pOldConsumer->consumerId); + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_NOTOPIC) { atomic_add_fetch_32(&pOldConsumer->epoch, 1); - pOldConsumer->rebalanceTime = pNewConsumer->upTime; - - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { + pOldConsumer->rebalanceTime = taosGetTimestampMs(); + mInfo("consumer:0x%" PRIx64 " reb update, only rebalance time", pOldConsumer->consumerId); + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_TOPIC) { char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); // check if exist in current topic @@ -1007,6 +999,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, // add to current topic bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic); if (existing) { + mError("consumer:0x%" PRIx64 "new topic:%s should not in currentTopics", pOldConsumer->consumerId, pNewTopic); taosMemoryFree(pNewTopic); } else { // added into current topic list taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); @@ -1018,17 +1011,17 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, updateConsumerStatus(pOldConsumer); // the re-balance is triggered when the new consumer is launched. - pOldConsumer->rebalanceTime = pNewConsumer->upTime; + pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); - mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 + mInfo("consumer:0x%" PRIx64 " reb update add, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d, newTopics:%d, removeTopics:%d", pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics), (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); - } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) { + } else if (pNewConsumer->updateType == CONSUMER_UPDATE_REB_MODIFY_REMOVE) { char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); // remove from removed topic @@ -1041,10 +1034,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, int32_t status = pOldConsumer->status; updateConsumerStatus(pOldConsumer); - pOldConsumer->rebalanceTime = pNewConsumer->upTime; + pOldConsumer->rebalanceTime = taosGetTimestampMs(); atomic_add_fetch_32(&pOldConsumer->epoch, 1); - mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 + mInfo("consumer:0x%" PRIx64 " reb update remove, state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d, newTopics:%d, removeTopics:%d", pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime, @@ -1107,8 +1100,12 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * int32_t cols = 0; // consumer id + char consumerIdHex[32] = {0}; + sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumer->consumerId); + varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false); + colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false); // consumer group char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -1149,7 +1146,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * // up time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false); + colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumer->createTime, false); // subscribe time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6dab018236..3660ab62af 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -214,7 +214,7 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { return (void *)buf; } -SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) { +SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char* cgroup) { SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj)); if (pConsumer == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -245,16 +245,20 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L return NULL; } - pConsumer->upTime = taosGetTimestampMs(); + pConsumer->createTime = taosGetTimestampMs(); return pConsumer; } -void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) { +void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer, bool delete) { + if(pConsumer == NULL) return; taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree); taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree); + if(delete){ + taosMemoryFree(pConsumer); + } } int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { @@ -269,7 +273,7 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) { tlen += taosEncodeFixedI32(buf, pConsumer->pid); tlen += taosEncodeSEpSet(buf, &pConsumer->ep); - tlen += taosEncodeFixedI64(buf, pConsumer->upTime); + tlen += taosEncodeFixedI64(buf, pConsumer->createTime); tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime); tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime); @@ -335,7 +339,7 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) { buf = taosDecodeFixedI32(buf, &pConsumer->pid); buf = taosDecodeSEpSet(buf, &pConsumer->ep); - buf = taosDecodeFixedI64(buf, &pConsumer->upTime); + buf = taosDecodeFixedI64(buf, &pConsumer->createTime); buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime); buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 74421afa33..9b5c32c94d 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -146,10 +146,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg) { -// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { -// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; -// return -1; -// } + if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { + terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; + return -1; + } void *buf; int32_t tlen; @@ -282,17 +282,17 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { } } -static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ - for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ - SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = pConsumerEp->consumerId, - .pVgEp = pVgEp, - }; - taosArrayPush(pOutput->rebVgs, &outputVg); - } -} +//static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ +// for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ +// SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); +// SMqRebOutputVg outputVg = { +// .oldConsumerId = pConsumerEp->consumerId, +// .newConsumerId = pConsumerEp->consumerId, +// .pVgEp = pVgEp, +// }; +// taosArrayPush(pOutput->rebVgs, &outputVg); +// } +//} static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, int32_t imbConsumerNum) { @@ -343,7 +343,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas } } } - putNoTransferToOutput(pOutput, pConsumerEp); +// putNoTransferToOutput(pOutput, pConsumerEp); } } @@ -513,50 +513,44 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu return -1; } + char topic[TSDB_TOPIC_FNAME_LEN] = {0}; + char cgroup[TSDB_CGROUP_LEN] = {0}; + mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); + // 3. commit log: consumer to update status and epoch // 3.1 set touched consumer int32_t consumerNum = taosArrayGetSize(pOutput->modifyConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->modifyConsumers, i); - SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH; - mndReleaseConsumer(pMnode, pConsumerOld); + SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_NOTOPIC; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); mndTransDrop(pTrans); return -1; } - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); } // 3.2 set new consumer consumerNum = taosArrayGetSize(pOutput->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->newConsumers, i); + SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_TOPIC; - SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__ADD; - char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - taosArrayPush(pConsumerNew->rebNewTopics, &topic); - mndReleaseConsumer(pMnode, pConsumerOld); + char* topicTmp = taosStrdup(topic); + taosArrayPush(pConsumerNew->rebNewTopics, &topicTmp); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); mndTransDrop(pTrans); return -1; } - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); } // 3.3 set removed consumer @@ -564,24 +558,19 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu for (int32_t i = 0; i < consumerNum; i++) { int64_t consumerId = *(int64_t *)taosArrayGet(pOutput->removedConsumers, i); - SMqConsumerObj *pConsumerOld = mndAcquireConsumer(pMnode, consumerId); - SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(pConsumerOld->consumerId, pConsumerOld->cgroup); - pConsumerNew->updateType = CONSUMER_UPDATE__REMOVE; - char *topic = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); - char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true); - taosArrayPush(pConsumerNew->rebRemovedTopics, &topic); - mndReleaseConsumer(pMnode, pConsumerOld); + SMqConsumerObj *pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup); + pConsumerNew->updateType = CONSUMER_UPDATE_REB_MODIFY_REMOVE; + + char* topicTmp = taosStrdup(topic); + taosArrayPush(pConsumerNew->rebRemovedTopics, &topicTmp); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); mndTransDrop(pTrans); return -1; } - tDeleteSMqConsumerObj(pConsumerNew); - taosMemoryFree(pConsumerNew); + tDeleteSMqConsumerObj(pConsumerNew, true); } // 4. TODO commit log: modification log @@ -1080,8 +1069,12 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); // consumer id + char consumerIdHex[32] = {0}; + sprintf(varDataVal(consumerIdHex), "0x%"PRIx64, pConsumerEp->consumerId); + varDataSetLen(consumerIdHex, strlen(varDataVal(consumerIdHex))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); + colDataSetVal(pColInfo, numOfRows, (const char *)consumerIdHex, false); mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);