diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 398e46e6a4..6d14cd010c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1533,9 +1533,7 @@ typedef struct SMqSetCVgReq { char* sql; char* logicalPlan; char* physicalPlan; - uint32_t qmsgLen; - void* qmsg; - //SSubQueryMsg msg; + char* qmsg; } SMqSetCVgReq; static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { @@ -1567,7 +1565,6 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += taosEncodeFixedU32(buf, pReq->qmsgLen); tlen += taosEncodeString(buf, (char*)pReq->qmsg); //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; @@ -1582,7 +1579,6 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); - buf = taosDecodeFixedU32(buf, &pReq->qmsgLen); buf = taosDecodeString(buf, (char**)&pReq->qmsg); //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 28a4e0c87d..45d1c1a49e 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -374,9 +374,17 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) { TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { SRequestObj *pRequest = NULL; - tmq->status = 1; int32_t sz = topic_list->cnt; - tmq->clientTopics = taosArrayInit(sz, sizeof(void*)); + //destroy ex + taosArrayDestroy(tmq->clientTopics); + tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); + + SCMSubscribeReq req; + req.topicNum = sz; + req.consumerId = tmq->consumerId; + req.consumerGroup = strdup(tmq->groupId); + req.topicNames = taosArrayInit(sz, sizeof(void*)); + for (int i = 0; i < sz; i++) { char* topicName = topic_list->elems[i]; @@ -391,16 +399,21 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { } tNameExtractFullName(&name, topicFname); tscDebug("subscribe topic: %s", topicFname); - taosArrayPush(tmq->clientTopics, &topicFname); + SMqClientTopic topic = { + .nextVgIdx = 0, + .sql = NULL, + .sqlLen = 0, + .topicId = 0, + .topicName = topicFname, + .vgs = NULL + }; + topic.vgs = taosArrayInit(0, sizeof(SMqClientVg)); + taosArrayPush(tmq->clientTopics, &topic); /*SMqClientTopic topic = {*/ /*.*/ /*};*/ + taosArrayPush(req.topicNames, &topicFname); } - SCMSubscribeReq req; - req.topicNum = taosArrayGetSize(tmq->clientTopics); - req.consumerId = tmq->consumerId; - req.consumerGroup = strdup(tmq->groupId); - req.topicNames = tmq->clientTopics; int tlen = tSerializeSCMSubscribeReq(NULL, &req); void* buf = malloc(tlen); @@ -419,17 +432,17 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; - SMsgSendInfo* body = buildMsgInfoImpl(pRequest); + SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); tsem_wait(&pRequest->body.rspSem); _return: - if (body != NULL) { - destroySendMsgInfo(body); + if (sendInfo != NULL) { + destroySendMsgInfo(sendInfo); } if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { @@ -601,6 +614,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { + tscDebug("tmq ask ep cb called"); + bool set = false; tmq_t* tmq = (tmq_t*)param; SMqCMGetSubEpRsp rsp; tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); @@ -620,17 +635,16 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { .epSet = pVgEp->epSet }; taosArrayPush(topic.vgs, &clientVg); + set = true; } taosArrayPush(tmq->clientTopics, &topic); } + if(set) tmq->status = 1; // unlock return 0; } tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { - if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) { - return NULL; - } SRequestObj *pRequest = NULL; SMqConsumeReq req = {0}; req.reqType = 1; @@ -639,7 +653,7 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_message = NULL; strcpy(req.cgroup, tmq->groupId); - if (taosArrayGetSize(tmq->clientTopics) == 0) { + if (taosArrayGetSize(tmq->clientTopics) == 0 || tmq->status == 0) { int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { @@ -667,7 +681,12 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { tsem_wait(&pRequest->body.rspSem); } - SMqClientTopic* pTopic = taosArrayGetP(tmq->clientTopics, tmq->nextTopicIdx); + if (taosArrayGetSize(tmq->clientTopics) == 0) { + tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); + return NULL; + } + + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); strcpy(req.topic, pTopic->topicName); int32_t nextVgIdx = pTopic->nextVgIdx; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index a1adf58f6a..e8cebfe7c9 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -570,30 +570,30 @@ TEST(testCase, create_topic_Test) { //taos_close(pConn); //} -//TEST(testCase, tmq_subscribe_Test) { - //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - //assert(pConn != NULL); +TEST(testCase, tmq_subscribe_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); - //TAOS_RES* pRes = taos_query(pConn, "use abc1"); - //if (taos_errno(pRes) != 0) { - //printf("error in use db, reason:%s\n", taos_errstr(pRes)); - //} - //taos_free_result(pRes); + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); - //tmq_conf_t* conf = tmq_conf_new(); - //tmq_conf_set(conf, "group.id", "tg1"); - //tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg1"); + tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); - //tmq_list_t* topic_list = tmq_list_new(); - //tmq_list_append(topic_list, "test_topic_1"); - //tmq_subscribe(tmq, topic_list); + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_topic_1"); + tmq_subscribe(tmq, topic_list); - //while (1) { - //tmq_message_t* msg = tmq_consume_poll(tmq, 0); - //printf("get msg\n"); - //if (msg == NULL) break; - //} -//} + while (1) { + tmq_message_t* msg = tmq_consume_poll(tmq, 0); + printf("get msg\n"); + if (msg == NULL) break; + } +} TEST(testCase, tmq_consume_Test) { } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 02ce3a1591..fc1a4e8508 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -363,9 +363,7 @@ typedef struct SMqConsumerEp { int64_t consumerId; // -1 for unassigned int64_t lastConsumerHbTs; int64_t lastVgHbTs; - uint32_t qmsgLen; char* qmsg; - //SSubQueryMsg qExec; } SMqConsumerEp; static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { @@ -375,8 +373,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); //tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); - tlen += taosEncodeFixedU32(buf, pConsumerEp->qmsgLen); - tlen += taosEncodeBinary(buf, pConsumerEp->qmsg, pConsumerEp->qmsgLen); + tlen += taosEncodeString(buf, pConsumerEp->qmsg); return tlen; } @@ -386,8 +383,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); //buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); - buf = taosDecodeFixedU32(buf, &pConsumerEp->qmsgLen); - buf = taosDecodeBinary(buf, (void**)&pConsumerEp->qmsg, pConsumerEp->qmsgLen); + buf = taosDecodeString(buf, &pConsumerEp->qmsg); return buf; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index df6a4a82f3..a65d08eeea 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -161,8 +161,6 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { pReq->sql = strdup(pTopic->sql); pReq->logicalPlan = strdup(pTopic->logicalPlan); pReq->physicalPlan = strdup(pTopic->physicalPlan); - pReq->qmsgLen = pCEp->qmsgLen; - /*memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);*/ pReq->qmsg = strdup(pCEp->qmsg); int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); void *reqStr = malloc(tlen); @@ -227,13 +225,6 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas * CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ CEp.vgId = pTaskInfo->addr.nodeId; CEp.qmsg = strdup(pTaskInfo->msg->msg); - CEp.qmsgLen = strlen(CEp.qmsg) + 1; - printf("abc:\n%s\n", CEp.qmsg); - /*CEp.qmsg = malloc(CEp.qmsgLen);*/ - /*if (CEp.qmsg == NULL) {*/ - /*return -1;*/ - /*}*/ - /*memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);*/ taosArrayPush(unassignedVg, &CEp); } @@ -257,8 +248,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume req.sql = pTopic->sql; req.logicalPlan = pTopic->logicalPlan; req.physicalPlan = pTopic->physicalPlan; - req.qmsg = strdup(pCEp->qmsg); - req.qmsgLen = strlen(req.qmsg); + req.qmsg = pCEp->qmsg; int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); void *buf = malloc(sizeof(SMsgHead) + tlen); if (buf == NULL) { diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 7f4cec6bc1..19b33538a5 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -1125,8 +1125,8 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) { *str = cJSON_Print(json); cJSON_Delete(json); - printf("====Physical plan:====\n"); - printf("%s\n", *str); + /*printf("====Physical plan:====\n");*/ + /*printf("%s\n", *str);*/ *len = strlen(*str) + 1; return TSDB_CODE_SUCCESS; }