Merge pull request #11940 from taosdata/feature/tq
enh: more info in perf schema
This commit is contained in:
commit
1ff3057d37
|
@ -205,7 +205,6 @@ SArray* taosArrayDup(const SArray* pSrc);
|
||||||
*/
|
*/
|
||||||
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy);
|
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* clear the array (remove all element)
|
* clear the array (remove all element)
|
||||||
* @param pArray
|
* @param pArray
|
||||||
|
@ -272,6 +271,8 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
|
||||||
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
|
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
|
||||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
|
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
|
||||||
|
|
||||||
|
char* taosShowStrArray(const SArray* pArray);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -131,6 +131,7 @@ extern const int32_t TYPE_BYTES[15];
|
||||||
#define TSDB_PERFS_TABLE_TOPICS "topics"
|
#define TSDB_PERFS_TABLE_TOPICS "topics"
|
||||||
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
|
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
|
||||||
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
|
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
|
||||||
|
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
|
||||||
|
|
||||||
#define TSDB_INDEX_TYPE_SMA "SMA"
|
#define TSDB_INDEX_TYPE_SMA "SMA"
|
||||||
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
|
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
|
||||||
|
|
|
@ -245,7 +245,6 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
|
||||||
ASSERT(pSchema != NULL && numOfCols > 0);
|
ASSERT(pSchema != NULL && numOfCols > 0);
|
||||||
|
|
||||||
pResInfo->numOfCols = numOfCols;
|
pResInfo->numOfCols = numOfCols;
|
||||||
// TODO handle memory leak
|
|
||||||
if (pResInfo->fields != NULL) {
|
if (pResInfo->fields != NULL) {
|
||||||
taosMemoryFree(pResInfo->fields);
|
taosMemoryFree(pResInfo->fields);
|
||||||
}
|
}
|
||||||
|
|
|
@ -666,7 +666,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
code = param.rspErr;
|
code = param.rspErr;
|
||||||
if (code != 0) goto FAIL;
|
if (code != 0) goto FAIL;
|
||||||
|
|
||||||
// TODO: add max retry cnt
|
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
||||||
tscDebug("not ready, retry");
|
tscDebug("not ready, retry");
|
||||||
taosMsleep(500);
|
taosMsleep(500);
|
||||||
|
@ -683,7 +682,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
code = 0;
|
code = 0;
|
||||||
FAIL:
|
FAIL:
|
||||||
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
|
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
|
||||||
if (code != 0) {
|
if (code != 0 && buf) {
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
@ -1265,6 +1264,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
|
||||||
return (TAOS_RES*)rspObj;
|
return (TAOS_RES*)rspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// in no topic status also need process delayed task
|
||||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
|
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1285,6 +1285,9 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
tsem_timewait(&tmq->rspSem, leftTime * 1000);
|
tsem_timewait(&tmq->rspSem, leftTime * 1000);
|
||||||
|
} else {
|
||||||
|
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
|
||||||
|
tsem_timewait(&tmq->rspSem, 500 * 1000);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,10 +23,8 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
// MQ_CONSUMER_STATUS__INIT = 1,
|
|
||||||
MQ_CONSUMER_STATUS__MODIFY = 1,
|
MQ_CONSUMER_STATUS__MODIFY = 1,
|
||||||
MQ_CONSUMER_STATUS__MODIFY_IN_REB,
|
MQ_CONSUMER_STATUS__MODIFY_IN_REB,
|
||||||
// MQ_CONSUMER_STATUS__IDLE,
|
|
||||||
MQ_CONSUMER_STATUS__READY,
|
MQ_CONSUMER_STATUS__READY,
|
||||||
MQ_CONSUMER_STATUS__LOST,
|
MQ_CONSUMER_STATUS__LOST,
|
||||||
MQ_CONSUMER_STATUS__LOST_IN_REB,
|
MQ_CONSUMER_STATUS__LOST_IN_REB,
|
||||||
|
|
|
@ -469,6 +469,7 @@ enum {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
char cgroup[TSDB_CGROUP_LEN];
|
||||||
|
char appId[TSDB_CGROUP_LEN];
|
||||||
int8_t updateType; // used only for update
|
int8_t updateType; // used only for update
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
|
@ -479,6 +480,14 @@ typedef struct {
|
||||||
SArray* currentTopics; // SArray<char*>
|
SArray* currentTopics; // SArray<char*>
|
||||||
SArray* rebNewTopics; // SArray<char*>
|
SArray* rebNewTopics; // SArray<char*>
|
||||||
SArray* rebRemovedTopics; // SArray<char*>
|
SArray* rebRemovedTopics; // SArray<char*>
|
||||||
|
|
||||||
|
// data for display
|
||||||
|
int32_t pid;
|
||||||
|
SEpSet ep;
|
||||||
|
int64_t upTime;
|
||||||
|
int64_t subscribeTime;
|
||||||
|
int64_t rebalanceTime;
|
||||||
|
|
||||||
} SMqConsumerObj;
|
} SMqConsumerObj;
|
||||||
|
|
||||||
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
|
||||||
|
|
|
@ -37,6 +37,8 @@
|
||||||
|
|
||||||
static int8_t mqInRebFlag = 0;
|
static int8_t mqInRebFlag = 0;
|
||||||
|
|
||||||
|
static const char *mndConsumerStatusName(int status);
|
||||||
|
|
||||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer);
|
||||||
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
|
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer);
|
||||||
|
@ -62,6 +64,10 @@ int32_t mndInitConsumer(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_ASK_EP, mndProcessAskEpReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_CONSUMER_LOST, mndProcessConsumerLostMsg);
|
||||||
|
|
||||||
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndRetrieveConsumer);
|
||||||
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONSUMERS, mndCancelGetNextConsumer);
|
||||||
|
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -366,7 +372,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
||||||
if (pConsumerOld == NULL) {
|
if (pConsumerOld == NULL) {
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
/*pConsumerNew->waitingRebTopics = newSub;*/
|
|
||||||
pConsumerNew->rebNewTopics = newSub;
|
pConsumerNew->rebNewTopics = newSub;
|
||||||
subscribe.topicNames = NULL;
|
subscribe.topicNames = NULL;
|
||||||
|
|
||||||
|
@ -389,7 +394,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
|
||||||
goto SUBSCRIBE_OVER;
|
goto SUBSCRIBE_OVER;
|
||||||
}
|
}
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
/*pConsumerOld->waitingRebTopics = newSub;*/
|
|
||||||
|
|
||||||
int32_t oldTopicNum = 0;
|
int32_t oldTopicNum = 0;
|
||||||
if (pConsumerOld->currentTopics) {
|
if (pConsumerOld->currentTopics) {
|
||||||
|
@ -532,6 +536,7 @@ CM_DECODE_OVER:
|
||||||
|
|
||||||
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||||
mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId);
|
mTrace("consumer:%" PRId64 ", perform insert action", pConsumer->consumerId);
|
||||||
|
pConsumer->subscribeTime = pConsumer->upTime;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -557,6 +562,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
|
pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics;
|
||||||
pNewConsumer->rebRemovedTopics = tmp;
|
pNewConsumer->rebRemovedTopics = tmp;
|
||||||
|
|
||||||
|
pOldConsumer->subscribeTime = pNewConsumer->upTime;
|
||||||
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
|
||||||
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
|
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
|
||||||
|
@ -565,9 +572,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
char *topic = strdup(taosArrayGetP(pOldConsumer->currentTopics, i));
|
char *topic = strdup(taosArrayGetP(pOldConsumer->currentTopics, i));
|
||||||
taosArrayPush(pNewConsumer->rebRemovedTopics, &topic);
|
taosArrayPush(pNewConsumer->rebRemovedTopics, &topic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||||
|
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST;
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) {
|
||||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
|
|
||||||
|
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||||
|
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
|
||||||
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);
|
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);
|
||||||
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
|
ASSERT(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
|
||||||
|
@ -612,6 +625,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||||
|
|
||||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
|
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
|
||||||
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
|
ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);
|
||||||
|
@ -668,6 +684,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
||||||
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOldConsumer->rebalanceTime = pNewConsumer->upTime;
|
||||||
|
|
||||||
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
atomic_add_fetch_32(&pOldConsumer->epoch, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -688,3 +707,104 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbRelease(pSdb, pConsumer);
|
sdbRelease(pSdb, pConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndRetrieveConsumer(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||||
|
SMnode *pMnode = pReq->pNode;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
int32_t numOfRows = 0;
|
||||||
|
SMqConsumerObj *pConsumer = NULL;
|
||||||
|
|
||||||
|
while (numOfRows < rowsCapacity) {
|
||||||
|
pShow->pIter = sdbFetch(pSdb, SDB_CONSUMER, pShow->pIter, (void **)&pConsumer);
|
||||||
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
|
SColumnInfoData *pColInfo;
|
||||||
|
int32_t cols = 0;
|
||||||
|
|
||||||
|
taosRLockLatch(&pConsumer->lock);
|
||||||
|
|
||||||
|
// consumer id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->consumerId, false);
|
||||||
|
|
||||||
|
// group id
|
||||||
|
char groupId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tstrncpy(varDataVal(groupId), pConsumer->cgroup, TSDB_CGROUP_LEN);
|
||||||
|
varDataSetLen(groupId, strlen(varDataVal(groupId)));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)groupId, false);
|
||||||
|
|
||||||
|
// app id
|
||||||
|
char appId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tstrncpy(varDataVal(appId), pConsumer->appId, TSDB_CGROUP_LEN);
|
||||||
|
varDataSetLen(appId, strlen(varDataVal(appId)));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)appId, false);
|
||||||
|
|
||||||
|
// status
|
||||||
|
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
|
||||||
|
varDataSetLen(status, strlen(varDataVal(status)));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)status, false);
|
||||||
|
|
||||||
|
// subscribed topics
|
||||||
|
char topics[TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
char *showStr = taosShowStrArray(pConsumer->currentTopics);
|
||||||
|
tstrncpy(varDataVal(topics), showStr, TSDB_SHOW_LIST_LEN);
|
||||||
|
taosMemoryFree(showStr);
|
||||||
|
varDataSetLen(topics, strlen(varDataVal(topics)));
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)topics, false);
|
||||||
|
|
||||||
|
// pid
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true);
|
||||||
|
|
||||||
|
// end point
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);
|
||||||
|
|
||||||
|
// up time
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->upTime, false);
|
||||||
|
|
||||||
|
// subscribe time
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->subscribeTime, false);
|
||||||
|
|
||||||
|
// rebalance time
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->rebalanceTime, pConsumer->rebalanceTime == 0);
|
||||||
|
|
||||||
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
|
sdbRelease(pSdb, pConsumer);
|
||||||
|
|
||||||
|
numOfRows++;
|
||||||
|
}
|
||||||
|
|
||||||
|
pShow->numOfRows += numOfRows;
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
static const char *mndConsumerStatusName(int status) {
|
||||||
|
switch (status) {
|
||||||
|
case MQ_CONSUMER_STATUS__READY:
|
||||||
|
return "ready";
|
||||||
|
case MQ_CONSUMER_STATUS__LOST:
|
||||||
|
case MQ_CONSUMER_STATUS__LOST_REBD:
|
||||||
|
case MQ_CONSUMER_STATUS__LOST_IN_REB:
|
||||||
|
return "lost";
|
||||||
|
case MQ_CONSUMER_STATUS__MODIFY:
|
||||||
|
case MQ_CONSUMER_STATUS__MODIFY_IN_REB:
|
||||||
|
return "rebalancing";
|
||||||
|
default:
|
||||||
|
return "unknown";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -43,6 +43,8 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pConsumer->upTime = taosGetTimestampMs();
|
||||||
|
|
||||||
return pConsumer;
|
return pConsumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,6 +69,12 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
|
tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumer->status);
|
tlen += taosEncodeFixedI32(buf, pConsumer->status);
|
||||||
|
|
||||||
|
tlen += taosEncodeFixedI32(buf, pConsumer->pid);
|
||||||
|
tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pConsumer->upTime);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);
|
||||||
|
|
||||||
// current topics
|
// current topics
|
||||||
if (pConsumer->currentTopics) {
|
if (pConsumer->currentTopics) {
|
||||||
sz = taosArrayGetSize(pConsumer->currentTopics);
|
sz = taosArrayGetSize(pConsumer->currentTopics);
|
||||||
|
@ -114,6 +122,12 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
|
buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumer->status);
|
buf = taosDecodeFixedI32(buf, &pConsumer->status);
|
||||||
|
|
||||||
|
buf = taosDecodeFixedI32(buf, &pConsumer->pid);
|
||||||
|
buf = taosDecodeSEpSet(buf, &pConsumer->ep);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pConsumer->upTime);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);
|
||||||
|
|
||||||
// current topics
|
// current topics
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
|
pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
|
||||||
|
@ -329,6 +343,7 @@ int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEnt
|
||||||
tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
|
tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
|
void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
|
||||||
buf = taosDecodeFixedI32(buf, &pEntry->epoch);
|
buf = taosDecodeFixedI32(buf, &pEntry->epoch);
|
||||||
buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
|
buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
|
||||||
|
|
|
@ -199,23 +199,6 @@ static const SInfosTableSchema vgroupsSchema[] = {
|
||||||
{.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema consumerSchema[] = {
|
|
||||||
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
|
||||||
{.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
|
||||||
// ep
|
|
||||||
// up time
|
|
||||||
// topics
|
|
||||||
};
|
|
||||||
|
|
||||||
static const SInfosTableSchema subscribeSchema[] = {
|
|
||||||
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
|
||||||
{.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
};
|
|
||||||
|
|
||||||
static const SInfosTableSchema smaSchema[] = {
|
static const SInfosTableSchema smaSchema[] = {
|
||||||
{.name = "sma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "sma_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
|
@ -282,8 +265,6 @@ static const SInfosTableMeta infosMeta[] = {
|
||||||
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
{TSDB_INS_TABLE_USER_USERS, userUsersSchema, tListLen(userUsersSchema)},
|
||||||
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)},
|
{TSDB_INS_TABLE_LICENCES, grantsSchema, tListLen(grantsSchema)},
|
||||||
{TSDB_INS_TABLE_VGROUPS, vgroupsSchema, tListLen(vgroupsSchema)},
|
{TSDB_INS_TABLE_VGROUPS, vgroupsSchema, tListLen(vgroupsSchema)},
|
||||||
{TSDB_INS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
|
||||||
{TSDB_INS_TABLE_SUBSCRIBES, subscribeSchema, tListLen(subscribeSchema)},
|
|
||||||
{TSDB_INS_TABLE_TRANS, transSchema, tListLen(transSchema)},
|
{TSDB_INS_TABLE_TRANS, transSchema, tListLen(transSchema)},
|
||||||
{TSDB_INS_TABLE_SMAS, smaSchema, tListLen(smaSchema)},
|
{TSDB_INS_TABLE_SMAS, smaSchema, tListLen(smaSchema)},
|
||||||
{TSDB_INS_TABLE_CONFIGS, configSchema, tListLen(configSchema)},
|
{TSDB_INS_TABLE_CONFIGS, configSchema, tListLen(configSchema)},
|
||||||
|
|
|
@ -49,13 +49,15 @@ static const SPerfsTableSchema topicSchema[] = {
|
||||||
|
|
||||||
static const SPerfsTableSchema consumerSchema[] = {
|
static const SPerfsTableSchema consumerSchema[] = {
|
||||||
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
|
||||||
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "status", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "app_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "topics", .bytes = TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "topics", .bytes = TSDB_SHOW_LIST_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
|
{.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
|
{.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SPerfsTableSchema subscriptionSchema[] = {
|
static const SPerfsTableSchema subscriptionSchema[] = {
|
||||||
|
@ -63,6 +65,12 @@ static const SPerfsTableSchema subscriptionSchema[] = {
|
||||||
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "consumer_id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
};
|
||||||
|
|
||||||
|
static const SPerfsTableSchema offsetSchema[] = {
|
||||||
|
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
{.name = "group_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
|
||||||
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "committed_offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "committed_offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
{.name = "current_offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "current_offset", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
{.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
{.name = "skip_log_cnt", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
|
||||||
|
@ -74,6 +82,7 @@ static const SPerfsTableMeta perfsMeta[] = {
|
||||||
{TSDB_PERFS_TABLE_TOPICS, topicSchema, tListLen(topicSchema)},
|
{TSDB_PERFS_TABLE_TOPICS, topicSchema, tListLen(topicSchema)},
|
||||||
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
||||||
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
|
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
|
||||||
|
{TSDB_PERFS_TABLE_OFFSETS, offsetSchema, tListLen(offsetSchema)},
|
||||||
};
|
};
|
||||||
|
|
||||||
// connection/application/
|
// connection/application/
|
||||||
|
|
|
@ -482,3 +482,22 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
|
||||||
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
|
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
|
||||||
}
|
}
|
||||||
// TODO(yihaoDeng) add order array<type>
|
// TODO(yihaoDeng) add order array<type>
|
||||||
|
//
|
||||||
|
|
||||||
|
char* taosShowStrArray(const SArray* pArray) {
|
||||||
|
int32_t sz = pArray->size;
|
||||||
|
int32_t tlen = 0;
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
tlen += strlen(taosArrayGetP(pArray, i)) + 1;
|
||||||
|
}
|
||||||
|
char* res = taosMemoryCalloc(1, tlen);
|
||||||
|
char* buf = res;
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
char* str = taosArrayGetP(pArray, i);
|
||||||
|
int32_t len = strlen(str);
|
||||||
|
memcpy(buf, str, len);
|
||||||
|
buf += len;
|
||||||
|
if (i != sz - 1) *buf = ',';
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue