From 0d73ed62b2fd4dabc64a75a7a87fc0b391af8a9f Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 23 Aug 2022 17:05:19 +0800 Subject: [PATCH 01/10] fix(query): change mnd redo action sleep TD-18483 --- source/dnode/mnode/impl/src/mndTrans.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 17b4336465..06a95cfc93 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1312,7 +1312,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { if (pTrans->failedTimes < 6) { mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id, pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes); - taosMsleep(1000); + taosMsleep(100); continueExec = true; return true; } From 2d628491b29ae88722b95a00af9e3cc2a9446318 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 23 Aug 2022 19:25:17 +0800 Subject: [PATCH 02/10] feat(stream): support tdb state backend --- docs/zh/12-taos-sql/14-stream.md | 4 +- examples/c/stream_demo.c | 8 +- include/libs/executor/executor.h | 32 +-- include/libs/stream/tstream.h | 44 ++++- source/dnode/vnode/src/tq/tq.c | 5 + source/libs/executor/CMakeLists.txt | 3 +- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 14 +- source/libs/executor/src/timewindowoperator.c | 4 +- source/libs/stream/src/streamMeta.c | 17 +- source/libs/stream/src/streamRecover.c | 5 +- source/libs/stream/src/streamState.c | 187 ++++++++++++++++++ source/libs/stream/src/streamTask.c | 3 + source/libs/wal/src/walMeta.c | 2 +- 14 files changed, 289 insertions(+), 40 deletions(-) create mode 100644 source/libs/stream/src/streamState.c diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index a967299e40..28f52be59a 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -18,7 +18,7 @@ stream_options: { 其中 subquery 是 select 普通查询语法的子集: ```sql -subquery: SELECT [DISTINCT] select_list +subquery: SELECT select_list from_clause [WHERE condition] [PARTITION BY tag_list] @@ -37,7 +37,7 @@ window_clause: { 其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 -窗口的定义与时序数据特色查询中的定义完全相同。 +窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) 例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 2fcf4dd62c..55556f21a1 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +// clang-format off #include #include #include @@ -94,13 +95,8 @@ int32_t create_stream() { } taos_free_result(pRes); - /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ - /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ - /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ - /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query(pConn, - "create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, " - "count(k) from st1 partition by tbname interval(20s) "); + "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a64815f14f..fb572b5591 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -23,24 +23,26 @@ extern "C" { #include "query.h" #include "tcommon.h" #include "tmsgcb.h" +#include "tstream.h" typedef void* qTaskInfo_t; typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef struct SReadHandle { - void* tqReader; - void* meta; - void* config; - void* vnode; - void* mnd; - SMsgCb* pMsgCb; - int64_t version; - bool initMetaReader; - bool initTableReader; - bool initTqReader; - int32_t numOfVgroups; +typedef struct { + void* tqReader; + void* meta; + void* config; + void* vnode; + void* mnd; + SMsgCb* pMsgCb; + int64_t version; + bool initMetaReader; + bool initTableReader; + bool initTqReader; + int32_t numOfVgroups; + SStreamState* pState; } SReadHandle; // in queue mode, data streams are seperated by msg @@ -78,8 +80,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO /** * @brief Cleanup SSDataBlock for StreamScanInfo - * - * @param tinfo + * + * @param tinfo */ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); @@ -163,7 +165,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/); +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/); int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 384c6a289f..16b259cf59 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -263,6 +263,14 @@ typedef struct { SArray* checkpointVer; } SStreamRecoveringState; +// incremental state storage +typedef struct { + SStreamTask* pOwner; + TDB* db; + TTB* pStateDb; + TXN txn; +} SStreamState; + typedef struct SStreamTask { int64_t streamId; int32_t taskId; @@ -312,6 +320,10 @@ typedef struct SStreamTask { // msg handle SMsgCb* pMsgCb; + + // state backend + SStreamState* pState; + } SStreamTask; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -507,7 +519,7 @@ typedef struct SStreamMeta { char* path; TDB* db; TTB* pTaskDb; - TTB* pStateDb; + TTB* pCheckpointDb; SHashObj* pTasks; SHashObj* pRecoverStatus; void* ahandle; @@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta); +SStreamState* streamStateOpen(char* path, SStreamTask* pTask); +void streamStateClose(SStreamState* pState); +int32_t streamStateBegin(SStreamState* pState); +int32_t streamStateCommit(SStreamState* pState); +int32_t streamStateAbort(SStreamState* pState); + +typedef struct { + TBC* pCur; +} SStreamStateCur; + +#if 1 +int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen); +int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen); +int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen); + +SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen); +void streamStateFreeCur(SStreamStateCur* pCur); + +int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen); + +int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); + +int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); + +#endif + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c6bc8e6e59..17aa426fec 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -664,6 +664,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ASSERT(pTask->exec.executor); } + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + if (pTask->pState == NULL) { + return -1; + } + // sink /*pTask->ahandle = pTq->pVnode;*/ if (pTask->outputType == TASK_OUTPUT__SMA) { diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index 89d08b3078..e0e2a7a267 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -8,7 +8,8 @@ add_library(executor STATIC ${EXECUTOR_SRC}) # ) target_link_libraries(executor - PRIVATE os util common function parser planner qcom vnode scalar nodes index stream + PUBLIC stream + PRIVATE os util common function parser planner qcom vnode scalar nodes index ) target_include_directories( diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fb4eac991f..03fc8d7bb3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -150,6 +150,7 @@ typedef struct { SQueryTableDataCond tableCond; int64_t recoverStartVer; int64_t recoverEndVer; + SStreamState* pState; } SStreamTaskInfo; typedef struct { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2d72bc813f..8db4dec88f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { pCtx->input.colDataAggIsSet = pStatus->hasAgg; - pCtx->input.numOfRows = pStatus->numOfRows; + pCtx->input.numOfRows = pStatus->numOfRows; pCtx->input.startRowIndex = pStatus->startOffset; } @@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode); - int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; + int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey); w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order); @@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, bool assignUid = groupbyTbname(group); - size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - if(assignUid){ + if (assignUid) { for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); info->groupId = info->uid; taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); } - }else{ + } else { int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4611,6 +4611,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } + if (pHandle && pHandle->pState) { + (*pTaskInfo)->streamInfo.pState = pHandle->pState; + } + (*pTaskInfo)->sql = sql; sql = NULL; (*pTaskInfo)->pSubplan = pPlan; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0594a727fc..871a340685 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3129,8 +3129,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.watermark); - if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA || - pBlock->info.type == STREAM_INVALID) { + ASSERT(pBlock->info.type != STREAM_INVERT); + if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_CLEAR) { SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5ff700546c..2386dbb3c4 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -14,7 +14,7 @@ */ #include "executor.h" -#include "tstream.h" +#include "streamInc.h" #include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { @@ -23,17 +23,22 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pMeta->path = strdup(path); + char streamPath[200]; + sprintf(streamPath, "%s/%s", path, "stream"); + pMeta->path = strdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { goto _err; } + char checkpointPath[200]; + sprintf(checkpointPath, "%s/%s", streamPath, "checkpoints"); + mkdir(checkpointPath, 0755); + if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { goto _err; } - // open state storage backend - if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) { + if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) { goto _err; } @@ -57,8 +62,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF _err: if (pMeta->path) taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); - if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); + if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); taosMemoryFree(pMeta); return NULL; @@ -67,7 +72,7 @@ _err: void streamMetaClose(SStreamMeta* pMeta) { tdbCommit(pMeta->db, &pMeta->txn); tdbTbClose(pMeta->pTaskDb); - tdbTbClose(pMeta->pStateDb); + tdbTbClose(pMeta->pCheckpointDb); tdbClose(pMeta->db); void* pIter = NULL; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 263053778b..0505c3edd6 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea } int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { +#if 0 void* buf = NULL; ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); @@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { FAIL: if (buf) taosMemoryFree(buf); return -1; +#endif return 0; } int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { +#if 0 void* pVal = NULL; int32_t vLen = 0; if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) { @@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { pTask->nextCheckId = aggCheckpoint.checkpointId + 1; pTask->checkpointInfo = aggCheckpoint.checkpointVer; - +#endif return 0; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c new file mode 100644 index 0000000000..6ccc90fa51 --- /dev/null +++ b/source/libs/stream/src/streamState.c @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "streamInc.h" +#include "ttimer.h" + +SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { + SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); + if (pState == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + char statePath[200]; + sprintf(statePath, "%s/%d", path, pTask->taskId); + if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) { + goto _err; + } + + // open state storage backend + if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) { + goto _err; + } + + pState->pOwner = pTask; + + return pState; + +_err: + if (pState->pStateDb) tdbTbClose(pState->pStateDb); + if (pState->db) tdbClose(pState->db); + taosMemoryFree(pState); + return NULL; +} + +void streamStateClose(SStreamState* pState) { + tdbCommit(pState->db, &pState->txn); + tdbTbClose(pState->pStateDb); + tdbClose(pState->db); + + taosMemoryFree(pState); +} + +int32_t streamStateBegin(SStreamState* pState) { + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStateCommit(SStreamState* pState) { + if (tdbCommit(pState->db, &pState->txn) < 0) { + return -1; + } + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStateAbort(SStreamState* pState) { + if (tdbAbort(pState->db, &pState->txn) < 0) { + return -1; + } + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) { + return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn); +} +int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) { + return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen); +} + +int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) { + return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn); +} + +SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; + tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); + + int32_t c; + tdbTbcMoveTo(pCur->pCur, key, kLen, &c); + if (c != 0) { + taosMemoryFree(pCur); + return NULL; + } + return 0; +} + +int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) { + return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen); +} + +int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToFirst(pCur->pCur); +} + +int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToLast(pCur->pCur); +} + +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c > 0) return pCur; + + if (tdbTbcMoveToNext(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c < 0) return pCur; + + if (tdbTbcMoveToPrev(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToNext(pCur->pCur); +} + +int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToPrev(pCur->pCur); +} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4009a47c65..ce5917de29 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); } + + if (pTask->pState) streamStateClose(pTask->pState); + taosMemoryFree(pTask); } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index a8da680910..7d9592e338 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { if (found == NULL) { // file corrupted, no complete log // TODO delete and search in previous files - ASSERT(0); + /*ASSERT(0);*/ terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } From c296dd0a6394aa7e7801d073cdb58bc24ae039f4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 23 Aug 2022 20:33:26 +0800 Subject: [PATCH 03/10] fix compile --- include/libs/executor/executor.h | 25 ++++++++++++------------- source/libs/executor/CMakeLists.txt | 3 +-- source/libs/executor/src/executorimpl.c | 4 ++-- source/libs/stream/src/streamExec.c | 1 - source/libs/stream/src/streamMeta.c | 9 +++++---- 5 files changed, 20 insertions(+), 22 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index fb572b5591..1ce88905c2 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -23,7 +23,6 @@ extern "C" { #include "query.h" #include "tcommon.h" #include "tmsgcb.h" -#include "tstream.h" typedef void* qTaskInfo_t; typedef void* DataSinkHandle; @@ -31,18 +30,18 @@ struct SRpcMsg; struct SSubplan; typedef struct { - void* tqReader; - void* meta; - void* config; - void* vnode; - void* mnd; - SMsgCb* pMsgCb; - int64_t version; - bool initMetaReader; - bool initTableReader; - bool initTqReader; - int32_t numOfVgroups; - SStreamState* pState; + void* tqReader; + void* meta; + void* config; + void* vnode; + void* mnd; + SMsgCb* pMsgCb; + int64_t version; + bool initMetaReader; + bool initTableReader; + bool initTqReader; + int32_t numOfVgroups; + void* pStateBackend; } SReadHandle; // in queue mode, data streams are seperated by msg diff --git a/source/libs/executor/CMakeLists.txt b/source/libs/executor/CMakeLists.txt index e0e2a7a267..89d08b3078 100644 --- a/source/libs/executor/CMakeLists.txt +++ b/source/libs/executor/CMakeLists.txt @@ -8,8 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC}) # ) target_link_libraries(executor - PUBLIC stream - PRIVATE os util common function parser planner qcom vnode scalar nodes index + PRIVATE os util common function parser planner qcom vnode scalar nodes index stream ) target_include_directories( diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f7577aee9c..dc2389e25f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4616,8 +4616,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } - if (pHandle && pHandle->pState) { - (*pTaskInfo)->streamInfo.pState = pHandle->pState; + if (pHandle && pHandle->pStateBackend) { + (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend; } (*pTaskInfo)->sql = sql; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 06ca26f029..102bad7426 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) return 0; } -// TODO: handle version int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchCnt = 1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2386dbb3c4..0041d800a7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -23,16 +23,17 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - char streamPath[200]; + int32_t len = strlen(path) + 20; + char* streamPath = taosMemoryCalloc(1, len); sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = strdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { goto _err; } - char checkpointPath[200]; - sprintf(checkpointPath, "%s/%s", streamPath, "checkpoints"); - mkdir(checkpointPath, 0755); + sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); + mkdir(streamPath, 0755); + taosMemoryFree(streamPath); if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { goto _err; From d7b804e02e3765e275bf9ffb30d8dcfcefe786b9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 24 Aug 2022 10:42:34 +0800 Subject: [PATCH 04/10] fix stream load task --- source/dnode/vnode/src/tq/tq.c | 4 ++++ source/libs/stream/src/streamMeta.c | 3 --- source/libs/wal/src/walMeta.c | 1 - 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 17aa426fec..1456c6c067 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ASSERT(0); } + if (streamLoadTasks(pTq->pStreamMeta) < 0) { + ASSERT(0); + } + return pTq; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0041d800a7..20a2f7d332 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -55,9 +55,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - if (streamLoadTasks(pMeta) < 0) { - goto _err; - } return pMeta; _err: diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 7d9592e338..93ced912f8 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) { int code = walSaveMeta(pWal); if (code < 0) { - taosArrayDestroy(actualLog); return -1; } } From c064bc38e4cb58952208b2cf86ef072b9bb38b16 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Wed, 24 Aug 2022 11:54:39 +0800 Subject: [PATCH 05/10] build: cancel smaTest stbTest --- source/dnode/mnode/impl/test/sma/CMakeLists.txt | 10 ++++++---- source/dnode/mnode/impl/test/stb/CMakeLists.txt | 10 ++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/test/sma/CMakeLists.txt b/source/dnode/mnode/impl/test/sma/CMakeLists.txt index 3f9ec123a8..a55b45ca11 100644 --- a/source/dnode/mnode/impl/test/sma/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/sma/CMakeLists.txt @@ -5,7 +5,9 @@ target_link_libraries( PUBLIC sut ) -add_test( - NAME smaTest - COMMAND smaTest -) +if(NOT ${TD_WINDOWS}) + add_test( + NAME smaTest + COMMAND smaTest + ) +endif(NOT ${TD_WINDOWS}) diff --git a/source/dnode/mnode/impl/test/stb/CMakeLists.txt b/source/dnode/mnode/impl/test/stb/CMakeLists.txt index dcfbe658fc..e3a3fc2e79 100644 --- a/source/dnode/mnode/impl/test/stb/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/stb/CMakeLists.txt @@ -5,7 +5,9 @@ target_link_libraries( PUBLIC sut ) -add_test( - NAME stbTest - COMMAND stbTest -) \ No newline at end of file +if(NOT ${TD_WINDOWS}) + add_test( + NAME stbTest + COMMAND stbTest + ) +endif(NOT ${TD_WINDOWS}) \ No newline at end of file From 8f6aaf0a60e257f06c0db7e1def952d264298d15 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 24 Aug 2022 13:47:27 +0800 Subject: [PATCH 06/10] fix: donot retry if error code not match retry code --- source/dnode/mnode/impl/src/mndTrans.c | 4 ++-- tests/script/tsim/db/basic2.sim | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 06a95cfc93..c77a80cc82 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1308,11 +1308,11 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { if (pTrans->policy == TRN_POLICY_ROLLBACK) { if (pTrans->lastAction != 0) { STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction); - if (pAction->retryCode != 0 && pAction->retryCode != pAction->errCode) { + if (pAction->retryCode != 0 && pAction->retryCode == pAction->errCode) { if (pTrans->failedTimes < 6) { mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id, pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes); - taosMsleep(100); + taosMsleep(1000); continueExec = true; return true; } diff --git a/tests/script/tsim/db/basic2.sim b/tests/script/tsim/db/basic2.sim index b7ac0b5edd..4f0ba4a13c 100644 --- a/tests/script/tsim/db/basic2.sim +++ b/tests/script/tsim/db/basic2.sim @@ -4,7 +4,7 @@ system sh/exec.sh -n dnode1 -s start sql connect print =============== conflict stb -sql create database db vgroups 1; +sql create database db vgroups 4; sql use db; sql create table stb (ts timestamp, i int) tags (j int); sql_error create table stb using stb tags (1); @@ -16,6 +16,9 @@ sql_error create table ctb (ts timestamp, i int) tags (j int); sql create table ntb (ts timestamp, i int); sql_error create table ntb (ts timestamp, i int) tags (j int); +sql drop table ntb +sql create table ntb (ts timestamp, i int) tags (j int); + sql drop database db print =============== create database d1 From 1b964bfdc94e9f7e2df657b1e119fc79a5cec2e4 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 24 Aug 2022 13:47:56 +0800 Subject: [PATCH 07/10] fix: print src ip if found invalid packet --- source/libs/transport/src/transSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 447db76136..6dd9481b95 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -276,7 +276,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { while (transReadComplete(pBuf)) { tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); if (true == pBuf->invalid || false == uvHandleReq(conn)) { - tError("%s conn %p read invalid packet", transLabel(pTransInst), conn); + tError("%s conn %p read invalid packet, dst: %s, srv: %s", transLabel(pTransInst), conn, conn->dst, conn->src); destroyConn(conn, true); return; } From 4436af66f4f5114922d290f523cdd160a6256a55 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 24 Aug 2022 14:03:46 +0800 Subject: [PATCH 08/10] refactor: adjust telemetry interval --- source/common/src/tglobal.c | 2 +- source/dnode/mnode/impl/src/mndTelem.c | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index adc5af1a17..3b3a18a4d4 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -76,7 +76,7 @@ bool tsMonitorComp = false; // telem bool tsEnableTelem = true; -int32_t tsTelemInterval = 86400; +int32_t tsTelemInterval = 43200; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; uint16_t tsTelemPort = 80; diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c index 27814fe5be..93f7531a27 100644 --- a/source/dnode/mnode/impl/src/mndTelem.c +++ b/source/dnode/mnode/impl/src/mndTelem.c @@ -131,7 +131,9 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) { char* pCont = mndBuildTelemetryReport(pMnode); if (pCont != NULL) { if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) { - mError("failed to send telemetry msg"); + mError("failed to send telemetry report"); + } else { + mTrace("succeed to send telemetry report"); } taosMemoryFree(pCont); } From c0b80a0da46acd19a366c7d1eb28f1c549928987 Mon Sep 17 00:00:00 2001 From: dingbo Date: Wed, 24 Aug 2022 15:11:38 +0800 Subject: [PATCH 09/10] docs: hide 3.0 download --- docs/zh/28-releases/01-tdengine.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index a64798caa0..1e97572ca4 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -9,7 +9,7 @@ import Release from "/components/ReleaseV3"; -## 3.0.0.0 + From 24261cc90b14ed6bf5fbb32682c29ccd19d3b3c6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 24 Aug 2022 15:25:25 +0800 Subject: [PATCH 10/10] refactor(stream): refine stream backend interface --- examples/c/stream_demo.c | 2 +- include/common/tcommon.h | 24 +++ include/libs/stream/tstream.h | 15 +- source/dnode/vnode/src/tq/tq.c | 16 +- source/libs/executor/src/scanoperator.c | 36 ++++ source/libs/executor/src/timewindowoperator.c | 169 ++++++++++-------- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamState.c | 54 ++++-- 8 files changed, 215 insertions(+), 103 deletions(-) diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 55556f21a1..1c9d11b755 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -96,7 +96,7 @@ int32_t create_stream() { taos_free_result(pRes); pRes = taos_query(pConn, - "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)"); + "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tcommon.h b/include/common/tcommon.h index dbe020f7ec..fb59d8b9a0 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -44,6 +44,30 @@ enum { ) // clang-format on +typedef struct { + TSKEY ts; + uint64_t groupId; +} SWinKey; + +static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { + SWinKey* pWin1 = (SWinKey*)pKey1; + SWinKey* pWin2 = (SWinKey*)pKey2; + + if (pWin1->groupId > pWin2->groupId) { + return 1; + } else if (pWin1->groupId < pWin2->groupId) { + return -1; + } + + if (pWin1->ts > pWin2->ts) { + return 1; + } else if (pWin1->ts < pWin2->ts) { + return -1; + } + + return 0; +} + enum { TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__POLL_RSP, diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 16b259cf59..2c27509008 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -551,16 +551,17 @@ typedef struct { } SStreamStateCur; #if 1 -int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen); -int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen); -int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen); +int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); +int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateDel(SStreamState* pState, const SWinKey* key); +void streamFreeVal(void* val); -SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen); -SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen); -SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key); void streamStateFreeCur(SStreamStateCur* pCur); -int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen); +int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1456c6c067..3ff59ac2c0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -652,27 +652,33 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { // expand executor if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + if (pTask->pState == NULL) { + return -1; + } + SReadHandle handle = { .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, + .pStateBackend = pTask->pState, }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.executor); } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + if (pTask->pState == NULL) { + return -1; + } SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), + .pStateBackend = pTask->pState, }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); ASSERT(pTask->exec.executor); } - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); - if (pTask->pState == NULL) { - return -1; - } - // sink /*pTask->ahandle = pTq->pVnode;*/ if (pTask->outputType == TASK_OUTPUT__SMA) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 599f86f4fa..3eb382522a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1281,6 +1281,42 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; +#if 0 + SStreamState* pState = pTaskInfo->streamInfo.pState; + if (pState) { + printf(">>>>>>>> stream write backend\n"); + SWinKey key = { + .ts = 1, + .groupId = 2, + }; + char tmp[100] = "abcdefg1"; + if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) { + ASSERT(0); + } + + key.ts = 2; + char tmp2[100] = "abcdefg2"; + if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) { + ASSERT(0); + } + + key.groupId = 5; + key.ts = 1; + char tmp3[100] = "abcdefg3"; + if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) { + ASSERT(0); + } + + char* val2 = NULL; + int32_t sz; + if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) { + ASSERT(0); + } + printf("stream read %s %d\n", val2, sz); + streamFreeVal(val2); + } +#endif + qDebug("stream scan called"); if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { while (1) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3c4cadbc31..e89a43f154 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -15,6 +15,7 @@ #include "executorimpl.h" #include "function.h" #include "functionMgt.h" +#include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" #include "tfill.h" @@ -27,11 +28,6 @@ typedef enum SResultTsInterpType { #define IS_FINAL_OP(op) ((op)->isFinal) -typedef struct SWinRes { - TSKEY ts; - uint64_t groupId; -} SWinRes; - typedef struct SPullWindowInfo { STimeWindow window; uint64_t groupId; @@ -641,7 +637,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, + numOfExprs); if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { closeResultRow(pr); @@ -812,7 +809,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { int32_t compareResKey(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; SResKeyPos* pos = taosArrayGetP(res, index); - SWinRes* pData = (SWinRes*)pKey; + SWinKey* pData = (SWinKey*)pKey; if (pData->ts == *(int64_t*)pos->key) { if (pData->groupId > pos->groupId) { return 1; @@ -828,7 +825,7 @@ int32_t compareResKey(void* pKey, void* data, int32_t index) { static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SArray* pUpdated) { int32_t size = taosArrayGetSize(pUpdated); - SWinRes data = {.ts = ts, .groupId = groupId}; + SWinKey data = {.ts = ts, .groupId = groupId}; int32_t index = binarySearchCom(pUpdated, size, &data, TSDB_ORDER_DESC, compareResKey); if (index == -1) { index = 0; @@ -861,8 +858,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ newPos->groupId = groupId; newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; *(int64_t*)newPos->key = ts; - SWinRes key = {.ts = ts, .groupId = groupId}; - if (taosHashPut(pUpdatedMap, &key, sizeof(SWinRes), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { + SWinKey key = {.ts = ts, .groupId = groupId}; + if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { taosMemoryFree(newPos); } return TSDB_CODE_SUCCESS; @@ -879,20 +876,20 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) { int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { - SWinRes* pW = taosArrayGet(pWins, i); - taosHashRemove(pUpdatedMap, pW, sizeof(SWinRes)); + SWinKey* pW = taosArrayGet(pWins, i); + taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey)); } } int64_t getWinReskey(void* data, int32_t index) { SArray* res = (SArray*)data; - SWinRes* pos = taosArrayGet(res, index); + SWinKey* pos = taosArrayGet(res, index); return pos->ts; } int32_t compareWinRes(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; - SWinRes* pos = taosArrayGetP(res, index); + SWinKey* pos = taosArrayGetP(res, index); SResKeyPos* pData = (SResKeyPos*)pKey; if (*(int64_t*)pData->key == pos->ts) { if (pData->groupId > pos->groupId) { @@ -985,8 +982,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - pBlock->info.rows, numOfOutput); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + numOfOutput); } doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -1025,8 +1022,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - pBlock->info.rows, numOfOutput); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); } @@ -1214,8 +1211,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + pBlock->info.rows, numOfOutput); } static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { @@ -1418,7 +1415,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC); doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); if (pUpWins) { - SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]}; + SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]}; taosArrayPush(pUpWins, &winRes); } } @@ -1445,7 +1442,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); if (pUpWins && res) { - SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; + SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; taosArrayPush(pUpWins, &winRes); } getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); @@ -1484,11 +1481,11 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, STimeWindow win; win.skey = ts; win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - SWinRes winRe = { + SWinKey winRe = { .ts = win.skey, .groupId = groupId, }; - void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes)); + void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinKey)); if (isCloseWindow(&win, pSup)) { if (chIds && pPullDataMap) { SArray* chAy = *(SArray**)chIds; @@ -1555,7 +1552,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); for (int32_t i = *index; i < size; i++) { - SWinRes* pWin = taosArrayGet(pWins, i); + SWinKey* pWin = taosArrayGet(pWins, i); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false); colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&pWin->groupId, false); pBlock->info.rows++; @@ -1595,6 +1592,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + + SStreamState* pState = pTaskInfo->streamInfo.pState; + while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1639,6 +1639,35 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); } +#if 0 + if (pState) { + printf(">>>>>>>> stream read backend\n"); + SWinKey key = { + .ts = 1, + .groupId = 2, + }; + char* val = NULL; + int32_t sz; + if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) { + ASSERT(0); + } + printf("stream read %s %d\n", val, sz); + streamFreeVal(val); + + SStreamStateCur* pCur = streamStateGetCur(pState, &key); + ASSERT(pCur); + while (streamStateCurNext(pState, pCur) == 0) { + SWinKey key1; + const void* val1; + if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) { + break; + } + printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz); + } + streamStateFreeCur(pCur); + } +#endif + pOperator->status = OP_RES_TO_RETURN; closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); @@ -1857,7 +1886,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* } } pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); - pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); + pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delIndex = 0; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -1958,8 +1987,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + pBlock->info.rows, numOfOutput); } static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { @@ -2811,7 +2840,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr return; } for (int32_t i = 0; i < size; i++) { - SWinRes* pWinRes = taosArrayGet(pWinArray, i); + SWinKey* pWinRes = taosArrayGet(pWinArray, i); SResultRow* pCurResult = NULL; STimeWindow ParentWin = {.skey = pWinRes->ts, .ekey = pWinRes->ts + 1}; setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &ParentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx, @@ -2854,12 +2883,12 @@ int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC); } -void addPullWindow(SHashObj* pMap, SWinRes* pWinRes, int32_t size) { +void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) { SArray* childIds = taosArrayInit(8, sizeof(int32_t)); for (int32_t i = 0; i < size; i++) { taosArrayPush(childIds, &i); } - taosHashPut(pMap, pWinRes, sizeof(SWinRes), &childIds, sizeof(void*)); + taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*)); } static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } @@ -2906,11 +2935,11 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc } if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { bool ignore = true; - SWinRes winRes = { + SWinKey winRes = { .ts = nextWin.skey, .groupId = tableGroupId, }; - void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes)); + void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) { SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId}; // add pull data request @@ -3039,8 +3068,8 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; int32_t chId = getChildIndex(pBlock); for (int32_t i = 0; i < pBlock->info.rows; i++) { - SWinRes winRes = {.ts = tsData[i], .groupId = groupIdData[i]}; - void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinRes)); + SWinKey winRes = {.ts = tsData[i], .groupId = groupIdData[i]}; + void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey)); if (chIds) { SArray* chArray = *(SArray**)chIds; int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); @@ -3049,7 +3078,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { taosArrayRemove(chArray, index); if (taosArrayGetSize(chArray) == 0) { // pull data is over - taosHashRemove(pMap, &winRes, sizeof(SWinRes)); + taosHashRemove(pMap, &winRes, sizeof(SWinKey)); } } } @@ -3133,7 +3162,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_CLEAR) { - SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); + SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); @@ -3171,7 +3200,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { - SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); + SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); removeResults(pUpWins, pUpdatedMap); taosArrayDestroy(pUpWins); @@ -3386,7 +3415,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->delIndex = 0; - pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); + pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pOperator->operatorType = pPhyNode->type; @@ -3721,8 +3750,8 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS } if (pWinInfo->win.skey > pStartTs[i]) { if (pStDeleted && pWinInfo->isOutput) { - SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId}; - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); + SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId}; + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); pWinInfo->isOutput = false; } pWinInfo->win.skey = pStartTs[i]; @@ -3840,8 +3869,8 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo); taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition)); if (pWinInfo->isOutput) { - SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId}; - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); + SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId}; + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); pWinInfo->isOutput = false; } taosArrayRemove(pInfo->streamAggSup.pCurWins, i); @@ -3903,8 +3932,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } pCurWin->isClosed = false; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) { - SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId}; - code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes)); + SWinKey value = {.ts = pCurWin->win.skey, .groupId = groupId}; + code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -3956,8 +3985,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, int32_t step = 0; for (int32_t i = 0; i < pBlock->info.rows; i += step) { int32_t winIndex = 0; - SResultWindowInfo* pCurWin = - getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex); + SResultWindowInfo* pCurWin = getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex); if (!pCurWin || pCurWin->pos.pageId == -1) { // window has been closed. step = 1; @@ -3982,9 +4010,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) { if (pos == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - pos->groupId = ((SWinRes*)pData)->groupId; + pos->groupId = ((SWinKey*)pData)->groupId; pos->pos = *(SResultRowPosition*)key; - *(int64_t*)pos->key = ((SWinRes*)pData)->ts; + *(int64_t*)pos->key = ((SWinKey*)pData)->ts; taosArrayPush(pUpdated, &pos); } taosArraySort(pUpdated, resultrowComparAsc); @@ -4000,7 +4028,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It blockDataEnsureCapacity(pBlock, size); size_t keyLen = 0; while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { - SWinRes* res = *Ite; + SWinKey* res = *Ite; SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); @@ -4131,8 +4159,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) { int32_t size = taosArrayGetSize(pResWins); for (int32_t i = 0; i < size; i++) { SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i); - SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); + SWinKey res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); } } @@ -4170,14 +4198,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); - doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, pOperator->exprSupp.numOfExprs, 0, - pWins); + doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, + pOperator->exprSupp.numOfExprs, 0, pWins); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; - doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs, - 0, NULL); + doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, + pChildOp->exprSupp.numOfExprs, 0, NULL); rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator); } taosArrayDestroy(pWins); @@ -4581,7 +4609,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_ } int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, uint64_t groupId, - SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, SHashObj* pSeDeleted) { + SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, + SHashObj* pSeDeleted) { *allEqual = true; SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex); for (int32_t i = start; i < rows; ++i) { @@ -4602,9 +4631,8 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u } if (pWinInfo->winInfo.win.skey > pTs[i]) { if (pSeDeleted && pWinInfo->winInfo.isOutput) { - SWinRes res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId}; - taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res, - sizeof(SWinRes)); + SWinKey res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId}; + taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); pWinInfo->winInfo.isOutput = false; } pWinInfo->winInfo.win.skey = pTs[i]; @@ -4617,14 +4645,14 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u return rows - start; } -static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, - SHashObj* pSeUpdated, SHashObj* pSeDeleted) { +static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SHashObj* pSeUpdated, + SHashObj* pSeDeleted) { SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pGroupColInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); TSKEY* tsCol = (TSKEY*)pTsColInfo->pData; bool allEqual = false; int32_t step = 1; - uint64_t* gpCol = (uint64_t*) pGroupColInfo->pData; + uint64_t* gpCol = (uint64_t*)pGroupColInfo->pData; for (int32_t i = 0; i < pBlock->info.rows; i += step) { int32_t winIndex = 0; SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], gpCol[i], &winIndex); @@ -4668,13 +4696,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl char* pKeyData = colDataGetData(pKeyColInfo, i); int32_t winIndex = 0; bool allEqual = true; - SStateWindowInfo* pCurWin = - getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex); - winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo, - pSDataBlock->info.rows, i, &allEqual, pStDeleted); + SStateWindowInfo* pCurWin = getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex); + winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo, pSDataBlock->info.rows, + i, &allEqual, pStDeleted); if (!allEqual) { - appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, - GROUPID_COLUMN_INDEX, &groupId); + appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, GROUPID_COLUMN_INDEX, + &groupId); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); continue; @@ -4685,8 +4712,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } pCurWin->winInfo.isClosed = false; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId}; - code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes)); + SWinKey value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId}; + code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c78ff0756f..9d4010f60e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -358,7 +358,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat FAIL_SHUFFLE_DISPATCH: if (pReqs) { for (int32_t i = 0; i < vgSz; i++) { - taosArrayDestroy(pReqs[i].data); + taosArrayDestroyP(pReqs[i].data, taosMemoryFree); taosArrayDestroy(pReqs[i].dataLen); } taosMemoryFree(pReqs); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 6ccc90fa51..dfd6f012cc 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -15,6 +15,7 @@ #include "executor.h" #include "streamInc.h" +#include "tcommon.h" #include "ttimer.h" SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { @@ -23,14 +24,18 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - char statePath[200]; + char statePath[300]; sprintf(statePath, "%s/%d", path, pTask->taskId); - if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) { + if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { goto _err; } // open state storage backend - if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) { + if (tdbTbOpen("state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pStateDb) < 0) { + goto _err; + } + + if (streamStateBegin(pState) < 0) { goto _err; } @@ -60,6 +65,7 @@ int32_t streamStateBegin(SStreamState* pState) { } if (tdbBegin(pState->db, &pState->txn) < 0) { + tdbTxnClose(&pState->txn); return -1; } return 0; @@ -95,33 +101,39 @@ int32_t streamStateAbort(SStreamState* pState) { return 0; } -int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) { - return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn); +int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { + return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); } -int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) { - return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen); +int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + return tdbTbGet(pState->pStateDb, key, sizeof(SWinKey), pVal, pVLen); } -int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) { - return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn); +int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { + return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn); } -SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) { +SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); int32_t c; - tdbTbcMoveTo(pCur->pCur, key, kLen, &c); + tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c); if (c != 0) { taosMemoryFree(pCur); return NULL; } - return 0; + return pCur; } -int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) { - return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen); +int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + const SWinKey* pKTmp = NULL; + int32_t kLen; + if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { + return -1; + } + *pKey = *pKTmp; + return 0; } int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { @@ -134,14 +146,14 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { return tdbTbcMoveToLast(pCur->pCur); } -SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) { +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } int32_t c; - if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { taosMemoryFree(pCur); return NULL; } @@ -155,14 +167,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, i return pCur; } -SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) { +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } int32_t c; - if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { taosMemoryFree(pCur); return NULL; } @@ -185,3 +197,9 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { // return tdbTbcMoveToPrev(pCur->pCur); } +void streamStateFreeCur(SStreamStateCur* pCur) { + tdbTbcClose(pCur->pCur); + taosMemoryFree(pCur); +} + +void streamFreeVal(void* val) { tdbFree(val); }