diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9ee1aeca86..b3d595d73b 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1519,15 +1519,17 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { } typedef struct SMqSetCVgReq { - int32_t vgId; - int64_t oldConsumerId; - int64_t newConsumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CONSUMER_GROUP_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - SSubQueryMsg msg; + int32_t vgId; + int64_t oldConsumerId; + int64_t newConsumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + uint32_t qmsgLen; + void* qmsg; + //SSubQueryMsg msg; } SMqSetCVgReq; static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { @@ -1536,7 +1538,7 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* tlen += taosEncodeFixedU64(buf, pMsg->queryId); tlen += taosEncodeFixedU64(buf, pMsg->taskId); tlen += taosEncodeFixedU32(buf, pMsg->contentLen); - tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen); + //tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen); return tlen; } @@ -1545,7 +1547,7 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) { buf = taosDecodeFixedU64(buf, &pMsg->queryId); buf = taosDecodeFixedU64(buf, &pMsg->taskId); buf = taosDecodeFixedU32(buf, &pMsg->contentLen); - buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen); + //buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen); return buf; } @@ -1559,7 +1561,9 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); + tlen += taosEncodeFixedU32(buf, pReq->qmsgLen); + tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen); + //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); return tlen; } @@ -1572,15 +1576,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan); - buf = tDecodeSSubQueryMsg(buf, &pReq->msg); + buf = taosDecodeFixedU32(buf, &pReq->qmsgLen); + buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen); + //buf = tDecodeSSubQueryMsg(buf, &pReq->msg); return buf; } typedef struct SMqSetCVgRsp { - int32_t vgId; - int64_t consumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_CONSUMER_GROUP_LEN]; + SMsgHead header; + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; typedef struct SMqColData { diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 457245e9a3..020f9f7ccd 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -32,7 +32,7 @@ struct SSubplan; * @param streamReadHandle * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle); +qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0115d7f43d..179e46527f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -314,10 +314,10 @@ tmq_conf_t* tmq_conf_new() { } int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) { - if (strcmp(key, "group.id")) { + if (strcmp(key, "group.id") == 0) { strcpy(conf->groupId, value); } - if (strcmp(key, "client.id")) { + if (strcmp(key, "client.id") == 0) { strcpy(conf->clientId, value); } return 0; @@ -365,7 +365,7 @@ tmq_list_t* tmq_list_new() { int32_t tmq_list_append(tmq_list_t* ptr, char* src) { if (ptr->cnt >= ptr->tot-1) return -1; - ptr->elems[ptr->cnt] = src; + ptr->elems[ptr->cnt] = strdup(src); ptr->cnt++; return 0; } @@ -377,8 +377,23 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { int32_t sz = topic_list->cnt; tmq->clientTopics = taosArrayInit(sz, sizeof(void*)); for (int i = 0; i < sz; i++) { - char* topicName = strdup(topic_list->elems[i]); - taosArrayPush(tmq->clientTopics, &topicName); + char* topicName = topic_list->elems[i]; + + SName name = {0}; + char* dbName = getDbOfConnection(tmq->pTscObj); + tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName)); + tNameFromString(&name, topicName, T_NAME_TABLE); + + char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN); + if (topicFname == NULL) { + + } + tNameExtractFullName(&name, topicFname); + tscDebug("subscribe topic: %s", topicFname); + taosArrayPush(tmq->clientTopics, &topicFname); + /*SMqClientTopic topic = {*/ + /*.*/ + /*};*/ } SCMSubscribeReq req; req.topicNum = taosArrayGetSize(tmq->clientTopics); @@ -402,7 +417,7 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { } pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; - pRequest->type = TDMT_MND_CREATE_TOPIC; + pRequest->type = TDMT_MND_SUBSCRIBE; SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); @@ -535,7 +550,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, SCMCreateTopicReq req = { .name = (char*) topicFname, - .igExists = 0, + .igExists = 1, .physicalPlan = (char*) pStr, .sql = (char*) sql, .logicalPlan = "no logic plan", diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index c57aedee2e..d62d7cb826 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -458,10 +458,9 @@ TEST(testCase, show_table_Test) { assert(pConn != NULL); TAOS_RES* pRes = taos_query(pConn, "show tables"); - ASSERT_NE(taos_errno(pRes), 0); - if (taos_errno(pRes) != 0) { - printf("expected failed to show tables, reason:%s\n", taos_errstr(pRes)); + printf("failed to show tables, reason:%s\n", taos_errstr(pRes)); + taos_free_result(pRes); } taos_free_result(pRes); @@ -537,6 +536,7 @@ TEST(testCase, create_topic_Test) { if (taos_errno(pRes) != 0) { printf("error in use db, reason:%s\n", taos_errstr(pRes)); } + taos_free_result(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes); ASSERT_TRUE(pFields == nullptr); @@ -570,6 +570,51 @@ TEST(testCase, insert_test) { taos_close(pConn); } +#if 0 +TEST(testCase, tmq_subscribe_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg1"); + tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); + + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_topic_1"); + tmq_subscribe(tmq, topic_list); + + while (1) { + tmq_message_t* msg = tmq_consume_poll(tmq, 0); + printf("get msg\n"); + if (msg == NULL) break; + } +} + +TEST(testCase, tmq_consume_Test) { +} + +TEST(testCase, tmq_commit_TEST) { +} +#endif + +//TEST(testCase, insert_test) { +// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); +// ASSERT_NE(pConn, nullptr); +// +// TAOS_RES* pRes = taos_query(pConn, "use abc1"); +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "insert into t_2 values(now, 1)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); +// taos_free_result(pRes); +// ASSERT_TRUE(false); +// } +// +// taos_free_result(pRes); +// taos_close(pConn); +//} + TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 6052e055ad..079c50f403 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -112,6 +112,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg; + /*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/ + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg; // Requests handled by VNODE pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg; @@ -142,6 +145,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e1103e7bb4..02ce3a1591 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -363,8 +363,9 @@ typedef struct SMqConsumerEp { int64_t consumerId; // -1 for unassigned int64_t lastConsumerHbTs; int64_t lastVgHbTs; - int32_t execLen; - SSubQueryMsg qExec; + uint32_t qmsgLen; + char* qmsg; + //SSubQueryMsg qExec; } SMqConsumerEp; static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { @@ -373,7 +374,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon tlen += taosEncodeFixedI32(buf, pConsumerEp->status); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); - tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); + //tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); + tlen += taosEncodeFixedU32(buf, pConsumerEp->qmsgLen); + tlen += taosEncodeBinary(buf, pConsumerEp->qmsg, pConsumerEp->qmsgLen); return tlen; } @@ -382,8 +385,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu buf = taosDecodeFixedI32(buf, &pConsumerEp->status); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); - buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); - pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen; + //buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); + buf = taosDecodeFixedU32(buf, &pConsumerEp->qmsgLen); + buf = taosDecodeBinary(buf, (void**)&pConsumerEp->qmsg, pConsumerEp->qmsgLen); return buf; } @@ -402,11 +406,12 @@ typedef struct SMqSubscribeObj { static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj)); - pSub->key[0] = 0; - pSub->epoch = 0; if (pSub == NULL) { return NULL; } + pSub->key[0] = 0; + pSub->epoch = 0; + pSub->availConsumer = taosArrayInit(0, sizeof(int64_t)); if (pSub->availConsumer == NULL) { free(pSub); @@ -433,7 +438,7 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { free(pSub); return NULL; } - return NULL; + return pSub; } static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 5cdd8e77bd..7591caebc5 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -56,7 +56,9 @@ void mndCleanupConsumer(SMnode *pMnode) {} SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer); - SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, tlen); + int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE; + + SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); if (pRaw == NULL) goto CM_ENCODE_OVER; void* buf = malloc(tlen); @@ -68,34 +70,6 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { int32_t dataPos = 0; SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER); - -#if 0 - int32_t topicNum = taosArrayGetSize(pConsumer->topics); - SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER); - int32_t len = strlen(pConsumer->cgroup); - SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CM_ENCODE_OVER); - SDB_SET_INT32(pRaw, dataPos, topicNum, CM_ENCODE_OVER); - for (int i = 0; i < topicNum; i++) { - int32_t len; - SMqConsumerTopic *pConsumerTopic = taosArrayGet(pConsumer->topics, i); - len = strlen(pConsumerTopic->name); - SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER); - SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len, CM_ENCODE_OVER); - int vgSize; - if (pConsumerTopic->vgroups == NULL) { - vgSize = 0; - } else { - vgSize = listNEles(pConsumerTopic->vgroups); - } - SDB_SET_INT32(pRaw, dataPos, vgSize, CM_ENCODE_OVER); - for (int j = 0; j < vgSize; j++) { - // SList* head; - /*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/ - } - } -#endif - SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER); @@ -116,53 +90,35 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CONSUME_DECODE_OVER; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER; if (sver != MND_CONSUMER_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; - goto CONSUME_DECODE_OVER; + goto CM_DECODE_OVER; } SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj)); - if (pRow == NULL) goto CONSUME_DECODE_OVER; + if (pRow == NULL) goto CM_DECODE_OVER; SMqConsumerObj *pConsumer = sdbGetRowObj(pRow); - if (pConsumer == NULL) goto CONSUME_DECODE_OVER; + if (pConsumer == NULL) goto CM_DECODE_OVER; int32_t dataPos = 0; int32_t len; - SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER); + SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER); void* buf = malloc(len); - if (buf == NULL) goto CONSUME_DECODE_OVER; - - SDB_GET_BINARY(pRaw, dataPos, buf, len, CONSUME_DECODE_OVER); + if (buf == NULL) goto CM_DECODE_OVER; + SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER); + SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER); - tDecodeSMqConsumerObj(buf, pConsumer); - - SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER); + if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) { + goto CM_DECODE_OVER; + } terrno = TSDB_CODE_SUCCESS; -#if 0 - SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER); - for (int i = 0; i < topicNum; i++) { - int32_t topicLen; - SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic)); - if (pConsumerTopic == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - // TODO - return NULL; - } - /*pConsumerTopic->vgroups = taosArrayInit(topicNum, sizeof(SMqConsumerTopic));*/ - SDB_GET_INT32(pRaw, dataPos, &topicLen, CONSUME_DECODE_OVER); - SDB_GET_BINARY(pRaw, dataPos, pConsumerTopic->name, topicLen, CONSUME_DECODE_OVER); - int32_t vgSize; - SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER); - } -#endif - -CONSUME_DECODE_OVER: - if (terrno != 0) { +CM_DECODE_OVER: + if (terrno != TSDB_CODE_SUCCESS) { mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); tfree(pRow); return NULL; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 9a573cbe2c..78e9a7c17c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -55,7 +55,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSubActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); - mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); + mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); return sdbSetTable(pMnode->pSdb, table); } @@ -96,25 +96,27 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); // build msg - SMqSetCVgReq req = { - .vgId = pCEp->vgId, - .oldConsumerId = -1, - .newConsumerId = consumerId, - }; - strcpy(req.cgroup, cgroup); - strcpy(req.topicName, topic); - strcpy(req.sql, pTopic->sql); - strcpy(req.logicalPlan, pTopic->logicalPlan); - strcpy(req.physicalPlan, pTopic->physicalPlan); - memcpy(&req.msg, &pCEp->qExec, pCEp->execLen); - int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + + SMqSetCVgReq* pReq = malloc(sizeof(SMqSetCVgReq) + pCEp->qmsgLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + strcpy(pReq->cgroup, cgroup); + strcpy(pReq->topicName, topic); + pReq->sql = strdup(pTopic->sql); + pReq->logicalPlan = strdup(pTopic->logicalPlan); + pReq->physicalPlan = strdup(pTopic->physicalPlan); + pReq->qmsgLen = pCEp->qmsgLen; + memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen); + int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); void *reqStr = malloc(tlen); if (reqStr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } void *abuf = reqStr; - tEncodeSMqSetCVgReq(abuf, &req); + tEncodeSMqSetCVgReq(&abuf, pReq); // persist msg STransAction action = {0}; @@ -128,6 +130,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { SSdbRaw *pRaw = mndSubActionEncode(pSub); mndTransAppendRedolog(pTrans, pRaw); + free(pReq); tfree(topic); tfree(cgroup); } @@ -146,6 +149,14 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas //convert phyplan to dag SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SArray *pArray; + SArray* inner = taosArrayGet(pDag->pSubplans, 0); + SSubplan *plan = taosArrayGetP(inner, 0); + plan->execNode.inUse = 0; + strcpy(plan->execNode.epAddr[0].fqdn, "localhost"); + plan->execNode.epAddr[0].port = 6030; + plan->execNode.nodeId = 2; + plan->execNode.numOfEps = 1; + if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { return -1; } @@ -157,11 +168,18 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; STaskInfo* pTaskInfo = taosArrayGet(pArray, i); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); + /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ CEp.vgId = pTaskInfo->addr.nodeId; + CEp.qmsgLen = pTaskInfo->msg->contentLen; + CEp.qmsg = malloc(CEp.qmsgLen); + if (CEp.qmsg == NULL) { + return -1; + } + memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen); taosArrayPush(unassignedVg, &CEp); } - qDestroyQueryDag(pDag); + /*qDestroyQueryDag(pDag);*/ return 0; } @@ -178,27 +196,33 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume }; strcpy(req.cgroup, pConsumer->cgroup); strcpy(req.topicName, pTopic->name); - strcpy(req.sql, pTopic->sql); - strcpy(req.logicalPlan, pTopic->logicalPlan); - strcpy(req.physicalPlan, pTopic->physicalPlan); + req.sql = pTopic->sql; + req.logicalPlan = pTopic->logicalPlan; + req.physicalPlan = pTopic->physicalPlan; int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); - void *reqStr = malloc(tlen); - if (reqStr == NULL) { + void *buf = malloc(sizeof(SMsgHead) + tlen); + if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - void *abuf = reqStr; + + SMsgHead* pMsgHead = (SMsgHead*)buf; + + pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen); + pMsgHead->vgId = htonl(vgId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncodeSMqSetCVgReq(&abuf, &req); STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = reqStr; + action.pCont = buf; action.contLen = tlen; action.msgType = TDMT_VND_MQ_SET_CONN; mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(reqStr); + free(buf); return -1; } } @@ -208,19 +232,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume void mndCleanupSubscribe(SMnode *pMnode) {} static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { + terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t tlen = tEncodeSubscribeObj(NULL, pSub); - int32_t size = tlen + MND_SUBSCRIBE_RESERVE_SIZE; + int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size); if (pRaw == NULL) goto SUB_ENCODE_OVER; void *buf = malloc(tlen); - if (buf == NULL) { - goto SUB_ENCODE_OVER; - } - void *abuf = buf; + if (buf == NULL) goto SUB_ENCODE_OVER; - tEncodeSubscribeObj(&buf, pSub); + void *abuf = buf; + tEncodeSubscribeObj(&abuf, pSub); int32_t dataPos = 0; SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER); @@ -228,6 +251,8 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER); + terrno = TSDB_CODE_SUCCESS; + SUB_ENCODE_OVER: if (terrno != 0) { mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); @@ -259,9 +284,9 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { int32_t dataPos = 0; int32_t tlen; + SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); void *buf = malloc(tlen + 1); if (buf == NULL) goto SUB_DECODE_OVER; - SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); @@ -269,8 +294,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { goto SUB_DECODE_OVER; } + terrno = TSDB_CODE_SUCCESS; + SUB_DECODE_OVER: - if (terrno != 0) { + if (terrno != TSDB_CODE_SUCCESS) { mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); // TODO free subscribeobj tfree(pRow); @@ -379,10 +406,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; j++; } else if (j >= oldTopicNum) { - newTopicName = taosArrayGet(newSub, i); + newTopicName = taosArrayGetP(newSub, i); i++; } else { - newTopicName = taosArrayGet(newSub, i); + newTopicName = taosArrayGetP(newSub, i); oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; int comp = compareLenPrefixedStr(newTopicName, oldTopicName); @@ -466,6 +493,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + char* key = mndMakeSubscribeKey(consumerGroup, newTopicName); + strcpy(pSub->key, key); // set unassigned vg mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); //TODO: disable alter @@ -486,7 +515,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } SSdbRaw *pRaw = mndSubActionEncode(pSub); - /*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/ + sdbSetRawStatus(pRaw, SDB_STATUS_READY); mndTransAppendRedolog(pTrans, pRaw); #if 0 SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); @@ -519,8 +548,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { mndTransAppendRedolog(pTrans, pTopicRaw); #endif - mndReleaseTopic(pMnode, pTopic); - mndReleaseSubscribe(pMnode, pSub); + /*mndReleaseTopic(pMnode, pTopic);*/ + /*mndReleaseSubscribe(pMnode, pSub);*/ } } // part3. persist consumerObj diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index ac66e7d88b..fa043cf7a0 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -60,7 +60,9 @@ void mndCleanupTopic(SMnode *pMnode) {} SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { terrno = TSDB_CODE_OUT_OF_MEMORY; - int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE; + int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1; + int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1; + int32_t size = sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + MND_TOPIC_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); if (pRaw == NULL) goto TOPIC_ENCODE_OVER; @@ -74,12 +76,10 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); - int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1; - SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->logicalPlan)+1, TOPIC_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER); - int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1; - SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER); + SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); @@ -135,7 +135,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto TOPIC_DECODE_OVER; } - SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); pTopic->physicalPlan = calloc(len + 1, sizeof(char)); @@ -144,7 +144,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto TOPIC_DECODE_OVER; } - SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); @@ -231,6 +231,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) { } static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) { + mDebug("topic:%s to create", pCreate->name); SMqTopicObj topicObj = {0}; tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); @@ -273,7 +274,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { return 0; } else { terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST; - mError("db:%s, failed to create since %s", createTopicReq.name, terrstr()); + mError("topic:%s, failed to create since already exists", createTopicReq.name); return -1; } } diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 0f318dea0b..9e33c04022 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -319,7 +319,7 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset); #endif int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); -int32_t tqProcessSetConnReq(STQ* pTq, char* msg); +int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f3ebd5b284..a3a3804562 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -783,7 +783,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { return 0; } -int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { +int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) { SMqSetCVgReq req; tDecodeSMqSetCVgReq(msg, &req); STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); @@ -799,9 +799,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { return -1; } strcpy(pTopic->topicName, req.topicName); - strcpy(pTopic->sql, req.sql); - strcpy(pTopic->logicalPlan, req.logicalPlan); - strcpy(pTopic->physicalPlan, req.physicalPlan); + pTopic->sql = strdup(req.sql); + pTopic->logicalPlan = strdup(req.logicalPlan); + pTopic->physicalPlan = strdup(req.physicalPlan); pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; @@ -811,9 +811,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); - pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.msg, pReadHandle); + pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.qmsg, pReadHandle); } taosArrayPush(pConsumer->topics, pTopic); + terrno = TSDB_CODE_SUCCESS; return 0; } @@ -826,7 +827,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->pMsg = NULL; pReadHandle->ver = -1; pReadHandle->pColIdList = NULL; - return NULL; + return pReadHandle; } void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) { diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 169dc32c1c..4993e8af23 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -115,7 +115,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } break; case TDMT_VND_MQ_SET_CONN: { - if (tqProcessSetConnReq(pVnode->pTq, ptr) < 0) { + if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead)), NULL) < 0) { + // TODO: handle error } } break; default: diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 2f1f40813c..b7be85dc34 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -60,8 +60,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) { return code; } -qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) { - if (pMsg == NULL || streamReadHandle == NULL) { +qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { + if (msg == NULL || streamReadHandle == NULL) { return NULL; } @@ -74,7 +74,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle #endif struct SSubplan* plan = NULL; - int32_t code = qStringToSubplan(pMsg->msg, &plan); + int32_t code = qStringToSubplan(msg, &plan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index 15af5745b8..7f4cec6bc1 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -87,6 +87,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f static const char* jkPnodeType = "Type"; static int32_t getPnodeTypeSize(cJSON* json) { switch (getNumber(json, jkPnodeType)) { + case OP_StreamScan: case OP_TableScan: case OP_DataBlocksOptScan: case OP_TableSeqScan: @@ -869,6 +870,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { SPhyNode* phyNode = (SPhyNode*)obj; switch (phyNode->info.type) { case OP_TableScan: + case OP_StreamScan: case OP_DataBlocksOptScan: case OP_TableSeqScan: return tableScanNodeFromJson(json, obj); @@ -1189,14 +1191,14 @@ SQueryDag* qJsonToDag(const cJSON* pRoot) { if(pDag == NULL) { return NULL; } - pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "numOfSubplans")); - pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "queryId")); + pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "Number")); + pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "QueryId")); pDag->pSubplans = taosArrayInit(0, sizeof(SArray)); if (pDag->pSubplans == NULL) { free(pDag); return NULL; } - cJSON* pLevels = cJSON_GetObjectItem(pRoot, "pSubplans"); + cJSON* pLevels = cJSON_GetObjectItem(pRoot, "Subplans"); int level = cJSON_GetArraySize(pLevels); for(int i = 0; i < level; i++) { SArray* plansOneLevel = taosArrayInit(0, sizeof(void*)); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index d17bc20495..223fa300df 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1490,7 +1490,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { SSubQueryMsg *pMsg = (SSubQueryMsg*) msg; - pMsg->header.vgId = htonl(tInfo.addr.nodeId); + pMsg->header.vgId = tInfo.addr.nodeId; pMsg->sId = schMgmt.sId; pMsg->queryId = plan->id.queryId;