diff --git a/include/client/consumer/consumer.h b/include/client/consumer/consumer.h deleted file mode 100644 index 8d1c9835e6..0000000000 --- a/include/client/consumer/consumer.h +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_CONSUMER_H_ -#define _TD_CONSUMER_H_ - -#include "tlist.h" -#include "tarray.h" -#include "hash.h" - -#ifdef __cplusplus -extern "C" { -#endif - - //consumer handle - struct tmq_consumer_t; - typedef struct tmq_consumer_t tmq_consumer_t; - - //consumer config - struct tmq_consumer_config_t; - typedef struct tmq_consumer_config_t tmq_consumer_config_t; - - //response err - struct tmq_resp_err_t; - typedef struct tmq_resp_err_t tmq_resp_err_t; - - struct tmq_message_t; - typedef struct tmq_message_t tmq_message_t; - - struct tmq_col_batch_t; - typedef struct tmq_col_batch_t tmq_col_batch_t; - - //get content of message - tmq_col_batch_t* tmq_get_msg_col_by_idx(tmq_message_t*, int32_t col_id); - tmq_col_batch_t* tmq_get_msg_col_by_name(tmq_message_t*, const char*); - - //consumer config - int32_t tmq_conf_set(tmq_consumer_config_t* , const char* config_key, const char* config_value, char* errstr, int32_t errstr_cap); - - //consumer initialization - //resouces are supposed to be free by users by calling tmq_consumer_destroy - tmq_consumer_t* tmq_consumer_new(tmq_consumer_config_t* , char* errstr, int32_t errstr_cap); - - //subscribe - tmq_resp_err_t tmq_subscribe(tmq_consumer_t*, const SList*); - tmq_resp_err_t tmq_unsubscribe(tmq_consumer_t*); - - //consume - //resouces are supposed to be free by users by calling tmq_message_destroy - tmq_message_t* tmq_consume_poll(tmq_consumer_t*, int64_t blocking_time); - - //destroy message and free memory - void tmq_message_destroy(tmq_message_t*); - - //close consumer - int32_t tmq_consumer_close(tmq_consumer_t*); - - //destroy consumer - void tmq_consumer_destroy(tmq_message_t*); - - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_CONSUMER_H_*/ diff --git a/include/client/stream/stream.h b/include/client/stream/stream.h deleted file mode 100644 index 79b247c61c..0000000000 --- a/include/client/stream/stream.h +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_STREAM_H_ -#define _TD_STREAM_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_STREAM_H_*/ \ No newline at end of file diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 02fe591a09..eadc901389 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -30,6 +30,11 @@ enum { STREAM_TASK_STATUS__STOP, }; +enum { + STREAM_CREATED_BY__USER = 1, + STREAM_CREATED_BY__SMA, +}; + #if 0 // pipe -> fetch/pipe queue // merge -> merge queue diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index caf5172596..6976d83abd 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -731,10 +731,10 @@ typedef struct { int32_t vgNum; SRWLatch lock; int8_t status; - int8_t sourceType; - int8_t sinkType; // int32_t sqlLen; - int32_t sinkVgId; // 0 for automatic + int8_t createdBy; // STREAM_CREATED_BY__USER or SMA + int32_t fixedSinkVgId; // 0 for shuffle + int64_t smaId; // 0 for unused char* sql; char* logicalPlan; char* physicalPlan; diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 416061bf34..42951beca2 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId); +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index e7cdd34a7e..b5d22cb7a5 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 1b3564924a..6374b4cad2 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -26,8 +26,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; - if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1; + /*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/ if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1; // TODO encode tasks if (pObj->tasks) { @@ -69,8 +72,11 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; - if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1; + /*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/ if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1; pObj->tasks = NULL; int32_t sz; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 566bd1d282..4562d9e5d3 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } -int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { +int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; void* pIter = NULL; SArray* tasks = taosArrayGetP(pStream->tasks, 0); @@ -151,9 +151,9 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, pTask->execType = TASK_EXEC__NONE; // sink - if (smaId != -1) { + if (pStream->createdBy == STREAM_CREATED_BY__SMA) { pTask->sinkType = TASK_SINK__SMA; - pTask->smaSink.smaId = smaId; + pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; } @@ -166,7 +166,45 @@ int32_t mndAddSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, return 0; } -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { +int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { + ASSERT(pStream->fixedSinkVgId != 0); + SArray* tasks = taosArrayGetP(pStream->tasks, 0); + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + taosArrayPush(tasks, &pTask); + + pTask->nodeId = pStream->fixedSinkVgId; + SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId); + if (pVgroup == NULL) { + return -1; + } + pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); + // source + pTask->sourceType = TASK_SOURCE__MERGE; + + // exec + pTask->execType = TASK_EXEC__NONE; + + // sink + if (pStream->createdBy == STREAM_CREATED_BY__SMA) { + pTask->sinkType = TASK_SINK__SMA; + pTask->smaSink.smaId = pStream->smaId; + } else { + pTask->sinkType = TASK_SINK__TABLE; + } + // + // dispatch + pTask->dispatchType = TASK_DISPATCH__NONE; + + mndPersistTaskDeployReq(pTrans, pTask, &pTask->epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); + + return 0; +} + +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSdb* pSdb = pMnode->pSdb; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { @@ -185,7 +223,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i taosArrayPush(pStream->tasks, &taskOneLevel); // add extra sink hasExtraSink = true; - mndAddSinkToStream(pMnode, pTrans, pStream, smaId); + if (pStream->fixedSinkVgId == 0) { + mndAddShuffledSinkToStream(pMnode, pTrans, pStream); + } else { + mndAddFixedSinkToStream(pMnode, pTrans, pStream); + } } for (int32_t level = 0; level < totLevel; level++) { @@ -221,12 +263,14 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i pTask->sinkType = TASK_SINK__SHOW; pTask->showSink.reserved = 0; if (!hasExtraSink) { - if (smaId != -1) { +#if 1 + if (pStream->createdBy == STREAM_CREATED_BY__SMA) { pTask->sinkType = TASK_SINK__SMA; - pTask->smaSink.smaId = smaId; + pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; } +#endif } } else { pTask->sinkType = TASK_SINK__NONE; @@ -286,35 +330,47 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, i if (hasExtraSink) { // add dispatcher - pTask->dispatchType = TASK_DISPATCH__SHUFFLE; + if (pStream->fixedSinkVgId == 0) { + pTask->dispatchType = TASK_DISPATCH__SHUFFLE; - pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; - SDbObj* pDb = mndAcquireDb(pMnode, pStream->db); - ASSERT(pDb); - if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + SDbObj* pDb = mndAcquireDb(pMnode, pStream->db); + ASSERT(pDb); + if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + sdbRelease(pSdb, pDb); + qDestroyQueryPlan(pPlan); + return -1; + } sdbRelease(pSdb, pDb); - qDestroyQueryPlan(pPlan); - return -1; - } - sdbRelease(pSdb, pDb); - // put taskId to useDbRsp - // TODO: optimize - SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(pVgs); - SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); - int32_t sinkLvSize = taosArrayGetSize(sinkLv); - for (int32_t i = 0; i < sz; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); - for (int32_t j = 0; j < sinkLvSize; j++) { - SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); - /*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/ - if (pLastLevelTask->nodeId == pVgInfo->vgId) { - pVgInfo->taskId = pLastLevelTask->taskId; - /*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/ - break; + // put taskId to useDbRsp + // TODO: optimize + SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(pVgs); + SArray* sinkLv = taosArrayGetP(pStream->tasks, 0); + int32_t sinkLvSize = taosArrayGetSize(sinkLv); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i); + for (int32_t j = 0; j < sinkLvSize; j++) { + SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); + /*printf("vgid %d node id %d\n", pVgInfo->vgId, pTask->nodeId);*/ + if (pLastLevelTask->nodeId == pVgInfo->vgId) { + pVgInfo->taskId = pLastLevelTask->taskId; + /*printf("taskid %d set to %d\n", pVgInfo->taskId, pTask->taskId);*/ + break; + } } } + } else { + pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->dispatchMsgType = TDMT_VND_TASK_WRITE_EXEC; + SArray* pArray = taosArrayGetP(pStream->tasks, 0); + // one sink only + ASSERT(taosArrayGetSize(pArray) == 1); + SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; + pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } } #endif diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 5c62cfa0f2..94114a96bf 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -415,6 +415,10 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; + streamObj.createdBy = STREAM_CREATED_BY__SMA; + // TODO + streamObj.fixedSinkVgId = 0; + streamObj.smaId = smaObj.uid; /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; @@ -428,7 +432,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bbb2f64282..376a41b0cd 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { @@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } - if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) { + if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); return -1; } @@ -300,6 +300,10 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; + streamObj.createdBy = STREAM_CREATED_BY__USER; + // TODO + streamObj.fixedSinkVgId = 0; + streamObj.smaId = 0; /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; @@ -310,7 +314,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1;