From 1751c0285a8f20caea417e87185790d750d9df40 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 21 Mar 2022 16:47:59 +0800 Subject: [PATCH] add parallel --- include/common/tmsg.h | 4 ++-- source/common/src/tmsg.c | 4 ++++ source/dnode/snode/src/snode.c | 5 ++++- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 852f77777a..3e5a53c288 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2274,8 +2274,8 @@ enum { }; typedef struct { - void* inputHandle; - void** executor; + void* inputHandle; + void* executor[4]; } SStreamTaskParRunner; typedef struct { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1d2c9397c0..3517f7176c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2718,6 +2718,8 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->pipeEnd) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->parallel) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; tEndEncode(pEncoder); @@ -2730,6 +2732,8 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->pipeEnd) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->parallel) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; tEndDecode(pDecoder); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f4129e37ce..cbee8d2f90 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -57,7 +57,9 @@ void sndMetaDelete(SStreamMeta *pMeta) { } int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { - pTask->runner.executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); + for (int i = 0; i < pTask->parallel; i++) { + pTask->runner.executor[i] = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); + } return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); } @@ -95,6 +97,7 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTask *pTask = malloc(sizeof(SStreamTask)); if (pTask == NULL) { ASSERT(0); + return; } SCoder decoder; tCoderInit(&decoder, TD_LITTLE_ENDIAN, msg, pMsg->contLen - sizeof(SMsgHead), TD_DECODER);