diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 9dcd58a05f..21a93532e0 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -23,9 +23,6 @@ extern "C" { #endif -#define SND_UNIQUE_THREAD_NUM 2 -#define SND_SHARED_THREAD_NUM 2 - /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SDnode SDnode; typedef struct SSnode SSnode; diff --git a/include/util/tdef.h b/include/util/tdef.h index a53b81894a..41a61ceb55 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -345,19 +345,19 @@ typedef enum ELogicConditionType { #define TSDB_MAX_DB_QUORUM_OPTION 2 #define TSDB_DEFAULT_DB_QUORUM_OPTION 1 -#define TSDB_MIN_DB_TTL_OPTION 1 -#define TSDB_DEFAULT_DB_TTL_OPTION 0 +#define TSDB_MIN_DB_TTL_OPTION 1 +#define TSDB_DEFAULT_DB_TTL_OPTION 0 -#define TSDB_MIN_DB_SINGLE_STABLE_OPTION 0 -#define TSDB_MAX_DB_SINGLE_STABLE_OPTION 1 -#define TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION 0 +#define TSDB_MIN_DB_SINGLE_STABLE_OPTION 0 +#define TSDB_MAX_DB_SINGLE_STABLE_OPTION 1 +#define TSDB_DEFAULT_DB_SINGLE_STABLE_OPTION 0 -#define TSDB_MIN_DB_STREAM_MODE_OPTION 0 -#define TSDB_MAX_DB_STREAM_MODE_OPTION 1 -#define TSDB_DEFAULT_DB_STREAM_MODE_OPTION 0 +#define TSDB_MIN_DB_STREAM_MODE_OPTION 0 +#define TSDB_MAX_DB_STREAM_MODE_OPTION 1 +#define TSDB_DEFAULT_DB_STREAM_MODE_OPTION 0 -#define TSDB_MAX_JOIN_TABLE_NUM 10 -#define TSDB_MAX_UNION_CLAUSE 5 +#define TSDB_MAX_JOIN_TABLE_NUM 10 +#define TSDB_MAX_UNION_CLAUSE 5 #define TSDB_MIN_DB_UPDATE 0 #define TSDB_MAX_DB_UPDATE 2 @@ -445,6 +445,9 @@ typedef struct { #define TMQ_SEPARATOR ':' +#define SND_UNIQUE_THREAD_NUM 2 +#define SND_SHARED_THREAD_NUM 2 + #ifdef __cplusplus } #endif diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 846329f0f4..2e2c44951e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -459,7 +459,7 @@ void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf-> TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { STscObj* pTscObj = (STscObj*)taos; SRequestObj* pRequest = NULL; - SQuery* pQueryNode = NULL; + SQuery* pQueryNode = NULL; char* pStr = NULL; terrno = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9ed6104140..a7116b72ff 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -679,6 +679,12 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons return buf; } +typedef struct { + int32_t taskId; + int32_t level; + SSubplan* plan; +} SStreamTaskMeta; + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN]; @@ -687,12 +693,14 @@ typedef struct { int64_t uid; int64_t dbUid; int32_t version; + int32_t vgNum; SRWLatch lock; int8_t status; // int32_t sqlLen; - char* sql; - char* logicalPlan; - char* physicalPlan; + char* sql; + char* logicalPlan; + char* physicalPlan; + SArray* tasks; // SArray> } SStreamObj; int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index e827810cc9..855e244daa 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -21,6 +21,7 @@ #include "mndOffset.h" #include "mndShow.h" #include "mndStb.h" +#include "mndStream.h" #include "mndSubscribe.h" #include "mndTopic.h" #include "mndTrans.h" @@ -28,10 +29,84 @@ #include "mndVgroup.h" #include "tcompare.h" #include "tname.h" +#include "tuuid.h" + +int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { + SSdb* pSdb = pMnode->pSdb; + SVgObj* pVgroup = NULL; + SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); + if (pPlan == NULL) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + ASSERT(pStream->vgNum == 0); + + int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); + pStream->tasks = taosArrayInit(levelNum, sizeof(SArray)); + + for (int32_t i = 0; i < levelNum; i++) { + SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTaskMeta)); + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, i); + int32_t opNum = LIST_LENGTH(inner->pNodeList); + ASSERT(opNum == 1); + + SSubplan* plan = nodesListGetNode(inner->pNodeList, 0); + if (i == 0) { + ASSERT(plan->type == SUBPLAN_TYPE_SCAN); + void* pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pStream->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } + + pStream->vgNum++; + plan->execNode.nodeId = pVgroup->vgId; + plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); + SStreamTaskMeta task = { + .taskId = tGenIdPI32(), + .level = i, + .plan = plan, + }; + // send to vnode + taosArrayPush(taskOneLevel, &task); + } + } else if (plan->subplanType == SUBPLAN_TYPE_SCAN) { + // duplicatable + + int32_t parallel = 0; + // if no snode, parallel set to fetch thread num in vnode + + // if has snode, set to shared thread num in snode + parallel = SND_SHARED_THREAD_NUM; + + for (int32_t j = 0; j < parallel; j++) { + SStreamTaskMeta task = { + .taskId = tGenIdPI32(), + .level = i, + .plan = plan, + }; + taosArrayPush(taskOneLevel, &task); + } + } else { + // not duplicatable + SStreamTaskMeta task = { + .taskId = tGenIdPI32(), + .level = i, + .plan = plan, + }; + taosArrayPush(taskOneLevel, &task); + } + taosArrayPush(pStream->tasks, taskOneLevel); + } + return 0; +} int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) { - SSdb* pSdb = pMnode->pSdb; - SVgObj* pVgroup = NULL; + SSdb* pSdb = pMnode->pSdb; + SVgObj* pVgroup = NULL; SQueryPlan* pPlan = qStringToQueryPlan(pTopic->physicalPlan); if (pPlan == NULL) { terrno = TSDB_CODE_QRY_INVALID_INPUT; diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index ffe691aeb4..3fe816845d 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -67,11 +67,11 @@ typedef struct { // storage handle } SStreamTask; -int32_t sndCreateStream(); -int32_t sndDropStream(); +int32_t sndCreateTask(); +int32_t sndDropTaskOfStream(int64_t streamId); -int32_t sndStopStream(); -int32_t sndResumeStream(); +int32_t sndStopTaskOfStream(int64_t streamId); +int32_t sndResumeTaskOfStream(int64_t streamId); #ifdef __cplusplus } diff --git a/source/util/src/tuuid.c b/source/util/src/tuuid.c index 0405403220..ceca33436a 100644 --- a/source/util/src/tuuid.c +++ b/source/util/src/tuuid.c @@ -24,7 +24,6 @@ int32_t tGenIdPI32(void) { int32_t code = taosGetSystemUUID(uid, tListLen(uid)); if (code != TSDB_CODE_SUCCESS) { terrno = TAOS_SYSTEM_ERROR(errno); - return -1; } else { hashId = MurmurHash3_32(uid, strlen(uid)); } @@ -44,7 +43,6 @@ int64_t tGenIdPI64(void) { int32_t code = taosGetSystemUUID(uid, tListLen(uid)); if (code != TSDB_CODE_SUCCESS) { terrno = TAOS_SYSTEM_ERROR(errno); - return -1; } else { hashId = MurmurHash3_32(uid, strlen(uid)); }