diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 8dc634571f..39d08f2e86 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -33,7 +33,7 @@ typedef enum { TSDB_SUPER_TABLE = 1, // super table TSDB_CHILD_TABLE = 2, // table created from super table TSDB_NORMAL_TABLE = 3, // ordinary table - TSDB_STREAM_TABLE = 4, // table created from stream computing + TSDB_STREAM_TABLE = 4, // table created by stream processing TSDB_TEMP_TABLE = 5, // temp table created by nest query TSDB_TABLE_MAX = 6 } ETableType; @@ -50,7 +50,12 @@ typedef enum { TSDB_CHECK_ITEM_MAX } ECheckItemType; -typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig; +typedef enum { + TD_ROW_DISCARD_UPDATE = 0, + TD_ROW_OVERWRITE_UPDATE = 1, + TD_ROW_PARTIAL_UPDATE = 2, +} TDUpdateConfig; + typedef enum { TSDB_STATIS_OK = 0, // statis part exist and load successfully TSDB_STATIS_NONE = 1, // statis part not exist diff --git a/include/common/tcommon.h b/include/common/tcommon.h index b5bd088006..23dd349861 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -136,6 +136,23 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) return (void*)buf; } +static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + + // int32_t numOfOutput = pBlock->info.numOfCols; + int32_t sz = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < sz; ++i) { + SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + tfree(pColInfoData->pData); + } + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockAgg); + // tfree(pBlock); +} + static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { int32_t tlen = 0; int32_t sz = 0; @@ -178,23 +195,6 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { return buf; } -static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { - if (pBlock == NULL) { - return; - } - - // int32_t numOfOutput = pBlock->info.numOfCols; - int32_t sz = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < sz; ++i) { - SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); - tfree(pColInfoData->pData); - } - - taosArrayDestroy(pBlock->pDataBlock); - tfree(pBlock->pBlockAgg); - // tfree(pBlock); -} - static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { if (pRsp->schemas) { if (pRsp->schemas->nCols) { @@ -204,10 +204,6 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { } taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock); pRsp->pBlockData = NULL; - // for (int32_t i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) { - // SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i); - // tDeleteSSDataBlock(pDataBlock); - //} } //====================================================================================================================== diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5d12fc4ffc..5975bf0a29 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1139,6 +1139,17 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq); void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + int64_t streamId; + char* sql; + char* executorMsg; +} SMVCreateStreamReq, SMSCreateStreamReq; + +typedef struct { + int64_t streamId; +} SMVCreateStreamRsp, SMSCreateStreamRsp; + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int8_t igExists; diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index c9fab140cc..21a93532e0 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -80,6 +80,10 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); */ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); + +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg); + /** * @brief Drop a snode. * diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ff853145fa..f26f19f3b2 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2666,7 +2666,6 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1; if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1; - tEndEncode(&encoder); int32_t tlen = encoder.pos; diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 13ef101908..aeea5386b4 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -90,9 +90,11 @@ typedef struct { int32_t refCount; int8_t deployed; int8_t dropped; + int8_t uniqueWorkerInUse; SSnode *pSnode; SRWLatch latch; - SDnodeWorker writeWorker; + SArray *uniqueWorkers; // SArray + SDnodeWorker sharedWorker; } SSnodeMgmt; typedef struct { @@ -153,4 +155,4 @@ int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo); } #endif -#endif /*_TD_DND_ENV_H_*/ \ No newline at end of file +#endif /*_TD_DND_ENV_H_*/ diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 4ca6b97ad4..a8530037da 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -70,4 +70,4 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); } #endif -#endif /*_TD_DND_INT_H_*/ \ No newline at end of file +#endif /*_TD_DND_INT_H_*/ diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index 4906aef246..b27a25680a 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -19,7 +19,20 @@ #include "dndTransport.h" #include "dndWorker.h" -static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); +typedef struct { + int32_t vgId; + int32_t refCount; + int32_t snVersion; + int8_t dropped; + char *path; + SSnode *pImpl; + STaosQueue *pSharedQ; + STaosQueue *pUniqueQ; +} SSnodeObj; + +static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg); + +static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); static SSnode *dndAcquireSnode(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; @@ -152,8 +165,21 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) { static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) { - dError("failed to start snode write worker since %s", terrstr()); + pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); + for (int32_t i = 0; i < 2; i++) { + SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); + if (pUniqueWorker == NULL) { + return -1; + } + if (dndInitWorker(pDnode, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) { + dError("failed to start snode unique worker since %s", terrstr()); + return -1; + } + taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker); + } + if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4, + dndProcessSnodeSharedQueue)) { + dError("failed to start snode shared worker since %s", terrstr()); return -1; } @@ -169,9 +195,13 @@ static void dndStopSnodeWorker(SDnode *pDnode) { while (pMgmt->refCount > 0) { taosMsleep(10); - } + } - dndCleanupWorker(&pMgmt->writeWorker); + for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { + SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i); + dndCleanupWorker(worker); + } + taosArrayDestroy(pMgmt->uniqueWorkers); } static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { @@ -292,17 +322,36 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } } -static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { +static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - SRpcMsg *pRsp = NULL; int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; SSnode *pSnode = dndAcquireSnode(pDnode); if (pSnode != NULL) { - code = sndProcessMsg(pSnode, pMsg, &pRsp); + for (int32_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + + sndProcessUMsg(pSnode, pMsg); + + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + } + dndReleaseSnode(pDnode, pSnode); +} + +static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + code = sndProcessSMsg(pSnode, pMsg); } dndReleaseSnode(pDnode, pSnode); +#if 0 if (pMsg->msgType & 1u) { if (pRsp != NULL) { pRsp->ahandle = pMsg->ahandle; @@ -314,11 +363,32 @@ static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { rpcSendResponse(&rpcRsp); } } +#endif rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } +static void dndWriteSnodeMsgToRandomWorker(SDnode *pDnode, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + int32_t index = (pDnode->smgmt.uniqueWorkerInUse + 1) % taosArrayGetSize(pDnode->smgmt.uniqueWorkers); + SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, index); + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); + } + dndReleaseSnode(pDnode, pSnode); + + if (code != 0) { + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->pCont); + } +} + static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; @@ -337,8 +407,13 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc } } -void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.writeWorker, pMsg); +void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + // judge from msg to write to unique queue + dndWriteSnodeMsgToRandomWorker(pDnode, pMsg); +} + +void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); } int32_t dndInitSnode(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index 5ccf6640c0..38f8737b2b 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -109,4 +109,4 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) } return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index d3642f4204..f2baea1cd9 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -30,6 +30,7 @@ #include "mndShow.h" #include "mndSnode.h" #include "mndStb.h" +#include "mndStream.h" #include "mndSubscribe.h" #include "mndSync.h" #include "mndTelem.h" @@ -220,6 +221,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1; if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1; if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 01500fbc54..91008dd03a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -31,3 +31,15 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } void sndDestroy(const char *path) {} + +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { + // stream deployment + // stream stop/resume + // operator exec + return 0; +} + +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { + // operator exec + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 92a111298f..a2342ec85a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -83,8 +83,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { } int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - /*int32_t sversion = pHandle->pBlock->sversion;*/ - /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/ + // currently only rows are used + pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->uid = pHandle->pBlock->uid; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 1657a85ee8..1fa70da870 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -188,7 +188,7 @@ void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueu int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; - pool->workers = calloc(sizeof(SWWorker), pool->max); + pool->workers = calloc(pool->max, sizeof(SWWorker)); if (pool->workers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1;