From 2d628491b29ae88722b95a00af9e3cc2a9446318 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 23 Aug 2022 19:25:17 +0800 Subject: [PATCH 1/4] feat(stream): support tdb state backend --- docs/zh/12-taos-sql/14-stream.md | 4 +- examples/c/stream_demo.c | 8 +- include/libs/executor/executor.h | 32 +-- include/libs/stream/tstream.h | 44 ++++- source/dnode/vnode/src/tq/tq.c | 5 + source/libs/executor/CMakeLists.txt | 3 +- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 14 +- source/libs/executor/src/timewindowoperator.c | 4 +- source/libs/stream/src/streamMeta.c | 17 +- source/libs/stream/src/streamRecover.c | 5 +- source/libs/stream/src/streamState.c | 187 ++++++++++++++++++ source/libs/stream/src/streamTask.c | 3 + source/libs/wal/src/walMeta.c | 2 +- 14 files changed, 289 insertions(+), 40 deletions(-) create mode 100644 source/libs/stream/src/streamState.c diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index a967299e40..28f52be59a 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -18,7 +18,7 @@ stream_options: { 其中 subquery 是 select 普通查询语法的子集: ```sql -subquery: SELECT [DISTINCT] select_list +subquery: SELECT select_list from_clause [WHERE condition] [PARTITION BY tag_list] @@ -37,7 +37,7 @@ window_clause: { 其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 -窗口的定义与时序数据特色查询中的定义完全相同。 +窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) 例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 2fcf4dd62c..55556f21a1 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +// clang-format off #include #include #include @@ -94,13 +95,8 @@ int32_t create_stream() { } taos_free_result(pRes); - /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ - /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ - /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ - /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query(pConn, - "create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, " - "count(k) from st1 partition by tbname interval(20s) "); + "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a64815f14f..fb572b5591 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -23,24 +23,26 @@ extern "C" { #include "query.h" #include "tcommon.h" #include "tmsgcb.h" +#include "tstream.h" typedef void* qTaskInfo_t; typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef struct SReadHandle { - void* tqReader; - void* meta; - void* config; - void* vnode; - void* mnd; - SMsgCb* pMsgCb; - int64_t version; - bool initMetaReader; - bool initTableReader; - bool initTqReader; - int32_t numOfVgroups; +typedef struct { + void* tqReader; + void* meta; + void* config; + void* vnode; + void* mnd; + SMsgCb* pMsgCb; + int64_t version; + bool initMetaReader; + bool initTableReader; + bool initTqReader; + int32_t numOfVgroups; + SStreamState* pState; } SReadHandle; // in queue mode, data streams are seperated by msg @@ -78,8 +80,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO /** * @brief Cleanup SSDataBlock for StreamScanInfo - * - * @param tinfo + * + * @param tinfo */ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); @@ -163,7 +165,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/); +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/); int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 384c6a289f..16b259cf59 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -263,6 +263,14 @@ typedef struct { SArray* checkpointVer; } SStreamRecoveringState; +// incremental state storage +typedef struct { + SStreamTask* pOwner; + TDB* db; + TTB* pStateDb; + TXN txn; +} SStreamState; + typedef struct SStreamTask { int64_t streamId; int32_t taskId; @@ -312,6 +320,10 @@ typedef struct SStreamTask { // msg handle SMsgCb* pMsgCb; + + // state backend + SStreamState* pState; + } SStreamTask; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -507,7 +519,7 @@ typedef struct SStreamMeta { char* path; TDB* db; TTB* pTaskDb; - TTB* pStateDb; + TTB* pCheckpointDb; SHashObj* pTasks; SHashObj* pRecoverStatus; void* ahandle; @@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta); +SStreamState* streamStateOpen(char* path, SStreamTask* pTask); +void streamStateClose(SStreamState* pState); +int32_t streamStateBegin(SStreamState* pState); +int32_t streamStateCommit(SStreamState* pState); +int32_t streamStateAbort(SStreamState* pState); + +typedef struct { + TBC* pCur; +} SStreamStateCur; + +#if 1 +int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen); +int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen); +int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen); + +SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen); +void streamStateFreeCur(SStreamStateCur* pCur); + +int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen); + +int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); + +int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); + +#endif + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c6bc8e6e59..17aa426fec 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -664,6 +664,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ASSERT(pTask->exec.executor); } + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + if (pTask->pState == NULL) { + return -1; + } + // sink /*pTask->ahandle = pTq->pVnode;*/ if (pTask->outputType == TASK_OUTPUT__SMA) { diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index 89d08b3078..e0e2a7a267 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -8,7 +8,8 @@ add_library(executor STATIC ${EXECUTOR_SRC}) # ) target_link_libraries(executor - PRIVATE os util common function parser planner qcom vnode scalar nodes index stream + PUBLIC stream + PRIVATE os util common function parser planner qcom vnode scalar nodes index ) target_include_directories( diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fb4eac991f..03fc8d7bb3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -150,6 +150,7 @@ typedef struct { SQueryTableDataCond tableCond; int64_t recoverStartVer; int64_t recoverEndVer; + SStreamState* pState; } SStreamTaskInfo; typedef struct { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2d72bc813f..8db4dec88f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { pCtx->input.colDataAggIsSet = pStatus->hasAgg; - pCtx->input.numOfRows = pStatus->numOfRows; + pCtx->input.numOfRows = pStatus->numOfRows; pCtx->input.startRowIndex = pStatus->startOffset; } @@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode); - int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; + int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey); w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order); @@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, bool assignUid = groupbyTbname(group); - size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - if(assignUid){ + if (assignUid) { for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); info->groupId = info->uid; taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); } - }else{ + } else { int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4611,6 +4611,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } + if (pHandle && pHandle->pState) { + (*pTaskInfo)->streamInfo.pState = pHandle->pState; + } + (*pTaskInfo)->sql = sql; sql = NULL; (*pTaskInfo)->pSubplan = pPlan; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0594a727fc..871a340685 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3129,8 +3129,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.watermark); - if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA || - pBlock->info.type == STREAM_INVALID) { + ASSERT(pBlock->info.type != STREAM_INVERT); + if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_CLEAR) { SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5ff700546c..2386dbb3c4 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -14,7 +14,7 @@ */ #include "executor.h" -#include "tstream.h" +#include "streamInc.h" #include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { @@ -23,17 +23,22 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pMeta->path = strdup(path); + char streamPath[200]; + sprintf(streamPath, "%s/%s", path, "stream"); + pMeta->path = strdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { goto _err; } + char checkpointPath[200]; + sprintf(checkpointPath, "%s/%s", streamPath, "checkpoints"); + mkdir(checkpointPath, 0755); + if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { goto _err; } - // open state storage backend - if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) { + if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) { goto _err; } @@ -57,8 +62,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF _err: if (pMeta->path) taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); - if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); + if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); taosMemoryFree(pMeta); return NULL; @@ -67,7 +72,7 @@ _err: void streamMetaClose(SStreamMeta* pMeta) { tdbCommit(pMeta->db, &pMeta->txn); tdbTbClose(pMeta->pTaskDb); - tdbTbClose(pMeta->pStateDb); + tdbTbClose(pMeta->pCheckpointDb); tdbClose(pMeta->db); void* pIter = NULL; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 263053778b..0505c3edd6 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea } int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { +#if 0 void* buf = NULL; ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); @@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { FAIL: if (buf) taosMemoryFree(buf); return -1; +#endif return 0; } int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { +#if 0 void* pVal = NULL; int32_t vLen = 0; if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) { @@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { pTask->nextCheckId = aggCheckpoint.checkpointId + 1; pTask->checkpointInfo = aggCheckpoint.checkpointVer; - +#endif return 0; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c new file mode 100644 index 0000000000..6ccc90fa51 --- /dev/null +++ b/source/libs/stream/src/streamState.c @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "streamInc.h" +#include "ttimer.h" + +SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { + SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); + if (pState == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + char statePath[200]; + sprintf(statePath, "%s/%d", path, pTask->taskId); + if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) { + goto _err; + } + + // open state storage backend + if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) { + goto _err; + } + + pState->pOwner = pTask; + + return pState; + +_err: + if (pState->pStateDb) tdbTbClose(pState->pStateDb); + if (pState->db) tdbClose(pState->db); + taosMemoryFree(pState); + return NULL; +} + +void streamStateClose(SStreamState* pState) { + tdbCommit(pState->db, &pState->txn); + tdbTbClose(pState->pStateDb); + tdbClose(pState->db); + + taosMemoryFree(pState); +} + +int32_t streamStateBegin(SStreamState* pState) { + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStateCommit(SStreamState* pState) { + if (tdbCommit(pState->db, &pState->txn) < 0) { + return -1; + } + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStateAbort(SStreamState* pState) { + if (tdbAbort(pState->db, &pState->txn) < 0) { + return -1; + } + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) { + return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn); +} +int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) { + return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen); +} + +int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) { + return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn); +} + +SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; + tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); + + int32_t c; + tdbTbcMoveTo(pCur->pCur, key, kLen, &c); + if (c != 0) { + taosMemoryFree(pCur); + return NULL; + } + return 0; +} + +int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) { + return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen); +} + +int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToFirst(pCur->pCur); +} + +int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToLast(pCur->pCur); +} + +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c > 0) return pCur; + + if (tdbTbcMoveToNext(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c < 0) return pCur; + + if (tdbTbcMoveToPrev(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToNext(pCur->pCur); +} + +int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToPrev(pCur->pCur); +} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4009a47c65..ce5917de29 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); } + + if (pTask->pState) streamStateClose(pTask->pState); + taosMemoryFree(pTask); } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index a8da680910..7d9592e338 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { if (found == NULL) { // file corrupted, no complete log // TODO delete and search in previous files - ASSERT(0); + /*ASSERT(0);*/ terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } From c296dd0a6394aa7e7801d073cdb58bc24ae039f4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 23 Aug 2022 20:33:26 +0800 Subject: [PATCH 2/4] fix compile --- include/libs/executor/executor.h | 25 ++++++++++++------------- source/libs/executor/CMakeLists.txt | 3 +-- source/libs/executor/src/executorimpl.c | 4 ++-- source/libs/stream/src/streamExec.c | 1 - source/libs/stream/src/streamMeta.c | 9 +++++---- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index fb572b5591..1ce88905c2 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -23,7 +23,6 @@ extern "C" { #include "query.h" #include "tcommon.h" #include "tmsgcb.h" -#include "tstream.h" typedef void* qTaskInfo_t; typedef void* DataSinkHandle; @@ -31,18 +30,18 @@ struct SRpcMsg; struct SSubplan; typedef struct { - void* tqReader; - void* meta; - void* config; - void* vnode; - void* mnd; - SMsgCb* pMsgCb; - int64_t version; - bool initMetaReader; - bool initTableReader; - bool initTqReader; - int32_t numOfVgroups; - SStreamState* pState; + void* tqReader; + void* meta; + void* config; + void* vnode; + void* mnd; + SMsgCb* pMsgCb; + int64_t version; + bool initMetaReader; + bool initTableReader; + bool initTqReader; + int32_t numOfVgroups; + void* pStateBackend; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index e0e2a7a267..89d08b3078 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -8,8 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC}) # ) target_link_libraries(executor - PUBLIC stream - PRIVATE os util common function parser planner qcom vnode scalar nodes index + PRIVATE os util common function parser planner qcom vnode scalar nodes index stream ) target_include_directories( diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f7577aee9c..dc2389e25f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4616,8 +4616,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } - if (pHandle && pHandle->pState) { - (*pTaskInfo)->streamInfo.pState = pHandle->pState; + if (pHandle && pHandle->pStateBackend) { + (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend; } (*pTaskInfo)->sql = sql; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 06ca26f029..102bad7426 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) return 0; } -// TODO: handle version int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchCnt = 1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2386dbb3c4..0041d800a7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -23,16 +23,17 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - char streamPath[200]; + int32_t len = strlen(path) + 20; + char* streamPath = taosMemoryCalloc(1, len); sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = strdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { goto _err; } - char checkpointPath[200]; - sprintf(checkpointPath, "%s/%s", streamPath, "checkpoints"); - mkdir(checkpointPath, 0755); + sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); + mkdir(streamPath, 0755); + taosMemoryFree(streamPath); if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { goto _err; From d7b804e02e3765e275bf9ffb30d8dcfcefe786b9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 24 Aug 2022 10:42:34 +0800 Subject: [PATCH 3/4] fix stream load task --- source/dnode/vnode/src/tq/tq.c | 4 ++++ source/libs/stream/src/streamMeta.c | 3 --- source/libs/wal/src/walMeta.c | 1 - 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 17aa426fec..1456c6c067 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ASSERT(0); } + if (streamLoadTasks(pTq->pStreamMeta) < 0) { + ASSERT(0); + } + return pTq; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0041d800a7..20a2f7d332 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -55,9 +55,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - if (streamLoadTasks(pMeta) < 0) { - goto _err; - } return pMeta; _err: diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 7d9592e338..93ced912f8 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) { int code = walSaveMeta(pWal); if (code < 0) { - taosArrayDestroy(actualLog); return -1; } } From c064bc38e4cb58952208b2cf86ef072b9bb38b16 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Wed, 24 Aug 2022 11:54:39 +0800 Subject: [PATCH 4/4] build: cancel smaTest stbTest --- source/dnode/mnode/impl/test/sma/CMakeLists.txt | 10 ++++++---- source/dnode/mnode/impl/test/stb/CMakeLists.txt | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/test/sma/CMakeLists.txt b/source/dnode/mnode/impl/test/sma/CMakeLists.txt index 3f9ec123a8..a55b45ca11 100644 --- a/source/dnode/mnode/impl/test/sma/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/sma/CMakeLists.txt @@ -5,7 +5,9 @@ target_link_libraries( PUBLIC sut ) -add_test( - NAME smaTest - COMMAND smaTest -) +if(NOT ${TD_WINDOWS}) + add_test( + NAME smaTest + COMMAND smaTest + ) +endif(NOT ${TD_WINDOWS}) diff --git a/source/dnode/mnode/impl/test/stb/CMakeLists.txt b/source/dnode/mnode/impl/test/stb/CMakeLists.txt index dcfbe658fc..e3a3fc2e79 100644 --- a/source/dnode/mnode/impl/test/stb/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/stb/CMakeLists.txt @@ -5,7 +5,9 @@ target_link_libraries( PUBLIC sut ) -add_test( - NAME stbTest - COMMAND stbTest -) \ No newline at end of file +if(NOT ${TD_WINDOWS}) + add_test( + NAME stbTest + COMMAND stbTest + ) +endif(NOT ${TD_WINDOWS}) \ No newline at end of file