diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index e593e6d62b..2d9b00eee7 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 2af2222 + GIT_TAG 833b721 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/en/14-reference/02-rest-api/02-rest-api.mdx b/docs/en/14-reference/02-rest-api/02-rest-api.mdx index 8d4186a36b..74ba78b7fc 100644 --- a/docs/en/14-reference/02-rest-api/02-rest-api.mdx +++ b/docs/en/14-reference/02-rest-api/02-rest-api.mdx @@ -10,7 +10,7 @@ One difference from the native connector is that the REST interface is stateless ## Installation -The REST interface does not rely on any TDengine native library, so the client application does not need to install any TDengine libraries. The client application's development language only needs to support the HTTP protocol. +The REST interface does not rely on any TDengine native library, so the client application does not need to install any TDengine libraries. The client application's development language only needs to support the HTTP protocol. The REST interface is provided by [taosAdapter](../taosadapter), to use REST interface you need to make sure `taosAdapter` is running properly. ## Verification diff --git a/docs/en/14-reference/03-connector/cpp.mdx b/docs/en/14-reference/03-connector/03-cpp.mdx similarity index 99% rename from docs/en/14-reference/03-connector/cpp.mdx rename to docs/en/14-reference/03-connector/03-cpp.mdx index 5839ed4af8..02d7df48db 100644 --- a/docs/en/14-reference/03-connector/cpp.mdx +++ b/docs/en/14-reference/03-connector/03-cpp.mdx @@ -1,5 +1,4 @@ --- -sidebar_position: 1 sidebar_label: C/C++ title: C/C++ Connector --- diff --git a/docs/en/14-reference/03-connector/java.mdx b/docs/en/14-reference/03-connector/04-java.mdx similarity index 99% rename from docs/en/14-reference/03-connector/java.mdx rename to docs/en/14-reference/03-connector/04-java.mdx index 39514c37eb..0f977393f1 100644 --- a/docs/en/14-reference/03-connector/java.mdx +++ b/docs/en/14-reference/03-connector/04-java.mdx @@ -1,6 +1,5 @@ --- toc_max_heading_level: 4 -sidebar_position: 2 sidebar_label: Java title: TDengine Java Connector description: The TDengine Java Connector is implemented on the standard JDBC API and provides native and REST connectors. diff --git a/docs/en/14-reference/03-connector/go.mdx b/docs/en/14-reference/03-connector/05-go.mdx similarity index 99% rename from docs/en/14-reference/03-connector/go.mdx rename to docs/en/14-reference/03-connector/05-go.mdx index 2926355040..00e3bc1bc3 100644 --- a/docs/en/14-reference/03-connector/go.mdx +++ b/docs/en/14-reference/03-connector/05-go.mdx @@ -1,6 +1,5 @@ --- toc_max_heading_level: 4 -sidebar_position: 4 sidebar_label: Go title: TDengine Go Connector --- diff --git a/docs/en/14-reference/03-connector/rust.mdx b/docs/en/14-reference/03-connector/06-rust.mdx similarity index 99% rename from docs/en/14-reference/03-connector/rust.mdx rename to docs/en/14-reference/03-connector/06-rust.mdx index e9b16ba94d..1184c98a28 100644 --- a/docs/en/14-reference/03-connector/rust.mdx +++ b/docs/en/14-reference/03-connector/06-rust.mdx @@ -1,6 +1,5 @@ --- toc_max_heading_level: 4 -sidebar_position: 5 sidebar_label: Rust title: TDengine Rust Connector --- diff --git a/docs/en/14-reference/03-connector/python.mdx b/docs/en/14-reference/03-connector/07-python.mdx similarity index 99% rename from docs/en/14-reference/03-connector/python.mdx rename to docs/en/14-reference/03-connector/07-python.mdx index e183bbee22..fc95033baa 100644 --- a/docs/en/14-reference/03-connector/python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -1,5 +1,4 @@ --- -sidebar_position: 3 sidebar_label: Python title: TDengine Python Connector description: "taospy is the official Python connector for TDengine. taospy provides a rich API that makes it easy for Python applications to use TDengine. tasopy wraps both the native and REST interfaces of TDengine, corresponding to the two submodules of tasopy: taos and taosrest. In addition to wrapping the native and REST interfaces, taospy also provides a programming interface that conforms to the Python Data Access Specification (PEP 249), making it easy to integrate taospy with many third-party tools, such as SQLAlchemy and pandas." diff --git a/docs/en/14-reference/03-connector/node.mdx b/docs/en/14-reference/03-connector/08-node.mdx similarity index 99% rename from docs/en/14-reference/03-connector/node.mdx rename to docs/en/14-reference/03-connector/08-node.mdx index d170044435..f93632b417 100644 --- a/docs/en/14-reference/03-connector/node.mdx +++ b/docs/en/14-reference/03-connector/08-node.mdx @@ -1,6 +1,5 @@ --- toc_max_heading_level: 4 -sidebar_position: 6 sidebar_label: Node.js title: TDengine Node.js Connector --- diff --git a/docs/en/14-reference/03-connector/csharp.mdx b/docs/en/14-reference/03-connector/09-csharp.mdx similarity index 99% rename from docs/en/14-reference/03-connector/csharp.mdx rename to docs/en/14-reference/03-connector/09-csharp.mdx index 388ae49d09..c745b8dd1a 100644 --- a/docs/en/14-reference/03-connector/csharp.mdx +++ b/docs/en/14-reference/03-connector/09-csharp.mdx @@ -1,6 +1,5 @@ --- toc_max_heading_level: 4 -sidebar_position: 7 sidebar_label: C# title: C# Connector --- diff --git a/docs/en/14-reference/03-connector/php.mdx b/docs/en/14-reference/03-connector/10-php.mdx similarity index 98% rename from docs/en/14-reference/03-connector/php.mdx rename to docs/en/14-reference/03-connector/10-php.mdx index 08cf34495f..820f703759 100644 --- a/docs/en/14-reference/03-connector/php.mdx +++ b/docs/en/14-reference/03-connector/10-php.mdx @@ -1,6 +1,5 @@ --- -sidebar_position: 1 -sidebar_label: PHP (community contribution) +sidebar_label: PHP title: PHP Connector --- diff --git a/docs/en/14-reference/03-connector/03-connector.mdx b/docs/en/14-reference/03-connector/index.mdx similarity index 100% rename from docs/en/14-reference/03-connector/03-connector.mdx rename to docs/en/14-reference/03-connector/index.mdx diff --git a/docs/zh/08-connector/02-rest-api.mdx b/docs/zh/08-connector/02-rest-api.mdx index 4b9171c07d..e254244657 100644 --- a/docs/zh/08-connector/02-rest-api.mdx +++ b/docs/zh/08-connector/02-rest-api.mdx @@ -1,5 +1,7 @@ --- title: REST API +sidebar_label: REST API +description: 详细介绍 TDengine 提供的 RESTful API. --- 为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API,即 REST API。为最大程度降低学习成本,不同于其他数据库 REST API 的设计方法,TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST 连接器的使用参见 [视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。 @@ -10,7 +12,7 @@ title: REST API ## 安装 -RESTful 接口不依赖于任何 TDengine 的库,因此客户端不需要安装任何 TDengine 的库,只要客户端的开发语言支持 HTTP 协议即可。 +RESTful 接口不依赖于任何 TDengine 的库,因此客户端不需要安装任何 TDengine 的库,只要客户端的开发语言支持 HTTP 协议即可。TDengine 的 RESTful API 由 [taosAdapter](../../reference/taosadapter) 提供,在使用 RESTful API 之前需要确保 `taosAdapter` 正常运行。 ## 验证 diff --git a/docs/zh/08-connector/03-cpp.mdx b/docs/zh/08-connector/03-cpp.mdx index d27eeb7dfb..c0bd33f129 100644 --- a/docs/zh/08-connector/03-cpp.mdx +++ b/docs/zh/08-connector/03-cpp.mdx @@ -1,5 +1,4 @@ --- -sidebar_position: 1 sidebar_label: C/C++ title: C/C++ Connector --- diff --git a/docs/zh/08-connector/04-java.mdx b/docs/zh/08-connector/04-java.mdx index 20d2e4fabd..6b1715f8c6 100644 --- a/docs/zh/08-connector/04-java.mdx +++ b/docs/zh/08-connector/04-java.mdx @@ -1,6 +1,5 @@ --- toc_max_heading_level: 4 -sidebar_position: 2 sidebar_label: Java title: TDengine Java Connector description: TDengine Java 连接器基于标准 JDBC API 实现, 并提供原生连接与 REST连接两种连接器。 diff --git a/docs/zh/08-connector/06-rust.mdx b/docs/zh/08-connector/06-rust.mdx index 187e2f0b33..26f53c82d6 100644 --- a/docs/zh/08-connector/06-rust.mdx +++ b/docs/zh/08-connector/06-rust.mdx @@ -1,6 +1,5 @@ --- toc_max_heading_level: 4 -sidebar_position: 5 sidebar_label: Rust title: TDengine Rust Connector --- diff --git a/docs/zh/08-connector/07-python.mdx b/docs/zh/08-connector/07-python.mdx index 88a5d4f84d..0242486d3b 100644 --- a/docs/zh/08-connector/07-python.mdx +++ b/docs/zh/08-connector/07-python.mdx @@ -1,5 +1,4 @@ --- -sidebar_position: 3 sidebar_label: Python title: TDengine Python Connector description: "taospy 是 TDengine 的官方 Python 连接器。taospy 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。tasopy 对 TDengine 的原生接口和 REST 接口都进行了封装, 分别对应 tasopy 的两个子模块:tasos 和 taosrest。除了对原生接口和 REST 接口的封装,taospy 还提供了符合 Python 数据访问规范(PEP 249)的编程接口。这使得 taospy 和很多第三方工具集成变得简单,比如 SQLAlchemy 和 pandas" diff --git a/docs/zh/08-connector/08-node.mdx b/docs/zh/08-connector/08-node.mdx index 63d690e554..167ae069d6 100644 --- a/docs/zh/08-connector/08-node.mdx +++ b/docs/zh/08-connector/08-node.mdx @@ -1,6 +1,5 @@ --- toc_max_heading_level: 4 -sidebar_position: 6 sidebar_label: Node.js title: TDengine Node.js Connector --- diff --git a/docs/zh/08-connector/09-csharp.mdx b/docs/zh/08-connector/09-csharp.mdx index 8214717583..be27bfb685 100644 --- a/docs/zh/08-connector/09-csharp.mdx +++ b/docs/zh/08-connector/09-csharp.mdx @@ -1,6 +1,5 @@ --- toc_max_heading_level: 4 -sidebar_position: 7 sidebar_label: C# title: C# Connector --- diff --git a/docs/zh/08-connector/10-php.mdx b/docs/zh/08-connector/10-php.mdx index 53611c0274..5e32c709de 100644 --- a/docs/zh/08-connector/10-php.mdx +++ b/docs/zh/08-connector/10-php.mdx @@ -1,6 +1,5 @@ --- -sidebar_position: 1 -sidebar_label: PHP(社区贡献) +sidebar_label: PHP title: PHP Connector --- diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 55556f21a1..1c9d11b755 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -96,7 +96,7 @@ int32_t create_stream() { taos_free_result(pRes); pRes = taos_query(pConn, - "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)"); + "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, avg(k) from st1 partition by tbname interval(10s)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tcommon.h b/include/common/tcommon.h index dbe020f7ec..fb59d8b9a0 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -44,6 +44,30 @@ enum { ) // clang-format on +typedef struct { + TSKEY ts; + uint64_t groupId; +} SWinKey; + +static inline int SWinKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { + SWinKey* pWin1 = (SWinKey*)pKey1; + SWinKey* pWin2 = (SWinKey*)pKey2; + + if (pWin1->groupId > pWin2->groupId) { + return 1; + } else if (pWin1->groupId < pWin2->groupId) { + return -1; + } + + if (pWin1->ts > pWin2->ts) { + return 1; + } else if (pWin1->ts < pWin2->ts) { + return -1; + } + + return 0; +} + enum { TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__POLL_RSP, diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 16b259cf59..2c27509008 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -551,16 +551,17 @@ typedef struct { } SStreamStateCur; #if 1 -int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen); -int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen); -int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen); +int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); +int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); +int32_t streamStateDel(SStreamState* pState, const SWinKey* key); +void streamFreeVal(void* val); -SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen); -SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen); -SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key); +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key); void streamStateFreeCur(SStreamStateCur* pCur); -int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen); +int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1456c6c067..3ff59ac2c0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -652,27 +652,33 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { // expand executor if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + if (pTask->pState == NULL) { + return -1; + } + SReadHandle handle = { .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, + .pStateBackend = pTask->pState, }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.executor); } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + if (pTask->pState == NULL) { + return -1; + } SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), + .pStateBackend = pTask->pState, }; pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); ASSERT(pTask->exec.executor); } - pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); - if (pTask->pState == NULL) { - return -1; - } - // sink /*pTask->ahandle = pTq->pVnode;*/ if (pTask->outputType == TASK_OUTPUT__SMA) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 599f86f4fa..3eb382522a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1281,6 +1281,42 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; +#if 0 + SStreamState* pState = pTaskInfo->streamInfo.pState; + if (pState) { + printf(">>>>>>>> stream write backend\n"); + SWinKey key = { + .ts = 1, + .groupId = 2, + }; + char tmp[100] = "abcdefg1"; + if (streamStatePut(pState, &key, &tmp, strlen(tmp) + 1) < 0) { + ASSERT(0); + } + + key.ts = 2; + char tmp2[100] = "abcdefg2"; + if (streamStatePut(pState, &key, &tmp2, strlen(tmp2) + 1) < 0) { + ASSERT(0); + } + + key.groupId = 5; + key.ts = 1; + char tmp3[100] = "abcdefg3"; + if (streamStatePut(pState, &key, &tmp3, strlen(tmp3) + 1) < 0) { + ASSERT(0); + } + + char* val2 = NULL; + int32_t sz; + if (streamStateGet(pState, &key, (void**)&val2, &sz) < 0) { + ASSERT(0); + } + printf("stream read %s %d\n", val2, sz); + streamFreeVal(val2); + } +#endif + qDebug("stream scan called"); if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { while (1) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3d54b791a8..ba0e534cc6 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -15,6 +15,7 @@ #include "executorimpl.h" #include "function.h" #include "functionMgt.h" +#include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" #include "tfill.h" @@ -27,11 +28,6 @@ typedef enum SResultTsInterpType { #define IS_FINAL_OP(op) ((op)->isFinal) -typedef struct SWinRes { - TSKEY ts; - uint64_t groupId; -} SWinRes; - typedef struct SPullWindowInfo { STimeWindow window; uint64_t groupId; @@ -641,7 +637,8 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, + numOfExprs); if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { closeResultRow(pr); @@ -812,7 +809,7 @@ static int32_t savePullWindow(SPullWindowInfo* pPullInfo, SArray* pPullWins) { int32_t compareResKey(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; SResKeyPos* pos = taosArrayGetP(res, index); - SWinRes* pData = (SWinRes*)pKey; + SWinKey* pData = (SWinKey*)pKey; if (pData->ts == *(int64_t*)pos->key) { if (pData->groupId > pos->groupId) { return 1; @@ -828,7 +825,7 @@ int32_t compareResKey(void* pKey, void* data, int32_t index) { static int32_t saveResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SArray* pUpdated) { int32_t size = taosArrayGetSize(pUpdated); - SWinRes data = {.ts = ts, .groupId = groupId}; + SWinKey data = {.ts = ts, .groupId = groupId}; int32_t index = binarySearchCom(pUpdated, size, &data, TSDB_ORDER_DESC, compareResKey); if (index == -1) { index = 0; @@ -861,8 +858,8 @@ static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_ newPos->groupId = groupId; newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; *(int64_t*)newPos->key = ts; - SWinRes key = {.ts = ts, .groupId = groupId}; - if (taosHashPut(pUpdatedMap, &key, sizeof(SWinRes), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { + SWinKey key = {.ts = ts, .groupId = groupId}; + if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { taosMemoryFree(newPos); } return TSDB_CODE_SUCCESS; @@ -879,20 +876,20 @@ static int32_t saveResultRow(SResultRow* result, uint64_t groupId, SArray* pUpda static void removeResults(SArray* pWins, SHashObj* pUpdatedMap) { int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { - SWinRes* pW = taosArrayGet(pWins, i); - taosHashRemove(pUpdatedMap, pW, sizeof(SWinRes)); + SWinKey* pW = taosArrayGet(pWins, i); + taosHashRemove(pUpdatedMap, pW, sizeof(SWinKey)); } } int64_t getWinReskey(void* data, int32_t index) { SArray* res = (SArray*)data; - SWinRes* pos = taosArrayGet(res, index); + SWinKey* pos = taosArrayGet(res, index); return pos->ts; } int32_t compareWinRes(void* pKey, void* data, int32_t index) { SArray* res = (SArray*)data; - SWinRes* pos = taosArrayGetP(res, index); + SWinKey* pos = taosArrayGetP(res, index); SResKeyPos* pData = (SResKeyPos*)pKey; if (*(int64_t*)pData->key == pos->ts) { if (pData->groupId > pos->groupId) { @@ -985,8 +982,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - pBlock->info.rows, numOfOutput); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + numOfOutput); } doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -1025,8 +1022,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, - pBlock->info.rows, numOfOutput); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); } @@ -1214,8 +1211,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + pBlock->info.rows, numOfOutput); } static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { @@ -1418,7 +1415,7 @@ void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, TSDB_ORDER_ASC); doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); if (pUpWins) { - SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]}; + SWinKey winRes = {.ts = win.skey, .groupId = groupIds[i]}; taosArrayPush(pUpWins, &winRes); } } @@ -1445,7 +1442,7 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* uint64_t winGpId = pGpDatas ? pGpDatas[startPos] : pBlock->info.groupId; bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TSKEY), winGpId, numOfOutput); if (pUpWins && res) { - SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; + SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; taosArrayPush(pUpWins, &winRes); } getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); @@ -1484,11 +1481,11 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, STimeWindow win; win.skey = ts; win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - SWinRes winRe = { + SWinKey winRe = { .ts = win.skey, .groupId = groupId, }; - void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinRes)); + void* chIds = taosHashGet(pPullDataMap, &winRe, sizeof(SWinKey)); if (isCloseWindow(&win, pSup)) { if (chIds && pPullDataMap) { SArray* chAy = *(SArray**)chIds; @@ -1555,7 +1552,7 @@ static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlo SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); for (int32_t i = *index; i < size; i++) { - SWinRes* pWin = taosArrayGet(pWins, i); + SWinKey* pWin = taosArrayGet(pWins, i); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false); colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&pWin->groupId, false); pBlock->info.rows++; @@ -1595,6 +1592,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + + SStreamState* pState = pTaskInfo->streamInfo.pState; + while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1639,6 +1639,35 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdatedMap); } +#if 0 + if (pState) { + printf(">>>>>>>> stream read backend\n"); + SWinKey key = { + .ts = 1, + .groupId = 2, + }; + char* val = NULL; + int32_t sz; + if (streamStateGet(pState, &key, (void**)&val, &sz) < 0) { + ASSERT(0); + } + printf("stream read %s %d\n", val, sz); + streamFreeVal(val); + + SStreamStateCur* pCur = streamStateGetCur(pState, &key); + ASSERT(pCur); + while (streamStateCurNext(pState, pCur) == 0) { + SWinKey key1; + const void* val1; + if (streamStateGetKVByCur(pCur, &key1, &val1, &sz) < 0) { + break; + } + printf("stream iter key groupId:%d ts:%d, value %s %d\n", key1.groupId, key1.ts, val1, sz); + } + streamStateFreeCur(pCur); + } +#endif + pOperator->status = OP_RES_TO_RETURN; closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); @@ -1857,7 +1886,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* } } pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); - pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); + pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delIndex = 0; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -1958,8 +1987,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, - pRowSup->numOfRows, pBlock->info.rows, numOfOutput); + doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + pBlock->info.rows, numOfOutput); } static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { @@ -2811,7 +2840,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr return; } for (int32_t i = 0; i < size; i++) { - SWinRes* pWinRes = taosArrayGet(pWinArray, i); + SWinKey* pWinRes = taosArrayGet(pWinArray, i); SResultRow* pCurResult = NULL; STimeWindow ParentWin = {.skey = pWinRes->ts, .ekey = pWinRes->ts + 1}; setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &ParentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx, @@ -2854,12 +2883,12 @@ int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* return getNextQualifiedWindow(pInterval, pNextWin, pBlockInfo, tsCols, prevEndPos, TSDB_ORDER_ASC); } -void addPullWindow(SHashObj* pMap, SWinRes* pWinRes, int32_t size) { +void addPullWindow(SHashObj* pMap, SWinKey* pWinRes, int32_t size) { SArray* childIds = taosArrayInit(8, sizeof(int32_t)); for (int32_t i = 0; i < size; i++) { taosArrayPush(childIds, &i); } - taosHashPut(pMap, pWinRes, sizeof(SWinRes), &childIds, sizeof(void*)); + taosHashPut(pMap, pWinRes, sizeof(SWinKey), &childIds, sizeof(void*)); } static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } @@ -2906,11 +2935,11 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc } if (IS_FINAL_OP(pInfo) && isClosed && pInfo->pChildren) { bool ignore = true; - SWinRes winRes = { + SWinKey winRes = { .ts = nextWin.skey, .groupId = tableGroupId, }; - void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinRes)); + void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (isDeletedWindow(&nextWin, tableGroupId, &pInfo->aggSup) && !chIds) { SPullWindowInfo pull = {.window = nextWin, .groupId = tableGroupId}; // add pull data request @@ -3038,8 +3067,8 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; int32_t chId = getChildIndex(pBlock); for (int32_t i = 0; i < pBlock->info.rows; i++) { - SWinRes winRes = {.ts = tsData[i], .groupId = groupIdData[i]}; - void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinRes)); + SWinKey winRes = {.ts = tsData[i], .groupId = groupIdData[i]}; + void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey)); if (chIds) { SArray* chArray = *(SArray**)chIds; int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); @@ -3048,7 +3077,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap) { taosArrayRemove(chArray, index); if (taosArrayGetSize(chArray) == 0) { // pull data is over - taosHashRemove(pMap, &winRes, sizeof(SWinRes)); + taosHashRemove(pMap, &winRes, sizeof(SWinKey)); } } } @@ -3132,7 +3161,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_CLEAR) { - SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); + SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); @@ -3170,7 +3199,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { - SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); + SArray* pUpWins = taosArrayInit(8, sizeof(SWinKey)); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); removeResults(pUpWins, pUpdatedMap); taosArrayDestroy(pUpWins); @@ -3385,7 +3414,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->delIndex = 0; - pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); + pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pOperator->operatorType = pPhyNode->type; @@ -3720,8 +3749,8 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TS } if (pWinInfo->win.skey > pStartTs[i]) { if (pStDeleted && pWinInfo->isOutput) { - SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId}; - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); + SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId}; + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); pWinInfo->isOutput = false; } pWinInfo->win.skey = pStartTs[i]; @@ -3839,8 +3868,8 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex, compactFunctions(pSup->pCtx, pInfo->pDummyCtx, numOfOutput, pTaskInfo); taosHashRemove(pStUpdated, &pWinInfo->pos, sizeof(SResultRowPosition)); if (pWinInfo->isOutput) { - SWinRes res = {.ts = pWinInfo->win.skey, .groupId = groupId}; - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); + SWinKey res = {.ts = pWinInfo->win.skey, .groupId = groupId}; + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); pWinInfo->isOutput = false; } taosArrayRemove(pInfo->streamAggSup.pCurWins, i); @@ -3902,8 +3931,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } pCurWin->isClosed = false; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) { - SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId}; - code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes)); + SWinKey value = {.ts = pCurWin->win.skey, .groupId = groupId}; + code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -3955,8 +3984,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, int32_t step = 0; for (int32_t i = 0; i < pBlock->info.rows; i += step) { int32_t winIndex = 0; - SResultWindowInfo* pCurWin = - getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex); + SResultWindowInfo* pCurWin = getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex); if (!pCurWin || pCurWin->pos.pageId == -1) { // window has been closed. step = 1; @@ -3981,9 +4009,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) { if (pos == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - pos->groupId = ((SWinRes*)pData)->groupId; + pos->groupId = ((SWinKey*)pData)->groupId; pos->pos = *(SResultRowPosition*)key; - *(int64_t*)pos->key = ((SWinRes*)pData)->ts; + *(int64_t*)pos->key = ((SWinKey*)pData)->ts; taosArrayPush(pUpdated, &pos); } taosArraySort(pUpdated, resultrowComparAsc); @@ -3999,7 +4027,7 @@ void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** It blockDataEnsureCapacity(pBlock, size); size_t keyLen = 0; while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { - SWinRes* res = *Ite; + SWinKey* res = *Ite; SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); colDataAppend(pTsCol, pBlock->info.rows, (const char*)&res->ts, false); SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); @@ -4130,8 +4158,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SHashObj* pStDeleted) { int32_t size = taosArrayGetSize(pResWins); for (int32_t i = 0; i < size; i++) { SResultWindowInfo* pWinInfo = taosArrayGet(pResWins, i); - SWinRes res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; - taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinRes)); + SWinKey res = {.ts = pWinInfo->win.skey, .groupId = pWinInfo->groupId}; + taosHashPut(pStDeleted, &pWinInfo->pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); } } @@ -4169,14 +4197,14 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); - doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, pOperator->exprSupp.numOfExprs, 0, - pWins); + doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, + pOperator->exprSupp.numOfExprs, 0, pWins); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; - doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs, - 0, NULL); + doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, + pChildOp->exprSupp.numOfExprs, 0, NULL); rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator); } taosArrayDestroy(pWins); @@ -4578,7 +4606,8 @@ SStateWindowInfo* getStateWindow(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_ } int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, uint64_t groupId, - SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, SHashObj* pSeDeleted) { + SColumnInfoData* pKeyCol, int32_t rows, int32_t start, bool* allEqual, + SHashObj* pSeDeleted) { *allEqual = true; SStateWindowInfo* pWinInfo = taosArrayGet(pWinInfos, winIndex); for (int32_t i = start; i < rows; ++i) { @@ -4599,9 +4628,8 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u } if (pWinInfo->winInfo.win.skey > pTs[i]) { if (pSeDeleted && pWinInfo->winInfo.isOutput) { - SWinRes res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId}; - taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res, - sizeof(SWinRes)); + SWinKey res = {.ts = pWinInfo->winInfo.win.skey, .groupId = groupId}; + taosHashPut(pSeDeleted, &pWinInfo->winInfo.pos, sizeof(SResultRowPosition), &res, sizeof(SWinKey)); pWinInfo->winInfo.isOutput = false; } pWinInfo->winInfo.win.skey = pTs[i]; @@ -4614,14 +4642,14 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u return rows - start; } -static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, - SHashObj* pSeUpdated, SHashObj* pSeDeleted) { +static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SHashObj* pSeUpdated, + SHashObj* pSeDeleted) { SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pGroupColInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); TSKEY* tsCol = (TSKEY*)pTsColInfo->pData; bool allEqual = false; int32_t step = 1; - uint64_t* gpCol = (uint64_t*) pGroupColInfo->pData; + uint64_t* gpCol = (uint64_t*)pGroupColInfo->pData; for (int32_t i = 0; i < pBlock->info.rows; i += step) { int32_t winIndex = 0; SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], gpCol[i], &winIndex); @@ -4665,13 +4693,12 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl char* pKeyData = colDataGetData(pKeyColInfo, i); int32_t winIndex = 0; bool allEqual = true; - SStateWindowInfo* pCurWin = - getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex); - winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo, - pSDataBlock->info.rows, i, &allEqual, pStDeleted); + SStateWindowInfo* pCurWin = getStateWindow(pAggSup, tsCols[i], groupId, pKeyData, &pInfo->stateCol, &winIndex); + winRows = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCols, groupId, pKeyColInfo, pSDataBlock->info.rows, + i, &allEqual, pStDeleted); if (!allEqual) { - appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, - GROUPID_COLUMN_INDEX, &groupId); + appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, GROUPID_COLUMN_INDEX, + &groupId); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); continue; @@ -4682,8 +4709,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } pCurWin->winInfo.isClosed = false; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { - SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId}; - code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes)); + SWinKey value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId}; + code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey)); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c78ff0756f..9d4010f60e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -358,7 +358,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat FAIL_SHUFFLE_DISPATCH: if (pReqs) { for (int32_t i = 0; i < vgSz; i++) { - taosArrayDestroy(pReqs[i].data); + taosArrayDestroyP(pReqs[i].data, taosMemoryFree); taosArrayDestroy(pReqs[i].dataLen); } taosMemoryFree(pReqs); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 6ccc90fa51..dfd6f012cc 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -15,6 +15,7 @@ #include "executor.h" #include "streamInc.h" +#include "tcommon.h" #include "ttimer.h" SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { @@ -23,14 +24,18 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - char statePath[200]; + char statePath[300]; sprintf(statePath, "%s/%d", path, pTask->taskId); - if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) { + if (tdbOpen(statePath, 4096, 256, &pState->db) < 0) { goto _err; } // open state storage backend - if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) { + if (tdbTbOpen("state.db", sizeof(SWinKey), -1, SWinKeyCmpr, pState->db, &pState->pStateDb) < 0) { + goto _err; + } + + if (streamStateBegin(pState) < 0) { goto _err; } @@ -60,6 +65,7 @@ int32_t streamStateBegin(SStreamState* pState) { } if (tdbBegin(pState->db, &pState->txn) < 0) { + tdbTxnClose(&pState->txn); return -1; } return 0; @@ -95,33 +101,39 @@ int32_t streamStateAbort(SStreamState* pState) { return 0; } -int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) { - return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn); +int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { + return tdbTbUpsert(pState->pStateDb, key, sizeof(SWinKey), value, vLen, &pState->txn); } -int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) { - return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen); +int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { + return tdbTbGet(pState->pStateDb, key, sizeof(SWinKey), pVal, pVLen); } -int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) { - return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn); +int32_t streamStateDel(SStreamState* pState, const SWinKey* key) { + return tdbTbDelete(pState->pStateDb, key, sizeof(SWinKey), &pState->txn); } -SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) { +SStreamStateCur* streamStateGetCur(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); int32_t c; - tdbTbcMoveTo(pCur->pCur, key, kLen, &c); + tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c); if (c != 0) { taosMemoryFree(pCur); return NULL; } - return 0; + return pCur; } -int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) { - return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen); +int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { + const SWinKey* pKTmp = NULL; + int32_t kLen; + if (tdbTbcGet(pCur->pCur, (const void**)&pKTmp, &kLen, pVal, pVLen) < 0) { + return -1; + } + *pKey = *pKTmp; + return 0; } int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { @@ -134,14 +146,14 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { return tdbTbcMoveToLast(pCur->pCur); } -SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) { +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } int32_t c; - if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { taosMemoryFree(pCur); return NULL; } @@ -155,14 +167,14 @@ SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, i return pCur; } -SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) { +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const SWinKey* key) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) { return NULL; } int32_t c; - if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + if (tdbTbcMoveTo(pCur->pCur, key, sizeof(SWinKey), &c) < 0) { taosMemoryFree(pCur); return NULL; } @@ -185,3 +197,9 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { // return tdbTbcMoveToPrev(pCur->pCur); } +void streamStateFreeCur(SStreamStateCur* pCur) { + tdbTbcClose(pCur->pCur); + taosMemoryFree(pCur); +} + +void streamFreeVal(void* val) { tdbFree(val); }