From 00c5307d513db41ffbe06437953fcf6c121095fc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 10 Mar 2022 20:48:38 +0800 Subject: [PATCH 1/3] refactor --- include/common/taosdef.h | 9 +++++-- include/common/tcommon.h | 38 +++++++++++++---------------- source/common/src/tmsg.c | 1 - source/dnode/mnode/impl/src/mnode.c | 2 ++ 4 files changed, 26 insertions(+), 24 deletions(-) diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 69c2618ac8..1584fcb4bf 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -33,7 +33,7 @@ typedef enum { TSDB_SUPER_TABLE = 1, // super table TSDB_CHILD_TABLE = 2, // table created from super table TSDB_NORMAL_TABLE = 3, // ordinary table - TSDB_STREAM_TABLE = 4, // table created from stream computing + TSDB_STREAM_TABLE = 4, // table created by stream processing TSDB_TEMP_TABLE = 5, // temp table created by nest query TSDB_TABLE_MAX = 6 } ETableType; @@ -50,7 +50,12 @@ typedef enum { TSDB_CHECK_ITEM_MAX } ECheckItemType; -typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig; +typedef enum { + TD_ROW_DISCARD_UPDATE = 0, + TD_ROW_OVERWRITE_UPDATE = 1, + TD_ROW_PARTIAL_UPDATE = 2, +} TDUpdateConfig; + typedef enum { TSDB_STATIS_OK = 0, // statis part exist and load successfully TSDB_STATIS_NONE = 1, // statis part not exist diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 1d3ab4f340..3eb9e88ff5 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -135,6 +135,23 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) return (void*)buf; } +static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + + // int32_t numOfOutput = pBlock->info.numOfCols; + int32_t sz = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < sz; ++i) { + SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + tfree(pColInfoData->pData); + } + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockAgg); + // tfree(pBlock); +} + static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { int32_t tlen = 0; int32_t sz = 0; @@ -177,23 +194,6 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { return buf; } -static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { - if (pBlock == NULL) { - return; - } - - // int32_t numOfOutput = pBlock->info.numOfCols; - int32_t sz = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < sz; ++i) { - SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); - tfree(pColInfoData->pData); - } - - taosArrayDestroy(pBlock->pDataBlock); - tfree(pBlock->pBlockAgg); - // tfree(pBlock); -} - static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { if (pRsp->schemas) { if (pRsp->schemas->nCols) { @@ -203,10 +203,6 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { } taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock); pRsp->pBlockData = NULL; - // for (int32_t i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) { - // SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i); - // tDeleteSSDataBlock(pDataBlock); - //} } //====================================================================================================================== diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 135ff34207..26861e1ff8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2434,7 +2434,6 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1; if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1; - tEndEncode(&encoder); int32_t tlen = encoder.pos; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index d3642f4204..f2baea1cd9 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -30,6 +30,7 @@ #include "mndShow.h" #include "mndSnode.h" #include "mndStb.h" +#include "mndStream.h" #include "mndSubscribe.h" #include "mndSync.h" #include "mndTelem.h" @@ -220,6 +221,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1; if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1; if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1; From 36d4ef0af6cbe03e7a5418135fb4851e90382658 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 14 Mar 2022 20:32:19 +0800 Subject: [PATCH 2/3] refactor stream worker --- include/common/tmsg.h | 11 +++ include/dnode/snode/snode.h | 4 ++ source/dnode/mgmt/impl/inc/dndEnv.h | 6 +- source/dnode/mgmt/impl/inc/dndInt.h | 2 +- source/dnode/mgmt/impl/src/dndSnode.c | 92 +++++++++++++++++++++++--- source/dnode/mgmt/impl/src/dndWorker.c | 2 +- source/dnode/snode/src/snode.c | 12 ++++ source/dnode/vnode/src/tq/tqRead.c | 4 +- source/util/src/tworker.c | 2 +- 9 files changed, 118 insertions(+), 17 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a5c2c89b24..5a60761f11 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1118,6 +1118,17 @@ int32_t tSerializeSCMCreateStreamReq(void* buf, int32_t bufLen, const SCMCreateS int32_t tDeserializeSCMCreateStreamReq(void* buf, int32_t bufLen, SCMCreateStreamReq* pReq); void tFreeSCMCreateStreamReq(SCMCreateStreamReq* pReq); +typedef struct { + char name[TSDB_TOPIC_FNAME_LEN]; + int64_t streamId; + char* sql; + char* executorMsg; +} SMVCreateStreamReq, SMSCreateStreamReq; + +typedef struct { + int64_t streamId; +} SMVCreateStreamRsp, SMSCreateStreamRsp; + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int8_t igExists; diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index c9fab140cc..21a93532e0 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -80,6 +80,10 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); */ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg); + +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg); + /** * @brief Drop a snode. * diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 13ef101908..aeea5386b4 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -90,9 +90,11 @@ typedef struct { int32_t refCount; int8_t deployed; int8_t dropped; + int8_t uniqueWorkerInUse; SSnode *pSnode; SRWLatch latch; - SDnodeWorker writeWorker; + SArray *uniqueWorkers; // SArray + SDnodeWorker sharedWorker; } SSnodeMgmt; typedef struct { @@ -153,4 +155,4 @@ int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo); } #endif -#endif /*_TD_DND_ENV_H_*/ \ No newline at end of file +#endif /*_TD_DND_ENV_H_*/ diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 4ca6b97ad4..a8530037da 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -70,4 +70,4 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); } #endif -#endif /*_TD_DND_INT_H_*/ \ No newline at end of file +#endif /*_TD_DND_INT_H_*/ diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index 4906aef246..d192d9df01 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -19,7 +19,20 @@ #include "dndTransport.h" #include "dndWorker.h" -static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); +typedef struct { + int32_t vgId; + int32_t refCount; + int32_t snVersion; + int8_t dropped; + char *path; + SSnode *pImpl; + STaosQueue *pSharedQ; + STaosQueue *pUniqueQ; +} SSnodeObj; + +static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg); + +static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs); static SSnode *dndAcquireSnode(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; @@ -152,8 +165,18 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) { static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) { - dError("failed to start snode write worker since %s", terrstr()); + pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); + for (int32_t i = 0; i < 2; i++) { + SDnodeWorker uniqueWorker; + if (dndInitWorker(pDnode, &uniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) { + dError("failed to start snode unique worker since %s", terrstr()); + return -1; + } + taosArrayPush(pMgmt->uniqueWorkers, &uniqueWorker); + } + if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4, + dndProcessSnodeSharedQueue)) { + dError("failed to start snode shared worker since %s", terrstr()); return -1; } @@ -169,9 +192,13 @@ static void dndStopSnodeWorker(SDnode *pDnode) { while (pMgmt->refCount > 0) { taosMsleep(10); - } + } - dndCleanupWorker(&pMgmt->writeWorker); + for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { + SDnodeWorker *worker = taosArrayGetP(pMgmt->uniqueWorkers, i); + dndCleanupWorker(worker); + } + taosArrayDestroy(pMgmt->uniqueWorkers); } static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { @@ -292,17 +319,36 @@ int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } } -static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { +static void dndProcessSnodeUniqueQueue(SDnode *pDnode, STaosQall *qall, int32_t numOfMsgs) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - SRpcMsg *pRsp = NULL; int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; SSnode *pSnode = dndAcquireSnode(pDnode); if (pSnode != NULL) { - code = sndProcessMsg(pSnode, pMsg, &pRsp); + for (int32_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + + sndProcessUMsg(pSnode, pMsg); + + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + } + dndReleaseSnode(pDnode, pSnode); +} + +static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + code = sndProcessSMsg(pSnode, pMsg); } dndReleaseSnode(pDnode, pSnode); +#if 0 if (pMsg->msgType & 1u) { if (pRsp != NULL) { pRsp->ahandle = pMsg->ahandle; @@ -314,11 +360,32 @@ static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { rpcSendResponse(&rpcRsp); } } +#endif rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } +static void dndWriteSnodeMsgToRandomWorker(SDnode *pDnode, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + int32_t index = (pDnode->smgmt.uniqueWorkerInUse + 1) % taosArrayGetSize(pDnode->smgmt.uniqueWorkers); + SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, index); + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); + } + dndReleaseSnode(pDnode, pSnode); + + if (code != 0) { + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->pCont); + } +} + static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; @@ -337,8 +404,13 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc } } -void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.writeWorker, pMsg); +void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + // judge from msg to write to unique queue + dndWriteSnodeMsgToRandomWorker(pDnode, pMsg); +} + +void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.sharedWorker, pMsg); } int32_t dndInitSnode(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index 5ccf6640c0..38f8737b2b 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -109,4 +109,4 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) } return 0; -} \ No newline at end of file +} diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 01500fbc54..91008dd03a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -31,3 +31,15 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } void sndDestroy(const char *path) {} + +int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { + // stream deployment + // stream stop/resume + // operator exec + return 0; +} + +int32_t sndProcessSMsg(SSnode *pSnode, SRpcMsg *pMsg) { + // operator exec + return 0; +} diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 92a111298f..a2342ec85a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -83,8 +83,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { } int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - /*int32_t sversion = pHandle->pBlock->sversion;*/ - /*SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false);*/ + // currently only rows are used + pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->uid = pHandle->pBlock->uid; diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 1657a85ee8..1fa70da870 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -188,7 +188,7 @@ void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueu int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; - pool->workers = calloc(sizeof(SWWorker), pool->max); + pool->workers = calloc(pool->max, sizeof(SWWorker)); if (pool->workers == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; From 6044c7511800f470ac27eb74d019cccec113570e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 14 Mar 2022 20:51:13 +0800 Subject: [PATCH 3/3] fix worker init --- source/dnode/mgmt/impl/src/dndSnode.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index d192d9df01..b27a25680a 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -167,12 +167,15 @@ static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); for (int32_t i = 0; i < 2; i++) { - SDnodeWorker uniqueWorker; - if (dndInitWorker(pDnode, &uniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) { + SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); + if (pUniqueWorker == NULL) { + return -1; + } + if (dndInitWorker(pDnode, pUniqueWorker, DND_WORKER_MULTI, "snode-unique", 1, 1, dndProcessSnodeSharedQueue) != 0) { dError("failed to start snode unique worker since %s", terrstr()); return -1; } - taosArrayPush(pMgmt->uniqueWorkers, &uniqueWorker); + taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker); } if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4, dndProcessSnodeSharedQueue)) {