From 0fe0d6fc77b2bd2ef0b9012c11e3a197820bb644 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 13 Apr 2022 20:37:51 +0800 Subject: [PATCH] fix: fix tmq result parse --- example/src/tmq.c | 23 ++++++++--- include/common/tmsg.h | 15 +++----- source/client/src/clientMain.c | 24 +++++++----- source/client/src/tmq.c | 24 +++++++----- source/dnode/mnode/impl/src/mndSubscribe.c | 44 +++++++++++++++------- source/dnode/vnode/src/tq/tq.c | 32 ++++++++++------ 6 files changed, 102 insertions(+), 60 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 85dea5b382..832e389a13 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -19,8 +19,20 @@ #include #include "taos.h" -static int running = 1; -/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ +static int running = 1; +static void msg_process(TAOS_RES* msg) { + char buf[1024]; + printf("topic: %s\n", tmq_get_topic_name(msg)); + printf("vg:%d\n", tmq_get_vgroup_id(msg)); + while (1) { + TAOS_ROW row = taos_fetch_row(msg); + if (row == NULL) break; + TAOS_FIELD* fields = taos_fetch_fields(msg); + int32_t numOfFields = taos_field_count(msg); + taos_print_row(buf, row, fields, numOfFields); + printf("%s\n", buf); + } +} int32_t init_env() { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -42,8 +54,7 @@ int32_t init_env() { } taos_free_result(pRes); - pRes = - taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"); + pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c4 int) tags(t1 int)"); if (taos_errno(pRes) != 0) { printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); return -1; @@ -90,7 +101,7 @@ int32_t create_topic() { /*const char* sql = "select * from tu1";*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1 from ct1"); + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c4 from ct1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -200,7 +211,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { while (running) { TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); if (tmqmessage) { - /*msg_process(tmqmessage);*/ + msg_process(tmqmessage); tmq_message_destroy(tmqmessage); if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5d5889e308..1ded61eb3f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2365,11 +2365,10 @@ typedef struct { } SMqSubVgEp; typedef struct { - char topic[TSDB_TOPIC_FNAME_LEN]; - int8_t isSchemaAdaptive; - SArray* vgs; // SArray - int32_t numOfFields; - TAOS_FIELD* fields; + char topic[TSDB_TOPIC_FNAME_LEN]; + int8_t isSchemaAdaptive; + SArray* vgs; // SArray + SSchemaWrapper schema; } SMqSubTopicEp; typedef struct { @@ -2468,8 +2467,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); tlen += tEncodeSMqSubVgEp(buf, pVgEp); } - tlen += taosEncodeFixedI32(buf, pTopicEp->numOfFields); - // tlen += taosEncodeBinary(buf, pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD)); + tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema); return tlen; } @@ -2487,8 +2485,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE buf = tDecodeSMqSubVgEp(buf, &vgEp); taosArrayPush(pTopicEp->vgs, &vgEp); } - buf = taosDecodeFixedI32(buf, &pTopicEp->numOfFields); - // buf = taosDecodeBinary(buf, (void**)&pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD)); + buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema); return buf; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index c93260be37..fa55e9c295 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -171,21 +171,25 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return doFetchRow(pRequest, true, true); } else if (TD_RES_TMQ(res)) { - SMqRspObj *msg = ((SMqRspObj *)res); + SMqRspObj *msg = ((SMqRspObj *)res); + if (msg->resIter == -1) msg->resIter++; SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter); - - doSetOneRowPtr(pResultInfo); - pResultInfo->current += 1; - - if (pResultInfo->row == NULL) { - msg->resIter++; - pResultInfo = taosArrayGet(msg->res, msg->resIter); + if (pResultInfo->current < pResultInfo->numOfRows) { doSetOneRowPtr(pResultInfo); pResultInfo->current += 1; + return pResultInfo->row; + } else { + msg->resIter++; + if (msg->resIter < taosArrayGetSize(msg->res)) { + pResultInfo = taosArrayGet(msg->res, msg->resIter); + doSetOneRowPtr(pResultInfo); + pResultInfo->current += 1; + return pResultInfo->row; + } else { + return NULL; + } } - return pResultInfo->row; - } else { // assert to avoid uninitialization error ASSERT(0); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index ea31170b1f..2b69b47865 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -119,14 +119,14 @@ typedef struct { typedef struct { // subscribe info - int32_t sqlLen; - char* sql; - char* topicName; - int64_t topicId; - SArray* vgs; // SArray - int8_t isSchemaAdaptive; - int32_t numOfFields; - TAOS_FIELD* fields; + int32_t sqlLen; + char* sql; + char* topicName; + int64_t topicId; + SArray* vgs; // SArray + int8_t isSchemaAdaptive; + int32_t numOfFields; + SSchemaWrapper schema; } SMqClientTopic; typedef struct { @@ -956,6 +956,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { for (int32_t i = 0; i < topicNumGet; i++) { SMqClientTopic topic = {0}; SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); + topic.schema = pTopicEp->schema; taosHashClear(pHash); topic.topicName = strdup(pTopicEp->topic); @@ -1191,7 +1192,10 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { for (int32_t i = 0; i < blockNum; i++) { int32_t pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i); SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos); - SReqResultInfo resInfo; + SReqResultInfo resInfo = {0}; + resInfo.totalRows = 0; + resInfo.precision = TSDB_TIME_PRECISION_MILLI; + setResSchemaInfo(&resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); setQueryResultFromRsp(&resInfo, pRetrieve, true); taosArrayPush(pRspObj->res, &resInfo); } @@ -1386,7 +1390,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { rspWrapper = NULL; continue; } - // build msg + // build rsp SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); return pRsp; } else { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index dbd5e43b6d..e1461c0eba 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -60,8 +60,10 @@ static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg); static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, const SMqConsumerEp *pConsumerEp); -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName); -static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName); +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, + const char *topicName); +static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, + const char *oldTopicName); int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_SUBSCRIBE, @@ -102,7 +104,8 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj return pSub; } -static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* topicName) { +static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, + const char *topicName) { SMqMVRebReq req = { .vgId = pConsumerEp->vgId, .oldConsumerId = pConsumerEp->oldConsumerId, @@ -131,7 +134,8 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume return 0; } -static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* topicName) { +static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, + const char *topicName) { ASSERT(pConsumerEp->oldConsumerId != -1); void *buf; @@ -158,7 +162,8 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC return 0; } -static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, const char* oldTopicName) { +static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp, + const char *oldTopicName) { SMqCancelConnReq req = {0}; req.consumerId = pConsumerEp->consumerId; req.vgId = pConsumerEp->vgId; @@ -182,7 +187,8 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum return 0; } -static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, const char* oldTopicName) { +static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp, + const char *oldTopicName) { void *buf; int32_t tlen; if (mndBuildCancelConnReq(&buf, &tlen, pConsumerEp, oldTopicName) < 0) { @@ -219,13 +225,14 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; return -1; } - //TODO add lock + // TODO add lock ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); - int32_t serverEpoch = pConsumer->epoch; + int32_t serverEpoch = pConsumer->epoch; // TODO int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus); - mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch, hbStatus); + mDebug("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, serverEpoch, + hbStatus); atomic_store_32(&pConsumer->hbStatus, 0); /*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ @@ -233,7 +240,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { strcpy(rsp.cgroup, pReq->cgroup); if (epoch != serverEpoch) { - mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch, serverEpoch); + mInfo("send new assignment to consumer %ld, consumer epoch %d, server epoch %d", pConsumer->consumerId, epoch, + serverEpoch); mDebug("consumer %ld try r lock", consumerId); taosRLockLatch(&pConsumer->lock); mDebug("consumer %ld r locked", consumerId); @@ -251,8 +259,15 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) { if (consumerId == pSubConsumer->consumerId) { int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); mInfo("topic %s has %d vg", topicName, serverEpoch); + SMqSubTopicEp topicEp; strcpy(topicEp.topic, topicName); + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topicName); + ASSERT(pTopic != NULL); + topicEp.schema = pTopic->schema; + mndReleaseTopic(pMnode, pTopic); + topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); for (int32_t k = 0; k < vgsz; k++) { char offsetKey[TSDB_PARTITION_KEY_LEN]; @@ -409,7 +424,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); taosMemoryFreeClear(pRebSub->key); - mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, (int32_t)taosArrayGetSize(pSub->unassignedVg)); + mInfo("mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d", pSub->key, pSub->vgNum, + (int32_t)taosArrayGetSize(pSub->unassignedVg)); // remove lost consumer for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { @@ -459,12 +475,12 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { mDebug("consumer %ld try w lock", pRebConsumer->consumerId); taosWLockLatch(&pRebConsumer->lock); mDebug("consumer %ld w locked", pRebConsumer->consumerId); - int32_t status = atomic_load_32(&pRebConsumer->status); + int32_t status = atomic_load_32(&pRebConsumer->status); if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb || (vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) || (vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)) { /*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/ - /*pRebConsumer->epoch++;*/ + /*pRebConsumer->epoch++;*/ /*}*/ if (vgThisConsumerAfterRb != 0) { atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE); @@ -500,7 +516,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { pConsumerEp->oldConsumerId = pConsumerEp->consumerId; pConsumerEp->consumerId = pSubConsumer->consumerId; - //TODO + // TODO pConsumerEp->epoch = 0; taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 654d7c3a65..f08b39b907 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -551,30 +551,40 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { rspV2.rspOffset = fetchOffset; int32_t blockSz = taosArrayGetSize(pRes); - int32_t tlen = 0; + int32_t dataBlockStrLen = 0; for (int32_t i = 0; i < blockSz; i++) { SSDataBlock* pBlock = taosArrayGet(pRes, i); - tlen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + dataBlockStrLen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); } - void* data = taosMemoryMalloc(tlen); - if (data == NULL) { + void* dataBlockBuf = taosMemoryMalloc(dataBlockStrLen); + if (dataBlockBuf == NULL) { pMsg->code = -1; taosMemoryFree(pHead); } - rspV2.blockData = data; + rspV2.blockData = dataBlockBuf; - void* dataBlockBuf = data; int32_t pos; + rspV2.blockPos = taosArrayInit(blockSz, sizeof(int32_t)); for (int32_t i = 0; i < blockSz; i++) { pos = 0; - SSDataBlock* pBlock = taosArrayGet(pRes, i); - blockCompressEncode(pBlock, dataBlockBuf, &pos, pBlock->info.numOfCols, false); + SSDataBlock* pBlock = taosArrayGet(pRes, i); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)dataBlockBuf; + pRetrieve->useconds = 0; + pRetrieve->precision = 0; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(pBlock->info.rows); + blockCompressEncode(pBlock, pRetrieve->data, &pos, pBlock->info.numOfCols, false); taosArrayPush(rspV2.blockPos, &rspV2.dataLen); - rspV2.dataLen += pos; - dataBlockBuf = POINTER_SHIFT(dataBlockBuf, pos); + + int32_t totLen = sizeof(SRetrieveTableRsp) + pos; + pRetrieve->compLen = htonl(totLen); + rspV2.dataLen += totLen; + dataBlockBuf = POINTER_SHIFT(dataBlockBuf, totLen); } + ASSERT(POINTER_DISTANCE(dataBlockBuf, rspV2.blockData) <= dataBlockStrLen); int32_t msgLen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2); void* buf = rpcMallocCont(msgLen); @@ -590,7 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/ pMsg->pCont = buf; - pMsg->contLen = tlen; + pMsg->contLen = msgLen; pMsg->code = 0; vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch);