From 2e582155e6927b152eed3808cda5325dbdb613c5 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 12 Jun 2024 03:07:50 +0000 Subject: [PATCH 01/29] Update the libuv version to prevent compilation errors in higher versions of GCC --- cmake/libuv_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/libuv_CMakeLists.txt.in b/cmake/libuv_CMakeLists.txt.in index 9c48ddefef..dfca7ee018 100644 --- a/cmake/libuv_CMakeLists.txt.in +++ b/cmake/libuv_CMakeLists.txt.in @@ -2,7 +2,7 @@ # libuv ExternalProject_Add(libuv GIT_REPOSITORY https://github.com/libuv/libuv.git - GIT_TAG v1.44.2 + GIT_TAG v1.47.0 SOURCE_DIR "${TD_CONTRIB_DIR}/libuv" BINARY_DIR "${TD_CONTRIB_DIR}/libuv" CONFIGURE_COMMAND "" From 89b97eff60104caf3c8b4be30fcb2f5c4f70c767 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 12 Jun 2024 07:22:40 +0000 Subject: [PATCH 02/29] fix compile error --- cmake/libuv_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/libuv_CMakeLists.txt.in b/cmake/libuv_CMakeLists.txt.in index dfca7ee018..3e259024b2 100644 --- a/cmake/libuv_CMakeLists.txt.in +++ b/cmake/libuv_CMakeLists.txt.in @@ -2,7 +2,7 @@ # libuv ExternalProject_Add(libuv GIT_REPOSITORY https://github.com/libuv/libuv.git - GIT_TAG v1.47.0 + GIT_TAG v1.46.0 SOURCE_DIR "${TD_CONTRIB_DIR}/libuv" BINARY_DIR "${TD_CONTRIB_DIR}/libuv" CONFIGURE_COMMAND "" From 6ff106cd0af1c93613571e02461a4dfc36b826e3 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 12 Jun 2024 08:39:35 +0000 Subject: [PATCH 03/29] fix compile error --- cmake/libuv_CMakeLists.txt.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/libuv_CMakeLists.txt.in b/cmake/libuv_CMakeLists.txt.in index 3e259024b2..673c771fb0 100644 --- a/cmake/libuv_CMakeLists.txt.in +++ b/cmake/libuv_CMakeLists.txt.in @@ -2,7 +2,7 @@ # libuv ExternalProject_Add(libuv GIT_REPOSITORY https://github.com/libuv/libuv.git - GIT_TAG v1.46.0 + GIT_TAG v1.48.0 SOURCE_DIR "${TD_CONTRIB_DIR}/libuv" BINARY_DIR "${TD_CONTRIB_DIR}/libuv" CONFIGURE_COMMAND "" From 95a6cbf8e082db7cbd14dcc623fda221084eb7f9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 12 Jun 2024 18:56:29 +0800 Subject: [PATCH 04/29] enh: support get origin string of taos errors --- include/util/taoserror.h | 7 +++++++ source/util/src/terror.c | 21 ++++++++++----------- source/util/test/CMakeLists.txt | 10 +++++++++- source/util/test/terrorTest.cpp | 29 +++++++++++++++++++++++++++++ 4 files changed, 55 insertions(+), 12 deletions(-) create mode 100644 source/util/test/terrorTest.cpp diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8f8434dfc1..63b733a337 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -24,6 +24,12 @@ extern "C" { // clang-format off +typedef struct { + int32_t val; + const char* str; + const char* origin; +} STaosError; + #define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code)))) #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) @@ -38,6 +44,7 @@ const char* terrstr(); char* taosGetErrMsgReturn(); char* taosGetErrMsg(); int32_t* taosGetErrno(); +int32_t taosGetErrSize(); #define terrno (*taosGetErrno()) #define terrMsg (taosGetErrMsg()) diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0f594af0e9..f2ae43bc94 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -21,10 +21,10 @@ #define TAOS_ERROR_C -typedef struct { - int32_t val; - const char* str; -} STaosError; +// typedef struct { +// int32_t val; +// const char* str; +// } STaosError; static threadlocal int32_t tsErrno; static threadlocal char tsErrMsgDetail[ERR_MSG_LEN] = {0}; @@ -35,7 +35,9 @@ char* taosGetErrMsg() { return tsErrMsgDetail; } char* taosGetErrMsgReturn() { return tsErrMsgReturn; } #ifdef TAOS_ERROR_C -#define TAOS_DEFINE_ERROR(name, msg) {.val = (name), .str = (msg)}, +#define TAOS_DEFINE_ERROR(name, msg) {.val = (name), .str = (msg), .origin = #name}, +STaosError errors[] = { + TAOS_DEFINE_ERROR(TSDB_CODE_SUCCESS, "success") #else #define TAOS_DEFINE_ERROR(name, mod, code, msg) static const int32_t name = TAOS_DEF_ERROR_CODE(mod, code); #endif @@ -44,11 +46,6 @@ char* taosGetErrMsgReturn() { return tsErrMsgReturn; } #define TAOS_SUCCEEDED(err) ((err) >= 0) #define TAOS_FAILED(err) ((err) < 0) -#ifdef TAOS_ERROR_C -STaosError errors[] = { - {.val = 0, .str = "success"}, -#endif - // rpc TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") @@ -784,7 +781,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory") -#ifdef TAOS_ERROR_C +#if defined(TAOS_ERROR_INFO) || defined(TAOS_ERROR_C) }; #endif @@ -837,3 +834,5 @@ const char* tstrerror(int32_t err) { } const char* terrstr() { return tstrerror(terrno); } + +int32_t taosGetErrSize() { return sizeof(errors)/sizeof(errors[0]); } diff --git a/source/util/test/CMakeLists.txt b/source/util/test/CMakeLists.txt index e8e3348343..89978fd5aa 100644 --- a/source/util/test/CMakeLists.txt +++ b/source/util/test/CMakeLists.txt @@ -123,4 +123,12 @@ add_test( #add_test( # NAME decompressTest # COMMAND decompressTest -#) \ No newline at end of file +#) + +# terrorTest +add_executable(terrorTest "terrorTest.cpp") +target_link_libraries(terrorTest os util common gtest_main) +add_test( + NAME terrorTest + COMMAND terrorTest +) \ No newline at end of file diff --git a/source/util/test/terrorTest.cpp b/source/util/test/terrorTest.cpp new file mode 100644 index 0000000000..db2f9641ed --- /dev/null +++ b/source/util/test/terrorTest.cpp @@ -0,0 +1,29 @@ +#include +#include + +#define TAOS_ERROR_INFO + +#include +#include "os.h" +#include "osTime.h" +#include "taos.h" +#include "taoserror.h" +#include "tglobal.h" + +extern STaosError errors[]; + +using namespace std; + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +TEST(TAOS_ERROR_TEST, terror_test) { + int32_t errSize = taosGetErrSize(); + for (int32_t i = 0; i < errSize; ++i) { + STaosError *pInfo = &errors[i]; + std::cout << i + 1 << " " << pInfo->origin << " " << pInfo->val << std::endl; + } +} \ No newline at end of file From 247183b6fc061c135443e8e2b7e76d409dd0e554 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 13 Jun 2024 06:57:13 +0800 Subject: [PATCH 05/29] enh: support get macro string of taos errors --- include/util/taoserror.h | 2 +- source/util/src/terror.c | 14 ++++++-------- source/util/test/terrorTest.cpp | 12 +----------- 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 63b733a337..3207d498d3 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -27,7 +27,7 @@ extern "C" { typedef struct { int32_t val; const char* str; - const char* origin; + const char* macro; } STaosError; #define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code)))) diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f2ae43bc94..56cca26f6e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -21,11 +21,6 @@ #define TAOS_ERROR_C -// typedef struct { -// int32_t val; -// const char* str; -// } STaosError; - static threadlocal int32_t tsErrno; static threadlocal char tsErrMsgDetail[ERR_MSG_LEN] = {0}; static threadlocal char tsErrMsgReturn[ERR_MSG_LEN] = {0}; @@ -35,9 +30,7 @@ char* taosGetErrMsg() { return tsErrMsgDetail; } char* taosGetErrMsgReturn() { return tsErrMsgReturn; } #ifdef TAOS_ERROR_C -#define TAOS_DEFINE_ERROR(name, msg) {.val = (name), .str = (msg), .origin = #name}, -STaosError errors[] = { - TAOS_DEFINE_ERROR(TSDB_CODE_SUCCESS, "success") +#define TAOS_DEFINE_ERROR(name, msg) {.val = (name), .str = (msg), .macro = #name}, #else #define TAOS_DEFINE_ERROR(name, mod, code, msg) static const int32_t name = TAOS_DEF_ERROR_CODE(mod, code); #endif @@ -46,6 +39,11 @@ STaosError errors[] = { #define TAOS_SUCCEEDED(err) ((err) >= 0) #define TAOS_FAILED(err) ((err) < 0) +#ifdef TAOS_ERROR_C +STaosError errors[] = { + TAOS_DEFINE_ERROR(TSDB_CODE_SUCCESS, "success") +#endif + // rpc TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_UNAVAIL, "Unable to establish connection") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_FQDN_ERROR, "Unable to resolve FQDN") diff --git a/source/util/test/terrorTest.cpp b/source/util/test/terrorTest.cpp index db2f9641ed..5ef5f43216 100644 --- a/source/util/test/terrorTest.cpp +++ b/source/util/test/terrorTest.cpp @@ -4,26 +4,16 @@ #define TAOS_ERROR_INFO #include -#include "os.h" -#include "osTime.h" -#include "taos.h" #include "taoserror.h" -#include "tglobal.h" extern STaosError errors[]; using namespace std; -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wwrite-strings" -#pragma GCC diagnostic ignored "-Wunused-function" -#pragma GCC diagnostic ignored "-Wunused-variable" -#pragma GCC diagnostic ignored "-Wsign-compare" - TEST(TAOS_ERROR_TEST, terror_test) { int32_t errSize = taosGetErrSize(); for (int32_t i = 0; i < errSize; ++i) { STaosError *pInfo = &errors[i]; - std::cout << i + 1 << " " << pInfo->origin << " " << pInfo->val << std::endl; + std::cout << i + 1 << " " << pInfo->macro << " " << pInfo->val << std::endl; } } \ No newline at end of file From 962a78d6a399ae8b937a5b36e05bead811d27790 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 13 Jun 2024 06:59:04 +0800 Subject: [PATCH 06/29] enh: support get macro string of taos errors --- source/util/src/terror.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 56cca26f6e..a4067b942b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -779,7 +779,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_DIR, "Invalid TDLite open TAOS_DEFINE_ERROR(TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY, "Queue out of memory") -#if defined(TAOS_ERROR_INFO) || defined(TAOS_ERROR_C) +#ifdef TAOS_ERROR_C }; #endif From b7ef054b38dee1152356fc7239366579981b109c Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 13 Jun 2024 02:11:41 +0000 Subject: [PATCH 07/29] add compress to child table --- source/common/src/tcol.c | 4 ++- source/dnode/vnode/src/vnd/vnodeQuery.c | 36 ++++++++++++++----------- 2 files changed, 23 insertions(+), 17 deletions(-) diff --git a/source/common/src/tcol.c b/source/common/src/tcol.c index ba36558587..a949d0793a 100644 --- a/source/common/src/tcol.c +++ b/source/common/src/tcol.c @@ -327,7 +327,9 @@ int32_t setColCompressByOption(uint8_t type, uint8_t encode, uint16_t compressTy return TSDB_CODE_SUCCESS; } -bool useCompress(uint8_t tableType) { return TSDB_SUPER_TABLE == tableType || TSDB_NORMAL_TABLE == tableType; } +bool useCompress(uint8_t tableType) { + return TSDB_SUPER_TABLE == tableType || TSDB_NORMAL_TABLE == tableType || TSDB_CHILD_TABLE == tableType; +} int8_t validColCompressLevel(uint8_t type, uint8_t level) { if (level == TSDB_COLVAL_LEVEL_DISABLED) return 1; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 19be7e7ebd..5b17e0f1da 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -120,7 +120,8 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { memcpy(metaRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols); } if (metaRsp.pSchemaExt) { - code = fillTableColCmpr(&mer1, metaRsp.pSchemaExt, metaRsp.numOfColumns); + SMetaReader *pReader = mer1.me.type == TSDB_CHILD_TABLE ? &mer2 : &mer1; + code = fillTableColCmpr(pReader, metaRsp.pSchemaExt, metaRsp.numOfColumns); if (code < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; @@ -254,15 +255,18 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { memcpy(cfgRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols); } - if (useCompress(cfgRsp.tableType)) { - SColCmprWrapper *pColCmpr = &mer1.me.colCmpr; - for (int32_t i = 0; i < cfgRsp.numOfColumns; i++) { - SColCmpr *pCmpr = &pColCmpr->pColCmpr[i]; - SSchemaExt *pSchExt = cfgRsp.pSchemaExt + i; - pSchExt->colId = pCmpr->id; - pSchExt->compress = pCmpr->alg; - } + // if (useCompress(cfgRsp.tableType)) { + + SMetaReader *pReader = mer1.me.type == TSDB_CHILD_TABLE ? &mer2 : &mer1; + SColCmprWrapper *pColCmpr = &pReader->me.colCmpr; + + for (int32_t i = 0; i < cfgRsp.numOfColumns; i++) { + SColCmpr *pCmpr = &pColCmpr->pColCmpr[i]; + SSchemaExt *pSchExt = cfgRsp.pSchemaExt + i; + pSchExt->colId = pCmpr->id; + pSchExt->compress = pCmpr->alg; } + //} // encode and send response rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp); @@ -752,13 +756,13 @@ int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64 return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid); } -int32_t vnodeGetStreamProgress(SVnode* pVnode, SRpcMsg* pMsg, bool direct) { - int32_t code = 0; - SStreamProgressReq req; - SStreamProgressRsp rsp = {0}; - SRpcMsg rpcMsg = {.info = pMsg->info, .code = 0}; - char * buf = NULL; - int32_t rspLen = 0; +int32_t vnodeGetStreamProgress(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { + int32_t code = 0; + SStreamProgressReq req; + SStreamProgressRsp rsp = {0}; + SRpcMsg rpcMsg = {.info = pMsg->info, .code = 0}; + char *buf = NULL; + int32_t rspLen = 0; code = tDeserializeStreamProgressReq(pMsg->pCont, pMsg->contLen, &req); if (code == TSDB_CODE_SUCCESS) { From 6f667d93145a29a0ddeff971a27101ab26f5dd41 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 13 Jun 2024 10:13:24 +0800 Subject: [PATCH 08/29] enh: support get macro string of taos errors --- include/util/taoserror.h | 2 ++ source/util/test/terrorTest.cpp | 4 ---- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 3207d498d3..95d028a47e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -30,6 +30,8 @@ typedef struct { const char* macro; } STaosError; +extern STaosError errors[]; + #define TAOS_DEF_ERROR_CODE(mod, code) ((int32_t)((0x80000000 | ((mod)<<16) | (code)))) #define TAOS_SYSTEM_ERROR(code) (0x80ff0000 | (code)) diff --git a/source/util/test/terrorTest.cpp b/source/util/test/terrorTest.cpp index 5ef5f43216..fbb698f780 100644 --- a/source/util/test/terrorTest.cpp +++ b/source/util/test/terrorTest.cpp @@ -1,13 +1,9 @@ #include #include -#define TAOS_ERROR_INFO - #include #include "taoserror.h" -extern STaosError errors[]; - using namespace std; TEST(TAOS_ERROR_TEST, terror_test) { From f59b88ea936f0867be00e153aa9994df3400c45e Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Thu, 13 Jun 2024 07:03:15 +0000 Subject: [PATCH 09/29] double check enableWhiteList --- source/libs/transport/src/transSvr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 049554a7c9..c205c1412c 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -363,7 +363,7 @@ static bool uvHandleReq(SSvrConn* pConn) { memcpy(pConn->user, pHead->user, strlen(pHead->user)); int8_t forbiddenIp = 0; - if (pThrd->enableIpWhiteList) { + if (pThrd->enableIpWhiteList && tsEnableWhiteList) { forbiddenIp = !uvWhiteListCheckConn(pThrd->pWhiteList, pConn) ? 1 : 0; if (forbiddenIp == 0) { uvWhiteListSetConnVer(pThrd->pWhiteList, pConn); From 367b3b153fa717c0ef7b7c69ee30180e4cec5e62 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Thu, 13 Jun 2024 15:35:26 +0800 Subject: [PATCH 10/29] check cursor for count window --- source/libs/stream/src/streamSessionState.c | 12 +++++++----- source/libs/stream/src/streamState.c | 1 + 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 4c61e6da1d..61f44e9b79 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -516,24 +516,25 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS return pBuffCur; } winCount = *((COUNT_TYPE*) ((char*)pVal + (resSize - sizeof(COUNT_TYPE)))); + taosMemoryFreeClear(pVal); + streamStateFreeCur(pBuffCur); if (sessionRangeKeyCmpr(pWinKey, &key) != 0 && winCount == count) { - streamStateFreeCur(pCur); - return pBuffCur; + streamStateCurNext(pFileStore, pCur); + return pCur; } streamStateCurPrev(pFileStore, pCur); while (1) { code = streamStateSessionGetKVByCur_rocksdb(pCur, &key, &pVal, &len); if (code == TSDB_CODE_FAILED) { streamStateCurNext(pFileStore, pCur); - streamStateFreeCur(pBuffCur); return pCur; } winCount = *((COUNT_TYPE*) ((char*)pVal + (resSize - sizeof(COUNT_TYPE)))); + taosMemoryFreeClear(pVal); if (sessionRangeKeyCmpr(pWinKey, &key) == 0 || winCount < count) { streamStateCurPrev(pFileStore, pCur); } else { streamStateCurNext(pFileStore, pCur); - streamStateFreeCur(pBuffCur); return pCur; } } @@ -568,7 +569,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void void* pData = NULL; code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen); if (taosArrayGetSize(pWinStates) > 0 && - (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) { + (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) { transformCursor(pCur->pStreamFileState, pCur); SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex); if (pVal) { @@ -590,6 +591,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void } int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) { + qDebug("move cursor to next"); if (pCur && pCur->buffIndex >= 0) { pCur->buffIndex++; } else { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 47324bd8c9..38d6a5c372 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -654,6 +654,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { #ifdef USE_ROCKSDB + qDebug("move cursor to next"); return streamStateCurPrev_rocksdb(pCur); #else if (!pCur) { From 1f646713fe0ffc3254bec29db350f726d6f26151 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 13 Jun 2024 16:36:18 +0800 Subject: [PATCH 11/29] fix: check range option of alter user --- source/libs/parser/src/parTranslater.c | 16 ++++++++-------- tests/script/tsim/user/basic.sim | 4 ++++ tests/system-test/0-others/user_control.py | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3dbba397ba..5f55841a55 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6605,8 +6605,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS } static int32_t checkRangeOption(STranslateContext* pCxt, int32_t code, const char* pName, int64_t val, int64_t minVal, - int64_t maxVal) { - if (val >= 0 && (val < minVal || val > maxVal)) { + int64_t maxVal, bool skipMinus) { + if (skipMinus ? ((val >= 0) && (val < minVal || val > maxVal)) : (val < minVal || val > maxVal)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "Invalid option %s: %" PRId64 ", valid range: [%" PRId64 ", %" PRId64 "]", pName, val, minVal, maxVal); @@ -6616,12 +6616,12 @@ static int32_t checkRangeOption(STranslateContext* pCxt, int32_t code, const cha static int32_t checkDbRangeOption(STranslateContext* pCxt, const char* pName, int32_t val, int32_t minVal, int32_t maxVal) { - return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_DB_OPTION, pName, val, minVal, maxVal); + return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_DB_OPTION, pName, val, minVal, maxVal, true); } static int32_t checkTableRangeOption(STranslateContext* pCxt, const char* pName, int64_t val, int64_t minVal, int64_t maxVal) { - return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_TABLE_OPTION, pName, val, minVal, maxVal); + return checkRangeOption(pCxt, TSDB_CODE_PAR_INVALID_TABLE_OPTION, pName, val, minVal, maxVal, true); } static int32_t checkDbS3KeepLocalOption(STranslateContext* pCxt, SDatabaseOptions* pOptions) { @@ -8485,7 +8485,7 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p static int32_t translateCreateUser(STranslateContext* pCxt, SCreateUserStmt* pStmt) { int32_t code = 0; SCreateUserReq createReq = {0}; - if ((code = checkRangeOption(pCxt, TSDB_CODE_INVALID_OPTION, "sysinfo", pStmt->sysinfo, 0, 1))) { + if ((code = checkRangeOption(pCxt, TSDB_CODE_INVALID_OPTION, "sysinfo", pStmt->sysinfo, 0, 1, false))) { return code; } strcpy(createReq.user, pStmt->userName); @@ -8509,13 +8509,13 @@ static int32_t checkAlterUser(STranslateContext* pCxt, SAlterUserStmt* pStmt) { int32_t code = 0; switch (pStmt->alterType) { case TSDB_ALTER_USER_ENABLE: - code = checkRangeOption(pCxt, TSDB_CODE_INVALID_OPTION, "enable", pStmt->enable, 0, 1); + code = checkRangeOption(pCxt, TSDB_CODE_INVALID_OPTION, "enable", pStmt->enable, 0, 1, false); break; case TSDB_ALTER_USER_SYSINFO: - code = checkRangeOption(pCxt, TSDB_CODE_INVALID_OPTION, "sysinfo", pStmt->sysinfo, 0, 1); + code = checkRangeOption(pCxt, TSDB_CODE_INVALID_OPTION, "sysinfo", pStmt->sysinfo, 0, 1, false); break; case TSDB_ALTER_USER_CREATEDB: - code = checkRangeOption(pCxt, TSDB_CODE_INVALID_OPTION, "createdb", pStmt->createdb, 0, 1); + code = checkRangeOption(pCxt, TSDB_CODE_INVALID_OPTION, "createdb", pStmt->createdb, 0, 1, false); break; } return code; diff --git a/tests/script/tsim/user/basic.sim b/tests/script/tsim/user/basic.sim index 353e1d080a..0e1fbb5b40 100644 --- a/tests/script/tsim/user/basic.sim +++ b/tests/script/tsim/user/basic.sim @@ -217,14 +217,18 @@ endi sql_error CREATE USER u100 PASS 'taosdata' SYSINFO -1; sql_error CREATE USER u101 PASS 'taosdata' SYSINFO 2; sql_error CREATE USER u102 PASS 'taosdata' SYSINFO 20000; +sql_error CREATE USER u103 PASS 'taosdata' SYSINFO 1000; sql_error ALTER USER u1 enable -1 sql_error ALTER USER u1 enable 2 +sql_error ALTER USER u1 enable 1000 sql_error ALTER USER u1 enable 10000 sql_error ALTER USER u1 sysinfo -1 sql_error ALTER USER u1 sysinfo 2 +sql_error ALTER USER u1 sysinfo 1000 sql_error ALTER USER u1 sysinfo -20000 sql_error ALTER USER u1 createdb -1 sql_error ALTER USER u1 createdb 3 +sql_error ALTER USER u1 createdb 1000 sql_error ALTER USER u1 createdb 100000 system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file diff --git a/tests/system-test/0-others/user_control.py b/tests/system-test/0-others/user_control.py index c29170e112..c4d24582e4 100644 --- a/tests/system-test/0-others/user_control.py +++ b/tests/system-test/0-others/user_control.py @@ -514,7 +514,7 @@ class TDTestCase: def test_alter_user(self): options = ["enable", "sysinfo", "createdb"] - optionErrVals = [-10000, -128, -1, 2, 127, 10000] + optionErrVals = [-10000, -128, -1, 2, 127, 1000, 10000] for optionErrVal in optionErrVals: tdSql.error("create user user_alter pass 'taosdata' sysinfo %d" % optionErrVal) tdSql.execute("create user user_alter pass 'taosdata'") From 94d5acba33b93d73229e64aa647c94ee7eea7b06 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 13 Jun 2024 17:36:02 +0800 Subject: [PATCH 12/29] chore: order of error definition --- include/util/taoserror.h | 2 +- source/util/src/terror.c | 9 ++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 95d028a47e..c3afd2df5a 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -325,7 +325,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x038A) // #define TSDB_CODE_MND_DB_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x038B) #define TSDB_CODE_MND_DB_RETENTION_PERIOD_ZERO TAOS_DEF_ERROR_CODE(0, 0x038C) -#define TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY TAOS_DEF_ERROR_CODE(0, 0x038D) +// #define TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY TAOS_DEF_ERROR_CODE(0, 0x038D) // used #define TSDB_CODE_MND_INVALID_ENCRYPT_KEY TAOS_DEF_ERROR_CODE(0, 0x038E) // #define TSDB_CODE_MND_INVALID_DB_OPTION_DAYS TAOS_DEF_ERROR_CODE(0, 0x0390) // 2.x // #define TSDB_CODE_MND_INVALID_DB_OPTION_KEEP TAOS_DEF_ERROR_CODE(0, 0x0391) // 2.x diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a4067b942b..239090b8f4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -245,15 +245,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB, "Invalid database name TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_DATABASES, "Too many databases for account") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_DROPPING, "Database in dropping status") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_NOT_EXIST, "Database not exist") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_RETENTION_PERIOD_ZERO, "WAL retention period is zero") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DB_ACCT, "Invalid database account") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_OPTION_UNCHANGED, "Database options not changed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_INDEX_NOT_EXIST, "Index not exist") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SYS_TABLENAME, "Invalid system table name") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_CREATING, "Database in creating status") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE, "Encryption is not allowed to be changed after database is created") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY, "Inconsistent encryption key") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_RETENTION_PERIOD_ZERO, "WAL retention period is zero") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ENCRYPT_KEY, "The cluster has not been set properly for database encryption") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_CREATING, "Database in creating status") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SYS_TABLENAME, "Invalid system table name") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE, "Encryption is not allowed to be changed after database is created") // mnode-node TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists") From 7078b5017ca6ad89ec3418564941c7cd52a70eb3 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 13 Jun 2024 17:42:41 +0800 Subject: [PATCH 13/29] chore: order of error definition --- include/util/taoserror.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index c3afd2df5a..a06581bcad 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -325,7 +325,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_MND_DB_OPTION_UNCHANGED TAOS_DEF_ERROR_CODE(0, 0x038A) // #define TSDB_CODE_MND_DB_INDEX_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x038B) #define TSDB_CODE_MND_DB_RETENTION_PERIOD_ZERO TAOS_DEF_ERROR_CODE(0, 0x038C) -// #define TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY TAOS_DEF_ERROR_CODE(0, 0x038D) // used +// #define TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY TAOS_DEF_ERROR_CODE(0, 0x038D) // unused #define TSDB_CODE_MND_INVALID_ENCRYPT_KEY TAOS_DEF_ERROR_CODE(0, 0x038E) // #define TSDB_CODE_MND_INVALID_DB_OPTION_DAYS TAOS_DEF_ERROR_CODE(0, 0x0390) // 2.x // #define TSDB_CODE_MND_INVALID_DB_OPTION_KEEP TAOS_DEF_ERROR_CODE(0, 0x0391) // 2.x From eb72cec8d21176f7bdedae3d16bd09e9bcb8e32e Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 14 Jun 2024 08:10:07 +0800 Subject: [PATCH 14/29] chore: naming optimization --- source/libs/parser/src/parTranslater.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 5f55841a55..335324c86f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6605,8 +6605,8 @@ static int32_t buildCreateDbReq(STranslateContext* pCxt, SCreateDatabaseStmt* pS } static int32_t checkRangeOption(STranslateContext* pCxt, int32_t code, const char* pName, int64_t val, int64_t minVal, - int64_t maxVal, bool skipMinus) { - if (skipMinus ? ((val >= 0) && (val < minVal || val > maxVal)) : (val < minVal || val > maxVal)) { + int64_t maxVal, bool skipUndef) { + if (skipUndef ? ((val >= 0) && (val < minVal || val > maxVal)) : (val < minVal || val > maxVal)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, code, "Invalid option %s: %" PRId64 ", valid range: [%" PRId64 ", %" PRId64 "]", pName, val, minVal, maxVal); From 0ef8d3510a4e79bda648c6e2b7aae8b30d1a7cb9 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 14 Jun 2024 15:46:11 +0800 Subject: [PATCH 15/29] fix: use unsigned type for union bit operation --- include/common/tmsg.h | 6 +++--- source/common/src/tmsg.c | 4 ++-- source/dnode/mnode/impl/inc/mndDef.h | 6 +++--- source/dnode/mnode/impl/src/mndUser.c | 4 ++-- source/dnode/mnode/sdb/inc/sdb.h | 4 ++++ source/dnode/mnode/sdb/src/sdbRaw.c | 30 +++++++++++++++++++++++++++ 6 files changed, 44 insertions(+), 10 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fa89ca917a..9301b31f95 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1054,10 +1054,10 @@ typedef struct { int8_t enable; int8_t isView; union { - int8_t flag; + uint8_t flag; struct { - int8_t createdb : 1; - int8_t reserve : 7; + uint8_t createdb : 1; + uint8_t reserve : 7; }; }; char user[TSDB_USER_LEN]; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d465e24d5b..1cbd646413 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1813,7 +1813,7 @@ int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) } if (tEncodeI64(&encoder, pReq->privileges) < 0) return -1; ENCODESQL(); - if (tEncodeI8(&encoder, pReq->flag) < 0) return -1; + if (tEncodeU8(&encoder, pReq->flag) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1854,7 +1854,7 @@ int32_t tDeserializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq if (tDecodeI64(&decoder, &pReq->privileges) < 0) return -1; DECODESQL(); if (!tDecodeIsEnd(&decoder)) { - if (tDecodeI8(&decoder, &pReq->flag) < 0) return -1; + if (tDecodeU8(&decoder, &pReq->flag) < 0) return -1; } tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index b6fc24a910..46e606d3ea 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -332,10 +332,10 @@ typedef struct { int8_t sysInfo; int8_t enable; union { - int8_t flag; + uint8_t flag; struct { - int8_t createdb : 1; - int8_t reserve : 7; + uint8_t createdb : 1; + uint8_t reserve : 7; }; }; int32_t acctId; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 9a85d405ca..7cde8c5508 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -818,7 +818,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_INT8(pRaw, dataPos, pUser->superUser, _OVER) SDB_SET_INT8(pRaw, dataPos, pUser->sysInfo, _OVER) SDB_SET_INT8(pRaw, dataPos, pUser->enable, _OVER) - SDB_SET_INT8(pRaw, dataPos, pUser->flag, _OVER) + SDB_SET_UINT8(pRaw, dataPos, pUser->flag, _OVER) SDB_SET_INT32(pRaw, dataPos, pUser->authVersion, _OVER) SDB_SET_INT32(pRaw, dataPos, pUser->passVersion, _OVER) SDB_SET_INT32(pRaw, dataPos, numOfReadDbs, _OVER) @@ -1002,7 +1002,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { SDB_GET_INT8(pRaw, dataPos, &pUser->superUser, _OVER) SDB_GET_INT8(pRaw, dataPos, &pUser->sysInfo, _OVER) SDB_GET_INT8(pRaw, dataPos, &pUser->enable, _OVER) - SDB_GET_INT8(pRaw, dataPos, &pUser->flag, _OVER) + SDB_GET_UINT8(pRaw, dataPos, &pUser->flag, _OVER) if (pUser->superUser) pUser->createdb = 1; SDB_GET_INT32(pRaw, dataPos, &pUser->authVersion, _OVER) if (sver >= 4) { diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 0b2de2b151..fc9b89a141 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -57,6 +57,7 @@ extern "C" { #define SDB_GET_INT32(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt32, int32_t) #define SDB_GET_INT16(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt16, int16_t) #define SDB_GET_INT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawInt8, int8_t) +#define SDB_GET_UINT8(pData, dataPos, val, pos) SDB_GET_VAL(pData, dataPos, val, pos, sdbGetRawUInt8, uint8_t) #define SDB_GET_RESERVE(pRaw, dataPos, valLen, pos) \ { \ @@ -76,6 +77,7 @@ extern "C" { #define SDB_SET_INT32(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt32, int32_t) #define SDB_SET_INT16(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt16, int16_t) #define SDB_SET_INT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawInt8, int8_t) +#define SDB_SET_UINT8(pRaw, dataPos, val, pos) SDB_SET_VAL(pRaw, dataPos, val, pos, sdbSetRawUInt8, uint8_t) #define SDB_SET_BINARY(pRaw, dataPos, val, valLen, pos) \ { \ @@ -388,6 +390,7 @@ void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val); +int32_t sdbSetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t val); int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val); int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val); int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val); @@ -395,6 +398,7 @@ int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32 int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen); int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status); int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val); +int32_t sdbGetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t *val); int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val); int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val); int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val); diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 244e50b52e..4f68139155 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -67,6 +67,21 @@ int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) { return 0; } +int32_t sdbSetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(uint8_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *(uint8_t *)(pRaw->pData + dataPos) = val; + return 0; +} + int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) { if (pRaw == NULL) { terrno = TSDB_CODE_INVALID_PTR; @@ -174,6 +189,21 @@ int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val) { return 0; } +int32_t sdbGetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t *val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(uint8_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *val = *(uint8_t *)(pRaw->pData + dataPos); + return 0; +} + int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) { if (pRaw == NULL) { terrno = TSDB_CODE_INVALID_PTR; From 2e900f1787d8c94a55d2a12856440654b98aaf8c Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 17 Jun 2024 10:05:03 +0800 Subject: [PATCH 16/29] adj stream doc --- docs/en/12-taos-sql/14-stream.md | 2 +- docs/zh/12-taos-sql/14-stream.md | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index e7cefc1d7a..bbcffff062 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -47,7 +47,7 @@ window_clause: { } ``` -`SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically. +`SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically.The _wend of this window is the time of the last data plus tol_val. `EVENT_WINDOW` is determined according to the window start condition and the window close condition. The window is started when `start_trigger_condition` is evaluated to true, the window is closed when `end_trigger_condition` is evaluated to true. `start_trigger_condition` and `end_trigger_condition` can be any conditional expressions supported by TDengine and can include multiple columns. diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index cc057c3b72..bf95a29baa 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -54,8 +54,10 @@ window_clause: { } ``` -其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 +其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。该窗口的_wend等于最后一条数据的时间加上tol_val。 + EVENT_WINDOW 是事件窗口,根据开始条件和结束条件来划定窗口。当 start_trigger_condition 满足时则窗口开始,直到 end_trigger_condition 满足时窗口关闭。 start_trigger_condition 和 end_trigger_condition 可以是任意 TDengine 支持的条件表达式,且可以包含不同的列。 + COUNT_WINDOW 是计数窗口,按固定的数据行数来划分窗口。 count_val 是常量,是正整数,必须大于等于2,小于2147483648。 count_val 表示每个 COUNT_WINDOW 包含的最大数据行数,总数据行数不能整除 count_val 时,最后一个窗口的行数会小于 count_val 。 sliding_val 是常量,表示窗口滑动的数量,类似于 INTERVAL 的 SLIDING 。 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) From 6a10cea339417b842761dcf31ed5e5b1b4b911c0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 17 Jun 2024 10:07:10 +0800 Subject: [PATCH 17/29] adj stream doc --- docs/en/12-taos-sql/14-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/14-stream.md b/docs/en/12-taos-sql/14-stream.md index bbcffff062..fcd782b429 100644 --- a/docs/en/12-taos-sql/14-stream.md +++ b/docs/en/12-taos-sql/14-stream.md @@ -47,7 +47,7 @@ window_clause: { } ``` -`SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically.The _wend of this window is the time of the last data plus tol_val. +`SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically.The `_wend` of this window is the time of the last data plus `tol_val`. `EVENT_WINDOW` is determined according to the window start condition and the window close condition. The window is started when `start_trigger_condition` is evaluated to true, the window is closed when `end_trigger_condition` is evaluated to true. `start_trigger_condition` and `end_trigger_condition` can be any conditional expressions supported by TDengine and can include multiple columns. From ffdef2d1c8e1aa9a34aac5e469d982e08345450f Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 17 Jun 2024 10:08:36 +0800 Subject: [PATCH 18/29] adj stream doc --- docs/zh/12-taos-sql/14-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index bf95a29baa..38d913dfaf 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -54,7 +54,7 @@ window_clause: { } ``` -其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。该窗口的_wend等于最后一条数据的时间加上tol_val。 +其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。该窗口的 _wend 等于最后一条数据的时间加上 tol_val。 EVENT_WINDOW 是事件窗口,根据开始条件和结束条件来划定窗口。当 start_trigger_condition 满足时则窗口开始,直到 end_trigger_condition 满足时窗口关闭。 start_trigger_condition 和 end_trigger_condition 可以是任意 TDengine 支持的条件表达式,且可以包含不同的列。 From 2298b4eae8745395b8b4ba41a8469d102a7da0b4 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 17 Jun 2024 11:19:10 +0800 Subject: [PATCH 19/29] adj log --- source/libs/stream/src/streamState.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 38d6a5c372..55e06cdcaf 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -654,7 +654,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { #ifdef USE_ROCKSDB - qDebug("move cursor to next"); + qTrace("move cursor to next"); return streamStateCurPrev_rocksdb(pCur); #else if (!pCur) { From 4474788e362713e0d27cd0f2c8d718704849e23e Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 17 Jun 2024 11:24:30 +0800 Subject: [PATCH 20/29] adj log --- source/libs/stream/src/streamSessionState.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 61f44e9b79..005fd1603c 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -591,7 +591,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void } int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) { - qDebug("move cursor to next"); + qTrace("move cursor to next"); if (pCur && pCur->buffIndex >= 0) { pCur->buffIndex++; } else { From 848b1e19b5c740069ceb057a637d08aa39b8eb79 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 14:39:46 +0800 Subject: [PATCH 21/29] fix(stream): fix some typo. --- source/dnode/vnode/src/tq/tqUtil.c | 5 +++-- source/libs/stream/src/streamMeta.c | 12 +++++++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 33e3414a7d..642de6c3f8 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -605,12 +605,13 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b numOfTasks = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); if (pId->streamId != streamId) { continue; } - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL) { tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId); continue; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d0f9d40469..3c50156fad 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1007,9 +1007,10 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pId = taosArrayGet(pMeta->pTaskList, i); + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId)); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (pTask == NULL) { continue; } @@ -1020,7 +1021,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { } STaskStatusEntry entry = { - .id = *pId, + .id = id, .status = streamTaskGetStatus(*pTask)->state, .nodeId = hbMsg.vgId, .stage = pMeta->stage, @@ -1508,8 +1509,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { int32_t num = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < num; ++i) { - STaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, pTaskId, sizeof(*pTaskId)); + SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL) { continue; } From 3aa6e00bee01b22d3689b1e65db67f731645bd8d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 15:58:59 +0800 Subject: [PATCH 22/29] fix(stream): fix some typo. --- source/dnode/vnode/src/tq/tqUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 642de6c3f8..5290c39d42 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -613,7 +613,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL) { - tqError("vgId:%d failed to acquire task:0x%" PRIx64 " in retrieving progress", pMeta->vgId, pId->taskId); + tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId); continue; } From bdbcdf34655b9f56e1ce5db787e3f283cae5cd99 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 16:58:58 +0800 Subject: [PATCH 23/29] refactor: add two assert. --- source/libs/stream/src/streamMeta.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 3c50156fad..8433531f71 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -744,10 +744,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t pTask = *ppTask; // it is an fill-history task, remove the related stream task's id that points to it - atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); + if (pTask->info.fillHistory == 0) { + atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); + } + + ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); + + ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); streamMetaRemoveTask(pMeta, &id); streamMetaWUnLock(pMeta); From 0f4ffdf49a96ab594f7fce16189a89dbf2aae474 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 18:57:31 +0800 Subject: [PATCH 24/29] fix(stream): add some info. --- source/libs/stream/src/streamMeta.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8433531f71..bca0f06d5d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -673,13 +673,21 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) } static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* id) { + bool remove = false; for (int32_t i = 0; i < num; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { taosArrayRemove(pMeta->pTaskList, i); + stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x", pMeta->vgId, id->streamId, id->taskId); + remove = true; break; + } else { + stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x, entry:0x%" PRIx64 "-0x%x", pMeta->vgId, id->streamId, + id->taskId, pTaskId->streamId, pTaskId->taskId); } } + + ASSERT(remove); } static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { From 60fac803459c81fdce2635c8bcd0eec08e14cdad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 19:34:09 +0800 Subject: [PATCH 25/29] fix(stream): add some logs. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/stream/src/streamMeta.c | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c27249cff6..d9bc8b74d2 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -635,8 +635,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { - streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); + streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); } // drop the stream task now diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bca0f06d5d..df22dff97f 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -677,8 +677,8 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* i for (int32_t i = 0; i < num; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { + stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x succ", pMeta->vgId, id->streamId, id->taskId); taosArrayRemove(pMeta->pTaskList, i); - stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x", pMeta->vgId, id->streamId, id->taskId); remove = true; break; } else { @@ -723,7 +723,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } streamMetaWUnLock(pMeta); - stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId); + stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, pMeta->vgId); while (1) { streamMetaRLock(pMeta); @@ -750,6 +750,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; + ASSERT(pTask->id.taskId == id.taskId && pTask->id.streamId == id.streamId); // it is an fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { From f57ff9640884e53834e993d90229d00f70c40922 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 19:56:18 +0800 Subject: [PATCH 26/29] fix(stream): do some refactor. --- source/libs/stream/src/streamMeta.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index df22dff97f..05c414078d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -750,7 +750,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; - ASSERT(pTask->id.taskId == id.taskId && pTask->id.streamId == id.streamId); + SStreamTaskId pxId = pTask->id; + ASSERT((pxId.taskId == id.taskId) && (pxId.streamId == id.streamId)); // it is an fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { From c4a27569076a7d5ba418e3c4acec88a013105205 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 23:13:54 +0800 Subject: [PATCH 27/29] fix(stream): not revise the stream id for fill-history task. --- include/libs/stream/streamState.h | 3 ++- source/dnode/snode/src/snode.c | 6 ----- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 31 ++++++---------------- source/libs/stream/src/streamMeta.c | 24 +++++------------ source/libs/stream/src/streamState.c | 7 ++--- 6 files changed, 21 insertions(+), 52 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index ae5a733ae9..1333257dfb 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -29,7 +29,8 @@ extern "C" { #include "storageapi.h" -SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); +SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath, + int32_t szPage, int32_t pages); void streamStateClose(SStreamState* pState, bool remove); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index d322eb2977..481033508b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -25,12 +25,6 @@ #define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0) // clang-format on -static void restoreStreamTaskId(SStreamTask *pTask, STaskId *pId) { - ASSERT(pTask->info.fillHistory); - pTask->id.taskId = pId->taskId; - pTask->id.streamId = pId->streamId; -} - int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 3cc7c6ec66..ac5463e492 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -299,7 +299,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id); pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask); pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo(); - pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); + pStreamState = streamStateOpen(taskInfDir, pStreamTask, pStreamTask->id.streamId, pStreamTask->id.taskId, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index d9bc8b74d2..963503c135 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -30,37 +30,26 @@ typedef struct SMStreamCheckpointReadyRspMsg { static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); -static STaskId replaceStreamTaskId(SStreamTask* pTask) { - ASSERT(pTask->info.fillHistory); - STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; - - return id; -} - -static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { - ASSERT(pTask->info.fillHistory); - pTask->id.taskId = pId->taskId; - pTask->id.streamId = pId->streamId; -} - int32_t tqExpandStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; int32_t vgId = pMeta->vgId; - STaskId taskId = {0}; int64_t st = taosGetTimestampMs(); + int64_t streamId = 0; + int32_t taskId = 0; tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId); if (pTask->info.fillHistory) { - taskId = replaceStreamTaskId(pTask); + streamId = pTask->streamTaskId.streamId; + taskId = pTask->streamTaskId.taskId; + } else { + streamId = pTask->id.streamId; + taskId = pTask->id.taskId; } // sink task does not need the pState if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - pTask->pState = streamStateOpen(pMeta->path, pTask, false, -1, -1); + pTask->pState = streamStateOpen(pMeta->path, pTask, false, streamId, taskId, -1, -1); if (pTask->pState == NULL) { tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); return -1; @@ -69,10 +58,6 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { } } - if (pTask->info.fillHistory) { - restoreStreamTaskId(pTask, &taskId); - } - SReadHandle handle = { .checkpointId = pTask->chkInfo.checkpointId, .pStateBackend = pTask->pState, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 05c414078d..e8800c3370 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -608,6 +608,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } taosArrayPush(pMeta->pTaskList, &pTask->id); + taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); if (streamMetaSaveTask(pMeta, pTask) < 0) { return -1; @@ -617,7 +618,6 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa return -1; } - taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); if (pTask->info.fillHistory == 0) { atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); } @@ -672,21 +672,16 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) } } -static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* id) { +static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id) { bool remove = false; for (int32_t i = 0; i < num; ++i) { - SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) { - stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x succ", pMeta->vgId, id->streamId, id->taskId); - taosArrayRemove(pMeta->pTaskList, i); + taosArrayRemove(pTaskList, i); remove = true; break; - } else { - stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x, entry:0x%" PRIx64 "-0x%x", pMeta->vgId, id->streamId, - id->taskId, pTaskId->streamId, pTaskId->taskId); } } - ASSERT(remove); } @@ -750,26 +745,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; - SStreamTaskId pxId = pTask->id; - ASSERT((pxId.taskId == id.taskId) && (pxId.streamId == id.streamId)); - // it is an fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } - ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); - taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); - doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); - - ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); + doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); streamMetaRemoveTask(pMeta, &id); + ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); streamMetaWUnLock(pMeta); ASSERT(pTask->status.timerActive == 0); - if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); taosTmrStop(pTask->schedInfo.pDelayTimer); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 47324bd8c9..8919bd48ac 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -98,7 +98,8 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { return winKeyCmprImpl(&pWin1->key, &pWin2->key); } -SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) { +SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath, + int32_t szPage, int32_t pages) { SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); stDebug("open stream state %p, %s", pState, path); if (pState == NULL) { @@ -114,8 +115,8 @@ SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int3 } SStreamTask* pStreamTask = pTask; - pState->taskId = pStreamTask->id.taskId; - pState->streamId = pStreamTask->id.streamId; + pState->streamId = streamId; + pState->taskId = taskId; sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-0x%x", pState->streamId, pState->taskId); streamTaskSetDb(pStreamTask->pMeta, pTask, pState->pTdbState->idstr); From 985d6195699c581f401f990ae980c088b8df6140 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 23:14:33 +0800 Subject: [PATCH 28/29] fix(stream): fix syntax error. --- include/libs/executor/storageapi.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 330ba31c65..8c67f77adb 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -410,7 +410,7 @@ typedef struct SStateStore { void (*streamFileStateClear)(struct SStreamFileState* pFileState); bool (*needClearDiskBuff)(struct SStreamFileState* pFileState); - SStreamState* (*streamStateOpen)(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages); + SStreamState* (*streamStateOpen)(const char* path, void* pTask, int64_t streamId, int32_t taskId, bool specPath, int32_t szPage, int32_t pages); void (*streamStateClose)(SStreamState* pState, bool remove); int32_t (*streamStateBegin)(SStreamState* pState); int32_t (*streamStateCommit)(SStreamState* pState); From bca3cf4e9ee6e698ba8bd99efd15a52daef8882c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 17 Jun 2024 23:51:25 +0800 Subject: [PATCH 29/29] fix(stream): fix error in unit test. --- source/libs/stream/test/backendTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index dc506cfbc9..e6a508f6f7 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -46,7 +46,7 @@ SStreamState *stateCreate(const char *path) { SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL); pTask->pMeta = pMeta; - SStreamState *p = streamStateOpen((char *)path, pTask, true, 32, 32 * 1024); + SStreamState *p = streamStateOpen((char *)path, pTask, 0, 0, true, 32, 32 * 1024); ASSERT(p != NULL); return p; }