diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md
index a967299e40..28f52be59a 100644
--- a/docs/zh/12-taos-sql/14-stream.md
+++ b/docs/zh/12-taos-sql/14-stream.md
@@ -18,7 +18,7 @@ stream_options: {
其中 subquery 是 select 普通查询语法的子集:
```sql
-subquery: SELECT [DISTINCT] select_list
+subquery: SELECT select_list
from_clause
[WHERE condition]
[PARTITION BY tag_list]
@@ -37,7 +37,7 @@ window_clause: {
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
-窗口的定义与时序数据特色查询中的定义完全相同。
+窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished)
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c
index 2fcf4dd62c..55556f21a1 100644
--- a/examples/c/stream_demo.c
+++ b/examples/c/stream_demo.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+// clang-format off
#include
#include
#include
@@ -94,13 +95,8 @@ int32_t create_stream() {
}
taos_free_result(pRes);
- /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
- /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
- /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
- /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(pConn,
- "create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
- "count(k) from st1 partition by tbname interval(20s) ");
+ "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)");
if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1;
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index a64815f14f..1ce88905c2 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -29,7 +29,7 @@ typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
-typedef struct SReadHandle {
+typedef struct {
void* tqReader;
void* meta;
void* config;
@@ -41,6 +41,7 @@ typedef struct SReadHandle {
bool initTableReader;
bool initTqReader;
int32_t numOfVgroups;
+ void* pStateBackend;
} SReadHandle;
// in queue mode, data streams are seperated by msg
@@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
/**
* @brief Cleanup SSDataBlock for StreamScanInfo
- *
- * @param tinfo
+ *
+ * @param tinfo
*/
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo);
@@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
-int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/);
+int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 384c6a289f..16b259cf59 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -263,6 +263,14 @@ typedef struct {
SArray* checkpointVer;
} SStreamRecoveringState;
+// incremental state storage
+typedef struct {
+ SStreamTask* pOwner;
+ TDB* db;
+ TTB* pStateDb;
+ TXN txn;
+} SStreamState;
+
typedef struct SStreamTask {
int64_t streamId;
int32_t taskId;
@@ -312,6 +320,10 @@ typedef struct SStreamTask {
// msg handle
SMsgCb* pMsgCb;
+
+ // state backend
+ SStreamState* pState;
+
} SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@@ -507,7 +519,7 @@ typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
- TTB* pStateDb;
+ TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pRecoverStatus;
void* ahandle;
@@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta);
+SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
+void streamStateClose(SStreamState* pState);
+int32_t streamStateBegin(SStreamState* pState);
+int32_t streamStateCommit(SStreamState* pState);
+int32_t streamStateAbort(SStreamState* pState);
+
+typedef struct {
+ TBC* pCur;
+} SStreamStateCur;
+
+#if 1
+int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen);
+int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen);
+int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen);
+
+SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen);
+SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen);
+SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen);
+void streamStateFreeCur(SStreamStateCur* pCur);
+
+int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen);
+
+int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
+int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
+
+int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
+int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
+
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/source/dnode/mnode/impl/test/sma/CMakeLists.txt b/source/dnode/mnode/impl/test/sma/CMakeLists.txt
index 3f9ec123a8..a55b45ca11 100644
--- a/source/dnode/mnode/impl/test/sma/CMakeLists.txt
+++ b/source/dnode/mnode/impl/test/sma/CMakeLists.txt
@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
)
-add_test(
- NAME smaTest
- COMMAND smaTest
-)
+if(NOT ${TD_WINDOWS})
+ add_test(
+ NAME smaTest
+ COMMAND smaTest
+ )
+endif(NOT ${TD_WINDOWS})
diff --git a/source/dnode/mnode/impl/test/stb/CMakeLists.txt b/source/dnode/mnode/impl/test/stb/CMakeLists.txt
index dcfbe658fc..e3a3fc2e79 100644
--- a/source/dnode/mnode/impl/test/stb/CMakeLists.txt
+++ b/source/dnode/mnode/impl/test/stb/CMakeLists.txt
@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
)
-add_test(
- NAME stbTest
- COMMAND stbTest
-)
\ No newline at end of file
+if(NOT ${TD_WINDOWS})
+ add_test(
+ NAME stbTest
+ COMMAND stbTest
+ )
+endif(NOT ${TD_WINDOWS})
\ No newline at end of file
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index c6bc8e6e59..1456c6c067 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT(0);
}
+ if (streamLoadTasks(pTq->pStreamMeta) < 0) {
+ ASSERT(0);
+ }
+
return pTq;
}
@@ -664,6 +668,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
ASSERT(pTask->exec.executor);
}
+ pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
+ if (pTask->pState == NULL) {
+ return -1;
+ }
+
// sink
/*pTask->ahandle = pTq->pVnode;*/
if (pTask->outputType == TASK_OUTPUT__SMA) {
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 601c22a3ba..67f7cb2f6f 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -150,6 +150,7 @@ typedef struct {
SQueryTableDataCond tableCond;
int64_t recoverStartVer;
int64_t recoverEndVer;
+ SStreamState* pState;
} SStreamTaskInfo;
typedef struct {
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 1836ca6d9b..98c7c56d72 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pCtx->input.colDataAggIsSet = pStatus->hasAgg;
- pCtx->input.numOfRows = pStatus->numOfRows;
+ pCtx->input.numOfRows = pStatus->numOfRows;
pCtx->input.startRowIndex = pStatus->startOffset;
}
@@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
- int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
+ int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
@@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
bool assignUid = groupbyTbname(group);
- size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
+ size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
- if(assignUid){
+ if (assignUid) {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = info->uid;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
}
- }else{
+ } else {
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
@@ -4615,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete;
}
+ if (pHandle && pHandle->pStateBackend) {
+ (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
+ }
+
(*pTaskInfo)->sql = sql;
sql = NULL;
(*pTaskInfo)->pSubplan = pPlan;
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 551180f639..3d54b791a8 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -3128,8 +3128,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
maxTs = TMAX(maxTs, pBlock->info.watermark);
- if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
- pBlock->info.type == STREAM_INVALID) {
+ ASSERT(pBlock->info.type != STREAM_INVERT);
+ if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) {
SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 06ca26f029..102bad7426 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return 0;
}
-// TODO: handle version
int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
int32_t batchCnt = 1;
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 5ff700546c..20a2f7d332 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -14,7 +14,7 @@
*/
#include "executor.h"
-#include "tstream.h"
+#include "streamInc.h"
#include "ttimer.h"
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
@@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
- pMeta->path = strdup(path);
+ int32_t len = strlen(path) + 20;
+ char* streamPath = taosMemoryCalloc(1, len);
+ sprintf(streamPath, "%s/%s", path, "stream");
+ pMeta->path = strdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
goto _err;
}
+ sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
+ mkdir(streamPath, 0755);
+ taosMemoryFree(streamPath);
+
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
goto _err;
}
- // open state storage backend
- if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) {
+ if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) {
goto _err;
}
@@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
- if (streamLoadTasks(pMeta) < 0) {
- goto _err;
- }
return pMeta;
_err:
if (pMeta->path) taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
- if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
+ if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
taosMemoryFree(pMeta);
return NULL;
@@ -67,7 +70,7 @@ _err:
void streamMetaClose(SStreamMeta* pMeta) {
tdbCommit(pMeta->db, &pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
- tdbTbClose(pMeta->pStateDb);
+ tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db);
void* pIter = NULL;
diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c
index 263053778b..0505c3edd6 100644
--- a/source/libs/stream/src/streamRecover.c
+++ b/source/libs/stream/src/streamRecover.c
@@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
}
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
+#if 0
void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
@@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
FAIL:
if (buf) taosMemoryFree(buf);
return -1;
+#endif
return 0;
}
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
+#if 0
void* pVal = NULL;
int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
@@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
pTask->checkpointInfo = aggCheckpoint.checkpointVer;
-
+#endif
return 0;
}
diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c
new file mode 100644
index 0000000000..6ccc90fa51
--- /dev/null
+++ b/source/libs/stream/src/streamState.c
@@ -0,0 +1,187 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "executor.h"
+#include "streamInc.h"
+#include "ttimer.h"
+
+SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
+ SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
+ if (pState == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
+ char statePath[200];
+ sprintf(statePath, "%s/%d", path, pTask->taskId);
+ if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) {
+ goto _err;
+ }
+
+ // open state storage backend
+ if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) {
+ goto _err;
+ }
+
+ pState->pOwner = pTask;
+
+ return pState;
+
+_err:
+ if (pState->pStateDb) tdbTbClose(pState->pStateDb);
+ if (pState->db) tdbClose(pState->db);
+ taosMemoryFree(pState);
+ return NULL;
+}
+
+void streamStateClose(SStreamState* pState) {
+ tdbCommit(pState->db, &pState->txn);
+ tdbTbClose(pState->pStateDb);
+ tdbClose(pState->db);
+
+ taosMemoryFree(pState);
+}
+
+int32_t streamStateBegin(SStreamState* pState) {
+ if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
+ 0) {
+ return -1;
+ }
+
+ if (tdbBegin(pState->db, &pState->txn) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+int32_t streamStateCommit(SStreamState* pState) {
+ if (tdbCommit(pState->db, &pState->txn) < 0) {
+ return -1;
+ }
+ memset(&pState->txn, 0, sizeof(TXN));
+ if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
+ 0) {
+ return -1;
+ }
+ if (tdbBegin(pState->db, &pState->txn) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+int32_t streamStateAbort(SStreamState* pState) {
+ if (tdbAbort(pState->db, &pState->txn) < 0) {
+ return -1;
+ }
+ memset(&pState->txn, 0, sizeof(TXN));
+ if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
+ 0) {
+ return -1;
+ }
+ if (tdbBegin(pState->db, &pState->txn) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) {
+ return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn);
+}
+int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) {
+ return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen);
+}
+
+int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) {
+ return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn);
+}
+
+SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) {
+ SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
+ if (pCur == NULL) return NULL;
+ tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
+
+ int32_t c;
+ tdbTbcMoveTo(pCur->pCur, key, kLen, &c);
+ if (c != 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+ return 0;
+}
+
+int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) {
+ return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen);
+}
+
+int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
+ //
+ return tdbTbcMoveToFirst(pCur->pCur);
+}
+
+int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
+ //
+ return tdbTbcMoveToLast(pCur->pCur);
+}
+
+SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) {
+ SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
+ if (pCur == NULL) {
+ return NULL;
+ }
+
+ int32_t c;
+ if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+ if (c > 0) return pCur;
+
+ if (tdbTbcMoveToNext(pCur->pCur) < 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+
+ return pCur;
+}
+
+SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) {
+ SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
+ if (pCur == NULL) {
+ return NULL;
+ }
+
+ int32_t c;
+ if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+ if (c < 0) return pCur;
+
+ if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+
+ return pCur;
+}
+
+int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
+ //
+ return tdbTbcMoveToNext(pCur->pCur);
+}
+
+int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
+ //
+ return tdbTbcMoveToPrev(pCur->pCur);
+}
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index 4009a47c65..ce5917de29 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
}
+
+ if (pTask->pState) streamStateClose(pTask->pState);
+
taosMemoryFree(pTask);
}
diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c
index a8da680910..93ced912f8 100644
--- a/source/libs/wal/src/walMeta.c
+++ b/source/libs/wal/src/walMeta.c
@@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
if (found == NULL) {
// file corrupted, no complete log
// TODO delete and search in previous files
- ASSERT(0);
+ /*ASSERT(0);*/
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
@@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
int code = walSaveMeta(pWal);
if (code < 0) {
- taosArrayDestroy(actualLog);
return -1;
}
}