diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md
index a967299e40..28f52be59a 100644
--- a/docs/zh/12-taos-sql/14-stream.md
+++ b/docs/zh/12-taos-sql/14-stream.md
@@ -18,7 +18,7 @@ stream_options: {
其中 subquery 是 select 普通查询语法的子集:
```sql
-subquery: SELECT [DISTINCT] select_list
+subquery: SELECT select_list
from_clause
[WHERE condition]
[PARTITION BY tag_list]
@@ -37,7 +37,7 @@ window_clause: {
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
-窗口的定义与时序数据特色查询中的定义完全相同。
+窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished)
例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。
diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md
index a64798caa0..1e97572ca4 100644
--- a/docs/zh/28-releases/01-tdengine.md
+++ b/docs/zh/28-releases/01-tdengine.md
@@ -9,7 +9,7 @@ import Release from "/components/ReleaseV3";
-## 3.0.0.0
+
diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c
index 2fcf4dd62c..1c9d11b755 100644
--- a/examples/c/stream_demo.c
+++ b/examples/c/stream_demo.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+// clang-format off
#include
#include
#include
@@ -94,13 +95,8 @@ int32_t create_stream() {
}
taos_free_result(pRes);
- /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/
- /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/
- /*const char* sql = "select sum(k) from tu1 interval(10m)";*/
- /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
pRes = taos_query(pConn,
- "create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, "
- "count(k) from st1 partition by tbname interval(20s) ");
+ "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)");
if (taos_errno(pRes) != 0) {
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
return -1;
diff --git a/include/common/tcommon.h b/include/common/tcommon.h
index dbe020f7ec..fb59d8b9a0 100644
--- a/include/common/tcommon.h
+++ b/include/common/tcommon.h
@@ -44,6 +44,30 @@ enum {
)
// clang-format on
+typedef struct {
+ TSKEY ts;
+ uint64_t groupId;
+} SWinKey;
+
+static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
+ SWinKey* pWin1 = (SWinKey*)pKey1;
+ SWinKey* pWin2 = (SWinKey*)pKey2;
+
+ if (pWin1->groupId > pWin2->groupId) {
+ return 1;
+ } else if (pWin1->groupId < pWin2->groupId) {
+ return -1;
+ }
+
+ if (pWin1->ts > pWin2->ts) {
+ return 1;
+ } else if (pWin1->ts < pWin2->ts) {
+ return -1;
+ }
+
+ return 0;
+}
+
enum {
TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP,
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index a64815f14f..1ce88905c2 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -29,7 +29,7 @@ typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan;
-typedef struct SReadHandle {
+typedef struct {
void* tqReader;
void* meta;
void* config;
@@ -41,6 +41,7 @@ typedef struct SReadHandle {
bool initTableReader;
bool initTqReader;
int32_t numOfVgroups;
+ void* pStateBackend;
} SReadHandle;
// in queue mode, data streams are seperated by msg
@@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
/**
* @brief Cleanup SSDataBlock for StreamScanInfo
- *
- * @param tinfo
+ *
+ * @param tinfo
*/
void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo);
@@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t
void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
-int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/);
+int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/);
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len);
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 384c6a289f..2c27509008 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -263,6 +263,14 @@ typedef struct {
SArray* checkpointVer;
} SStreamRecoveringState;
+// incremental state storage
+typedef struct {
+ SStreamTask* pOwner;
+ TDB* db;
+ TTB* pStateDb;
+ TXN txn;
+} SStreamState;
+
typedef struct SStreamTask {
int64_t streamId;
int32_t taskId;
@@ -312,6 +320,10 @@ typedef struct SStreamTask {
// msg handle
SMsgCb* pMsgCb;
+
+ // state backend
+ SStreamState* pState;
+
} SStreamTask;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@@ -507,7 +519,7 @@ typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
- TTB* pStateDb;
+ TTB* pCheckpointDb;
SHashObj* pTasks;
SHashObj* pRecoverStatus;
void* ahandle;
@@ -528,6 +540,37 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaRollBack(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta);
+SStreamState* streamStateOpen(char* path, SStreamTask* pTask);
+void streamStateClose(SStreamState* pState);
+int32_t streamStateBegin(SStreamState* pState);
+int32_t streamStateCommit(SStreamState* pState);
+int32_t streamStateAbort(SStreamState* pState);
+
+typedef struct {
+ TBC* pCur;
+} SStreamStateCur;
+
+#if 1
+int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
+int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
+int32_t streamStateDel(SStreamState* pState, const SWinKey* key);
+void streamFreeVal(void* val);
+
+SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key);
+SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key);
+SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key);
+void streamStateFreeCur(SStreamStateCur* pCur);
+
+int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
+
+int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur);
+int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
+
+int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
+int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
+
+#endif
+
#ifdef __cplusplus
}
#endif
diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c
index a628f551f4..3bbbb8b463 100644
--- a/source/common/src/tglobal.c
+++ b/source/common/src/tglobal.c
@@ -76,7 +76,7 @@ bool tsMonitorComp = false;
// telem
bool tsEnableTelem = true;
-int32_t tsTelemInterval = 86400;
+int32_t tsTelemInterval = 43200;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80;
diff --git a/source/dnode/mnode/impl/src/mndTelem.c b/source/dnode/mnode/impl/src/mndTelem.c
index 27814fe5be..93f7531a27 100644
--- a/source/dnode/mnode/impl/src/mndTelem.c
+++ b/source/dnode/mnode/impl/src/mndTelem.c
@@ -131,7 +131,9 @@ static int32_t mndProcessTelemTimer(SRpcMsg* pReq) {
char* pCont = mndBuildTelemetryReport(pMnode);
if (pCont != NULL) {
if (taosSendHttpReport(tsTelemServer, tsTelemPort, pCont, strlen(pCont), HTTP_FLAT) != 0) {
- mError("failed to send telemetry msg");
+ mError("failed to send telemetry report");
+ } else {
+ mTrace("succeed to send telemetry report");
}
taosMemoryFree(pCont);
}
diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c
index 17b4336465..c77a80cc82 100644
--- a/source/dnode/mnode/impl/src/mndTrans.c
+++ b/source/dnode/mnode/impl/src/mndTrans.c
@@ -1308,7 +1308,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->lastAction != 0) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->lastAction);
- if (pAction->retryCode != 0 && pAction->retryCode != pAction->errCode) {
+ if (pAction->retryCode != 0 && pAction->retryCode == pAction->errCode) {
if (pTrans->failedTimes < 6) {
mError("trans:%d, stage keep on redoAction since action:%d code:0x%x not 0x%x, failedTimes:%d", pTrans->id,
pTrans->lastAction, pTrans->code, pAction->retryCode, pTrans->failedTimes);
diff --git a/source/dnode/mnode/impl/test/sma/CMakeLists.txt b/source/dnode/mnode/impl/test/sma/CMakeLists.txt
index 3f9ec123a8..a55b45ca11 100644
--- a/source/dnode/mnode/impl/test/sma/CMakeLists.txt
+++ b/source/dnode/mnode/impl/test/sma/CMakeLists.txt
@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
)
-add_test(
- NAME smaTest
- COMMAND smaTest
-)
+if(NOT ${TD_WINDOWS})
+ add_test(
+ NAME smaTest
+ COMMAND smaTest
+ )
+endif(NOT ${TD_WINDOWS})
diff --git a/source/dnode/mnode/impl/test/stb/CMakeLists.txt b/source/dnode/mnode/impl/test/stb/CMakeLists.txt
index dcfbe658fc..e3a3fc2e79 100644
--- a/source/dnode/mnode/impl/test/stb/CMakeLists.txt
+++ b/source/dnode/mnode/impl/test/stb/CMakeLists.txt
@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
)
-add_test(
- NAME stbTest
- COMMAND stbTest
-)
\ No newline at end of file
+if(NOT ${TD_WINDOWS})
+ add_test(
+ NAME stbTest
+ COMMAND stbTest
+ )
+endif(NOT ${TD_WINDOWS})
\ No newline at end of file
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index c6bc8e6e59..3ff59ac2c0 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT(0);
}
+ if (streamLoadTasks(pTq->pStreamMeta) < 0) {
+ ASSERT(0);
+ }
+
return pTq;
}
@@ -648,17 +652,28 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
// expand executor
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
+ pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
+ if (pTask->pState == NULL) {
+ return -1;
+ }
+
SReadHandle handle = {
.meta = pTq->pVnode->pMeta,
.vnode = pTq->pVnode,
.initTqReader = 1,
+ .pStateBackend = pTask->pState,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.executor);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
+ pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask);
+ if (pTask->pState == NULL) {
+ return -1;
+ }
SReadHandle mgHandle = {
.vnode = NULL,
.numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo),
+ .pStateBackend = pTask->pState,
};
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
ASSERT(pTask->exec.executor);
diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h
index 601c22a3ba..67f7cb2f6f 100644
--- a/source/libs/executor/inc/executorimpl.h
+++ b/source/libs/executor/inc/executorimpl.h
@@ -150,6 +150,7 @@ typedef struct {
SQueryTableDataCond tableCond;
int64_t recoverStartVer;
int64_t recoverEndVer;
+ SStreamState* pState;
} SStreamTaskInfo;
typedef struct {
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 1836ca6d9b..98c7c56d72 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
pCtx->input.colDataAggIsSet = pStatus->hasAgg;
- pCtx->input.numOfRows = pStatus->numOfRows;
+ pCtx->input.numOfRows = pStatus->numOfRows;
pCtx->input.startRowIndex = pStatus->startOffset;
}
@@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
- int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
+ int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
@@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle,
bool assignUid = groupbyTbname(group);
- size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
+ size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList);
- if(assignUid){
+ if (assignUid) {
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i);
info->groupId = info->uid;
taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t));
}
- }else{
+ } else {
int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
@@ -4615,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
goto _complete;
}
+ if (pHandle && pHandle->pStateBackend) {
+ (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend;
+ }
+
(*pTaskInfo)->sql = sql;
sql = NULL;
(*pTaskInfo)->pSubplan = pPlan;
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 599f86f4fa..3eb382522a 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -1281,6 +1281,42 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
+#if 0
+ SStreamState* pState = pTaskInfo->streamInfo.pState;
+ if (pState) {
+ printf(">>>>>>>> stream write backend\n");
+ SWinKey key = {
+ .ts = 1,
+ .groupId = 2,
+ };
+ char tmp[100] = "abcdefg1";
+ if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) {
+ ASSERT(0);
+ }
+
+ key.ts = 2;
+ char tmp2[100] = "abcdefg2";
+ if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) {
+ ASSERT(0);
+ }
+
+ key.groupId = 5;
+ key.ts = 1;
+ char tmp3[100] = "abcdefg3";
+ if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) {
+ ASSERT(0);
+ }
+
+ char* val2 = NULL;
+ int32_t sz;
+ if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) {
+ ASSERT(0);
+ }
+ printf("stream read %s %d\n", val2, sz);
+ streamFreeVal(val2);
+ }
+#endif
+
qDebug("stream scan called");
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) {
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 551180f639..ba0e534cc6 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -15,6 +15,7 @@
#include "executorimpl.h"
#include "function.h"
#include "functionMgt.h"
+#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
@@ -27,11 +28,6 @@ typedef enum SResultTsInterpType {
#define IS_FINAL_OP(op) ((op)->isFinal)
-typedef struct SWinRes {
- TSKEY ts;
- uint64_t groupId;
-} SWinRes;
-
typedef struct SPullWindowInfo {
STimeWindow window;
uint64_t groupId;
@@ -641,7 +637,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
- doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs);
+ doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows,
+ numOfExprs);
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pr);
@@ -812,7 +809,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) {
int32_t compareResKey(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
SResKeyPos* pos = taosArrayGetP(res, index);
- SWinRes* pData = (SWinRes*)pKey;
+ SWinKey* pData = (SWinKey*)pKey;
if (pData->ts == *(int64_t*)pos->key) {
if (pData->groupId > pos->groupId) {
return 1;
@@ -828,7 +825,7 @@ int32_t compareResKey(void* pKey, void* data, int32_t index) {
static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SArray* pUpdated) {
int32_t size = taosArrayGetSize(pUpdated);
- SWinRes data = {.ts = ts, .groupId = groupId};
+ SWinKey data = {.ts = ts, .groupId = groupId};
int32_t index = binarySearchCom(pUpdated, size, &data, TSDB_ORDER_DESC, compareResKey);
if (index == -1) {
index = 0;
@@ -861,8 +858,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_
newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
*(int64_t*)newPos->key = ts;
- SWinRes key = {.ts = ts, .groupId = groupId};
- if (taosHashPut(pUpdatedMap, &key, sizeof(SWinRes), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
+ SWinKey key = {.ts = ts, .groupId = groupId};
+ if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
taosMemoryFree(newPos);
}
return TSDB_CODE_SUCCESS;
@@ -879,20 +876,20 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda
static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) {
int32_t size = taosArrayGetSize(pWins);
for (int32_t i = 0; i < size; i++) {
- SWinRes* pW = taosArrayGet(pWins, i);
- taosHashRemove(pUpdatedMap, pW, sizeof(SWinRes));
+ SWinKey* pW = taosArrayGet(pWins, i);
+ taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
}
}
int64_t getWinReskey(void* data, int32_t index) {
SArray* res = (SArray*)data;
- SWinRes* pos = taosArrayGet(res, index);
+ SWinKey* pos = taosArrayGet(res, index);
return pos->ts;
}
int32_t compareWinRes(void* pKey, void* data, int32_t index) {
SArray* res = (SArray*)data;
- SWinRes* pos = taosArrayGetP(res, index);
+ SWinKey* pos = taosArrayGetP(res, index);
SResKeyPos* pData = (SResKeyPos*)pKey;
if (*(int64_t*)pData->key == pos->ts) {
if (pData->groupId > pos->groupId) {
@@ -985,8 +982,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
- doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
- pBlock->info.rows, numOfOutput);
+ doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
+ numOfOutput);
}
doCloseWindow(pResultRowInfo, pInfo, pResult);
@@ -1025,8 +1022,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
- doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
- pBlock->info.rows, numOfOutput);
+ doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows,
+ numOfOutput);
doCloseWindow(pResultRowInfo, pInfo, pResult);
}
@@ -1214,8 +1211,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
- doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
- pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
+ doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows,
+ pBlock->info.rows, numOfOutput);
}
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
@@ -1418,7 +1415,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock,
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC);
doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]);
if (pUpWins) {
- SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]};
+ SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]};
taosArrayPush(pUpWins, &winRes);
}
}
@@ -1445,7 +1442,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval*
uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId;
bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput);
if (pUpWins && res) {
- SWinRes winRes = {.ts = win.skey, .groupId = winGpId};
+ SWinKey winRes = {.ts = win.skey, .groupId = winGpId};
taosArrayPush(pUpWins, &winRes);
}
getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win);
@@ -1484,11 +1481,11 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
STimeWindow win;
win.skey = ts;
win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1;
- SWinRes winRe = {
+ SWinKey winRe = {
.ts = win.skey,
.groupId = groupId,
};
- void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes));
+ void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinKey));
if (isCloseWindow(&win, pSup)) {
if (chIds && pPullDataMap) {
SArray* chAy = *(SArray**)chIds;
@@ -1555,7 +1552,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t i = *index; i < size; i++) {
- SWinRes* pWin = taosArrayGet(pWins, i);
+ SWinKey* pWin = taosArrayGet(pWins, i);
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false);
colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&pWin->groupId, false);
pBlock->info.rows++;
@@ -1595,6 +1592,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
+
+ SStreamState* pState = pTaskInfo->streamInfo.pState;
+
while (1) {
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
@@ -1639,6 +1639,35 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap);
}
+#if 0
+ if (pState) {
+ printf(">>>>>>>> stream read backend\n");
+ SWinKey key = {
+ .ts = 1,
+ .groupId = 2,
+ };
+ char* val = NULL;
+ int32_t sz;
+ if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) {
+ ASSERT(0);
+ }
+ printf("stream read %s %d\n", val, sz);
+ streamFreeVal(val);
+
+ SStreamStateCur* pCur = streamStateGetCur(pState, &key);
+ ASSERT(pCur);
+ while (streamStateCurNext(pState, pCur) == 0) {
+ SWinKey key1;
+ const void* val1;
+ if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) {
+ break;
+ }
+ printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz);
+ }
+ streamStateFreeCur(pCur);
+ }
+#endif
+
pOperator->status = OP_RES_TO_RETURN;
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
@@ -1857,7 +1886,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
}
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
- pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
+ pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->delIndex = 0;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
initResultRowInfo(&pInfo->binfo.resultRowInfo);
@@ -1958,8 +1987,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
- doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
- pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
+ doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows,
+ pBlock->info.rows, numOfOutput);
}
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
@@ -2811,7 +2840,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
return;
}
for (int32_t i = 0; i < size; i++) {
- SWinRes* pWinRes = taosArrayGet(pWinArray, i);
+ SWinKey* pWinRes = taosArrayGet(pWinArray, i);
SResultRow* pCurResult = NULL;
STimeWindow ParentWin = {.skey = pWinRes->ts, .ekey = pWinRes->ts + 1};
setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &ParentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx,
@@ -2854,12 +2883,12 @@ int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY*
return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC);
}
-void addPullWindow(SHashObj* pMap, SWinRes* pWinRes, int32_t size) {
+void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) {
SArray* childIds = taosArrayInit(8, sizeof(int32_t));
for (int32_t i = 0; i < size; i++) {
taosArrayPush(childIds, &i);
}
- taosHashPut(pMap, pWinRes, sizeof(SWinRes), &childIds, sizeof(void*));
+ taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*));
}
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
@@ -2906,11 +2935,11 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
}
if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) {
bool ignore = true;
- SWinRes winRes = {
+ SWinKey winRes = {
.ts = nextWin.skey,
.groupId = tableGroupId,
};
- void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes));
+ void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) {
SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId};
// add pull data request
@@ -3038,8 +3067,8 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock);
for (int32_t i = 0; i < pBlock->info.rows; i++) {
- SWinRes winRes = {.ts = tsData[i], .groupId = groupIdData[i]};
- void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinRes));
+ SWinKey winRes = {.ts = tsData[i], .groupId = groupIdData[i]};
+ void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey));
if (chIds) {
SArray* chArray = *(SArray**)chIds;
int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
@@ -3048,7 +3077,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) {
taosArrayRemove(chArray, index);
if (taosArrayGetSize(chArray) == 0) {
// pull data is over
- taosHashRemove(pMap, &winRes, sizeof(SWinRes));
+ taosHashRemove(pMap, &winRes, sizeof(SWinKey));
}
}
}
@@ -3128,11 +3157,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
maxTs = TMAX(maxTs, pBlock->info.watermark);
- if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
- pBlock->info.type == STREAM_INVALID) {
+ ASSERT(pBlock->info.type != STREAM_INVERT);
+ if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
pInfo->binfo.pRes->info.type = pBlock->info.type;
} else if (pBlock->info.type == STREAM_CLEAR) {
- SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
+ SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
@@ -3170,7 +3199,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
- SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes));
+ SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey));
doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins);
removeResults(pUpWins, pUpdatedMap);
taosArrayDestroy(pUpWins);
@@ -3385,7 +3414,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->delIndex = 0;
- pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes));
+ pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t));
pOperator->operatorType = pPhyNode->type;
@@ -3720,8 +3749,8 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS
}
if (pWinInfo->win.skey > pStartTs[i]) {
if (pStDeleted && pWinInfo->isOutput) {
- SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId};
- taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
+ SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId};
+ taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey));
pWinInfo->isOutput = false;
}
pWinInfo->win.skey = pStartTs[i];
@@ -3839,8 +3868,8 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo);
taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition));
if (pWinInfo->isOutput) {
- SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId};
- taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
+ SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId};
+ taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey));
pWinInfo->isOutput = false;
}
taosArrayRemove(pInfo->streamAggSup.pCurWins, i);
@@ -3902,8 +3931,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
pCurWin->isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
- SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId};
- code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes));
+ SWinKey value = {.ts = pCurWin->win.skey, .groupId = groupId};
+ code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
@@ -3955,8 +3984,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
int32_t step = 0;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0;
- SResultWindowInfo* pCurWin =
- getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex);
+ SResultWindowInfo* pCurWin = getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex);
if (!pCurWin || pCurWin->pos.pageId == -1) {
// window has been closed.
step = 1;
@@ -3981,9 +4009,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) {
if (pos == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
- pos->groupId = ((SWinRes*)pData)->groupId;
+ pos->groupId = ((SWinKey*)pData)->groupId;
pos->pos = *(SResultRowPosition*)key;
- *(int64_t*)pos->key = ((SWinRes*)pData)->ts;
+ *(int64_t*)pos->key = ((SWinKey*)pData)->ts;
taosArrayPush(pUpdated, &pos);
}
taosArraySort(pUpdated, resultrowComparAsc);
@@ -3999,7 +4027,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It
blockDataEnsureCapacity(pBlock, size);
size_t keyLen = 0;
while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) {
- SWinRes* res = *Ite;
+ SWinKey* res = *Ite;
SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false);
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
@@ -4130,8 +4158,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) {
int32_t size = taosArrayGetSize(pResWins);
for (int32_t i = 0; i < size; i++) {
SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i);
- SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId};
- taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes));
+ SWinKey res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId};
+ taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey));
}
}
@@ -4169,14 +4197,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
- doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, pOperator->exprSupp.numOfExprs, 0,
- pWins);
+ doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX,
+ pOperator->exprSupp.numOfExprs, 0, pWins);
if (IS_FINAL_OP(pInfo)) {
int32_t childIndex = getChildIndex(pBlock);
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex);
SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info;
- doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs,
- 0, NULL);
+ doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX,
+ pChildOp->exprSupp.numOfExprs, 0, NULL);
rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator);
}
taosArrayDestroy(pWins);
@@ -4578,7 +4606,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_
}
int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, uint64_t groupId,
- SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, SHashObj* pSeDeleted) {
+ SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual,
+ SHashObj* pSeDeleted) {
*allEqual = true;
SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex);
for (int32_t i = start; i < rows; ++i) {
@@ -4599,9 +4628,8 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
}
if (pWinInfo->winInfo.win.skey > pTs[i]) {
if (pSeDeleted && pWinInfo->winInfo.isOutput) {
- SWinRes res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId};
- taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res,
- sizeof(SWinRes));
+ SWinKey res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId};
+ taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey));
pWinInfo->winInfo.isOutput = false;
}
pWinInfo->winInfo.win.skey = pTs[i];
@@ -4614,14 +4642,14 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u
return rows - start;
}
-static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock,
- SHashObj* pSeUpdated, SHashObj* pSeDeleted) {
+static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SHashObj* pSeUpdated,
+ SHashObj* pSeDeleted) {
SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pGroupColInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
TSKEY* tsCol = (TSKEY*)pTsColInfo->pData;
bool allEqual = false;
int32_t step = 1;
- uint64_t* gpCol = (uint64_t*) pGroupColInfo->pData;
+ uint64_t* gpCol = (uint64_t*)pGroupColInfo->pData;
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
int32_t winIndex = 0;
SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], gpCol[i], &winIndex);
@@ -4665,13 +4693,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
char* pKeyData = colDataGetData(pKeyColInfo, i);
int32_t winIndex = 0;
bool allEqual = true;
- SStateWindowInfo* pCurWin =
- getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex);
- winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo,
- pSDataBlock->info.rows, i, &allEqual, pStDeleted);
+ SStateWindowInfo* pCurWin = getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex);
+ winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo, pSDataBlock->info.rows,
+ i, &allEqual, pStDeleted);
if (!allEqual) {
- appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey,
- GROUPID_COLUMN_INDEX, &groupId);
+ appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, GROUPID_COLUMN_INDEX,
+ &groupId);
taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition));
deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo);
continue;
@@ -4682,8 +4709,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
pCurWin->winInfo.isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
- SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId};
- code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes));
+ SWinKey value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId};
+ code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index c78ff0756f..9d4010f60e 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -358,7 +358,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
FAIL_SHUFFLE_DISPATCH:
if (pReqs) {
for (int32_t i = 0; i < vgSz; i++) {
- taosArrayDestroy(pReqs[i].data);
+ taosArrayDestroyP(pReqs[i].data, taosMemoryFree);
taosArrayDestroy(pReqs[i].dataLen);
}
taosMemoryFree(pReqs);
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index 06ca26f029..102bad7426 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch)
return 0;
}
-// TODO: handle version
int32_t streamExecForAll(SStreamTask* pTask) {
while (1) {
int32_t batchCnt = 1;
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 5ff700546c..20a2f7d332 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -14,7 +14,7 @@
*/
#include "executor.h"
-#include "tstream.h"
+#include "streamInc.h"
#include "ttimer.h"
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
@@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
- pMeta->path = strdup(path);
+ int32_t len = strlen(path) + 20;
+ char* streamPath = taosMemoryCalloc(1, len);
+ sprintf(streamPath, "%s/%s", path, "stream");
+ pMeta->path = strdup(streamPath);
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) {
goto _err;
}
+ sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints");
+ mkdir(streamPath, 0755);
+ taosMemoryFree(streamPath);
+
if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) {
goto _err;
}
- // open state storage backend
- if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) {
+ if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) {
goto _err;
}
@@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc;
- if (streamLoadTasks(pMeta) < 0) {
- goto _err;
- }
return pMeta;
_err:
if (pMeta->path) taosMemoryFree(pMeta->path);
if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks);
- if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb);
if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb);
+ if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
taosMemoryFree(pMeta);
return NULL;
@@ -67,7 +70,7 @@ _err:
void streamMetaClose(SStreamMeta* pMeta) {
tdbCommit(pMeta->db, &pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
- tdbTbClose(pMeta->pStateDb);
+ tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db);
void* pIter = NULL;
diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c
index 263053778b..0505c3edd6 100644
--- a/source/libs/stream/src/streamRecover.c
+++ b/source/libs/stream/src/streamRecover.c
@@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea
}
int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
+#if 0
void* buf = NULL;
ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
@@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
FAIL:
if (buf) taosMemoryFree(buf);
return -1;
+#endif
return 0;
}
int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
+#if 0
void* pVal = NULL;
int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
@@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) {
pTask->nextCheckId = aggCheckpoint.checkpointId + 1;
pTask->checkpointInfo = aggCheckpoint.checkpointVer;
-
+#endif
return 0;
}
diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c
new file mode 100644
index 0000000000..dfd6f012cc
--- /dev/null
+++ b/source/libs/stream/src/streamState.c
@@ -0,0 +1,205 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "executor.h"
+#include "streamInc.h"
+#include "tcommon.h"
+#include "ttimer.h"
+
+SStreamState* streamStateOpen(char* path, SStreamTask* pTask) {
+ SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
+ if (pState == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
+ char statePath[300];
+ sprintf(statePath, "%s/%d", path, pTask->taskId);
+ if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) {
+ goto _err;
+ }
+
+ // open state storage backend
+ if (tdbTbOpen("state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pStateDb) < 0) {
+ goto _err;
+ }
+
+ if (streamStateBegin(pState) < 0) {
+ goto _err;
+ }
+
+ pState->pOwner = pTask;
+
+ return pState;
+
+_err:
+ if (pState->pStateDb) tdbTbClose(pState->pStateDb);
+ if (pState->db) tdbClose(pState->db);
+ taosMemoryFree(pState);
+ return NULL;
+}
+
+void streamStateClose(SStreamState* pState) {
+ tdbCommit(pState->db, &pState->txn);
+ tdbTbClose(pState->pStateDb);
+ tdbClose(pState->db);
+
+ taosMemoryFree(pState);
+}
+
+int32_t streamStateBegin(SStreamState* pState) {
+ if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
+ 0) {
+ return -1;
+ }
+
+ if (tdbBegin(pState->db, &pState->txn) < 0) {
+ tdbTxnClose(&pState->txn);
+ return -1;
+ }
+ return 0;
+}
+
+int32_t streamStateCommit(SStreamState* pState) {
+ if (tdbCommit(pState->db, &pState->txn) < 0) {
+ return -1;
+ }
+ memset(&pState->txn, 0, sizeof(TXN));
+ if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
+ 0) {
+ return -1;
+ }
+ if (tdbBegin(pState->db, &pState->txn) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+int32_t streamStateAbort(SStreamState* pState) {
+ if (tdbAbort(pState->db, &pState->txn) < 0) {
+ return -1;
+ }
+ memset(&pState->txn, 0, sizeof(TXN));
+ if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
+ 0) {
+ return -1;
+ }
+ if (tdbBegin(pState->db, &pState->txn) < 0) {
+ return -1;
+ }
+ return 0;
+}
+
+int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) {
+ return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn);
+}
+int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
+ return tdbTbGet(pState->pStateDb, key, sizeof(SWinKey), pVal, pVLen);
+}
+
+int32_t streamStateDel(SStreamState* pState, const SWinKey* key) {
+ return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn);
+}
+
+SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) {
+ SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
+ if (pCur == NULL) return NULL;
+ tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL);
+
+ int32_t c;
+ tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c);
+ if (c != 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+ return pCur;
+}
+
+int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
+ const SWinKey* pKTmp = NULL;
+ int32_t kLen;
+ if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) {
+ return -1;
+ }
+ *pKey = *pKTmp;
+ return 0;
+}
+
+int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) {
+ //
+ return tdbTbcMoveToFirst(pCur->pCur);
+}
+
+int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) {
+ //
+ return tdbTbcMoveToLast(pCur->pCur);
+}
+
+SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
+ SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
+ if (pCur == NULL) {
+ return NULL;
+ }
+
+ int32_t c;
+ if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+ if (c > 0) return pCur;
+
+ if (tdbTbcMoveToNext(pCur->pCur) < 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+
+ return pCur;
+}
+
+SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key) {
+ SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
+ if (pCur == NULL) {
+ return NULL;
+ }
+
+ int32_t c;
+ if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+ if (c < 0) return pCur;
+
+ if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
+ taosMemoryFree(pCur);
+ return NULL;
+ }
+
+ return pCur;
+}
+
+int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) {
+ //
+ return tdbTbcMoveToNext(pCur->pCur);
+}
+
+int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) {
+ //
+ return tdbTbcMoveToPrev(pCur->pCur);
+}
+void streamStateFreeCur(SStreamStateCur* pCur) {
+ tdbTbcClose(pCur->pCur);
+ taosMemoryFree(pCur);
+}
+
+void streamFreeVal(void* val) { tdbFree(val); }
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index 4009a47c65..ce5917de29 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) {
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
}
+
+ if (pTask->pState) streamStateClose(pTask->pState);
+
taosMemoryFree(pTask);
}
diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c
index 447db76136..6dd9481b95 100644
--- a/source/libs/transport/src/transSvr.c
+++ b/source/libs/transport/src/transSvr.c
@@ -276,7 +276,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
while (transReadComplete(pBuf)) {
tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn);
if (true == pBuf->invalid || false == uvHandleReq(conn)) {
- tError("%s conn %p read invalid packet", transLabel(pTransInst), conn);
+ tError("%s conn %p read invalid packet, dst: %s, srv: %s", transLabel(pTransInst), conn, conn->dst, conn->src);
destroyConn(conn, true);
return;
}
diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c
index a8da680910..93ced912f8 100644
--- a/source/libs/wal/src/walMeta.c
+++ b/source/libs/wal/src/walMeta.c
@@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
if (found == NULL) {
// file corrupted, no complete log
// TODO delete and search in previous files
- ASSERT(0);
+ /*ASSERT(0);*/
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1;
}
@@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
int code = walSaveMeta(pWal);
if (code < 0) {
- taosArrayDestroy(actualLog);
return -1;
}
}
diff --git a/tests/script/tsim/db/basic2.sim b/tests/script/tsim/db/basic2.sim
index b7ac0b5edd..4f0ba4a13c 100644
--- a/tests/script/tsim/db/basic2.sim
+++ b/tests/script/tsim/db/basic2.sim
@@ -4,7 +4,7 @@ system sh/exec.sh -n dnode1 -s start
sql connect
print =============== conflict stb
-sql create database db vgroups 1;
+sql create database db vgroups 4;
sql use db;
sql create table stb (ts timestamp, i int) tags (j int);
sql_error create table stb using stb tags (1);
@@ -16,6 +16,9 @@ sql_error create table ctb (ts timestamp, i int) tags (j int);
sql create table ntb (ts timestamp, i int);
sql_error create table ntb (ts timestamp, i int) tags (j int);
+sql drop table ntb
+sql create table ntb (ts timestamp, i int) tags (j int);
+
sql drop database db
print =============== create database d1