diff --git a/include/client/taos.h b/include/client/taos.h index 626264d94e..5f7cd12963 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -204,12 +204,19 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); +/* --------------------------TMQ INTERFACE------------------------------- */ +typedef struct tmq_resp_err_t tmq_resp_err_t; +typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; +typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; + +typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); DLL_EXPORT tmq_list_t* tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t*, char*); DLL_EXPORT tmq_conf_t* tmq_conf_new(); DLL_EXPORT int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value); +DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb); DLL_EXPORT TAOS_RES *taos_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); @@ -217,13 +224,7 @@ DLL_EXPORT tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, DLL_EXPORT TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list); -DLL_EXPORT tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time); - -DLL_EXPORT int32_t tmq_topic_num(tmq_message_t* msg); -DLL_EXPORT char* tmq_get_topic(tmq_message_topic_t* msg); -DLL_EXPORT int32_t tmq_get_vgId(tmq_message_topic_t* msg); -DLL_EXPORT tmq_message_tb_t* tmq_get_next_tb(tmq_message_topic_t* msg, tmq_tb_iter_t* iter); -DLL_EXPORT tmq_message_col_t* tmq_get_next_col(tmq_message_tb_t* msg, tmq_col_iter_t* iter); +DLL_EXPORT tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8f49fce558..50af44a94c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -254,6 +254,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5) #define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6) #define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7) +#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E7) // dnode #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a8e30bfd52..3b71805e14 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -280,23 +280,21 @@ typedef struct SMqClientTopic { SArray* vgs; //SArray } SMqClientTopic; -typedef struct tmq_resp_err_t { +struct tmq_resp_err_t { int32_t code; -} tmq_resp_err_t; +}; -typedef struct tmq_topic_vgroup_t { +struct tmq_topic_vgroup_t { char* topic; int32_t vgId; int64_t commitOffset; -} tmq_topic_vgroup_t; +}; -typedef struct tmq_topic_vgroup_list_t { +struct tmq_topic_vgroup_list_t { int32_t cnt; int32_t size; tmq_topic_vgroup_t* elems; -} tmq_topic_vgroup_list_t; - -typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); +}; struct tmq_conf_t { char clientId[256]; @@ -677,8 +675,7 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; if (code != 0) { - - printf("exit wait %d\n", pParam->wait); + printf("get topic endpoint error, not ready, wait:%d\n", pParam->wait); if (pParam->wait) { tsem_post(&tmq->rspSem); } @@ -769,7 +766,7 @@ END: return 0; } -tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { +tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { int64_t status = atomic_load_64(&tmq->status); tmqAsyncAskEp(tmq, status == 0 || taosArrayGetSize(tmq->clientTopics)); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index eec6a00405..8cca1e6152 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -562,9 +562,9 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } - #endif -TEST(testCase, create_topic_Test) { + +TEST(testCase, create_topic_ctb_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -583,13 +583,37 @@ TEST(testCase, create_topic_Test) { taos_free_result(pRes); char* sql = "select * from tu"; - pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + pRes = taos_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql)); taos_free_result(pRes); taos_close(pConn); } +TEST(testCase, create_topic_stb_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); -TEST(testCase, tmq_subscribe_Test) { + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + //taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from st1"; + pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} + +#if 0 +TEST(testCase, tmq_subscribe_ctb_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -604,7 +628,32 @@ TEST(testCase, tmq_subscribe_Test) { tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "test_topic_1"); + tmq_list_append(topic_list, "test_ctb_topic_1"); + tmq_subscribe(tmq, topic_list); + + while (1) { + tmq_message_t* msg = tmq_consumer_poll(tmq, 1000); + //printf("get msg\n"); + //if (msg == NULL) break; + } +} + +TEST(testCase, tmq_subscribe_stb_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); + + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_stb_topic_1"); tmq_subscribe(tmq, topic_list); while (1) { @@ -613,6 +662,7 @@ TEST(testCase, tmq_subscribe_Test) { //if (msg == NULL) break; } } +#endif TEST(testCase, tmq_consume_Test) { } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 0c0d8bd349..fa8c78f4b1 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -96,7 +96,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { bool found = 0; bool changed = 0; for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { - if (*(int64_t*)taosArrayGet(pSub->availConsumer, j) == consumerId) { + if (*(int64_t *)taosArrayGet(pSub->availConsumer, j) == consumerId) { found = 1; break; } @@ -105,16 +105,13 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { taosArrayPush(pSub->availConsumer, &consumerId); } - int32_t assignedSz = taosArrayGetSize(pSub->assigned); + int32_t assignedSz = taosArrayGetSize(pSub->assigned); topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp)); for (int32_t j = 0; j < assignedSz; j++) { SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j); if (pCEp->consumerId == consumerId) { pCEp->lastConsumerHbTs = currentTs; - SMqSubVgEp vgEp = { - .epSet = pCEp->epSet, - .vgId = pCEp->vgId - }; + SMqSubVgEp vgEp = {.epSet = pCEp->epSet, .vgId = pCEp->vgId}; taosArrayPush(topicEp.vgs, &vgEp); changed = 1; } @@ -123,7 +120,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { taosArrayPush(rsp.topics, &topicEp); } if (changed || found) { - SSdbRaw* pRaw = mndSubActionEncode(pSub); + SSdbRaw *pRaw = mndSubActionEncode(pSub); sdbSetRawStatus(pRaw, SDB_STATUS_READY); sdbWriteNotFree(pMnode->pSdb, pRaw); } @@ -137,7 +134,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { } void *abuf = buf; tEncodeSMqCMGetSubEpRsp(&abuf, &rsp); - //TODO: free rsp + // TODO: free rsp pMsg->pCont = buf; pMsg->contLen = tlen; return 0; @@ -164,9 +161,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { int32_t sz; while (pIter != NULL) { for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) { - SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i); - int64_t consumerId = pCEp->consumerId; - if(pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) { + SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); + int64_t consumerId = pCEp->consumerId; + if (pCEp->lastConsumerHbTs != -1 && currentTs - pCEp->lastConsumerHbTs > MND_SUBSCRIBE_REBALANCE_MS) { // put consumer into lostConsumer taosArrayPush(pSub->lostConsumer, pCEp); // put vg into unassgined @@ -176,7 +173,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { taosArrayRemove(pSub->assigned, i); // remove from available consumer for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { - if (*(int64_t*)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) { + if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) { taosArrayRemove(pSub->availConsumer, j); break; } @@ -209,7 +206,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { taosArrayPush(pSub->assigned, pCEp); pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); - SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId); + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); pConsumer->epoch++; /*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/ /*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/ @@ -269,33 +266,50 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { // convert phyplan to dag SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); - SArray *pArray; + SArray *pArray = NULL; SArray *inner = taosArrayGet(pDag->pSubplans, 0); SSubplan *plan = taosArrayGetP(inner, 0); + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; - plan->execNode.nodeId = 2; - SEpSet* pEpSet = &plan->execNode.epset; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pTopic->dbUid) continue; + + plan->execNode.nodeId = pVgroup->vgId; + plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup); + + if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { + terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC; + mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql); + return -1; + } + if (pArray && taosArrayGetSize(pArray) != 1) { + terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC; + mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray)); + return -1; + } - pEpSet->inUse = 0; - addEpIntoEpSet(pEpSet, "localhost", 6030); - if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { - return -1; - } - int32_t sz = taosArrayGetSize(pArray); - // convert dag to msg - for (int32_t i = 0; i < sz; i++) { SMqConsumerEp CEp; CEp.status = 0; CEp.consumerId = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; - STaskInfo *pTaskInfo = taosArrayGet(pArray, i); + STaskInfo *pTaskInfo = taosArrayGet(pArray, 0); CEp.epSet = pTaskInfo->addr.epset; - - /*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], - * CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ CEp.vgId = pTaskInfo->addr.nodeId; + + ASSERT(CEp.vgId == pVgroup->vgId); CEp.qmsg = strdup(pTaskInfo->msg->msg); taosArrayPush(unassignedVg, &CEp); + //TODO: free taskInfo + taosArrayDestroy(pArray); + + /*SEpSet *pEpSet = &plan->execNode.epset;*/ + /*pEpSet->inUse = 0;*/ + /*addEpIntoEpSet(pEpSet, "localhost", 6030);*/ + } /*qDestroyQueryDag(pDag);*/ @@ -608,7 +622,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName); - bool create = false; + bool create = false; if (pSub == NULL) { mDebug("create new subscription, group: %s, topic %s", consumerGroup, newTopicName); pSub = tNewSubscribeObj(); @@ -624,7 +638,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { strcpy(pSub->key, key); free(key); // set unassigned vg - mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); + if (mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg) < 0) { + //TODO: free memory + return -1; + } // TODO: disable alter create = true; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d7fd02422b..4e5246b0a8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -866,18 +866,16 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { if (pHandle->pBlock == NULL) return false; pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid); - if (pHandle->tbUid == pHandle->pBlock->uid) { + /*if (pHandle->tbUid == pHandle->pBlock->uid) {*/ + ASSERT(pHandle->tbIdHash); + void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); + if (ret != NULL) { pHandle->pBlock->tid = htonl(pHandle->pBlock->tid); pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion); pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen); pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen); pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows); return true; - } else if (pHandle->tbIdHash != NULL) { - void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->pBlock->uid, sizeof(int64_t)); - if (ret != NULL) { - return true; - } } } return false; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7b1ef01fb2..e2e2e7c8d5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5477,7 +5477,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp // set the extract column id to streamHandle tqReadHandleSetColIdList((STqReadHandle* )streamReadHandle, pColList); - tqReadHandleSetTbUid(streamReadHandle, pTableIdList); + tqReadHandleSetTbUidList(streamReadHandle, pTableIdList); pInfo->readerHandle = streamReadHandle; @@ -7776,7 +7776,7 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode; // simple child table. STableGroupInfo groupInfo = {0}; - int32_t code = doCreateTableGroup(((STqReadHandle*)readerHandle)->pMeta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); + int32_t code = doCreateTableGroup(((STqReadHandle*)readerHandle)->pVnodeMeta, pScanPhyNode->tableType, pScanPhyNode->uid, &groupInfo, queryId, taskId); SArray* pa = taosArrayGetP(groupInfo.pGroupList, 0); ASSERT(taosArrayGetSize(groupInfo.pGroupList) == 1); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 489fff5d64..ee5bea0ab7 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -254,6 +254,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retriev TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") +// mnode-topic +TAOS_DEFINE_ERROR(TSDB_CODE_MND_UNSUPPORTED_TOPIC, "Topic with STable not supported yet") + // dnode TAOS_DEFINE_ERROR(TSDB_CODE_DND_ACTION_IN_PROGRESS, "Action in progress") TAOS_DEFINE_ERROR(TSDB_CODE_DND_OFFLINE, "Dnode is offline")