From 9a08ac3c5fcce27a1c004cf9b85a5d3323039126 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 28 Mar 2022 14:55:15 +0800 Subject: [PATCH] stream support sma --- source/dnode/vnode/src/inc/vnd.h | 7 +++---- source/dnode/vnode/src/tq/tq.c | 6 ++++++ source/dnode/vnode/src/vnd/vnodeWrite.c | 5 +++++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 7b0606512c..5ec5b1d58f 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -19,15 +19,14 @@ #include "tmallocator.h" // #include "sync.h" #include "tcoding.h" +#include "tdatablock.h" #include "tfs.h" #include "tlist.h" #include "tlockfree.h" #include "tmacro.h" -#include "wal.h" - #include "vnode.h" - #include "vnodeQuery.h" +#include "wal.h" #ifdef __cplusplus extern "C" { @@ -203,7 +202,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); // sma -void smaHandleRes(SVnode* pVnode, int64_t smaId, const SArray* data); +void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 55202335e0..efc7ac80e9 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -473,10 +473,16 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { } tCoderClear(&decoder); + // exec if (tqExpandTask(pTq, pTask, 4) < 0) { ASSERT(0); } + + // sink pTask->ahandle = pTq->pVnode; + if (pTask->sinkType == TASK_SINK__SMA) { + pTask->smaSink.smaHandle = smaHandleRes; + } taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 98256e6b24..9b1bb61d07 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -15,6 +15,11 @@ #include "vnd.h" +void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { + // TODO + blockDebugShowData(data); +} + void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { SNodeMsg *pMsg; SRpcMsg *pRpc;