feat:add parameters for consumer & add offset rows for subscription
This commit is contained in:
parent
d7a4817124
commit
aa610b27a4
|
@ -809,6 +809,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
||||||
offRows->vgId = pVg->vgId;
|
offRows->vgId = pVg->vgId;
|
||||||
offRows->rows = pVg->numOfRows;
|
offRows->rows = pVg->numOfRows;
|
||||||
offRows->offset = pVg->offsetInfo.committedOffset;
|
offRows->offset = pVg->offsetInfo.committedOffset;
|
||||||
|
tscDebug("report row:%lldd, offset:%" PRId64, offRows->rows, offRows->offset.version);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1695,7 +1696,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||||
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
||||||
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
||||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
@ -1710,6 +1711,13 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// extract the rows in this data packet
|
||||||
|
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
||||||
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||||
|
int64_t rows = htobe64(pRetrieve->numOfRows);
|
||||||
|
pVg->numOfRows += rows;
|
||||||
|
(*numOfRows) += rows;
|
||||||
|
}
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2007,7 +2015,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
|
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
|
||||||
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||||
} else {
|
} else {
|
||||||
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
|
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq->totalRows += numOfRows;
|
tmq->totalRows += numOfRows;
|
||||||
|
|
|
@ -595,6 +595,7 @@ typedef struct {
|
||||||
int64_t stbUid;
|
int64_t stbUid;
|
||||||
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
|
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
|
||||||
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
||||||
|
SArray* offsetRows;
|
||||||
char dbName[TSDB_DB_FNAME_LEN];
|
char dbName[TSDB_DB_FNAME_LEN];
|
||||||
} SMqSubscribeObj;
|
} SMqSubscribeObj;
|
||||||
|
|
||||||
|
|
|
@ -515,7 +515,6 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
||||||
SMqConsumerEp newEp = {
|
SMqConsumerEp newEp = {
|
||||||
.consumerId = pConsumerEp->consumerId,
|
.consumerId = pConsumerEp->consumerId,
|
||||||
.vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
|
.vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
|
||||||
.offsetRows = NULL,
|
|
||||||
};
|
};
|
||||||
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
|
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
|
||||||
}
|
}
|
||||||
|
@ -535,6 +534,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
|
||||||
}
|
}
|
||||||
taosHashCleanup(pSub->consumerHash);
|
taosHashCleanup(pSub->consumerHash);
|
||||||
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
|
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
|
||||||
|
taosArrayDestroy(pSub->offsetRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
||||||
|
@ -561,6 +561,23 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
||||||
if (cnt != sz) return -1;
|
if (cnt != sz) return -1;
|
||||||
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
|
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
|
||||||
tlen += taosEncodeString(buf, pSub->dbName);
|
tlen += taosEncodeString(buf, pSub->dbName);
|
||||||
|
|
||||||
|
int32_t szVgs = taosArrayGetSize(pSub->offsetRows);
|
||||||
|
tlen += taosEncodeFixedI32(buf, szVgs);
|
||||||
|
for (int32_t j= 0; j < szVgs; ++j) {
|
||||||
|
OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j);
|
||||||
|
tlen += taosEncodeFixedI32(buf, offRows->vgId);
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->rows);
|
||||||
|
tlen += taosEncodeFixedI8(buf, offRows->offset.type);
|
||||||
|
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
|
||||||
|
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||||
|
tlen += taosEncodeFixedI64(buf, offRows->offset.version);
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,6 +602,29 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
|
||||||
|
|
||||||
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
||||||
buf = taosDecodeStringTo(buf, pSub->dbName);
|
buf = taosDecodeStringTo(buf, pSub->dbName);
|
||||||
|
|
||||||
|
if (sver > 1){
|
||||||
|
int32_t szVgs = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &szVgs);
|
||||||
|
if(szVgs > 0){
|
||||||
|
pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
|
||||||
|
if (NULL == pSub->offsetRows) return NULL;
|
||||||
|
for (int32_t j= 0; j < szVgs; ++j) {
|
||||||
|
OffsetRows* offRows = taosArrayReserve(pSub->offsetRows, 1);
|
||||||
|
buf = taosDecodeFixedI32(buf, &offRows->vgId);
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->rows);
|
||||||
|
buf = taosDecodeFixedI8(buf, &offRows->offset.type);
|
||||||
|
if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
|
||||||
|
} else if (offRows->offset.type == TMQ_OFFSET__LOG) {
|
||||||
|
buf = taosDecodeFixedI64(buf, &offRows->offset.version);
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
return (void *)buf;
|
return (void *)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -449,8 +449,26 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
|
|
||||||
SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
|
SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter;
|
||||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||||
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned
|
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed
|
||||||
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned
|
||||||
|
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
|
||||||
|
if(pSub){
|
||||||
|
taosRLockLatch(&pSub->lock);
|
||||||
|
if(pOutput->pSub->offsetRows == NULL){
|
||||||
|
pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows));
|
||||||
|
}else{
|
||||||
|
taosArrayClear(pOutput->pSub->offsetRows);
|
||||||
|
}
|
||||||
|
pIter = NULL;
|
||||||
|
while(1){
|
||||||
|
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
|
taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows);
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pSub->lock);
|
||||||
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -890,6 +908,10 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
|
||||||
pOldSub->unassignedVgs = pNewSub->unassignedVgs;
|
pOldSub->unassignedVgs = pNewSub->unassignedVgs;
|
||||||
pNewSub->unassignedVgs = tmp1;
|
pNewSub->unassignedVgs = tmp1;
|
||||||
|
|
||||||
|
SArray *tmp2 = pOldSub->offsetRows;
|
||||||
|
pOldSub->offsetRows = pNewSub->offsetRows;
|
||||||
|
pNewSub->offsetRows = tmp2;
|
||||||
|
|
||||||
taosWUnLockLatch(&pOldSub->lock);
|
taosWUnLockLatch(&pOldSub->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1028,6 +1050,61 @@ END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){
|
||||||
|
int32_t sz = taosArrayGetSize(vgs);
|
||||||
|
for (int32_t j = 0; j < sz; j++) {
|
||||||
|
SMqVgEp *pVgEp = taosArrayGetP(vgs, j);
|
||||||
|
|
||||||
|
SColumnInfoData *pColInfo;
|
||||||
|
int32_t cols = 0;
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false);
|
||||||
|
|
||||||
|
// vg id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false);
|
||||||
|
|
||||||
|
// consumer id
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)&consumerId, consumerId == -1);
|
||||||
|
|
||||||
|
mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
||||||
|
consumerId, varDataVal(cgroup), pVgEp->vgId);
|
||||||
|
|
||||||
|
// offset
|
||||||
|
OffsetRows *data = NULL;
|
||||||
|
for(int i = 0; i < taosArrayGetSize(offsetRows); i++){
|
||||||
|
OffsetRows *tmp = taosArrayGet(offsetRows, i);
|
||||||
|
if(tmp->vgId != pVgEp->vgId){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
data = tmp;
|
||||||
|
}
|
||||||
|
if(data){
|
||||||
|
// vg id
|
||||||
|
char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
|
||||||
|
varDataSetLen(buf, strlen(varDataVal(buf)));
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false);
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false);
|
||||||
|
}else{
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetNULL(pColInfo, *numOfRows);
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetNULL(pColInfo, *numOfRows);
|
||||||
|
mError("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
|
||||||
|
}
|
||||||
|
(*numOfRows)++;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -1048,6 +1125,13 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
|
blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// topic and cgroup
|
||||||
|
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
|
||||||
|
varDataSetLen(topic, strlen(varDataVal(topic)));
|
||||||
|
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
||||||
|
|
||||||
SMqConsumerEp *pConsumerEp = NULL;
|
SMqConsumerEp *pConsumerEp = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1055,121 +1139,11 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
pConsumerEp = (SMqConsumerEp *)pIter;
|
pConsumerEp = (SMqConsumerEp *)pIter;
|
||||||
|
|
||||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows);
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
|
||||||
|
|
||||||
SColumnInfoData *pColInfo;
|
|
||||||
int32_t cols = 0;
|
|
||||||
|
|
||||||
// topic and cgroup
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
|
|
||||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
|
||||||
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
|
|
||||||
|
|
||||||
// vg id
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
|
||||||
|
|
||||||
// consumer id
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
|
||||||
|
|
||||||
mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
|
||||||
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
|
|
||||||
|
|
||||||
// offset
|
|
||||||
OffsetRows *data = NULL;
|
|
||||||
for(int i = 0; i < taosArrayGetSize(pConsumerEp->offsetRows); i++){
|
|
||||||
OffsetRows *tmp = taosArrayGet(pConsumerEp->offsetRows, i);
|
|
||||||
if(data->vgId != pVgEp->vgId){
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
data = tmp;
|
|
||||||
}
|
|
||||||
if(data){
|
|
||||||
// vg id
|
|
||||||
char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset);
|
|
||||||
varDataSetLen(buf, strlen(varDataVal(buf)));
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)buf, false);
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&data->rows, false);
|
|
||||||
}else{
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetNULL(pColInfo, numOfRows);
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetNULL(pColInfo, numOfRows);
|
|
||||||
mError("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId);
|
|
||||||
}
|
|
||||||
//#if 0
|
|
||||||
// // subscribe time
|
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
// colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
|
||||||
//
|
|
||||||
// // rebalance time
|
|
||||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
// colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
|
||||||
//#endif
|
|
||||||
|
|
||||||
numOfRows++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// do not show for cleared subscription
|
// do not show for cleared subscription
|
||||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
|
||||||
|
|
||||||
SColumnInfoData *pColInfo;
|
|
||||||
int32_t cols = 0;
|
|
||||||
|
|
||||||
// topic and cgroup
|
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
|
||||||
mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false);
|
|
||||||
varDataSetLen(topic, strlen(varDataVal(topic)));
|
|
||||||
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)topic, false);
|
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
|
|
||||||
|
|
||||||
// vg id
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false);
|
|
||||||
|
|
||||||
// consumer id
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, NULL, true);
|
|
||||||
|
|
||||||
mDebug("mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup),
|
|
||||||
pVgEp->vgId);
|
|
||||||
|
|
||||||
// offset
|
|
||||||
#if 0
|
|
||||||
// subscribe time
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false);
|
|
||||||
|
|
||||||
// rebalance time
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
numOfRows++;
|
|
||||||
}
|
|
||||||
|
|
||||||
pBlock->info.rows = numOfRows;
|
pBlock->info.rows = numOfRows;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,313 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
sys.path.append("./7-tmq")
|
||||||
|
from tmqCommon import *
|
||||||
|
|
||||||
|
class actionType(Enum):
|
||||||
|
CREATE_DATABASE = 0
|
||||||
|
CREATE_STABLE = 1
|
||||||
|
CREATE_CTABLE = 2
|
||||||
|
INSERT_DATA = 3
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
hostname = socket.gethostname()
|
||||||
|
#rpcDebugFlagVal = '143'
|
||||||
|
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||||
|
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||||
|
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||||
|
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||||
|
#print ("===================: ", updatecfgDict)
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||||
|
|
||||||
|
def getBuildPath(self):
|
||||||
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
if ("community" in selfPath):
|
||||||
|
projPath = selfPath[:selfPath.find("community")]
|
||||||
|
else:
|
||||||
|
projPath = selfPath[:selfPath.find("tests")]
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(projPath):
|
||||||
|
if ("taosd" in files or "taosd.exe" in files):
|
||||||
|
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||||
|
if ("packaging" not in rootRealPath):
|
||||||
|
buildPath = root[:len(root) - len("/build/bin")]
|
||||||
|
break
|
||||||
|
return buildPath
|
||||||
|
|
||||||
|
def newcur(self,cfg,host,port):
|
||||||
|
user = "root"
|
||||||
|
password = "taosdata"
|
||||||
|
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
|
||||||
|
cur=con.cursor()
|
||||||
|
print(cur)
|
||||||
|
return cur
|
||||||
|
|
||||||
|
def initConsumerTable(self,cdbName='cdb'):
|
||||||
|
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||||
|
tdSql.query("create database if not exists %s vgroups 1 wal_retention_period 3600"%(cdbName))
|
||||||
|
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||||
|
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||||
|
|
||||||
|
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||||
|
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||||
|
|
||||||
|
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||||
|
tdLog.info("drop consumeinfo table")
|
||||||
|
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||||
|
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||||
|
|
||||||
|
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||||
|
sql = "insert into %s.consumeinfo values "%cdbName
|
||||||
|
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||||
|
tdLog.info("consume info sql: %s"%sql)
|
||||||
|
tdSql.query(sql)
|
||||||
|
|
||||||
|
def selectConsumeResult(self,expectRows,cdbName='cdb'):
|
||||||
|
resultList=[]
|
||||||
|
while 1:
|
||||||
|
tdSql.query("select * from %s.consumeresult"%cdbName)
|
||||||
|
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||||
|
if tdSql.getRows() == expectRows:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
for i in range(expectRows):
|
||||||
|
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||||
|
resultList.append(tdSql.getData(i , 3))
|
||||||
|
|
||||||
|
return resultList
|
||||||
|
|
||||||
|
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||||
|
if valgrind == 1:
|
||||||
|
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||||
|
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||||
|
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||||
|
|
||||||
|
if (platform.system().lower() == 'windows'):
|
||||||
|
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||||
|
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||||
|
shellCmd += "> nul 2>&1 &"
|
||||||
|
else:
|
||||||
|
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||||
|
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||||
|
shellCmd += "> /dev/null 2>&1 &"
|
||||||
|
tdLog.info(shellCmd)
|
||||||
|
os.system(shellCmd)
|
||||||
|
|
||||||
|
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
|
||||||
|
if dropFlag == 1:
|
||||||
|
tsql.execute("drop database if exists %s"%(dbName))
|
||||||
|
|
||||||
|
tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period 3600"%(dbName, vgroups, replica))
|
||||||
|
tdLog.debug("complete to create database %s"%(dbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def create_stable(self,tsql, dbName,stbName):
|
||||||
|
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName))
|
||||||
|
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def create_ctables(self,tsql, dbName,stbName,ctbNum):
|
||||||
|
tsql.execute("use %s" %dbName)
|
||||||
|
pre_create = "create table"
|
||||||
|
sql = pre_create
|
||||||
|
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||||
|
for i in range(ctbNum):
|
||||||
|
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
|
||||||
|
if (i > 0) and (i%100 == 0):
|
||||||
|
tsql.execute(sql)
|
||||||
|
sql = pre_create
|
||||||
|
if sql != pre_create:
|
||||||
|
tsql.execute(sql)
|
||||||
|
|
||||||
|
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0):
|
||||||
|
tdLog.debug("start to insert data ............")
|
||||||
|
tsql.execute("use %s" %dbName)
|
||||||
|
pre_insert = "insert into "
|
||||||
|
sql = pre_insert
|
||||||
|
|
||||||
|
if startTs == 0:
|
||||||
|
t = time.time()
|
||||||
|
startTs = int(round(t * 1000))
|
||||||
|
|
||||||
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
|
rowsOfSql = 0
|
||||||
|
for i in range(ctbNum):
|
||||||
|
sql += " %s_%d values "%(stbName,i)
|
||||||
|
for j in range(rowsPerTbl):
|
||||||
|
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
||||||
|
rowsOfSql += 1
|
||||||
|
if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
|
||||||
|
tsql.execute(sql)
|
||||||
|
rowsOfSql = 0
|
||||||
|
if j < rowsPerTbl - 1:
|
||||||
|
sql = "insert into %s_%d values " %(stbName,i)
|
||||||
|
else:
|
||||||
|
sql = "insert into "
|
||||||
|
#end sql
|
||||||
|
if sql != pre_insert:
|
||||||
|
#print("insert sql:%s"%sql)
|
||||||
|
tsql.execute(sql)
|
||||||
|
tdLog.debug("insert data ............ [OK]")
|
||||||
|
return
|
||||||
|
|
||||||
|
def prepareEnv(self, **parameterDict):
|
||||||
|
# create new connector for my thread
|
||||||
|
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||||
|
|
||||||
|
if parameterDict["actionType"] == actionType.CREATE_DATABASE:
|
||||||
|
self.create_database(tsql, parameterDict["dbName"])
|
||||||
|
elif parameterDict["actionType"] == actionType.CREATE_STABLE:
|
||||||
|
self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"])
|
||||||
|
elif parameterDict["actionType"] == actionType.CREATE_CTABLE:
|
||||||
|
self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||||
|
elif parameterDict["actionType"] == actionType.INSERT_DATA:
|
||||||
|
self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"], \
|
||||||
|
parameterDict["rowsPerTbl"],parameterDict["batchNum"])
|
||||||
|
else:
|
||||||
|
tdLog.exit("not support's action: ", parameterDict["actionType"])
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
def tmqCase1(self, cfgPath, buildPath):
|
||||||
|
tdLog.printNoPrefix("======== test case 1: ")
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
|
# create and start thread
|
||||||
|
parameterDict = {'cfg': '', \
|
||||||
|
'actionType': 0, \
|
||||||
|
'dbName': 'db1', \
|
||||||
|
'dropFlag': 1, \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'replica': 1, \
|
||||||
|
'stbName': 'stb1', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
|
self.create_database(tdSql, parameterDict["dbName"])
|
||||||
|
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||||
|
|
||||||
|
tdLog.info("create topics from stb1")
|
||||||
|
topicFromStb1 = 'topic_stb1'
|
||||||
|
|
||||||
|
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
|
topicList = topicFromStb1
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 0
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:true,\
|
||||||
|
auto.commit.interval.ms:2000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
pollDelay = 10
|
||||||
|
showMsg = 1
|
||||||
|
showRow = 1
|
||||||
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
tdLog.info("start show subscriptions 1")
|
||||||
|
while(1):
|
||||||
|
tdSql.query("show subscriptions")
|
||||||
|
if (tdSql.getRows() == 0):
|
||||||
|
tdLog.info("sleep")
|
||||||
|
time.sleep(1)
|
||||||
|
elif (tdSql.queryResult[0][4] != None):
|
||||||
|
tdSql.checkData(0, 4, "offset(reset to earlieast)")
|
||||||
|
tdSql.checkData(0, 5, 0)
|
||||||
|
break
|
||||||
|
|
||||||
|
tdLog.info("start insert data")
|
||||||
|
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||||
|
self.insert_data(tdSql,\
|
||||||
|
parameterDict["dbName"],\
|
||||||
|
parameterDict["stbName"],\
|
||||||
|
parameterDict["ctbNum"],\
|
||||||
|
parameterDict["rowsPerTbl"],\
|
||||||
|
parameterDict["batchNum"])
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
tdLog.info("start show subscriptions 2")
|
||||||
|
tdSql.query("show subscriptions")
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
print(tdSql.queryResult)
|
||||||
|
# tdSql.checkData(0, 4, 'offset(log) ver:103')
|
||||||
|
tdSql.checkData(0, 5, 10000)
|
||||||
|
# tdSql.checkData(1, 4, 'offset(log) ver:103')
|
||||||
|
tdSql.checkData(1, 5, 10000)
|
||||||
|
# tdSql.checkData(2, 4, 'offset(log) ver:303')
|
||||||
|
tdSql.checkData(2, 5, 50000)
|
||||||
|
# tdSql.checkData(3, 4, 'offset(log) ver:239')
|
||||||
|
tdSql.checkData(3, 5, 30000)
|
||||||
|
|
||||||
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
|
expectRows = 1
|
||||||
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
tdLog.info("start show subscriptions 3")
|
||||||
|
tdSql.query("show subscriptions")
|
||||||
|
tdSql.checkRows(4)
|
||||||
|
print(tdSql.queryResult)
|
||||||
|
tdSql.checkData(0, 3, None)
|
||||||
|
# tdSql.checkData(0, 4, 'offset(log) ver:103')
|
||||||
|
tdSql.checkData(0, 5, 10000)
|
||||||
|
# tdSql.checkData(1, 4, 'offset(log) ver:103')
|
||||||
|
tdSql.checkData(1, 5, 10000)
|
||||||
|
# tdSql.checkData(2, 4, 'offset(log) ver:303')
|
||||||
|
tdSql.checkData(2, 5, 50000)
|
||||||
|
# tdSql.checkData(3, 4, 'offset(log) ver:239')
|
||||||
|
tdSql.checkData(3, 5, 30000)
|
||||||
|
|
||||||
|
tdSql.query("drop topic %s"%topicFromStb1)
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
tdSql.prepare()
|
||||||
|
buildPath = self.getBuildPath()
|
||||||
|
if (buildPath == ""):
|
||||||
|
tdLog.exit("taosd not found!")
|
||||||
|
else:
|
||||||
|
tdLog.info("taosd found in %s" % buildPath)
|
||||||
|
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||||
|
tdLog.info("cfgPath: %s" % cfgPath)
|
||||||
|
|
||||||
|
self.tmqCase1(cfgPath, buildPath)
|
||||||
|
# self.tmqCase2(cfgPath, buildPath)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -171,7 +171,7 @@ class TMQCom:
|
||||||
if dropFlag == 1:
|
if dropFlag == 1:
|
||||||
tsql.execute("drop database if exists %s"%(dbName))
|
tsql.execute("drop database if exists %s"%(dbName))
|
||||||
|
|
||||||
tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
|
tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period 3600"%(dbName, vgroups, replica))
|
||||||
tdLog.debug("complete to create database %s"%(dbName))
|
tdLog.debug("complete to create database %s"%(dbName))
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue