From ca84e47a97388acf91f1a951ce832b4f0d0a22b8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 25 Jan 2022 15:17:02 +0800 Subject: [PATCH] fix query crash --- include/common/tmsg.h | 96 +++++++++++++++++- include/common/tmsgdef.h | 1 + source/client/src/clientImpl.c | 77 +++++++++++++-- source/client/test/clientTests.cpp | 41 ++++---- source/dnode/mgmt/impl/src/dndTransport.c | 1 + source/dnode/mnode/impl/inc/mndConsumer.h | 2 +- source/dnode/mnode/impl/src/mndConsumer.c | 25 +++-- source/dnode/mnode/impl/src/mndSubscribe.c | 107 ++++++++++++++++----- source/libs/executor/src/executorMain.c | 7 +- source/libs/planner/src/physicalPlanJson.c | 1 - 10 files changed, 285 insertions(+), 73 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b3d595d73b..398e46e6a4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1241,6 +1241,12 @@ typedef struct { char data[]; } SVShowTablesFetchRsp; +typedef struct SMqCMGetSubEpReq { + int64_t consumerId; + int32_t epoch; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; +} SMqCMGetSubEpReq; + #pragma pack(pop) static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { @@ -1562,7 +1568,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeFixedU32(buf, pReq->qmsgLen); - tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen); + tlen += taosEncodeString(buf, (char*)pReq->qmsg); //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1577,7 +1583,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeFixedU32(buf, &pReq->qmsgLen); - buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen); + buf = taosDecodeString(buf, (char**)&pReq->qmsg); //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } @@ -1639,6 +1645,92 @@ typedef struct SMqConsumeReq { char topic[TSDB_TOPIC_FNAME_LEN]; } SMqConsumeReq; +typedef struct SMqSubVgEp { + int32_t vgId; + SEpSet epSet; +} SMqSubVgEp; + +typedef struct SMqSubTopicEp { + char topic[TSDB_TOPIC_FNAME_LEN]; + SArray* vgs; // SArray +} SMqSubTopicEp; + +typedef struct SMqCMGetSubEpRsp { + int64_t consumerId; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; + SArray* topics; // SArray +} SMqCMGetSubEpRsp; + +static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI16(buf, pVgEp->vgId); + tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { + buf = taosDecodeFixedI32(buf, &pVgEp->vgId); + buf = taosDecodeSEpSet(buf, &pVgEp->epSet); + return buf; +} + +static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pTopicEp->topic); + int32_t sz = taosArrayGetSize(pTopicEp->vgs); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i); + tlen += tEncodeSMqSubVgEp(buf, pVgEp); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { + buf = taosDecodeStringTo(buf, pTopicEp->topic); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); + if (pTopicEp->vgs == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqSubVgEp vgEp; + buf = tDecodeSMqSubVgEp(buf, &vgEp); + taosArrayPush(pTopicEp->vgs, &vgEp); + } + return buf; +} + +static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pRsp->consumerId); + tlen += taosEncodeString(buf, pRsp->cgroup); + int32_t sz = taosArrayGetSize(pRsp->topics); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i); + tlen += tEncodeSMqSubTopicEp(buf, pVgEp); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) { + buf = taosDecodeFixedI64(buf, &pRsp->consumerId); + buf = taosDecodeStringTo(buf, pRsp->cgroup); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); + if (pRsp->topics == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqSubTopicEp topicEp; + buf = tDecodeSMqSubTopicEp(buf, &topicEp); + taosArrayPush(pRsp->topics, &topicEp); + } + return buf; +} #ifdef __cplusplus } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 26d8bc81eb..e81dd798f6 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -140,6 +140,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg) // Requests handled by VNODE diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 179e46527f..28a4e0c87d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -263,7 +263,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) typedef struct SMqClientVg { // statistics - int64_t consumeCnt; + int64_t pollCnt; // offset int64_t committedOffset; int64_t currentOffset; @@ -345,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err strcpy(pTmq->groupId, conf->groupId); pTmq->commit_cb = conf->commit_cb; pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1); + pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); return pTmq; } @@ -411,13 +412,12 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tSerializeSCMSubscribeReq(&abuf, &req); /*printf("formatted: %s\n", dagStr);*/ - pRequest = createRequest(tmq->pTscObj, NULL, NULL, TSDB_SQL_SELECT); + pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE); if (pRequest == NULL) { tscError("failed to malloc sqlObj"); } pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; - pRequest->type = TDMT_MND_SUBSCRIBE; SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); @@ -596,6 +596,37 @@ struct tmq_message_t { SMqConsumeRsp rsp; }; +int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { + return 0; +} + +int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { + tmq_t* tmq = (tmq_t*)param; + SMqCMGetSubEpRsp rsp; + tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); + int32_t sz = taosArrayGetSize(rsp.topics); + // TODO: lock + tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); + for (int32_t i = 0; i < sz; i++) { + SMqClientTopic topic = {0}; + SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i); + topic.topicName = strdup(pTopicEp->topic); + int32_t vgSz = taosArrayGetSize(pTopicEp->vgs); + topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg)); + for (int32_t j = 0; j < vgSz; j++) { + SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); + SMqClientVg clientVg = { + .vgId = pVgEp->vgId, + .epSet = pVgEp->epSet + }; + taosArrayPush(topic.vgs, &clientVg); + } + taosArrayPush(tmq->clientTopics, &topic); + } + // unlock + return 0; +} + tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) { return NULL; @@ -605,9 +636,38 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { req.reqType = 1; req.blockingTime = blocking_time; req.consumerId = tmq->consumerId; + tmq_message_t* tmq_message = NULL; strcpy(req.cgroup, tmq->groupId); - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); + if (taosArrayGetSize(tmq->clientTopics) == 0) { + int32_t tlen = sizeof(SMqCMGetSubEpReq); + SMqCMGetSubEpReq* buf = malloc(tlen); + if (buf == NULL) { + tscError("failed to malloc get subscribe ep buf"); + } + buf->consumerId = htobe64(buf->consumerId); + + pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); + if (pRequest == NULL) { + tscError("failed to malloc subscribe ep request"); + } + + pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = tmq; + sendInfo->fp = tmq_ask_ep_cb; + + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + + tsem_wait(&pRequest->body.rspSem); + } + + SMqClientTopic* pTopic = taosArrayGetP(tmq->clientTopics, tmq->nextTopicIdx); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); strcpy(req.topic, pTopic->topicName); int32_t nextVgIdx = pTopic->nextVgIdx; @@ -618,14 +678,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) }; pRequest->type = TDMT_VND_CONSUME; - SMsgSendInfo* body = buildMsgInfoImpl(pRequest); + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestObjRefId = 0; + sendInfo->param = &tmq_message; + sendInfo->fp = tmq_poll_cb_inner; int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, body); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); tsem_wait(&pRequest->body.rspSem); - return (tmq_message_t*)pRequest->body.resInfo.pData; + return tmq_message; /*tsem_wait(&pRequest->body.rspSem);*/ diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index e3e5002be6..a1adf58f6a 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -570,31 +570,30 @@ TEST(testCase, create_topic_Test) { //taos_close(pConn); //} -TEST(testCase, tmq_subscribe_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); +//TEST(testCase, tmq_subscribe_Test) { + //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + //assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); + //TAOS_RES* pRes = taos_query(pConn, "use abc1"); + //if (taos_errno(pRes) != 0) { + //printf("error in use db, reason:%s\n", taos_errstr(pRes)); + //} + //taos_free_result(pRes); - - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "group.id", "tg1"); - tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); + //tmq_conf_t* conf = tmq_conf_new(); + //tmq_conf_set(conf, "group.id", "tg1"); + //tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); - tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "test_topic_1"); - tmq_subscribe(tmq, topic_list); + //tmq_list_t* topic_list = tmq_list_new(); + //tmq_list_append(topic_list, "test_topic_1"); + //tmq_subscribe(tmq, topic_list); - while (1) { - tmq_message_t* msg = tmq_consume_poll(tmq, 0); - printf("get msg\n"); - if (msg == NULL) break; - } -} + //while (1) { + //tmq_message_t* msg = tmq_consume_poll(tmq, 0); + //printf("get msg\n"); + //if (msg == NULL) break; + //} +//} TEST(testCase, tmq_consume_Test) { } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 079c50f403..f19d154616 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -115,6 +115,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg; // Requests handled by VNODE pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 9d1dd084ee..42b2de01cc 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -25,7 +25,7 @@ extern "C" { int32_t mndInitConsumer(SMnode *pMnode); void mndCleanupConsumer(SMnode *pMnode); -SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); +SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7591caebc5..e642b578fa 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -30,24 +30,23 @@ #define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_RESERVE_SIZE 64 -static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); -static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); -static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); -static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); -static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); -static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); -static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); +static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); +static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); +static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); +static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); +static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); int32_t mndInitConsumer(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_CONSUMER, - .keyType = SDB_KEY_BINARY, + .keyType = SDB_KEY_INT64, .encodeFp = (SdbEncodeFp)mndConsumerActionEncode, .decodeFp = (SdbDecodeFp)mndConsumerActionDecode, .insertFp = (SdbInsertFp)mndConsumerActionInsert, .updateFp = (SdbUpdateFp)mndConsumerActionUpdate, .deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; - return sdbSetTable(pMnode->pSdb, table); } @@ -61,10 +60,10 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); if (pRaw == NULL) goto CM_ENCODE_OVER; - void* buf = malloc(tlen); + void *buf = malloc(tlen); if (buf == NULL) goto CM_ENCODE_OVER; - void* abuf = buf; + void *abuf = buf; tEncodeSMqConsumerObj(&abuf, pConsumer); int32_t dataPos = 0; @@ -106,7 +105,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; int32_t len; SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER); - void* buf = malloc(len); + void *buf = malloc(len); if (buf == NULL) goto CM_DECODE_OVER; SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER); @@ -147,7 +146,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, return 0; } -SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) { +SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) { SSdb *pSdb = pMnode->pSdb; SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId); if (pConsumer == NULL) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ab351fede4..df6a4a82f3 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -30,6 +30,8 @@ #define MND_SUBSCRIBE_VER_NUMBER 1 #define MND_SUBSCRIBE_RESERVE_SIZE 64 +static char *mndMakeSubscribeKey(char *cgroup, char *topicName); + static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); @@ -41,6 +43,7 @@ static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); +static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub); @@ -57,9 +60,60 @@ int32_t mndInitSubscribe(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); + mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq); return sdbSetTable(pMnode->pSdb, table); } +static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->pCont; + SMqCMGetSubEpRsp rsp; + int64_t consumerId = be64toh(pReq->consumerId); + + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId); + if (pConsumer == NULL) { + /*terrno = */ + return -1; + } + ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); + + strcpy(rsp.cgroup, pReq->cgroup); + rsp.consumerId = consumerId; + SArray *pTopics = pConsumer->topics; + int32_t sz = taosArrayGetSize(pTopics); + rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); + for (int32_t i = 0; i < sz; i++) { + SMqSubTopicEp topicEp; + SMqConsumerTopic *pConsumerTopic = taosArrayGet(pTopics, i); + strcpy(topicEp.topic, pConsumerTopic->name); + + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name); + int32_t assignedSz = taosArrayGetSize(pSub->assigned); + topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp)); + for (int32_t j = 0; j < assignedSz; j++) { + SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); + if (pCEp->consumerId == consumerId) { + taosArrayPush(pSub->assigned, pCEp); + } + } + if (taosArrayGetSize(topicEp.vgs) != 0) { + taosArrayPush(rsp.topics, &topicEp); + } + } + int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); + void *buf = malloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + void *abuf = buf; + tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); + //TODO: free rsp + pMsg->pCont = buf; + pMsg->contLen = tlen; + return 0; +} + static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { int i = 0; while (key[i] != ':') { @@ -97,7 +151,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { // build msg - SMqSetCVgReq* pReq = malloc(sizeof(SMqSetCVgReq) + pCEp->qmsgLen); + SMqSetCVgReq *pReq = malloc(sizeof(SMqSetCVgReq)); if (pReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -108,7 +162,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { pReq->logicalPlan = strdup(pTopic->logicalPlan); pReq->physicalPlan = strdup(pTopic->physicalPlan); pReq->qmsgLen = pCEp->qmsgLen; - memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen); + /*memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);*/ + pReq->qmsg = strdup(pCEp->qmsg); int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); void *reqStr = malloc(tlen); if (reqStr == NULL) { @@ -146,11 +201,11 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { } static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { - //convert phyplan to dag + // convert phyplan to dag SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); - SArray *pArray; - SArray* inner = taosArrayGet(pDag->pSubplans, 0); - SSubplan *plan = taosArrayGetP(inner, 0); + SArray *pArray; + SArray *inner = taosArrayGet(pDag->pSubplans, 0); + SSubplan *plan = taosArrayGetP(inner, 0); plan->execNode.inUse = 0; strcpy(plan->execNode.epAddr[0].fqdn, "localhost"); plan->execNode.epAddr[0].port = 6030; @@ -161,21 +216,24 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas return -1; } int32_t sz = taosArrayGetSize(pArray); - //convert dag to msg + // convert dag to msg for (int32_t i = 0; i < sz; i++) { SMqConsumerEp CEp; CEp.status = 0; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; - STaskInfo* pTaskInfo = taosArrayGet(pArray, i); + STaskInfo *pTaskInfo = taosArrayGet(pArray, i); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); - /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ + /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], + * CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ CEp.vgId = pTaskInfo->addr.nodeId; - CEp.qmsgLen = pTaskInfo->msg->contentLen; - CEp.qmsg = malloc(CEp.qmsgLen); - if (CEp.qmsg == NULL) { - return -1; - } - memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen); + CEp.qmsg = strdup(pTaskInfo->msg->msg); + CEp.qmsgLen = strlen(CEp.qmsg) + 1; + printf("abc:\n%s\n", CEp.qmsg); + /*CEp.qmsg = malloc(CEp.qmsgLen);*/ + /*if (CEp.qmsg == NULL) {*/ + /*return -1;*/ + /*}*/ + /*memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);*/ taosArrayPush(unassignedVg, &CEp); } @@ -184,7 +242,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas } static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, - SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp* pCEp) { + SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pCEp) { int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); for (int32_t i = 0; i < sz; i++) { int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i); @@ -208,18 +266,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume return -1; } - SMsgHead* pMsgHead = (SMsgHead*)buf; + SMsgHead *pMsgHead = (SMsgHead *)buf; pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); pMsgHead->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqSetCVgReq(&abuf, &req); STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); action.pCont = buf; - action.contLen = tlen; + action.contLen = sizeof(SMsgHead) + tlen; action.msgType = TDMT_VND_MQ_SET_CONN; mndReleaseVgroup(pMnode, pVgObj); @@ -287,7 +345,7 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; int32_t tlen; SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); - void *buf = malloc(tlen + 1); + void *buf = malloc(tlen + 1); if (buf == NULL) goto SUB_DECODE_OVER; SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); @@ -495,22 +553,21 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - char* key = mndMakeSubscribeKey(consumerGroup, newTopicName); + char *key = mndMakeSubscribeKey(consumerGroup, newTopicName); strcpy(pSub->key, key); // set unassigned vg mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); - //TODO: disable alter + // TODO: disable alter } taosArrayPush(pSub->availConsumer, &consumerId); - SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub); taosArrayPush(pConsumer->topics, pConsumerTopic); if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { ASSERT(taosArrayGetSize(pConsumerTopic->pVgInfo) == 1); - int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); - SMqConsumerEp* pCEp = taosArrayGetLast(pSub->assigned); + int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); + SMqConsumerEp *pCEp = taosArrayGetLast(pSub->assigned); if (pCEp->vgId == vgId) { if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic, pCEp) < 0) { // TODO diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index e5d56aca15..6575707601 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -83,8 +83,9 @@ int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTas if (code != TSDB_CODE_SUCCESS) { goto _error; } - - code = dsCreateDataSinker(pSubplan->pDataSink, handle); + if (handle) { + code = dsCreateDataSinker(pSubplan->pDataSink, handle); + } _error: // if failed to add ref for all tables in this query, abort current query @@ -461,4 +462,4 @@ int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCo return error; } -#endif \ No newline at end of file +#endif diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 9d82bbb509..7f4cec6bc1 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -1132,7 +1132,6 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { } int32_t stringToSubplan(const char* str, SSubplan** subplan) { - printf("aa: %s\n", str); cJSON* json = cJSON_Parse(str); if (NULL == json) { return TSDB_CODE_FAILED;