From 59a4eec1863edefd4fed142fdf7ab4c42f5a65cc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 25 Mar 2022 20:19:09 +0800 Subject: [PATCH] stream extract to module --- example/src/tstream.c | 2 +- include/common/tcommon.h | 19 +- include/common/tdatablock.h | 8 +- include/common/tmsg.h | 210 +++++++++++++-------- include/common/tmsgcb.h | 10 +- include/common/tmsgdef.h | 5 + include/libs/stream/tstream.h | 34 ++++ source/common/src/tdatablock.c | 101 +++++++++- source/common/src/tmsg.c | 86 +++++++-- source/dnode/mgmt/snode/src/smWorker.c | 11 +- source/dnode/mgmt/vnode/inc/vmInt.h | 5 +- source/dnode/mgmt/vnode/src/vmMsg.c | 2 + source/dnode/mgmt/vnode/src/vmWorker.c | 18 +- source/dnode/mnode/impl/inc/mndDef.h | 6 +- source/dnode/mnode/impl/src/mndScheduler.c | 38 ++-- source/dnode/snode/src/snode.c | 18 +- source/dnode/vnode/CMakeLists.txt | 1 + source/dnode/vnode/src/tq/tq.c | 173 ++--------------- source/libs/CMakeLists.txt | 1 + source/libs/stream/CMakeLists.txt | 16 ++ source/libs/stream/inc/tstreamInc.h | 27 +++ source/libs/stream/src/tstream.c | 145 ++++++++++++++ source/libs/stream/test/CMakeLists.txt | 0 23 files changed, 636 insertions(+), 300 deletions(-) create mode 100644 include/libs/stream/tstream.h create mode 100644 source/libs/stream/CMakeLists.txt create mode 100644 source/libs/stream/inc/tstreamInc.h create mode 100644 source/libs/stream/src/tstream.c create mode 100644 source/libs/stream/test/CMakeLists.txt diff --git a/example/src/tstream.c b/example/src/tstream.c index 51578bd27b..8ffa932bd2 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -25,7 +25,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tcommon.h b/include/common/tcommon.h index c91efc3ce2..b52397bb86 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -54,13 +54,16 @@ typedef struct SColumnDataAgg { } SColumnDataAgg; typedef struct SDataBlockInfo { - STimeWindow window; - int32_t rows; - int32_t rowSize; - int16_t numOfCols; - int16_t hasVarCol; - union {int64_t uid; int64_t blockId;}; - int64_t groupId; // no need to serialize + STimeWindow window; + int32_t rows; + int32_t rowSize; + int16_t numOfCols; + int16_t hasVarCol; + union { + int64_t uid; + int64_t blockId; + }; + int64_t groupId; // no need to serialize } SDataBlockInfo; typedef struct SSDataBlock { @@ -92,7 +95,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock); int32_t tEncodeDataBlocks(void** buf, const SArray* blocks); -void* tDecodeDataBlocks(const void* buf, SArray* blocks); +void* tDecodeDataBlocks(const void* buf, SArray** blocks); static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { // WARNING: do not use info.numOfCols, diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index f181c26e92..e6ddc1e5b5 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -102,7 +102,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u : ((p1_)->pData + ((r_) * (p1_)->info.bytes))) int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2); +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, + uint32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); @@ -113,7 +114,8 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock); size_t blockDataGetNumOfRows(const SSDataBlock* pBlock); int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); -int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize); +int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, + int32_t pageSize); int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); @@ -136,6 +138,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); void* blockDataDestroy(SSDataBlock* pBlock); +void blockDebugShowData(SArray* dataBlocks); + #ifdef __cplusplus } #endif diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b43ea77271..79ed508615 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -201,17 +201,9 @@ typedef struct SEp { typedef struct { int32_t contLen; - union { - int32_t vgId; - int32_t streamTaskId; - }; + int32_t vgId; } SMsgHead; -typedef struct { - int32_t workerType; - int32_t streamTaskId; -} SStreamExecMsgHead; - // Submit message for one table typedef struct SSubmitBlk { int64_t uid; // table unique id @@ -477,7 +469,8 @@ typedef struct { int32_t tz; // query client timezone char intervalUnit; char slidingUnit; - char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. + char + offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. int8_t precision; int64_t interval; int64_t sliding; @@ -2366,30 +2359,21 @@ enum { STREAM_TASK_STATUS__STOP, }; +// pipe -> fetch/pipe queue +// merge -> merge queue +// write -> write queue enum { - STREAM_NEXT_OP_DST__VND = 1, - STREAM_NEXT_OP_DST__SND, + TASK_SINK_MSG__SND_PIPE = 1, + TASK_SINK_MSG__SND_MERGE, + TASK_SINK_MSG__VND_PIPE, + TASK_SINK_MSG__VND_MERGE, + TASK_SINK_MSG__VND_WRITE, }; -enum { - STREAM_SOURCE_TYPE__NONE = 1, - STREAM_SOURCE_TYPE__SUPER, - STREAM_SOURCE_TYPE__CHILD, - STREAM_SOURCE_TYPE__NORMAL, -}; - -enum { - STREAM_SINK_TYPE__NONE = 1, - STREAM_SINK_TYPE__INPLACE, - STREAM_SINK_TYPE__ASSIGNED, - STREAM_SINK_TYPE__MULTIPLE, - STREAM_SINK_TYPE__TEMPORARY, -}; - -enum { - STREAM_TYPE__NORMAL = 1, - STREAM_TYPE__SMA, -}; +typedef struct { + int32_t nodeId; // 0 for snode + SEpSet epSet; +} SStreamTaskEp; typedef struct { void* inputHandle; @@ -2397,48 +2381,114 @@ typedef struct { } SStreamRunner; typedef struct { - int64_t streamId; - int32_t taskId; - int32_t level; - int8_t status; - int8_t parallelizable; - - // vnode or snode - int8_t nextOpDst; - - int8_t sourceType; - int8_t sinkType; - - // for sink type assigned - int32_t sinkVgId; - SEpSet NextOpEp; - - // executor meta info - char* qmsg; - - // followings are not applied to encoder and decoder - int8_t numOfRunners; - SStreamRunner runner[8]; -} SStreamTask; - -static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId) { - SStreamTask* pTask = (SStreamTask*)calloc(1, sizeof(SStreamTask)); - if (pTask == NULL) { - return NULL; - } - pTask->taskId = tGenIdPI32(); - pTask->streamId = streamId; - pTask->status = STREAM_TASK_STATUS__RUNNING; - pTask->qmsg = NULL; - return pTask; -} - -int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask); -int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask); -void tFreeSStreamTask(SStreamTask* pTask); + int8_t parallelizable; + char* qmsg; + // followings are not applicable to encoder and decoder + int8_t numOfRunners; + SStreamRunner* runners; +} STaskExec; typedef struct { - SMsgHead head; + int8_t reserved; +} STaskDispatcherInplace; + +typedef struct { + int32_t nodeId; + SEpSet epSet; +} STaskDispatcherFixedEp; + +typedef struct { + int8_t hashMethod; + SArray* info; +} STaskDispatcherShuffle; + +typedef struct { + int8_t reserved; + // not applicable to encoder and decoder + SHashObj* pHash; // groupId to tbuid +} STaskSinkTb; + +typedef struct { + int8_t reserved; +} STaskSinkSma; + +typedef struct { + int8_t reserved; +} STaskSinkFetch; + +typedef struct { + int8_t reserved; +} STaskSinkShow; + +enum { + TASK_SOURCE__SCAN = 1, + TASK_SOURCE__SINGLE, + TASK_SOURCE__MULTI, +}; + +enum { + TASK_EXEC__NONE = 1, + TASK_EXEC__EXEC, +}; + +enum { + TASK_DISPATCH__NONE = 1, + TASK_DISPATCH__INPLACE, + TASK_DISPATCH__FIXED, + TASK_DISPATCH__SHUFFLE, +}; + +enum { + TASK_SINK__NONE = 1, + TASK_SINK__TABLE, + TASK_SINK__SMA, + TASK_SINK__FETCH, + TASK_SINK__SHOW, +}; + +typedef struct { + int64_t streamId; + int32_t taskId; + int8_t status; + + int8_t sourceType; + int8_t execType; + int8_t sinkType; + int8_t dispatchType; + int16_t dispatchMsgType; + int32_t downstreamTaskId; + + // source preprocess + + // exec + STaskExec exec; + + // local sink + union { + STaskSinkTb tbSink; + STaskSinkSma smaSink; + STaskSinkFetch fetchSink; + STaskSinkShow showSink; + }; + + // dispatch + union { + STaskDispatcherInplace inplaceDispatcher; + STaskDispatcherFixedEp fixedEpDispatcher; + STaskDispatcherShuffle shuffleDispatcher; + }; + + // state storage + +} SStreamTask; + +SStreamTask* tNewSStreamTask(int64_t streamId); +int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask); +int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask); +void tFreeSStreamTask(SStreamTask* pTask); + +typedef struct { + // SMsgHead head; SStreamTask* task; } SStreamTaskDeployReq; @@ -2447,19 +2497,25 @@ typedef struct { } SStreamTaskDeployRsp; typedef struct { - SStreamExecMsgHead head; - SArray* data; // SArray + // SMsgHead head; + int64_t streamId; + int32_t taskId; + SArray* data; // SArray } SStreamTaskExecReq; +int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq); +void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq); +void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq); + typedef struct { int32_t reserved; } SStreamTaskExecRsp; typedef struct { - SMsgHead head; - int64_t streamId; - int64_t version; - SArray* res; // SArray + // SMsgHead head; + int64_t streamId; + int64_t version; + SArray* res; // SArray } SStreamSinkReq; #pragma pack(pop) diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 54a145ff33..64e399770b 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -25,7 +25,15 @@ extern "C" { typedef struct SRpcMsg SRpcMsg; typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; -typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EQueueType; +typedef enum { + QUERY_QUEUE, + FETCH_QUEUE, + WRITE_QUEUE, + APPLY_QUEUE, + SYNC_QUEUE, + MERGE_QUEUE, + QUEUE_MAX, +} EQueueType; typedef int32_t (*PutToQueueFp)(SMgmtWrapper* pWrapper, SRpcMsg* pReq); typedef int32_t (*GetQueueSizeFp)(SMgmtWrapper* pWrapper, int32_t vgId, EQueueType qtype); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index f9ce69925b..8d49479d31 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -193,6 +193,9 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) @@ -206,6 +209,8 @@ enum { TD_NEW_MSG_SEG(TDMT_SND_MSG) TD_DEF_MSG_TYPE(TDMT_SND_TASK_DEPLOY, "snode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_SND_TASK_EXEC, "snode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + TD_DEF_MSG_TYPE(TDMT_SND_TASK_PIPE_EXEC, "snode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) + TD_DEF_MSG_TYPE(TDMT_SND_TASK_MERGE_EXEC, "snode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h new file mode 100644 index 0000000000..88b9e22d04 --- /dev/null +++ b/include/libs/stream/tstream.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tdatablock.h" +#include "tmsg.h" +#include "tmsgcb.h" +#include "trpc.h" + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef _TSTREAM_H_ +#define _TSTREAM_H_ + +int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId); + +#ifdef __cplusplus +} +#endif + +#endif /* ifndef _TSTREAM_H_ */ diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 67e7333597..afdd8f155f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -273,7 +273,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p return TSDB_CODE_OUT_OF_MEMORY; } - pColumnInfoData->varmeta.offset = (int32_t*) p; + pColumnInfoData->varmeta.offset = (int32_t*)p; memcpy(pColumnInfoData->varmeta.offset, pSource->varmeta.offset, sizeof(int32_t) * numOfRows); if (pColumnInfoData->varmeta.allocLen < pSource->varmeta.length) { @@ -587,11 +587,11 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) { if (pBlock->info.rowSize == 0) { size_t rowSize = 0; - size_t numOfCols = pBlock->info.numOfCols; - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - rowSize += pColInfo->info.bytes; - } + size_t numOfCols = pBlock->info.numOfCols; + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + rowSize += pColInfo->info.bytes; + } pBlock->info.rowSize = rowSize; } @@ -637,7 +637,7 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { rowSize += sizeof(int32_t); } else { - rowSize += 1/8.0; // one bit for each record + rowSize += 1 / 8.0; // one bit for each record } } @@ -1318,12 +1318,97 @@ int32_t tEncodeDataBlocks(void** buf, const SArray* blocks) { return tlen; } -void* tDecodeDataBlocks(const void* buf, SArray* blocks) { +void* tDecodeDataBlocks(const void* buf, SArray** blocks) { int32_t sz; buf = taosDecodeFixedI32(buf, &sz); + + *blocks = taosArrayInit(sz, sizeof(SSDataBlock)); for (int32_t i = 0; i < sz; i++) { SSDataBlock pBlock = {0}; buf = tDecodeDataBlock(buf, &pBlock); + taosArrayPush(*blocks, &pBlock); } return (void*)buf; } + +static char* formatTimestamp(char* buf, int64_t val, int precision) { + time_t tt; + int32_t ms = 0; + if (precision == TSDB_TIME_PRECISION_NANO) { + tt = (time_t)(val / 1000000000); + ms = val % 1000000000; + } else if (precision == TSDB_TIME_PRECISION_MICRO) { + tt = (time_t)(val / 1000000); + ms = val % 1000000; + } else { + tt = (time_t)(val / 1000); + ms = val % 1000; + } + + /* comment out as it make testcases like select_with_tags.sim fail. + but in windows, this may cause the call to localtime crash if tt < 0, + need to find a better solution. + if (tt < 0) { + tt = 0; + } + */ + +#ifdef WINDOWS + if (tt < 0) tt = 0; +#endif + if (tt <= 0 && ms < 0) { + tt--; + if (precision == TSDB_TIME_PRECISION_NANO) { + ms += 1000000000; + } else if (precision == TSDB_TIME_PRECISION_MICRO) { + ms += 1000000; + } else { + ms += 1000; + } + } + + struct tm* ptm = localtime(&tt); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); + + if (precision == TSDB_TIME_PRECISION_NANO) { + sprintf(buf + pos, ".%09d", ms); + } else if (precision == TSDB_TIME_PRECISION_MICRO) { + sprintf(buf + pos, ".%06d", ms); + } else { + sprintf(buf + pos, ".%03d", ms); + } + + return buf; +} +void blockDebugShowData(SArray* dataBlocks) { + char pBuf[128]; + int32_t sz = taosArrayGetSize(dataBlocks); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i); + int32_t colNum = pDataBlock->info.numOfCols; + int32_t rows = pDataBlock->info.rows; + for (int32_t j = 0; j < rows; j++) { + printf("|"); + for (int32_t k = 0; k < colNum; k++) { + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + switch (pColInfoData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); + printf(" %25s |", pBuf); + break; + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_UINT: + printf(" %15d |", *(int32_t*)var); + break; + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_UBIGINT: + printf(" %15ld |", *(int64_t*)var); + break; + } + } + printf("\n"); + } + } +} + diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e860fc1831..943b13f9e7 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3095,21 +3095,49 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { tfree(pReq->ast); } +SStreamTask *tNewSStreamTask(int64_t streamId) { + SStreamTask *pTask = (SStreamTask *)calloc(1, sizeof(SStreamTask)); + if (pTask == NULL) { + return NULL; + } + pTask->taskId = tGenIdPI32(); + pTask->streamId = streamId; + pTask->status = STREAM_TASK_STATUS__RUNNING; + /*pTask->qmsg = NULL;*/ + return pTask; +} + int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) { /*if (tStartEncode(pEncoder) < 0) return -1;*/ if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->level) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->parallelizable) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->nextOpDst) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sourceType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; - if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) { - if (tEncodeI32(pEncoder, pTask->sinkVgId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->NextOpEp) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; + if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->downstreamTaskId) < 0) return -1; + + if (pTask->execType == TASK_EXEC__EXEC) { + if (tEncodeI8(pEncoder, pTask->exec.parallelizable) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; } - if (tEncodeCStr(pEncoder, pTask->qmsg) < 0) return -1; + + if (pTask->sinkType != TASK_SINK__NONE) { + // TODO: wrap + if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; + } + + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + if (tEncodeI8(pEncoder, pTask->shuffleDispatcher.hashMethod) < 0) return -1; + } + /*tEndEncode(pEncoder);*/ return pEncoder->pos; } @@ -3118,17 +3146,32 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) { /*if (tStartDecode(pDecoder) < 0) return -1;*/ if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->level) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->parallelizable) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->nextOpDst) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sourceType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; - if (pTask->sinkType == STREAM_SINK_TYPE__ASSIGNED) { - if (tDecodeI32(pDecoder, &pTask->sinkVgId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->NextOpEp) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; + if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->downstreamTaskId) < 0) return -1; + + if (pTask->execType == TASK_EXEC__EXEC) { + if (tDecodeI8(pDecoder, &pTask->exec.parallelizable) < 0) return -1; + if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; } - if (tDecodeCStrAlloc(pDecoder, &pTask->qmsg) < 0) return -1; + + if (pTask->sinkType != TASK_SINK__NONE) { + if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; + } + + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + if (tDecodeI8(pDecoder, &pTask->shuffleDispatcher.hashMethod) < 0) return -1; + } + /*tEndDecode(pDecoder);*/ return 0; } @@ -3139,3 +3182,18 @@ void tFreeSStreamTask(SStreamTask *pTask) { /*free(pTask->executor);*/ /*free(pTask);*/ } + +#if 0 +int32_t tEncodeSStreamTaskExecReq(SCoder* pEncoder, const SStreamTaskExecReq* pReq) { + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + /*if (tEncodeDataBlocks(buf, pReq->streamId) < 0) return -1;*/ + return pEncoder->size; +} +int32_t tDecodeSStreamTaskExecReq(SCoder* pDecoder, SStreamTaskExecReq* pReq) { + return pEncoder->size; +} +void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { + taosArrayDestroyEx(pReq->data, tDeleteSSDataBlock); +} +#endif diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c index ceec6b82c3..3fc5c482d5 100644 --- a/source/dnode/mgmt/snode/src/smWorker.c +++ b/source/dnode/mgmt/snode/src/smWorker.c @@ -94,14 +94,15 @@ void smStopWorker(SSnodeMgmt *pMgmt) { static FORCE_INLINE int32_t smGetSWIdFromMsg(SRpcMsg *pMsg) { SMsgHead *pHead = pMsg->pCont; - pHead->streamTaskId = htonl(pHead->streamTaskId); - return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM; + pHead->vgId = htonl(pHead->vgId); + return pHead->vgId % SND_UNIQUE_THREAD_NUM; } static FORCE_INLINE int32_t smGetSWTypeFromMsg(SRpcMsg *pMsg) { - SStreamExecMsgHead *pHead = pMsg->pCont; - pHead->workerType = htonl(pHead->workerType); - return pHead->workerType; + /*SMsgHead *pHead = pMsg->pCont;*/ + /*pHead->workerType = htonl(pHead->workerType);*/ + /*return pHead->workerType;*/ + return 0; } int32_t smProcessMgmtMsg(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) { diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index ccdb1ae257..197c606a0d 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -33,6 +33,7 @@ typedef struct SVnodesMgmt { SQWorkerPool fetchPool; SWWorkerPool syncPool; SWWorkerPool writePool; + SWWorkerPool mergePool; const char *path; SDnode *pDnode; SMgmtWrapper *pWrapper; @@ -63,6 +64,7 @@ typedef struct { STaosQueue *pApplyQ; STaosQueue *pQueryQ; STaosQueue *pFetchQ; + STaosQueue *pMergeQ; SMgmtWrapper *pWrapper; } SVnodeObj; @@ -110,10 +112,11 @@ int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); +int32_t vmProcessMergeMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } #endif -#endif /*_TD_DND_VNODES_INT_H_*/ \ No newline at end of file +#endif /*_TD_DND_VNODES_INT_H_*/ diff --git a/source/dnode/mgmt/vnode/src/vmMsg.c b/source/dnode/mgmt/vnode/src/vmMsg.c index a98dccbca3..ee78b59242 100644 --- a/source/dnode/mgmt/vnode/src/vmMsg.c +++ b/source/dnode/mgmt/vnode/src/vmMsg.c @@ -280,6 +280,8 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, 0); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0); diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 6c7d513c58..3dcbc9fddd 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -208,6 +208,8 @@ int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNode int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, FETCH_QUEUE); } +int32_t vmProcessMergeMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return vmPutNodeMsgToQueue(pMgmt, pMsg, MERGE_QUEUE); } + int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SSingleWorker *pWorker = &pMgmt->mgmtWorker; dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); @@ -239,6 +241,10 @@ static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueT dTrace("msg:%p, will be put into vnode-apply queue", pMsg); code = taosWriteQitem(pVnode->pApplyQ, pMsg); break; + case MERGE_QUEUE: + dTrace("msg:%p, will be put into vnode-merge queue", pMsg); + code = taosWriteQitem(pVnode->pMergeQ, pMsg); + break; default: terrno = TSDB_CODE_INVALID_PARA; break; @@ -260,6 +266,10 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { return vmPutRpcMsgToQueue(pWrapper, pRpc, APPLY_QUEUE); } +int32_t vmPutMsgToMergeQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, MERGE_QUEUE); +} + int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t size = -1; SVnodeObj *pVnode = vmAcquireVnode(pWrapper->pMgmt, vgId); @@ -280,6 +290,9 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { case APPLY_QUEUE: size = taosQueueSize(pVnode->pApplyQ); break; + case MERGE_QUEUE: + size = taosQueueSize(pVnode->pMergeQ); + break; default: break; } @@ -291,12 +304,13 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); + pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || - pVnode->pQueryQ == NULL) { + pVnode->pQueryQ == NULL || pVnode->pMergeQ) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -310,12 +324,14 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tWWorkerFreeQueue(&pMgmt->mergePool, pVnode->pMergeQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); pVnode->pWriteQ = NULL; pVnode->pApplyQ = NULL; pVnode->pSyncQ = NULL; pVnode->pFetchQ = NULL; pVnode->pQueryQ = NULL; + pVnode->pMergeQ = NULL; dDebug("vgId:%d, vnode queue is freed", pVnode->vgId); } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 0d65dacb20..b22c41cfde 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -729,13 +729,15 @@ typedef struct { int32_t vgNum; SRWLatch lock; int8_t status; + int8_t sourceType; + int8_t sinkType; // int32_t sqlLen; int32_t sinkVgId; // 0 for automatic char* sql; char* logicalPlan; char* physicalPlan; - SArray* tasks; // SArray> - SArray* ColAlias; + SArray* tasks; // SArray> + SArray* ColAlias; // SArray } SStreamObj; int32_t tEncodeSStreamObj(SCoder* pEncoder, const SStreamObj* pObj); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 870cbba979..4595eeadc9 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -46,7 +46,7 @@ int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - ((SMsgHead*)buf)->streamTaskId = htonl(nodeId); + ((SMsgHead*)buf)->vgId = htonl(nodeId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER); tEncodeSStreamTask(&encoder, pTask); @@ -69,7 +69,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS plan->execNode.nodeId = pVgroup->vgId; plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); - if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { + if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } @@ -89,7 +89,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, plan->execNode.nodeId = pSnode->id; plan->execNode.epSet = mndAcquireEpFromSnode(pMnode, pSnode); - if (qSubPlanToString(plan, &pTask->qmsg, &msgLen) < 0) { + if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) { terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } @@ -111,9 +111,15 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pStream->tasks = taosArrayInit(totLevel, sizeof(SArray)); int32_t lastUsedVgId = 0; - for (int32_t level = 0; level < totLevel; level++) { + // gather vnodes + // gather snodes + // iterate plan, expand source to vnodes and assign ep to each task + // iterate tasks, assign sink type and sink ep to each task + + for (int32_t revLevel = totLevel - 1; revLevel >= 0; revLevel--) { + int32_t level = totLevel - 1 - revLevel; SArray* taskOneLevel = taosArrayInit(0, sizeof(SStreamTask)); - SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, level); + SNodeListNode* inner = nodesListGetNode(pPlan->pSubplans, revLevel); int32_t opNum = LIST_LENGTH(inner->pNodeList); ASSERT(opNum == 1); @@ -132,11 +138,12 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { lastUsedVgId = pVgroup->vgId; pStream->vgNum++; - SStreamTask* pTask = streamTaskNew(pStream->uid); - pTask->level = level; - pTask->sourceType = 1; - pTask->sinkType = level == totLevel - 1 ? 1 : 0; - pTask->parallelizable = 1; + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + /*pTask->level = level;*/ + // TODO + /*pTask->sourceType = STREAM_SOURCE__SUPER;*/ + /*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/ + pTask->exec.parallelizable = 1; if (mndAssignTaskToVg(pMnode, pTrans, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); @@ -145,12 +152,11 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { taosArrayPush(taskOneLevel, pTask); } } else { - SStreamTask* pTask = streamTaskNew(pStream->uid); - pTask->level = level; - pTask->sourceType = 0; - pTask->sinkType = level == totLevel - 1 ? 1 : 0; - pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN; - pTask->nextOpDst = STREAM_NEXT_OP_DST__VND; + SStreamTask* pTask = tNewSStreamTask(pStream->uid); + /*pTask->level = level;*/ + /*pTask->sourceType = STREAM_SOURCE__NONE;*/ + /*pTask->sinkType = level == totLevel - 1 ? 1 : 0;*/ + pTask->exec.parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN; SSnodeObj* pSnode = mndSchedFetchSnode(pMnode); if (pSnode == NULL || tsStreamSchedV) { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index afa5930821..5a7d6c9acb 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -57,8 +57,8 @@ void sndMetaDelete(SStreamMeta *pMeta) { } int32_t sndMetaDeployTask(SStreamMeta *pMeta, SStreamTask *pTask) { - for (int i = 0; i < pTask->numOfRunners; i++) { - pTask->runner[i].executor = qCreateStreamExecTaskInfo(pTask->qmsg, NULL); + for (int i = 0; i < pTask->exec.numOfRunners; i++) { + pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); } return taosHashPut(pMeta->pHash, &pTask->taskId, sizeof(int32_t), pTask, sizeof(void *)); } @@ -72,19 +72,19 @@ int32_t sndMetaRemoveTask(SStreamMeta *pMeta, int32_t taskId) { if (pTask == NULL) { return -1; } - free(pTask->qmsg); + free(pTask->exec.qmsg); // TODO:free executor free(pTask); return taosHashRemove(pMeta->pHash, &taskId, sizeof(int32_t)); } static int32_t sndProcessTaskExecReq(SSnode *pSnode, SRpcMsg *pMsg) { - SStreamExecMsgHead *pHead = pMsg->pCont; - int32_t taskId = pHead->streamTaskId; - SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId); - if (pTask == NULL) { - return -1; - } + /*SStreamExecMsgHead *pHead = pMsg->pCont;*/ + /*int32_t taskId = pHead->streamTaskId;*/ + /*SStreamTask *pTask = sndMetaGetTask(pSnode->pMeta, taskId);*/ + /*if (pTask == NULL) {*/ + /*return -1;*/ + /*}*/ return 0; } diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index f026f8331b..bdc8a71b04 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -43,6 +43,7 @@ target_link_libraries( PUBLIC wal PUBLIC scheduler PUBLIC executor + PUBLIC stream PUBLIC qworker PUBLIC sync ) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 43554f923e..18074fc5a8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -16,8 +16,9 @@ #include "tcompare.h" #include "tqInt.h" #include "tqMetaStore.h" +#include "tstream.h" -void tqDebugShowSSData(SArray* dataBlocks); +void blockDebugShowData(SArray* dataBlocks); int32_t tqInit() { return tqPushMgrInit(); } @@ -441,16 +442,21 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { - ASSERT(parallel <= 8); - pTask->numOfRunners = parallel; + if (pTask->execType == TASK_EXEC__NONE) return 0; + + pTask->exec.numOfRunners = parallel; for (int32_t i = 0; i < parallel; i++) { STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); SReadHandle handle = { .reader = pReadHandle, .meta = pTq->pVnodeMeta, }; - pTask->runner[i].inputHandle = pReadHandle; - pTask->runner[i].executor = qCreateStreamExecTaskInfo(pTask->qmsg, &handle); + pTask->exec.runners = calloc(parallel, sizeof(SStreamRunner)); + if (pTask->exec.runners == NULL) { + return -1; + } + pTask->exec.runners[i].inputHandle = pReadHandle; + pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); } return 0; } @@ -473,87 +479,6 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { return 0; } -static char* formatTimestamp(char* buf, int64_t val, int precision) { - time_t tt; - int32_t ms = 0; - if (precision == TSDB_TIME_PRECISION_NANO) { - tt = (time_t)(val / 1000000000); - ms = val % 1000000000; - } else if (precision == TSDB_TIME_PRECISION_MICRO) { - tt = (time_t)(val / 1000000); - ms = val % 1000000; - } else { - tt = (time_t)(val / 1000); - ms = val % 1000; - } - - /* comment out as it make testcases like select_with_tags.sim fail. - but in windows, this may cause the call to localtime crash if tt < 0, - need to find a better solution. - if (tt < 0) { - tt = 0; - } - */ - -#ifdef WINDOWS - if (tt < 0) tt = 0; -#endif - if (tt <= 0 && ms < 0) { - tt--; - if (precision == TSDB_TIME_PRECISION_NANO) { - ms += 1000000000; - } else if (precision == TSDB_TIME_PRECISION_MICRO) { - ms += 1000000; - } else { - ms += 1000; - } - } - - struct tm* ptm = localtime(&tt); - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); - - if (precision == TSDB_TIME_PRECISION_NANO) { - sprintf(buf + pos, ".%09d", ms); - } else if (precision == TSDB_TIME_PRECISION_MICRO) { - sprintf(buf + pos, ".%06d", ms); - } else { - sprintf(buf + pos, ".%03d", ms); - } - - return buf; -} -void tqDebugShowSSData(SArray* dataBlocks) { - char pBuf[128]; - int32_t sz = taosArrayGetSize(dataBlocks); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(dataBlocks, i); - int32_t colNum = pDataBlock->info.numOfCols; - int32_t rows = pDataBlock->info.rows; - for (int32_t j = 0; j < rows; j++) { - printf("|"); - for (int32_t k = 0; k < colNum; k++) { - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - switch (pColInfoData->info.type) { - case TSDB_DATA_TYPE_TIMESTAMP: - formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); - printf(" %25s |", pBuf); - break; - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: - printf(" %15d |", *(int32_t*)var); - break; - case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: - printf(" %15ld |", *(int64_t*)var); - break; - } - } - printf("\n"); - } - } -} - int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { void* pIter = NULL; @@ -561,50 +486,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = (SStreamTask*)pIter; - if (!pTask->sourceType) continue; - int32_t workerId = 0; - void* exec = pTask->runner[workerId].executor; - qSetStreamInput(exec, data, STREAM_DATA_TYPE_SUBMIT_BLOCK); - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - while (1) { - SSDataBlock* output; - uint64_t ts; - if (qExecTask(exec, &output, &ts) < 0) { - ASSERT(false); - } - if (output == NULL) { - break; - } - taosArrayPush(pRes, output); - } - if (pTask->sinkType) { - // write back - /*printf("reach end\n");*/ - tqDebugShowSSData(pRes); - } else { - int32_t tlen = sizeof(SStreamExecMsgHead) + tEncodeDataBlocks(NULL, pRes); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - return -1; - } - void* abuf = POINTER_SHIFT(buf, sizeof(SStreamExecMsgHead)); - tEncodeDataBlocks(abuf, pRes); - tmsg_t type; - - if (pTask->nextOpDst == STREAM_NEXT_OP_DST__VND) { - type = TDMT_VND_TASK_EXEC; - } else { - type = TDMT_SND_TASK_EXEC; - } - - SRpcMsg reqMsg = { - .pCont = buf, - .contLen = tlen, - .code = 0, - .msgType = type, - }; - tmsgSendReq(&pTq->pVnode->msgCb, &pTask->NextOpEp, &reqMsg); + if (streamExecTask(pTask, &pTq->pVnode->msgCb, data, STREAM_DATA_TYPE_SUBMIT_BLOCK, 0) < 0) { + // TODO } } return 0; @@ -612,33 +496,12 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { SStreamTaskExecReq* pReq = msg->pCont; + int32_t taskId = pReq->taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + ASSERT(pTask); - int32_t taskId = pReq->head.streamTaskId; - int32_t workerType = pReq->head.workerType; - - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - // assume worker id is 1 - int32_t workerId = 1; - void* exec = pTask->runner[workerId].executor; - int32_t sz = taosArrayGetSize(pReq->data); - printf("input data:\n"); - tqDebugShowSSData(pReq->data); - SArray* pRes = taosArrayInit(0, sizeof(void*)); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* input = taosArrayGet(pReq->data, i); - SSDataBlock* output; - uint64_t ts; - qSetStreamInput(exec, input, STREAM_DATA_TYPE_SSDATA_BLOCK); - if (qExecTask(exec, &output, &ts) < 0) { - ASSERT(0); - } - if (output == NULL) { - break; - } - taosArrayPush(pRes, &output); + if (streamExecTask(pTask, &pTq->pVnode->msgCb, pReq->data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) { + // TODO } - printf("output data:\n"); - tqDebugShowSSData(pRes); - return 0; } diff --git a/source/libs/CMakeLists.txt b/source/libs/CMakeLists.txt index ea8195bfd1..b448a43dcb 100644 --- a/source/libs/CMakeLists.txt +++ b/source/libs/CMakeLists.txt @@ -8,6 +8,7 @@ add_subdirectory(scheduler) add_subdirectory(cache) add_subdirectory(catalog) add_subdirectory(executor) +add_subdirectory(stream) add_subdirectory(planner) add_subdirectory(function) add_subdirectory(qcom) diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt new file mode 100644 index 0000000000..572c70d31b --- /dev/null +++ b/source/libs/stream/CMakeLists.txt @@ -0,0 +1,16 @@ +aux_source_directory(src STREAM_SRC) +add_library(stream STATIC ${STREAM_SRC}) +target_include_directories( + stream + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/stream" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) + +target_link_libraries( + stream + PRIVATE os util transport qcom executor +) + +if(${BUILD_TEST}) + ADD_SUBDIRECTORY(test) +endif(${BUILD_TEST}) diff --git a/source/libs/stream/inc/tstreamInc.h b/source/libs/stream/inc/tstreamInc.h new file mode 100644 index 0000000000..c96901e567 --- /dev/null +++ b/source/libs/stream/inc/tstreamInc.h @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#ifndef _TSTREAM_H_ +#define _TSTREAM_H_ + +#ifdef __cplusplus +} +#endif + +#endif /* ifndef _TSTREAM_H_ */ diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c new file mode 100644 index 0000000000..55bbb1920d --- /dev/null +++ b/source/libs/stream/src/tstream.c @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstream.h" +#include "executor.h" + +int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, int32_t inputType, int32_t workId) { + SArray* pRes = NULL; + // source + if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK && pTask->sourceType != TASK_SOURCE__SCAN) return 0; + + // exec + if (pTask->execType == TASK_EXEC__EXEC) { + ASSERT(workId < pTask->exec.numOfRunners); + void* exec = pTask->exec.runners[workId].executor; + pRes = taosArrayInit(0, sizeof(SSDataBlock)); + if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + qSetStreamInput(exec, input, inputType); + while (1) { + SSDataBlock* output; + uint64_t ts; + if (qExecTask(exec, &output, &ts) < 0) { + ASSERT(false); + } + if (output == NULL) { + break; + } + taosArrayPush(pRes, output); + } + } else if (inputType == STREAM_DATA_TYPE_SSDATA_BLOCK) { + const SArray* blocks = (const SArray*)input; + int32_t sz = taosArrayGetSize(blocks); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pBlock = taosArrayGet(blocks, i); + qSetStreamInput(exec, pBlock, inputType); + while (1) { + SSDataBlock* output; + uint64_t ts; + if (qExecTask(exec, &output, &ts) < 0) { + ASSERT(false); + } + if (output == NULL) { + break; + } + taosArrayPush(pRes, output); + } + } + } else { + ASSERT(0); + } + } else { + ASSERT(inputType == STREAM_DATA_TYPE_SSDATA_BLOCK); + pRes = (SArray*)input; + } + + // sink + if (pTask->sinkType == TASK_SINK__TABLE) { + // + } else if (pTask->sinkType == TASK_SINK__SMA) { + // + } else if (pTask->sinkType == TASK_SINK__FETCH) { + // + } else if (pTask->sinkType == TASK_SINK__SHOW) { + blockDebugShowData(pRes); + } else { + ASSERT(pTask->sinkType == TASK_SINK__NONE); + } + + // dispatch + if (pTask->dispatchType != TASK_DISPATCH__NONE) { + SStreamTaskExecReq req = { + .streamId = pTask->streamId, + .taskId = pTask->taskId, + .data = pRes, + }; + + int32_t tlen = sizeof(SMsgHead) + tEncodeSStreamTaskExecReq(NULL, &req); + void* buf = rpcMallocCont(tlen); + + if (buf == NULL) { + return -1; + } + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + tEncodeSStreamTaskExecReq(&abuf, &req); + + SRpcMsg dispatchMsg = { + .pCont = buf, + .contLen = tlen, + .code = 0, + .msgType = pTask->dispatchMsgType, + }; + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + int32_t qType; + if (pTask->dispatchMsgType == TDMT_VND_TASK_PIPE_EXEC || pTask->dispatchMsgType == TDMT_SND_TASK_PIPE_EXEC) { + qType = FETCH_QUEUE; + } else if (pTask->dispatchMsgType == TDMT_VND_TASK_MERGE_EXEC || + pTask->dispatchMsgType == TDMT_SND_TASK_MERGE_EXEC) { + qType = MERGE_QUEUE; + } else if (pTask->dispatchMsgType == TDMT_VND_TASK_WRITE_EXEC) { + qType = WRITE_QUEUE; + } else { + ASSERT(0); + } + tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + ((SMsgHead*)buf)->vgId = pTask->fixedEpDispatcher.nodeId; + SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet; + tmsgSendReq(pMsgCb, pEpSet, &dispatchMsg); + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + // TODO + } else { + ASSERT(0); + } + } + return 0; +} + +int32_t tEncodeSStreamTaskExecReq(void** buf, const SStreamTaskExecReq* pReq) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pReq->streamId); + tlen += taosEncodeFixedI32(buf, pReq->taskId); + tlen += tEncodeDataBlocks(buf, pReq->data); + return tlen; +} + +void* tDecodeSStreamTaskExecReq(const void* buf, SStreamTaskExecReq* pReq) { + buf = taosDecodeFixedI64(buf, &pReq->streamId); + buf = taosDecodeFixedI32(buf, &pReq->taskId); + buf = tDecodeDataBlocks(buf, &pReq->data); + return (void*)buf; +} + +void tFreeSStreamTaskExecReq(SStreamTaskExecReq* pReq) { taosArrayDestroy(pReq->data); } diff --git a/source/libs/stream/test/CMakeLists.txt b/source/libs/stream/test/CMakeLists.txt new file mode 100644 index 0000000000..e69de29bb2