From c62304531a2752948eeb9a711a8cc80dd49e8cb8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 26 Jan 2022 09:56:32 +0800 Subject: [PATCH] fix query error --- include/common/tmsg.h | 3 +- include/libs/wal/wal.h | 3 +- source/client/src/clientImpl.c | 36 +++++++++++++++-------- source/dnode/mgmt/impl/src/dndTransport.c | 1 + source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/vnode/src/tq/tq.c | 9 +++++- source/libs/wal/CMakeLists.txt | 1 + source/libs/wal/src/walWrite.c | 2 +- 8 files changed, 39 insertions(+), 18 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6cc19fbdfb..ef5a854d20 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1630,6 +1630,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp // one req for one vg+topic typedef struct SMqConsumeReq { + SMsgHead head; //0: commit only, current offset //1: consume only, poll next offset //2: commit current and consume next offset @@ -1662,7 +1663,7 @@ typedef struct SMqCMGetSubEpRsp { static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { int32_t tlen = 0; - tlen += taosEncodeFixedI16(buf, pVgEp->vgId); + tlen += taosEncodeFixedI32(buf, pVgEp->vgId); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); return tlen; } diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 641b485f4c..45f1d88c30 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -19,6 +19,7 @@ #include "tarray.h" #include "tdef.h" #include "tlog.h" +#include "tmsg.h" #ifdef __cplusplus extern "C" { #endif @@ -159,7 +160,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg); void walClose(SWal *); // write -int64_t walWrite(SWal *, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen); +int64_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen); void walFsync(SWal *, bool force); // apis for lifecycle management diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a19b893e2f..df086463db 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -625,9 +625,13 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { + tmq_t* tmq = (tmq_t*)param; + if (code != 0) { + tsem_post(&tmq->rspSem); + return 0; + } tscDebug("tmq ask ep cb called"); bool set = false; - tmq_t* tmq = (tmq_t*)param; SMqCMGetSubEpRsp rsp; tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); int32_t sz = taosArrayGetSize(rsp.topics); @@ -642,6 +646,9 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { for (int32_t j = 0; j < vgSz; j++) { SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); SMqClientVg clientVg = { + .pollCnt = 0, + .committedOffset = -1, + .currentOffset = -1, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet }; @@ -657,13 +664,6 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { } tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { - SRequestObj *pRequest = NULL; - SMqConsumeReq req = {0}; - req.reqType = 1; - req.blockingTime = blocking_time; - req.consumerId = tmq->consumerId; - tmq_message_t* tmq_message = NULL; - strcpy(req.cgroup, tmq->groupId); if (taosArrayGetSize(tmq->clientTopics) == 0 || tmq->status == 0) { int32_t tlen = sizeof(SMqCMGetSubEpReq); @@ -674,7 +674,7 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { buf->consumerId = htobe64(tmq->consumerId); strcpy(buf->cgroup, tmq->groupId); - pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); + SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); if (pRequest == NULL) { tscError("failed to malloc subscribe ep request"); } @@ -699,16 +699,26 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { return NULL; } + SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq)); + pReq->reqType = 1; + pReq->blockingTime = blocking_time; + pReq->consumerId = tmq->consumerId; + tmq_message_t* tmq_message = NULL; + strcpy(pReq->cgroup, tmq->groupId); + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); - strcpy(req.topic, pTopic->topicName); + strcpy(pReq->topic, pTopic->topicName); int32_t nextVgIdx = pTopic->nextVgIdx; pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx); - req.offset = pVg->currentOffset; + pReq->offset = pVg->currentOffset; - pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) }; + pReq->head.vgId = htonl(pVg->vgId); + pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); + + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); + pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) }; SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); /*sendInfo->requestObjRefId = 0;*/ diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index f19d154616..8dfcc59d09 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -148,6 +148,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 71cf841f29..2a23618798 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -424,7 +424,7 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { return NULL; } pSub->lostConsumer = taosArrayInit(0, sizeof(SMqConsumerEp)); - if (pSub->idleConsumer == NULL) { + if (pSub->lostConsumer == NULL) { taosArrayDestroy(pSub->availConsumer); taosArrayDestroy(pSub->assigned); free(pSub); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fb9d6a213c..667b5bc43a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -679,6 +679,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { int rspLen = 0; STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); + ASSERT(pConsumer); int sz = taosArrayGetSize(pConsumer->topics); for (int i = 0; i < sz; i++) { @@ -775,6 +776,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) { } strcpy(pConsumer->cgroup, req.cgroup); pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle)); + pConsumer->consumerId = req.newConsumerId; + pConsumer->epoch = 0; STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); if (pTopic == NULL) { @@ -785,6 +788,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) { pTopic->sql = strdup(req.sql); pTopic->logicalPlan = strdup(req.logicalPlan); pTopic->physicalPlan = strdup(req.physicalPlan); + pTopic->committedOffset = -1; + pTopic->currentOffset = -1; pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; @@ -797,6 +802,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) { pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle); } taosArrayPush(pConsumer->topics, pTopic); + tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); + tqHandleCommit(pTq->tqMeta, req.newConsumerId); terrno = TSDB_CODE_SUCCESS; return 0; } @@ -821,7 +828,7 @@ void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ve } bool tqNextDataBlock(STqReadHandle* pHandle) { - while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) >= 0) { + while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock)) { if (pHandle->tbUid == pHandle->pBlock->uid) return true; } return false; diff --git a/source/libs/wal/CMakeLists.txt b/source/libs/wal/CMakeLists.txt index 4d2dd97c87..bcf759e04f 100644 --- a/source/libs/wal/CMakeLists.txt +++ b/source/libs/wal/CMakeLists.txt @@ -11,6 +11,7 @@ target_link_libraries( PUBLIC cjson PUBLIC os PUBLIC util + PUBLIC common ) if(${BUILD_TEST}) diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 2bc328b4e2..a4b34dee37 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -257,7 +257,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { return 0; } -int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) { +int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) { if (pWal == NULL) return -1; int code = 0;