fix query error

This commit is contained in:
Liu Jicong 2022-01-26 09:56:32 +08:00
parent 9d57522225
commit c62304531a
8 changed files with 39 additions and 18 deletions

View File

@ -1630,6 +1630,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
// one req for one vg+topic // one req for one vg+topic
typedef struct SMqConsumeReq { typedef struct SMqConsumeReq {
SMsgHead head;
//0: commit only, current offset //0: commit only, current offset
//1: consume only, poll next offset //1: consume only, poll next offset
//2: commit current and consume next offset //2: commit current and consume next offset
@ -1662,7 +1663,7 @@ typedef struct SMqCMGetSubEpRsp {
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI16(buf, pVgEp->vgId); tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen; return tlen;
} }

View File

@ -19,6 +19,7 @@
#include "tarray.h" #include "tarray.h"
#include "tdef.h" #include "tdef.h"
#include "tlog.h" #include "tlog.h"
#include "tmsg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -159,7 +160,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *); void walClose(SWal *);
// write // write
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen); int64_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force); void walFsync(SWal *, bool force);
// apis for lifecycle management // apis for lifecycle management

View File

@ -625,9 +625,13 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
} }
int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = (tmq_t*)param;
if (code != 0) {
tsem_post(&tmq->rspSem);
return 0;
}
tscDebug("tmq ask ep cb called"); tscDebug("tmq ask ep cb called");
bool set = false; bool set = false;
tmq_t* tmq = (tmq_t*)param;
SMqCMGetSubEpRsp rsp; SMqCMGetSubEpRsp rsp;
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp);
int32_t sz = taosArrayGetSize(rsp.topics); int32_t sz = taosArrayGetSize(rsp.topics);
@ -642,6 +646,9 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
for (int32_t j = 0; j < vgSz; j++) { for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0,
.committedOffset = -1,
.currentOffset = -1,
.vgId = pVgEp->vgId, .vgId = pVgEp->vgId,
.epSet = pVgEp->epSet .epSet = pVgEp->epSet
}; };
@ -657,13 +664,6 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
SRequestObj *pRequest = NULL;
SMqConsumeReq req = {0};
req.reqType = 1;
req.blockingTime = blocking_time;
req.consumerId = tmq->consumerId;
tmq_message_t* tmq_message = NULL;
strcpy(req.cgroup, tmq->groupId);
if (taosArrayGetSize(tmq->clientTopics) == 0 || tmq->status == 0) { if (taosArrayGetSize(tmq->clientTopics) == 0 || tmq->status == 0) {
int32_t tlen = sizeof(SMqCMGetSubEpReq); int32_t tlen = sizeof(SMqCMGetSubEpReq);
@ -674,7 +674,7 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
buf->consumerId = htobe64(tmq->consumerId); buf->consumerId = htobe64(tmq->consumerId);
strcpy(buf->cgroup, tmq->groupId); strcpy(buf->cgroup, tmq->groupId);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
if (pRequest == NULL) { if (pRequest == NULL) {
tscError("failed to malloc subscribe ep request"); tscError("failed to malloc subscribe ep request");
} }
@ -699,16 +699,26 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
return NULL; return NULL;
} }
SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq));
pReq->reqType = 1;
pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId;
tmq_message_t* tmq_message = NULL;
strcpy(pReq->cgroup, tmq->groupId);
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
strcpy(req.topic, pTopic->topicName); strcpy(pReq->topic, pTopic->topicName);
int32_t nextVgIdx = pTopic->nextVgIdx; int32_t nextVgIdx = pTopic->nextVgIdx;
pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx);
req.offset = pVg->currentOffset; pReq->offset = pVg->currentOffset;
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pReq->head.vgId = htonl(pVg->vgId);
pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) }; pReq->head.contLen = htonl(sizeof(SMqConsumeReq));
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
/*sendInfo->requestObjRefId = 0;*/ /*sendInfo->requestObjRefId = 0;*/

View File

@ -148,6 +148,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
} }
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {

View File

@ -424,7 +424,7 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
return NULL; return NULL;
} }
pSub->lostConsumer = taosArrayInit(0, sizeof(SMqConsumerEp)); pSub->lostConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->idleConsumer == NULL) { if (pSub->lostConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer); taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned); taosArrayDestroy(pSub->assigned);
free(pSub); free(pSub);

View File

@ -679,6 +679,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
int rspLen = 0; int rspLen = 0;
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
ASSERT(pConsumer);
int sz = taosArrayGetSize(pConsumer->topics); int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
@ -775,6 +776,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) {
} }
strcpy(pConsumer->cgroup, req.cgroup); strcpy(pConsumer->cgroup, req.cgroup);
pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle)); pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
pConsumer->consumerId = req.newConsumerId;
pConsumer->epoch = 0;
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
if (pTopic == NULL) { if (pTopic == NULL) {
@ -785,6 +788,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) {
pTopic->sql = strdup(req.sql); pTopic->sql = strdup(req.sql);
pTopic->logicalPlan = strdup(req.logicalPlan); pTopic->logicalPlan = strdup(req.logicalPlan);
pTopic->physicalPlan = strdup(req.physicalPlan); pTopic->physicalPlan = strdup(req.physicalPlan);
pTopic->committedOffset = -1;
pTopic->currentOffset = -1;
pTopic->buffer.firstOffset = -1; pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
@ -797,6 +802,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) {
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle);
} }
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
return 0; return 0;
} }
@ -821,7 +828,7 @@ void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ve
} }
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) >= 0) { while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock)) {
if (pHandle->tbUid == pHandle->pBlock->uid) return true; if (pHandle->tbUid == pHandle->pBlock->uid) return true;
} }
return false; return false;

View File

@ -11,6 +11,7 @@ target_link_libraries(
PUBLIC cjson PUBLIC cjson
PUBLIC os PUBLIC os
PUBLIC util PUBLIC util
PUBLIC common
) )
if(${BUILD_TEST}) if(${BUILD_TEST})

View File

@ -257,7 +257,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
return 0; return 0;
} }
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) { int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
if (pWal == NULL) return -1; if (pWal == NULL) return -1;
int code = 0; int code = 0;