diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6a7edb481f..8fc15da3cd 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2379,7 +2379,7 @@ typedef struct { int64_t streamId; int64_t version; SArray* res; // SArray -} SStreamSmaSinkReq; +} SStreamSinkReq; #pragma pack(pop) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index eed13e0317..5b88a9d6af 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -17,7 +17,9 @@ #define _TD_MND_H_ #include "monitor.h" +#include "tmsg.h" #include "tmsgcb.h" +#include "trpc.h" #ifdef __cplusplus extern "C" { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 30b4c923d9..67a21e755a 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -693,11 +693,13 @@ typedef struct { SRWLatch lock; int8_t status; // int32_t sqlLen; + int32_t sinkVgId; // 0 for automatic char* sql; char* logicalPlan; char* physicalPlan; SArray* tasks; // SArray> - SArray* outputName; + SArray* ColAlias; + char* outputSTbName; } SStreamObj; int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index f0905f88d2..1bac8d678a 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -45,12 +45,12 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { tEncodeI32(pEncoder, 0); } - if (pObj->outputName != NULL) { - outputNameSz = taosArrayGetSize(pObj->outputName); + if (pObj->ColAlias != NULL) { + outputNameSz = taosArrayGetSize(pObj->ColAlias); } if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1; for (int32_t i = 0; i < outputNameSz; i++) { - char *name = taosArrayGetP(pObj->outputName, i); + char *name = taosArrayGetP(pObj->ColAlias, i); if (tEncodeCStr(pEncoder, name) < 0) return -1; } return pEncoder->pos; @@ -88,14 +88,14 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { } int32_t outputNameSz; if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1; - pObj->outputName = taosArrayInit(outputNameSz, sizeof(void *)); - if (pObj->outputName == NULL) { + pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *)); + if (pObj->ColAlias == NULL) { return -1; } for (int32_t i = 0; i < outputNameSz; i++) { char *name; if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1; - taosArrayPush(pObj->outputName, &name); + taosArrayPush(pObj->ColAlias, &name); } return 0; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d9b5341ba1..c47a290a05 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -292,7 +292,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } printf("\n=======================================================\n"); - streamObj.outputName = names; + streamObj.ColAlias = names; if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index deb3cae617..f0c3f6801a 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -161,15 +161,16 @@ typedef struct { struct STQ { // the collection of groups // the handle of meta kvstore + bool writeTrigger; char* path; STqCfg* tqConfig; STqMemRef tqMemRef; STqMetaStore* tqMeta; - STqPushMgr* tqPushMgr; - SHashObj* pStreamTasks; - SVnode* pVnode; - SWal* pWal; - SMeta* pVnodeMeta; + // STqPushMgr* tqPushMgr; + SHashObj* pStreamTasks; + SVnode* pVnode; + SWal* pWal; + SMeta* pVnodeMeta; }; typedef struct { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 78ff9f1062..8d256995c6 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -55,6 +55,21 @@ typedef struct SVnodeMgr { TD_DLIST(SVnodeTask) queue; } SVnodeMgr; +typedef struct { + int8_t streamType; // sma or other + int8_t dstType; + int16_t padding; + int32_t smaId; + int64_t tbUid; + int64_t lastReceivedVer; + int64_t lastCommittedVer; +} SStreamSinkInfo; + +typedef struct { + SVnode* pVnode; + SHashObj* pHash; // streamId -> SStreamSinkInfo +} SSink; + extern SVnodeMgr vnodeMgr; // SVState @@ -72,8 +87,9 @@ struct SVnode { SVBufPool* pBufPool; SMeta* pMeta; STsdb* pTsdb; - STQ* pTq; SWal* pWal; + STQ* pTq; + SSink* pSink; tsem_t canCommit; SQHandle* pQuery; SMsgCb msgCb; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 25fe963799..21da0649e2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -52,12 +52,14 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq return NULL; } +#if 0 pTq->tqPushMgr = tqPushMgrOpen(); if (pTq->tqPushMgr == NULL) { // free store free(pTq); return NULL; } +#endif pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);