diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 69c2618ac8..1584fcb4bf 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -33,7 +33,7 @@ typedef enum { TSDB_SUPER_TABLE = 1, // super table TSDB_CHILD_TABLE = 2, // table created from super table TSDB_NORMAL_TABLE = 3, // ordinary table - TSDB_STREAM_TABLE = 4, // table created from stream computing + TSDB_STREAM_TABLE = 4, // table created by stream processing TSDB_TEMP_TABLE = 5, // temp table created by nest query TSDB_TABLE_MAX = 6 } ETableType; @@ -50,7 +50,12 @@ typedef enum { TSDB_CHECK_ITEM_MAX } ECheckItemType; -typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig; +typedef enum { + TD_ROW_DISCARD_UPDATE = 0, + TD_ROW_OVERWRITE_UPDATE = 1, + TD_ROW_PARTIAL_UPDATE = 2, +} TDUpdateConfig; + typedef enum { TSDB_STATIS_OK = 0, // statis part exist and load successfully TSDB_STATIS_NONE = 1, // statis part not exist diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 1d3ab4f340..3eb9e88ff5 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -135,6 +135,23 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) return (void*)buf; } +static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + + // int32_t numOfOutput = pBlock->info.numOfCols; + int32_t sz = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < sz; ++i) { + SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + tfree(pColInfoData->pData); + } + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockAgg); + // tfree(pBlock); +} + static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { int32_t tlen = 0; int32_t sz = 0; @@ -177,23 +194,6 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { return buf; } -static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { - if (pBlock == NULL) { - return; - } - - // int32_t numOfOutput = pBlock->info.numOfCols; - int32_t sz = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < sz; ++i) { - SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); - tfree(pColInfoData->pData); - } - - taosArrayDestroy(pBlock->pDataBlock); - tfree(pBlock->pBlockAgg); - // tfree(pBlock); -} - static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { if (pRsp->schemas) { if (pRsp->schemas->nCols) { @@ -203,10 +203,6 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { } taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock); pRsp->pBlockData = NULL; - // for (int32_t i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) { - // SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i); - // tDeleteSSDataBlock(pDataBlock); - //} } //====================================================================================================================== diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 135ff34207..26861e1ff8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2434,7 +2434,6 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1; if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1; - tEndEncode(&encoder); int32_t tlen = encoder.pos; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index d3642f4204..f2baea1cd9 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -30,6 +30,7 @@ #include "mndShow.h" #include "mndSnode.h" #include "mndStb.h" +#include "mndStream.h" #include "mndSubscribe.h" #include "mndSync.h" #include "mndTelem.h" @@ -220,6 +221,7 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-user", mndInitUser, mndCleanupUser) != 0) return -1; if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-stream", mndInitStream, mndCleanupStream) != 0) return -1; if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;