diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 79d54f522e..cc46ef9938 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG 0d5663d + GIT_TAG ff7de07 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/en/05-get-started/discord.svg b/docs/en/05-get-started/discord.svg new file mode 100644 index 0000000000..8218e3c3ca --- /dev/null +++ b/docs/en/05-get-started/discord.svg @@ -0,0 +1,7 @@ + + + + + diff --git a/docs/en/05-get-started/github.svg b/docs/en/05-get-started/github.svg new file mode 100644 index 0000000000..493832ceb7 --- /dev/null +++ b/docs/en/05-get-started/github.svg @@ -0,0 +1,6 @@ + + + diff --git a/docs/en/05-get-started/index.md b/docs/en/05-get-started/index.md index a6b6721383..fec734b64d 100644 --- a/docs/en/05-get-started/index.md +++ b/docs/en/05-get-started/index.md @@ -3,6 +3,12 @@ title: Get Started description: This article describes how to install TDengine and test its performance. --- +import github from './github.svg' +import discord from './discord.svg' +import twitter from './twitter.svg' +import youtube from './youtube.svg' +import linkedin from './linkedin.svg' + You can install and run TDengine on Linux/Windows/macOS machines as well as Docker containers. You can also deploy TDengine as a managed service with TDengine Cloud. The full package of TDengine includes the TDengine Server (`taosd`), TDengine Client (`taosc`), taosAdapter for connecting with third-party systems and providing a RESTful interface, a command-line interface, and some tools. In addition to connectors for multiple languages, TDengine also provides a [RESTful interface](/reference/rest-api) through [taosAdapter](/reference/taosadapter). @@ -12,4 +18,16 @@ import DocCardList from '@theme/DocCardList'; import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; -``` \ No newline at end of file +``` + +### Join TDengine Community + + + + + + + + + +
Star GitHub

Star GitHub

Join Discord

Join Discord

Follow Twitter

Follow Twitter

Subscribe YouTube

Subscribe YouTube

Follow LinkedIn

Follow LinkedIn

diff --git a/docs/en/05-get-started/linkedin.svg b/docs/en/05-get-started/linkedin.svg new file mode 100644 index 0000000000..969c6f03af --- /dev/null +++ b/docs/en/05-get-started/linkedin.svg @@ -0,0 +1,6 @@ + + + diff --git a/docs/en/05-get-started/twitter.svg b/docs/en/05-get-started/twitter.svg new file mode 100644 index 0000000000..4825aa4ed0 --- /dev/null +++ b/docs/en/05-get-started/twitter.svg @@ -0,0 +1,7 @@ + + + + + diff --git a/docs/en/05-get-started/youtube.svg b/docs/en/05-get-started/youtube.svg new file mode 100644 index 0000000000..20747b8b6a --- /dev/null +++ b/docs/en/05-get-started/youtube.svg @@ -0,0 +1,11 @@ + + + + + + + + diff --git a/docs/en/28-releases/01-tdengine.md b/docs/en/28-releases/01-tdengine.md index 74eeeb5efb..eee2f94ee1 100644 --- a/docs/en/28-releases/01-tdengine.md +++ b/docs/en/28-releases/01-tdengine.md @@ -1,9 +1,13 @@ --- sidebar_label: TDengine -title: TDengine +title: TDengine Release History and Download Links description: TDengine release history, Release Notes and download links. --- +TDengine 3.x installation packages can be downloaded at the following links: + +For TDengine 2.x installation packages by version, please visit [here](https://www.taosdata.com/all-downloads). + import Release from "/components/ReleaseV3"; ## 3.0.1.6 @@ -33,4 +37,3 @@ import Release from "/components/ReleaseV3"; ## 3.0.1.0 - diff --git a/docs/en/28-releases/02-tools.md b/docs/en/28-releases/02-tools.md index 0a96c776e0..6e8a040f8b 100644 --- a/docs/en/28-releases/02-tools.md +++ b/docs/en/28-releases/02-tools.md @@ -1,9 +1,13 @@ --- -sidebar_label: taosTools -title: taosTools +sidebar_label: taosTools +title: taosTools Release History and Download Links description: taosTools release history, Release Notes, download links. --- +taosTools installation packages can be downloaded at the following links: + +For other historical version installers, please visit [here](https://www.taosdata.com/all-downloads). + import Release from "/components/ReleaseV3"; ## 2.2.7 diff --git a/docs/zh/05-get-started/channel.webp b/docs/zh/05-get-started/channel.webp new file mode 100644 index 0000000000..8dba93d411 Binary files /dev/null and b/docs/zh/05-get-started/channel.webp differ diff --git a/docs/zh/05-get-started/index.md b/docs/zh/05-get-started/index.md index dec4d800bc..092523a556 100644 --- a/docs/zh/05-get-started/index.md +++ b/docs/zh/05-get-started/index.md @@ -3,7 +3,9 @@ title: 立即开始 description: '快速设置 TDengine 环境并体验其高效写入和查询' --- -import xiaot from './tdengine.webp' +import xiaot from './xiaot.webp' +import channel from './channel.webp' +import official_account from './official-account.webp' TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)。 @@ -16,8 +18,19 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; ``` -### 开发者技术交流群 +### 加入 TDengine 官方社区 -微信扫描下面二维码,加“小 T”为好友,即可加入“物联网大数据技术前沿群”,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。 +微信扫描以下二维码,学习了解 TDengine 的最新技术,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。 -小 T 的二维码 + + + + + + + + + + + +
小 T 的二维码TDengine 微信视频号TDengine 微信公众号
加入“物联网大数据技术前沿群”
与大家进行技术交流
关注 TDengine 微信视频号
收看技术直播与教学视频
关注 TDengine 微信公众号
阅读核心技术与行业案例文章
diff --git a/docs/zh/05-get-started/official-account.webp b/docs/zh/05-get-started/official-account.webp new file mode 100644 index 0000000000..fcbc3107fc Binary files /dev/null and b/docs/zh/05-get-started/official-account.webp differ diff --git a/docs/zh/05-get-started/tdengine.webp b/docs/zh/05-get-started/tdengine.webp deleted file mode 100644 index e1bc0a75ac..0000000000 Binary files a/docs/zh/05-get-started/tdengine.webp and /dev/null differ diff --git a/docs/zh/05-get-started/xiaot.webp b/docs/zh/05-get-started/xiaot.webp new file mode 100644 index 0000000000..91b3b3ef68 Binary files /dev/null and b/docs/zh/05-get-started/xiaot.webp differ diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index f72735d903..4108212c55 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -4,7 +4,9 @@ title: TDengine 发布历史及下载链接 description: TDengine 发布历史、Release Notes 及下载链接 --- -各版本 TDengine 安装包下载链接如下: +TDengine 3.x 各版本安装包下载链接如下: + +TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-downloads) import Release from "/components/ReleaseV3"; diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md index ac4a884f8b..28e0d4bca9 100644 --- a/docs/zh/28-releases/02-tools.md +++ b/docs/zh/28-releases/02-tools.md @@ -4,7 +4,9 @@ title: taosTools 发布历史及下载链接 description: taosTools 的发布历史、Release Notes 和下载链接 --- -各版本 taosTools 安装包下载链接如下: +taosTools 各版本安装包下载链接如下: + +其他历史版本安装包请访问[这里](https://www.taosdata.com/all-downloads) import Release from "/components/ReleaseV3"; diff --git a/include/common/systable.h b/include/common/systable.h index 8b29525db3..57f85f16bc 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -46,6 +46,7 @@ extern "C" { #define TSDB_INS_TABLE_SUBSCRIPTIONS "ins_subscriptions" #define TSDB_INS_TABLE_TOPICS "ins_topics" #define TSDB_INS_TABLE_STREAMS "ins_streams" +#define TSDB_INS_TABLE_STREAM_TASKS "ins_stream_tasks" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "perf_smas" diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 99c5c72e2f..3281bca96a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -119,6 +119,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_QUERIES, TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_APPS, + TSDB_MGMT_TABLE_STREAM_TASKS, TSDB_MGMT_TABLE_MAX, } EShowType; diff --git a/include/common/ttypes.h b/include/common/ttypes.h index bfd6a75c3a..761ffd0f1c 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -346,8 +346,8 @@ bool isValidDataType(int32_t type); void assignVal(char *val, const char *src, int32_t len, int32_t type); void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type); -void *getDataMin(int32_t type); -void *getDataMax(int32_t type); +void *getDataMin(int32_t type, void* value); +void *getDataMax(int32_t type, void* value); #ifdef __cplusplus } diff --git a/include/libs/function/taosudf.h b/include/libs/function/taosudf.h index 3fe3bb7d3b..5f57e203b9 100644 --- a/include/libs/function/taosudf.h +++ b/include/libs/function/taosudf.h @@ -104,7 +104,7 @@ typedef int32_t (*TUdfDestroyFunc)(); } while (0) #define udfColDataSetNull_var(pColumn, row) ((pColumn->colData.varLenCol.varOffsets)[row] = -1) -typedef uint16_t VarDataLenT; // maxVarDataLen: 32767 +typedef uint16_t VarDataLenT; // maxVarDataLen: 65535 #define VARSTR_HEADER_SIZE sizeof(VarDataLenT) #define varDataLen(v) ((VarDataLenT *)(v))[0] #define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index a1dad1806d..d0971b013f 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -74,9 +74,8 @@ typedef struct SColumnNode { char tableName[TSDB_TABLE_NAME_LEN]; char tableAlias[TSDB_TABLE_NAME_LEN]; char colName[TSDB_COL_NAME_LEN]; - // SNode* pProjectRef; - int16_t dataBlockId; - int16_t slotId; + int16_t dataBlockId; + int16_t slotId; } SColumnNode; typedef struct SColumnRefNode { diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 6ddd906700..2be0561ce7 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -94,6 +94,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes); +void qWorkerStopAllTasks(void *qWorkerMgmt); + void qWorkerDestroy(void **qWorkerMgmt); int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a03dc7d9f9..636decc60b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -385,6 +385,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x072D) #define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x072E) #define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x072F) +#define TSDB_CODE_QRY_QWORKER_QUIT TAOS_DEF_ERROR_CODE(0, 0x0730) // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) diff --git a/include/util/types.h b/include/util/types.h index 8dd0947e9c..b49670220b 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -78,7 +78,7 @@ static FORCE_INLINE double taos_align_get_double(const char *pBuf) { { (*(double *)(x)) = (*(double *)(y)); } // #endif -typedef uint16_t VarDataLenT; // maxVarDataLen: 32767 +typedef uint16_t VarDataLenT; // maxVarDataLen: 65535 #define VARSTR_HEADER_SIZE sizeof(VarDataLenT) #define varDataLen(v) ((VarDataLenT *)(v))[0] diff --git a/packaging/release.bat b/packaging/release.bat index 4ab7297f03..4c82c5ead5 100644 --- a/packaging/release.bat +++ b/packaging/release.bat @@ -39,7 +39,7 @@ if not exist %work_dir%\debug\ver-%2-x86 ( md %work_dir%\debug\ver-%2-x86 ) cd %work_dir%\debug\ver-%2-x64 -rem #call vcvarsall.bat x64 +call vcvarsall.bat x64 cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DWEBSOCKET=true -DBUILD_HTTP=false -DBUILD_TEST=false -DVERNUMBER=%2 -DCPUTYPE=x64 cmake --build . rd /s /Q C:\TDengine diff --git a/packaging/tools/tdengine.iss b/packaging/tools/tdengine.iss index 981bee91b8..1c0c105179 100644 --- a/packaging/tools/tdengine.iss +++ b/packaging/tools/tdengine.iss @@ -61,6 +61,16 @@ Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExc Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs +[run] +Filename: {sys}\sc.exe; Parameters: "create taosd start= DEMAND binPath= ""C:\\TDengine\\taosd.exe --win_service""" ; Flags: runhidden +Filename: {sys}\sc.exe; Parameters: "create taosadapter start= DEMAND binPath= ""C:\\TDengine\\taosadapter.exe --win_service""" ; Flags: runhidden + +[UninstallRun] +RunOnceId: "stoptaosd"; Filename: {sys}\sc.exe; Parameters: "stop taosd" ; Flags: runhidden +RunOnceId: "stoptaosadapter"; Filename: {sys}\sc.exe; Parameters: "stop taosadapter" ; Flags: runhidden +RunOnceId: "deltaosd"; Filename: {sys}\sc.exe; Parameters: "delete taosd" ; Flags: runhidden +RunOnceId: "deltaosadapter"; Filename: {sys}\sc.exe; Parameters: "delete taosadapter" ; Flags: runhidden + [Registry] Root: HKLM; Subkey: "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"; \ ValueType: expandsz; ValueName: "Path"; ValueData: "{olddata};C:\TDengine"; \ diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index efa7d095c5..0aa88382fe 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -677,6 +677,7 @@ static void destoryCatalogReq(SCatalogReq *pCatalogReq) { taosArrayDestroy(pCatalogReq->pIndex); taosArrayDestroy(pCatalogReq->pUser); taosArrayDestroy(pCatalogReq->pTableIndex); + taosArrayDestroy(pCatalogReq->pTableCfg); taosMemoryFree(pCatalogReq); } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index d3d006ab35..c3a1f9f67e 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -134,7 +134,7 @@ static const SSysDbTableSchema userStbsSchema[] = { }; static const SSysDbTableSchema streamSchema[] = { - {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -145,6 +145,15 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; +static const SSysDbTableSchema streamTaskSchema[] = { + {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +}; + static const SSysDbTableSchema userTblsSchema[] = { {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -287,6 +296,7 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_TOPICS, topicSchema, tListLen(topicSchema), false}, {TSDB_INS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema), false}, {TSDB_INS_TABLE_STREAMS, streamSchema, tListLen(streamSchema), false}, + {TSDB_INS_TABLE_STREAM_TASKS, streamTaskSchema, tListLen(streamTaskSchema), false}, {TSDB_INS_TABLE_VNODES, vnodesSchema, tListLen(vnodesSchema), true}, }; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e8d5989e4d..f7b1196248 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -277,7 +277,9 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int pColumnInfoData->varmeta.allocLen = len + oldLen; } - memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); + if (pColumnInfoData->pData && pSource->pData) { // TD-20382 + memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); + } pColumnInfoData->varmeta.length = len + oldLen; } else { if (finalNumOfRows > (*capacity)) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2eb94773e9..bd8e34a395 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2591,7 +2591,10 @@ int32_t tDeserializeSUseDbBatchRsp(void *buf, int32_t bufLen, SUseDbBatchRsp *pR for (int32_t i = 0; i < numOfBatch; ++i) { SUseDbRsp usedbRsp = {0}; - if (tDeserializeSUseDbRspImp(&decoder, &usedbRsp) < 0) return -1; + if (tDeserializeSUseDbRspImp(&decoder, &usedbRsp) < 0) { + tDecoderClear(&decoder); + return -1; + } taosArrayPush(pRsp->pArray, &usedbRsp); } tEndDecode(&decoder); diff --git a/source/common/src/ttypes.c b/source/common/src/ttypes.c index a4e7a12ce4..8c5d44b8d5 100644 --- a/source/common/src/ttypes.c +++ b/source/common/src/ttypes.c @@ -61,26 +61,36 @@ tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX] = { static float floatMin = -FLT_MAX, floatMax = FLT_MAX; static double doubleMin = -DBL_MAX, doubleMax = DBL_MAX; -FORCE_INLINE void *getDataMin(int32_t type) { +FORCE_INLINE void *getDataMin(int32_t type, void* value) { switch (type) { case TSDB_DATA_TYPE_FLOAT: - return &floatMin; + *(float *)value = floatMin; + break; case TSDB_DATA_TYPE_DOUBLE: - return &doubleMin; + *(double *)value = doubleMin; + break; default: - return &tDataTypes[type].minValue; + *(int64_t *)value = tDataTypes[type].minValue; + break; } + + return value; } -FORCE_INLINE void *getDataMax(int32_t type) { +FORCE_INLINE void *getDataMax(int32_t type, void* value) { switch (type) { case TSDB_DATA_TYPE_FLOAT: - return &floatMax; + *(float *)value = floatMax; + break; case TSDB_DATA_TYPE_DOUBLE: - return &doubleMax; + *(double *)value = doubleMax; + break; default: - return &tDataTypes[type].maxValue; + *(int64_t *)value = tDataTypes[type].maxValue; + break; } + + return value; } bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type < TSDB_DATA_TYPE_MAX; } diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 2b0edfebc2..3a7c25f7f9 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -178,8 +178,10 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) { offset += sizeof(p->msgLen); *(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode); offset += sizeof(p->rspCode); - memcpy((char *)pRsp + offset, p->msg, p->msgLen); - offset += p->msgLen; + if (p->msg != NULL) { + memcpy((char *)pRsp + offset, p->msg, p->msgLen); + offset += p->msgLen; + } rpcFreeCont(p->msg); } diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index b0af98b933..20c2ebb0a4 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -106,6 +106,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_STREAMS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_APPS, len) == 0) { type = TSDB_MGMT_TABLE_APPS; + } else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) { + type = TSDB_MGMT_TABLE_STREAM_TASKS; } else { // ASSERT(0); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 594c13f957..c649180285 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -41,6 +41,8 @@ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); +static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -62,6 +64,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); return sdbSetTable(pMnode->pSdb, table); } @@ -891,7 +895,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB SName n; int32_t cols = 0; - char streamName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)streamName, false); @@ -953,3 +957,105 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SStreamObj *pStream = NULL; + + while (numOfRows < rowsCapacity) { + pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); + if (pShow->pIter == NULL) break; + + // lock + taosRLockLatch(&pStream->lock); + // count task num + int32_t sz = taosArrayGetSize(pStream->tasks); + int32_t count = 0; + for (int32_t i = 0; i < sz; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + count += taosArrayGetSize(pLevel); + } + + if (numOfRows + count > rowsCapacity) { + blockDataEnsureCapacity(pBlock, numOfRows + count); + } + // add row for each task + for (int32_t i = 0; i < sz; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + int32_t levelCnt = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < levelCnt; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // stream name + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)streamName, false); + + // task id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pTask->taskId, false); + + // node type + char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; + varDataSetLen(nodeType, 5); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pTask->nodeId > 0) { + memcpy(varDataVal(nodeType), "vnode", 5); + } else { + memcpy(varDataVal(nodeType), "snode", 5); + } + colDataAppend(pColInfo, numOfRows, nodeType, false); + + // node id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + int32_t nodeId = TMAX(pTask->nodeId, 0); + colDataAppend(pColInfo, numOfRows, (const char *)&nodeId, false); + + // level + char level[20 + VARSTR_HEADER_SIZE] = {0}; + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + memcpy(varDataVal(level), "source", 6); + varDataSetLen(level, 6); + } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + memcpy(varDataVal(level), "agg", 3); + varDataSetLen(level, 3); + } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + memcpy(varDataVal(level), "sink", 4); + varDataSetLen(level, 4); + } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + } + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&level, false); + + // status + char status[20 + VARSTR_HEADER_SIZE] = {0}; + char status2[20] = {0}; + strcpy(status, "normal"); + STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&status, false); + + numOfRows++; + } + } + + // unlock + taosRUnLockLatch(&pStream->lock); + + sdbRelease(pSdb, pStream); + } + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a62f15f978..71e0b09e02 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -944,6 +944,8 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock int32_t numOfRows = 0; SMqSubscribeObj *pSub = NULL; + mDebug("mnd show subscriptions begin"); + while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pShow->pIter, (void **)&pSub); if (pShow->pIter == NULL) break; @@ -989,6 +991,9 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); + mDebug("mnd show subscrptions: topic %s, consumer %" PRId64 "cgroup %s vgid %d", varDataVal(topic), + pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId); + // offset #if 0 // subscribe time @@ -1034,6 +1039,9 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, NULL, true); + mDebug("mnd show subscrptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup), + pVgEp->vgId); + // offset #if 0 // subscribe time @@ -1053,6 +1061,8 @@ static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock sdbRelease(pSdb, pSub); } + mDebug("mnd end show subscriptions"); + pShow->numOfRows += numOfRows; return numOfRows; } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ba9d68ee7f..c6e2842c32 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -108,8 +108,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) { #define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)}) #define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)}) #define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)}) -void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); -int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); +void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); +// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); int32_t tsdbRowCmprFn(const void *p1, const void *p2); // SRowIter void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); @@ -333,6 +333,8 @@ struct SVersionRange { typedef struct SMemSkipListNode SMemSkipListNode; struct SMemSkipListNode { int8_t level; + int64_t version; + STSRow *pTSRow; SMemSkipListNode *forwards[0]; }; typedef struct SMemSkipList { @@ -772,14 +774,6 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { #define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) -#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) - -static FORCE_INLINE int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) { - int32_t n = tGetI64(p, &pRow->version); - pRow->pTSRow = (STSRow *)(p + n); - n += pRow->pTSRow->len; - return n; -} static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { if (pIter == NULL) return NULL; @@ -798,8 +792,9 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { } } - tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row); pIter->pRow = &pIter->row; + pIter->pRow->version = pIter->pNode->version; + pIter->pRow->pTSRow = pIter->pNode->pTSRow; return pIter->pRow; } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 29af2bda67..83f375c986 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -77,6 +77,7 @@ void vnodeBufPoolReset(SVBufPool* pPool); // vnodeQuery.c int32_t vnodeQueryOpen(SVnode* pVnode); +void vnodeQueryPreClose(SVnode *pVnode); void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 654afe1b6a..ddf2949607 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -18,10 +18,10 @@ #define MEM_MIN_HASH 1024 #define SL_MAX_LEVEL 5 -#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) +// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2 +#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4)) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) -#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) #define SL_MOVE_BACKWARD 0x1 #define SL_MOVE_FROM_POS 0x2 @@ -263,30 +263,27 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa } bool tsdbTbDataIterNext(STbDataIter *pIter) { - SMemSkipListNode *pHead = pIter->pTbData->sl.pHead; - SMemSkipListNode *pTail = pIter->pTbData->sl.pTail; - pIter->pRow = NULL; if (pIter->backward) { - ASSERT(pIter->pNode != pTail); + ASSERT(pIter->pNode != pIter->pTbData->sl.pTail); - if (pIter->pNode == pHead) { + if (pIter->pNode == pIter->pTbData->sl.pHead) { return false; } pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0); - if (pIter->pNode == pHead) { + if (pIter->pNode == pIter->pTbData->sl.pHead) { return false; } } else { - ASSERT(pIter->pNode != pHead); + ASSERT(pIter->pNode != pIter->pTbData->sl.pHead); - if (pIter->pNode == pTail) { + if (pIter->pNode == pIter->pTbData->sl.pTail) { return false; } pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0); - if (pIter->pNode == pTail) { + if (pIter->pNode == pIter->pTbData->sl.pTail) { return false; } } @@ -394,7 +391,7 @@ _err: static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) { SMemSkipListNode *px; SMemSkipListNode *pn; - TSDBKEY *pTKey; + TSDBKEY tKey = {0}; int32_t backward = flags & SL_MOVE_BACKWARD; int32_t fromPos = flags & SL_MOVE_FROM_POS; @@ -413,9 +410,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { pn = SL_NODE_BACKWARD(px, iLevel); while (pn != pTbData->sl.pHead) { - pTKey = (TSDBKEY *)SL_NODE_DATA(pn); + tKey.version = pn->version; + tKey.ts = pn->pTSRow->ts; - int32_t c = tsdbKeyCmprFn(pTKey, pKey); + int32_t c = tsdbKeyCmprFn(&tKey, pKey); if (c <= 0) { break; } else { @@ -442,7 +440,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { pn = SL_NODE_FORWARD(px, iLevel); while (pn != pTbData->sl.pTail) { - int32_t c = tsdbKeyCmprFn(SL_NODE_DATA(pn), pKey); + tKey.version = pn->version; + tKey.ts = pn->pTSRow->ts; + + int32_t c = tsdbKeyCmprFn(&tKey, pKey); if (c >= 0) { break; } else { @@ -467,8 +468,8 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { return level; } -static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow, - int8_t forward) { +static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version, + STSRow *pRow, int8_t forward) { int32_t code = 0; int8_t level; SMemSkipListNode *pNode; @@ -477,13 +478,19 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN // node level = tsdbMemSkipListRandLevel(&pTbData->sl); ASSERT(pPool != NULL); - pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow)); + pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level)); if (pNode == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } pNode->level = level; - tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow); + pNode->version = version; + pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len); + if (NULL == pNode->pTSRow) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + memcpy(pNode->pTSRow, pRow, pRow->len); for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) { SMemSkipListNode *pn = pos[iLevel]; @@ -549,7 +556,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i key.ts = row.pTSRow->ts; nRow++; tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); - code = tbDataDoPut(pMemTable, pTbData, pos, &row, 0); + code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 0); if (code) { goto _err; } @@ -570,7 +577,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) { tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); } - code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1); + code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 1); if (code) { goto _err; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c157faecb1..af368d33e0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3219,7 +3219,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger, SVersionRange* pVerRange, int32_t step) { - while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) { + while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) { if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) { rowIndex += step; continue; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 755a551e20..0aa2c6ab83 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -565,15 +565,15 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * } } -int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) { - int32_t n = 0; +// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) { +// int32_t n = 0; - n += tPutI64(p, pRow->version); - if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len); - n += pRow->pTSRow->len; +// n += tPutI64(p, pRow->version); +// if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len); +// n += pRow->pTSRow->len; - return n; -} +// return n; +// } int32_t tsdbRowCmprFn(const void *p1, const void *p2) { return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2)); @@ -1084,7 +1084,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch cv.flag = CV_FLAG_VALUE; if (IS_VAR_DATA_TYPE(pTColumn->type)) { - void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); + void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); cv.value.nData = varDataLen(pData); cv.value.pData = varDataVal(pData); } else { @@ -1106,7 +1106,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch cv.flag = CV_FLAG_VALUE; if (IS_VAR_DATA_TYPE(pTColumn->type)) { - void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); + void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); cv.value.nData = varDataLen(pData); cv.value.pData = varDataVal(pData); } else { @@ -1151,7 +1151,7 @@ static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSch ASSERT(pTColumn->type == pColData->type); SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type}; - TDRowValT vt = TD_VTYPE_NONE; // default is NONE + TDRowValT vt = TD_VTYPE_NONE; // default is NONE SKvRowIdx *pKvIdx = NULL; while (kvIter < nKvCols) { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 8c2036b97b..77d375bc45 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -242,7 +242,10 @@ _err: return NULL; } -void vnodePreClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); } +void vnodePreClose(SVnode *pVnode) { + vnodeQueryPreClose(pVnode); + vnodeSyncPreClose(pVnode); +} void vnodeClose(SVnode *pVnode) { if (pVnode) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index efedc12d80..15769ef4c9 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -28,6 +28,8 @@ int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb); } +void vnodeQueryPreClose(SVnode *pVnode) { qWorkerStopAllTasks((void *)pVnode->pQuery); } + void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) { @@ -386,8 +388,10 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { offset += sizeof(p->msgLen); *(int32_t *)((char *)pRsp + offset) = htonl(p->rspCode); offset += sizeof(p->rspCode); - memcpy((char *)pRsp + offset, p->msg, p->msgLen); - offset += p->msgLen; + if (p->msg) { + memcpy((char *)pRsp + offset, p->msg, p->msgLen); + offset += p->msgLen; + } taosMemoryFreeClear(p->msg); } diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 1c2d7e1f66..64fec145ef 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -528,7 +528,7 @@ static int32_t setCreateTBResultIntoDataBlock(SSDataBlock* pBlock, SDbCfgInfo* p appendTableOptions(buf2, &len, pDbCfg, pCfg); } - varDataLen(buf2) = len; + varDataLen(buf2) = (len > 65535) ? 65535 : len; colDataAppend(pCol2, 0, buf2, false); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e1db1f4729..b0da277cfb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -153,6 +153,16 @@ typedef struct { SSchemaWrapper* qsw; } SSchemaInfo; +typedef struct { + int32_t operatorType; + int64_t refId; +} SExchangeOpStopInfo; + +typedef struct { + SRWLatch lock; + SArray* pStopInfo; +} STaskStopInfo; + typedef struct SExecTaskInfo { STaskIdInfo id; uint32_t status; @@ -171,6 +181,7 @@ typedef struct SExecTaskInfo { SSubplan* pSubplan; struct SOperatorInfo* pRoot; SLocalFetch localFetch; + STaskStopInfo stopInfo; } SExecTaskInfo; enum { @@ -1050,6 +1061,7 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order); +int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 049de727df..c57a1b38eb 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -65,6 +65,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn while (1) { tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); @@ -286,6 +289,9 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES); + SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self}; + qAppendTaskStopInfo(pTaskInfo, &stopInfo); + pInfo->seqLoadData = false; pInfo->pTransporter = pTransporter; @@ -543,6 +549,10 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { pOperator->cost.openCost = taosGetTimestampUs() - startTs; tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + tsem_post(&pExchangeInfo->ready); return TSDB_CODE_SUCCESS; } @@ -562,6 +572,9 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); tsem_wait(&pExchangeInfo->ready); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current); SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1aa9a3c613..428af19a6c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -659,6 +659,54 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { return pTaskInfo->code; } +int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) { + taosWLockLatch(&pTaskInfo->stopInfo.lock); + taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo); + taosWUnLockLatch(&pTaskInfo->stopInfo.lock); + + return TSDB_CODE_SUCCESS; +} + +int32_t stopInfoComp(void const* lp, void const* rp) { + SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp; + SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp; + + if (key->refId < pInfo->refId) { + return -1; + } else if (key->refId > pInfo->refId) { + return 1; + } + + return 0; +} + +void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) { + taosWLockLatch(&pTaskInfo->stopInfo.lock); + int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ); + if (idx >= 0) { + taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx); + } + taosWUnLockLatch(&pTaskInfo->stopInfo.lock); + + return; +} + +void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { + taosWLockLatch(&pTaskInfo->stopInfo.lock); + + int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo); + for (int32_t i = 0; i < num; ++i) { + SExchangeOpStopInfo *pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i); + SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId); + if (pExchangeInfo) { + tsem_post(&pExchangeInfo->ready); + taosReleaseRef(exchangeObjRefPool, pStop->refId); + } + } + + taosWUnLockLatch(&pTaskInfo->stopInfo.lock); +} + int32_t qAsyncKillTask(qTaskInfo_t qinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; @@ -667,7 +715,11 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { } qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); + setTaskKilled(pTaskInfo); + + qStopTaskOperators(pTaskInfo); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7fd288cd57..baa5cb6479 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2597,6 +2597,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; pTaskInfo->pTableInfoList = tableListCreate(); + pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); @@ -3210,6 +3211,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { nodesDestroyNode((SNode*)pTaskInfo->pSubplan); } + taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index ddd948a6dd..c41376b2dc 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -513,6 +513,22 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { // taosMemoryFreeClear(pFillInfo->pTags[i].tagVal); // } + // free pFillCol + if (pFillInfo->pFillCol) { + for (int32_t i = 0; i < pFillInfo->numOfCols; i++) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + if (!pCol->notFillCol) { + if (pCol->fillVal.nType == TSDB_DATA_TYPE_VARBINARY || pCol->fillVal.nType == TSDB_DATA_TYPE_VARCHAR || + pCol->fillVal.nType == TSDB_DATA_TYPE_NCHAR || pCol->fillVal.nType == TSDB_DATA_TYPE_JSON) { + if (pCol->fillVal.pz) { + taosMemoryFree(pCol->fillVal.pz); + pCol->fillVal.pz = NULL; + } + } + } + } + } + taosMemoryFreeClear(pFillInfo->pTags); taosMemoryFreeClear(pFillInfo->pFillCol); taosMemoryFreeClear(pFillInfo); diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index a5168d24ba..9fbe94200c 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -189,7 +189,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons } while (pNode) { - if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { + if ((keyLen == pNode->keyLen) && (*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { break; } pNode = pNode->next; @@ -213,10 +213,12 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) { SHNode *pNode = pHashObj->hashList[index]; while (pNode) { - if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { + const char* p = GET_SHASH_NODE_KEY(pNode, pNode->dataLen); + ASSERT(keyLen > 0); + + if (pNode->keyLen == keyLen && ((*(pHashObj->equalFp))(p, key, keyLen) == 0)) { break; } - pNode = pNode->next; } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d3f03e8e9c..8249a8f6b1 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -475,7 +475,7 @@ static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t le // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1506,7 +1506,7 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1519,7 +1519,7 @@ static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); uint8_t dbPrec = pFunc->node.resType.precision; - //if (1 != numOfParams && 3 != numOfParams && 4 != numOfParams) { + // if (1 != numOfParams && 3 != numOfParams && 4 != numOfParams) { if (1 != numOfParams) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } @@ -1835,7 +1835,7 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1894,7 +1894,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2474,7 +2474,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "first", .type = FUNCTION_TYPE_FIRST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = firstDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2512,7 +2512,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last", .type = FUNCTION_TYPE_LAST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 079e553b07..d02d30dc60 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -269,7 +269,7 @@ typedef struct SModeInfo { STuplePos nullTuplePos; bool nullTupleSaved; - char pItems[]; + char pItems[]; } SModeInfo; typedef struct SDerivInfo { @@ -911,7 +911,7 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { case TSDB_DATA_TYPE_FLOAT: { float* plist = (float*)pCol->pData; -// float val = 0; + // float val = 0; for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { continue; @@ -919,9 +919,9 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pAvgRes->count += 1; - pAvgRes->sum.dsum += plist[i]; + pAvgRes->sum.dsum += plist[i]; } -// pAvgRes->sum.dsum = val; + // pAvgRes->sum.dsum = val; break; } @@ -1187,7 +1187,11 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { } if (!pBuf->assign) { - pBuf->v = *(int64_t*)tval; + if (type == TSDB_DATA_TYPE_FLOAT) { + *(float*)&pBuf->v = GET_DOUBLE_VAL(tval); + } else { + pBuf->v = *(int64_t*)tval; + } if (pCtx->subsidiaries.num > 0) { index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); if (index >= 0) { @@ -3007,7 +3011,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) { } } #else - int64_t* pts = (int64_t*) pInput->pPTS->pData; + int64_t* pts = (int64_t*)pInput->pPTS->pData; for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { continue; @@ -3111,61 +3115,61 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { } } #else - if (!pInputCol->hasNull) { - numOfElems = 1; + if (!pInputCol->hasNull) { + numOfElems = 1; - int32_t round = pInput->numOfRows >> 2; - int32_t reminder = pInput->numOfRows & 0x03; + int32_t round = pInput->numOfRows >> 2; + int32_t reminder = pInput->numOfRows & 0x03; - int32_t tick = 0; - for (int32_t i = pInput->startRowIndex; tick < round; i += 4, tick += 1) { - int64_t cts = pts[i]; - int32_t chosen = i; + int32_t tick = 0; + for (int32_t i = pInput->startRowIndex; tick < round; i += 4, tick += 1) { + int64_t cts = pts[i]; + int32_t chosen = i; - if (cts < pts[i + 1]) { - cts = pts[i + 1]; - chosen = i + 1; - } - - if (cts < pts[i + 2]) { - cts = pts[i + 2]; - chosen = i + 2; - } - - if (cts < pts[i + 3]) { - cts = pts[i + 3]; - chosen = i + 3; - } - - if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { - char* data = colDataGetData(pInputCol, chosen); - doSaveCurrentVal(pCtx, i, cts, type, data); - pResInfo->numOfRes = 1; - } + if (cts < pts[i + 1]) { + cts = pts[i + 1]; + chosen = i + 1; } - for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { - if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { - char* data = colDataGetData(pInputCol, i); - doSaveCurrentVal(pCtx, i, pts[i], type, data); - pResInfo->numOfRes = 1; - } + if (cts < pts[i + 2]) { + cts = pts[i + 2]; + chosen = i + 2; } - } else { - for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { - if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { - continue; - } - numOfElems++; + if (cts < pts[i + 3]) { + cts = pts[i + 3]; + chosen = i + 3; + } - if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { - char* data = colDataGetData(pInputCol, i); - doSaveCurrentVal(pCtx, i, pts[i], type, data); - pResInfo->numOfRes = 1; - } + if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { + char* data = colDataGetData(pInputCol, chosen); + doSaveCurrentVal(pCtx, i, cts, type, data); + pResInfo->numOfRes = 1; } } + + for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { + char* data = colDataGetData(pInputCol, i); + doSaveCurrentVal(pCtx, i, pts[i], type, data); + pResInfo->numOfRes = 1; + } + } + } else { + for (int32_t i = pInput->startRowIndex; i < pInput->startRowIndex + pInput->numOfRows; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) { + continue; + } + + numOfElems++; + + if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) { + char* data = colDataGetData(pInputCol, i); + doSaveCurrentVal(pCtx, i, pts[i], type, data); + pResInfo->numOfRes = 1; + } + } + } #endif #endif @@ -3175,7 +3179,7 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo); } -// SET_VAL(pResInfo, numOfElems, 1); + // SET_VAL(pResInfo, numOfElems, 1); return TSDB_CODE_SUCCESS; } @@ -5276,7 +5280,6 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pInfo->numSampled; } - bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { #if 0 SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); @@ -6073,11 +6076,19 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t row = 0; char st[256] = {0}; - double totalRawSize = pData->totalRows * pData->rowSize; + double averageSize = 0; + if (pData->numOfBlocks != 0) { + averageSize = ((double)pData->totalSize) / pData->numOfBlocks; + } + uint64_t totalRawSize = pData->totalRows * pData->rowSize; + double compRatio = 0; + if (totalRawSize != 0) { + compRatio = pData->totalSize * 100 / (double)totalRawSize; + } + int32_t len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]", - pData->numOfBlocks, pData->totalSize / 1024.0, ((double)pData->totalSize) / pData->numOfBlocks, - pData->totalSize * 100 / totalRawSize, '%'); + pData->numOfBlocks, pData->totalSize / 1024.0, averageSize, compRatio, '%'); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 39d17153d0..cc1bae6a3c 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -622,7 +622,7 @@ void nodesDestroyNode(SNode* pNode) { } switch (nodeType(pNode)) { - case QUERY_NODE_COLUMN: // pProjectRef is weak reference, no need to release + case QUERY_NODE_COLUMN: destroyExprNode((SExprNode*)pNode); break; case QUERY_NODE_VALUE: { diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 7a38f48cb2..411adc680c 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1364,6 +1364,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, break; } } + taosMemoryFree(pLine); if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0e5cb14208..8de130bbb5 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -744,7 +744,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) { return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId); } else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) { SFunctionNode* pFunc = (SFunctionNode*)pExpr; - if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType) { + if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_FIRST == pFunc->funcType || + FUNCTION_TYPE_LAST == pFunc->funcType) { return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0)); } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) { return true; @@ -787,7 +788,6 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) { SColumnNode* pCol = *pColRef; - // pCol->pProjectRef = (SNode*)pExpr; if (NULL == pExpr->pAssociation) { pExpr->pAssociation = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); } @@ -2932,8 +2932,8 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* return TSDB_CODE_SUCCESS; } - if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || - TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) { + if (!pCxt->createStream && (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || + TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER))) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } @@ -5268,9 +5268,7 @@ static int32_t checkTopicQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { } static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) { - SName name; - tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); - tNameGetFullDbName(&name, pReq->name); + snprintf(pReq->name, sizeof(pReq->name), "%d.%s", pCxt->pParseCxt->acctId, pStmt->topicName); pReq->igExists = pStmt->ignoreExists; pReq->withMeta = pStmt->withMeta; @@ -5280,7 +5278,7 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS } int32_t code = TSDB_CODE_SUCCESS; - + SName name; if ('\0' != pStmt->subSTbName[0]) { pReq->subType = TOPIC_SUB_TYPE__TABLE; toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name); @@ -5548,6 +5546,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { crossTableWithUdaf(pSelect)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } + if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "SUBTABLE expression must be of VARCHAR type"); + } return TSDB_CODE_SUCCESS; } @@ -6060,11 +6062,11 @@ static int32_t extractShowCreateTableResultSchema(int32_t* numOfCols, SSchema** } (*pSchema)[0].type = TSDB_DATA_TYPE_BINARY; - (*pSchema)[0].bytes = TSDB_TABLE_NAME_LEN; + (*pSchema)[0].bytes = SHOW_CREATE_TB_RESULT_FIELD1_LEN; strcpy((*pSchema)[0].name, "Table"); (*pSchema)[1].type = TSDB_DATA_TYPE_BINARY; - (*pSchema)[1].bytes = TSDB_MAX_BINARY_LEN; + (*pSchema)[1].bytes = SHOW_CREATE_TB_RESULT_FIELD2_LEN; strcpy((*pSchema)[1].name, "Create Table"); return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 36c6817595..a9eca64675 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -246,7 +246,7 @@ typedef struct SQWorkerMgmt { #define QW_ERR_RET(c) \ do { \ - int32_t _code = c; \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ return _code; \ @@ -254,7 +254,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_RET(c) \ do { \ - int32_t _code = c; \ + int32_t _code = (c); \ if (_code != TSDB_CODE_SUCCESS) { \ terrno = _code; \ } \ @@ -262,7 +262,7 @@ typedef struct SQWorkerMgmt { } while (0) #define QW_ERR_JRET(c) \ do { \ - code = c; \ + code = (c); \ if (code != TSDB_CODE_SUCCESS) { \ terrno = code; \ goto _return; \ diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 1871316260..4c4a41df82 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -91,11 +91,53 @@ _return: void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) { QW_LOCK(QW_READ, &sch->tasksLock); - QW_DLOG("the %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, - taosHashGetSize(sch->tasksHash)); + int32_t taskNum = taosHashGetSize(sch->tasksHash); + QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum); + + uint64_t qId, tId; + int32_t eId; + SQWTaskStatus *pTask = NULL; + void *pIter = taosHashIterate(sch->tasksHash, NULL); + while (pIter) { + pTask = (SQWTaskStatus *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status); + + pIter = taosHashIterate(sch->tasksHash, pIter); + } + QW_UNLOCK(QW_READ, &sch->tasksLock); } +void qwDbgDumpTasksInfo(SQWorker *mgmt) { + QW_DUMP("***Total remain ctx num %d", taosHashGetSize(mgmt->ctxHash)); + + int32_t i = 0; + SQWTaskCtx *ctx = NULL; + uint64_t qId, tId; + int32_t eId; + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + ctx = (SQWTaskCtx *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, fetchType:%d, " + "execId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " + "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d", + ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType, + ctx->fetchType, ctx->execId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, + ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName, + ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], + ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } + +} + void qwDbgDumpMgmtInfo(SQWorker *mgmt) { if (!gQWDebug.dumpEnable) { return; @@ -120,7 +162,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { QW_UNLOCK(QW_READ, &mgmt->schLock); - QW_DUMP("total remain ctx num %d", taosHashGetSize(mgmt->ctxHash)); + qwDbgDumpTasksInfo(mgmt); } int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) { diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index e9ded9b269..e13791ae89 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -281,9 +281,11 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { int32_t qwKillTaskHandle(SQWTaskCtx *ctx) { int32_t code = 0; + // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { + qDebug("start to kill task"); code = qAsyncKillTask(taskHandle); atomic_store_ptr(&ctx->taskHandle, taskHandle); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a7cd3db824..e45beb7e13 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -683,6 +683,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { bool queryStop = false; do { + ctx = NULL; + QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL)); QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); @@ -1162,6 +1164,41 @@ _return: QW_RET(code); } +void qWorkerStopAllTasks(void *qWorkerMgmt) { + SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + + QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash)); + + uint64_t qId, tId; + int32_t eId; + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + + QW_LOCK(QW_WRITE, &ctx->lock); + + QW_TASK_DLOG_E("start to force stop task"); + + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG_E("task already dropping"); + QW_UNLOCK(QW_WRITE, &ctx->lock); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + continue; + } + + if (QW_QUERY_RUNNING(ctx)) { + qwKillTaskHandle(ctx); + } + + QW_UNLOCK(QW_WRITE, &ctx->lock); + + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } +} + void qWorkerDestroy(void **qWorkerMgmt) { if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) { return; diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index df9a818fee..be085e6cbd 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -512,15 +512,17 @@ int32_t filterReuseRangeCtx(SFilterRangeCtx *ctx, int32_t type, int32_t options) } int32_t filterConvertRange(SFilterRangeCtx *cur, SFilterRange *ra, bool *notNull) { + int64_t tmp = 0; + if (!FILTER_GET_FLAG(ra->sflag, RANGE_FLG_NULL)) { - int32_t sr = cur->pCompareFunc(&ra->s, getDataMin(cur->type)); + int32_t sr = cur->pCompareFunc(&ra->s, getDataMin(cur->type, &tmp)); if (sr == 0) { FILTER_SET_FLAG(ra->sflag, RANGE_FLG_NULL); } } if (!FILTER_GET_FLAG(ra->eflag, RANGE_FLG_NULL)) { - int32_t er = cur->pCompareFunc(&ra->e, getDataMax(cur->type)); + int32_t er = cur->pCompareFunc(&ra->e, getDataMax(cur->type, &tmp)); if (er == 0) { FILTER_SET_FLAG(ra->eflag, RANGE_FLG_NULL); } @@ -696,14 +698,15 @@ int32_t filterAddRangeImpl(void *h, SFilterRange *ra, int32_t optr) { int32_t filterAddRange(void *h, SFilterRange *ra, int32_t optr) { SFilterRangeCtx *ctx = (SFilterRangeCtx *)h; - + int64_t tmp = 0; + if (FILTER_GET_FLAG(ra->sflag, RANGE_FLG_NULL)) { - SIMPLE_COPY_VALUES(&ra->s, getDataMin(ctx->type)); + SIMPLE_COPY_VALUES(&ra->s, getDataMin(ctx->type, &tmp)); // FILTER_CLR_FLAG(ra->sflag, RA_NULL); } if (FILTER_GET_FLAG(ra->eflag, RANGE_FLG_NULL)) { - SIMPLE_COPY_VALUES(&ra->e, getDataMax(ctx->type)); + SIMPLE_COPY_VALUES(&ra->e, getDataMax(ctx->type, &tmp)); // FILTER_CLR_FLAG(ra->eflag, RA_NULL); } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 22fb66d92f..a6a2a6c301 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -286,9 +286,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa if (pJob->execRes.res) { SSubmitRsp *sum = pJob->execRes.res; sum->affectedRows += rsp->affectedRows; - sum->nBlocks += rsp->nBlocks; - sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); - memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); + sum->nBlocks += rsp->nBlocks; + if (rsp->nBlocks > 0 && rsp->pBlocks) { + sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); + memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); + } taosMemoryFree(rsp->pBlocks); taosMemoryFree(rsp); } else { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 88c39c1157..aefe30116b 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -114,12 +114,12 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int return NULL; } - char statePath[300]; + char statePath[1024]; if (!specPath) { sprintf(statePath, "%s/%d", path, pTask->taskId); } else { - memset(statePath, 0, 300); - tstrncpy(statePath, path, 300); + memset(statePath, 0, 1024); + tstrncpy(statePath, path, 1024); } if (tdbOpen(statePath, szPage, pages, &pState->db, 0) < 0) { goto _err; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 94c4de3c5e..875204c0a9 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -37,43 +37,6 @@ // /\ UNCHANGED <> // -// only start once -static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, - SyncAppendEntriesReply* pMsg) { - if (beginIndex > endIndex) { - sNError(ths, "snapshot param error, start:%" PRId64 ", end:%" PRId64, beginIndex, endIndex); - return; - } - - // get sender - SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); - ASSERT(pSender != NULL); - - if (snapshotSenderIsStart(pSender)) { - sSError(pSender, "snapshot sender already start"); - return; - } - - SSnapshot snapshot = { - .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; - void* pReader = NULL; - SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex}; - int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); - ASSERT(code == 0); - -#if 0 - if (pMsg->privateTerm < pSender->privateTerm) { - ASSERT(pReader != NULL); - snapshotSenderStart(pSender, readerParam, snapshot, pReader); - - } else { - if (pReader != NULL) { - ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); - } - } -#endif -} - int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncAppendEntriesReply* pMsg = pRpcMsg->pCont; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7ed90fb140..e802f60f30 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -385,7 +385,7 @@ bool syncIsReadyForRead(int64_t rid) { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry( - pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); if (code == 0 && pEntry != NULL) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { ready = true; @@ -1806,7 +1806,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), pNode->pingTimerMS, pNode); if (code != 0) { - sNError(pNode, "failed to build ping msg"); + sError("failed to build ping msg"); rpcFreeCont(rpcMsg.pCont); return; } @@ -1814,7 +1814,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { sNTrace(pNode, "enqueue ping msg"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr()); + sError("failed to sync enqueue ping msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); return; } @@ -1832,11 +1832,14 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { SElectTimer* pElectTimer = param; SSyncNode* pNode = pElectTimer->pSyncNode; + if (pNode == NULL) return; + if (pNode->syncEqMsg == NULL) return; + SRpcMsg rpcMsg = {0}; int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); if (code != 0) { - sNError(pNode, "failed to build elect msg"); + sError("failed to build elect msg"); taosMemoryFree(pElectTimer); return; } @@ -1846,7 +1849,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr()); + sError("failed to sync enqueue elect msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); taosMemoryFree(pElectTimer); return; @@ -1876,14 +1879,14 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { pNode->heartbeatTimerMS, pNode); if (code != 0) { - sNError(pNode, "failed to build heartbeat msg"); + sError("failed to build heartbeat msg"); return; } sNTrace(pNode, "enqueue heartbeat timer"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr()); + sError("failed to enqueue heartbeat msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); return; } @@ -1968,7 +1971,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) { sNTrace(pNode, "propose msg, type:noop"); code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr()); + sError("failed to propose noop msg while enqueue since %s", terrstr()); } return code; @@ -2002,7 +2005,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { - sNError(ths, "append noop error"); + sError("append noop error"); return -1; } } @@ -2106,7 +2109,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeFollowerCommit(ths, pMsg->fcIndex); } else { - sNError(ths, "error local cmd"); + sError("error local cmd"); } return 0; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 7e4b18ab88..b00ba3918c 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -197,7 +197,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr syncMeta.isWeek = pEntry->isWeak; syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; + + int64_t tsWriteBegin = taosGetTimestampNs(); index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); + int64_t tsWriteEnd = taosGetTimestampNs(); + int64_t tsElapsed = tsWriteEnd - tsWriteBegin; + if (index < 0) { int32_t err = terrno; const char* errStr = tstrerror(err); @@ -210,8 +215,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr } pEntry->index = index; - sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index, - TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); + sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, + TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); return 0; } @@ -234,9 +239,13 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR return -1; } + int64_t ts1 = taosGetTimestampNs(); taosThreadMutexLock(&(pData->mutex)); + int64_t ts2 = taosGetTimestampNs(); code = walReadVer(pWalHandle, index); + int64_t ts3 = taosGetTimestampNs(); + // code = walReadVerCached(pWalHandle, index); if (code != 0) { int32_t err = terrno; @@ -280,6 +289,18 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR */ taosThreadMutexUnlock(&(pData->mutex)); + int64_t ts4 = taosGetTimestampNs(); + + int64_t tsElapsed = ts4 - ts1; + int64_t tsElapsedLock = ts2 - ts1; + int64_t tsElapsedRead = ts3 - ts2; + int64_t tsElapsedBuild = ts4 - ts3; + + sNTrace(pData->pSyncNode, + "read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64 + ", elapsed-build:%" PRId64, + index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild); + return code; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index f09bac2139..6a7a2c18c1 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -72,7 +72,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh SRpcMsg rpcMsg = {0}; SyncAppendEntries* pMsg = NULL; - SSyncRaftEntry* pEntry; + SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); if (code == 0) { @@ -99,6 +99,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh } } + syncEntryDestory(pEntry); + // prepare msg ASSERT(pMsg != NULL); pMsg->srcId = pSyncNode->myRaftId; @@ -140,9 +142,10 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncAppendEntries* pMsg = pRpcMsg->pCont; - - syncLogSendAppendEntries(pSyncNode, pMsg, ""); - syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); + if (pMsg == NULL) { + sError("vgId:%d, sync-append-entries msg is NULL", pSyncNode->vgId); + return 0; + } SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId); if (pState == NULL) { @@ -150,8 +153,19 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI return 0; } + // save index, otherwise pMsg will be free by rpc + SyncIndex saveLastSendIndex = pState->lastSendIndex; + bool update = false; if (pMsg->dataLen > 0) { - pState->lastSendIndex = pMsg->prevLogIndex + 1; + saveLastSendIndex = pMsg->prevLogIndex + 1; + update = true; + } + + syncLogSendAppendEntries(pSyncNode, pMsg, ""); + syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); + + if (update) { + pState->lastSendIndex = saveLastSendIndex; pState->lastSendTime = taosGetTimestampMs(); } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index a575de7e56..b9a271ab9d 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -194,6 +194,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + int64_t currentTerm = pNode->pRaftStore->currentTerm; // save error code, otherwise it will be overwritten int32_t errCode = terrno; @@ -235,8 +236,8 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", - pNode->vgId, syncStr(pNode->state), eventLog, pNode->pRaftStore->currentTerm, pNode->commitIndex, - logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, + pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, + logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); @@ -374,9 +375,9 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); + "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 + ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); } void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { @@ -511,8 +512,8 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + ", lsend-index:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1), pMsg->commitIndex, pMsg->dataLen, s); } diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index bdbd6c2f3d..e7254c8bc6 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -169,7 +169,7 @@ int tdbPCacheAlter(SPCache *pCache, int32_t nPage) { SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { SPage *pPage; - i32 nRef; + i32 nRef = 0; tdbPCacheLock(pCache); @@ -178,14 +178,17 @@ SPage *tdbPCacheFetch(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) { nRef = tdbRefPage(pPage); } - ASSERT(pPage); - tdbPCacheUnlock(pCache); // printf("thread %" PRId64 " fetch page %d pgno %d pPage %p nRef %d\n", taosGetSelfPthreadId(), pPage->id, // TDB_PAGE_PGNO(pPage), pPage, nRef); - tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef); + if (pPage) { + tdbDebug("pcache/fetch page %p/%d/%d/%d", pPage, TDB_PAGE_PGNO(pPage), pPage->id, nRef); + } else { + tdbDebug("pcache/fetch page %p", pPage); + } + return pPage; } @@ -266,7 +269,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn) } // 4. Try a create new page - if (!pPage) { + if (!pPage && pTxn->xMalloc != NULL) { ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg); if (ret < 0 || pPage == NULL) { // TODO diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index c3ae1dc739..abbad06515 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -27,6 +27,116 @@ typedef struct { TDB_STATIC_ASSERT(sizeof(SFileHdr) == 128, "Size of file header is not correct"); +struct hashset_st { + size_t nbits; + size_t mask; + size_t capacity; + size_t *items; + size_t nitems; + double load_factor; +}; + +static const unsigned int prime = 39; +static const unsigned int prime2 = 5009; + +hashset_t hashset_create(void) { + hashset_t set = tdbOsCalloc(1, sizeof(struct hashset_st)); + if (!set) { + return NULL; + } + + set->nbits = 4; + set->capacity = (size_t)(1 << set->nbits); + set->items = tdbOsCalloc(set->capacity, sizeof(size_t)); + if (!set->items) { + tdbOsFree(set); + return NULL; + } + set->mask = set->capacity - 1; + set->nitems = 0; + + set->load_factor = 0.75; + + return set; +} + +void hashset_destroy(hashset_t set) { + if (set) { + tdbOsFree(set->items); + tdbOsFree(set); + } +} + +int hashset_add_member(hashset_t set, void *item) { + size_t value = (size_t) item; + size_t h; + + if (value == 0) { + return -1; + } + + for (h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) { + if (set->items[h] == value) { + return 0; + } + } + + set->items[h] = value; + ++set->nitems; + return 1; +} + +int hashset_add(hashset_t set, void *item) { + int ret = hashset_add_member(set, item); + + size_t old_capacity = set->capacity; + if (set->nitems >= (double)old_capacity * set->load_factor) { + size_t *old_items = set->items; + ++set->nbits; + set->capacity = (size_t)(1 << set->nbits); + set->mask = set->capacity - 1; + + set->items = tdbOsCalloc(set->capacity, sizeof(size_t)); + if (!set->items) { + return -1; + } + + set->nitems = 0; + for (size_t i = 0; i < old_capacity; ++i) { + hashset_add_member(set, (void*)old_items[i]); + } + tdbOsFree(old_items); + } + + return ret; +} + +int hashset_remove(hashset_t set, void *item) { + size_t value = (size_t) item; + + for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) { + if (set->items[h] == value) { + set->items[h] = 0; + --set->nitems; + return 1; + } + } + + return 0; +} + +int hashset_contains(hashset_t set, void *item) { + size_t value = (size_t) item; + + for (size_t h = set->mask & (prime * value); set->items[h] != 0; h = set->mask & (h + prime2)) { + if (set->items[h] == value) { + return 1; + } + } + + return 0; +} + #define TDB_PAGE_INITIALIZED(pPage) ((pPage)->pPager != NULL) static int tdbPagerInitPage(SPager *pPager, SPage *pPage, int (*initPage)(SPage *, void *, int), void *arg, @@ -209,12 +319,16 @@ int tdbPagerWrite(SPager *pPager, SPage *pPage) { tRBTreePut(&pPager->rbt, (SRBTreeNode *)pPage); // Write page to journal if neccessary - if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize) { + if (TDB_PAGE_PGNO(pPage) <= pPager->dbOrigSize && (pPager->jPageSet == NULL || !hashset_contains(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))))) { ret = tdbPagerWritePageToJournal(pPager, pPage); if (ret < 0) { tdbError("failed to write page to journal since %s", tstrerror(terrno)); return -1; } + + if (pPager->jPageSet) { + hashset_add(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))); + } } return 0; @@ -233,6 +347,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) { return -1; } + pPager->jPageSet = hashset_create(); // TODO: write the size of the file pPager->inTran = 1; @@ -275,6 +390,9 @@ int tdbPagerCommit(SPager *pPager, TXN *pTxn) { pPage->isDirty = 0; tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + if (pPager->jPageSet) { + hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))); + } tdbPCacheRelease(pPager->pCache, pPage, pTxn); } @@ -304,6 +422,9 @@ int tdbPagerPostCommit(SPager *pPager, TXN *pTxn) { return -1; } + if (pPager->jPageSet) { + hashset_destroy(pPager->jPageSet); + } pPager->inTran = 0; return 0; @@ -375,36 +496,61 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { return -1; } - tdb_fd_t jfd = tdbOsOpen(pPager->jFileName, TDB_O_RDWR, 0755); - if (jfd == NULL) { - return -1; - } + tdb_fd_t jfd = pPager->jfd; ret = tdbGetFileSize(jfd, pPager->pageSize, &journalSize); if (ret < 0) { return -1; } - // 1, read pages from jounal file - // 2, write original pages to buffered ones + u8 *pageBuf = tdbOsCalloc(1, pPager->pageSize); + if (pageBuf == NULL) { + return -1; + } - /* TODO: reset the buffered pages instead of releasing them - // loop to reset the dirty pages from file - for (pgIdx = 0, pPage = pPager->pDirty; pPage != NULL && pgIndex < journalSize; pPage = pPage->pDirtyNext, ++pgIdx) { + for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) { // read pgno & the page from journal SPgno pgno; int ret = tdbOsRead(jfd, &pgno, sizeof(pgno)); if (ret < 0) { + tdbOsFree(pageBuf); return -1; } ret = tdbOsRead(jfd, pageBuf, pPager->pageSize); if (ret < 0) { + tdbOsFree(pageBuf); + return -1; + } + + i64 offset = pPager->pageSize * (pgno - 1); + if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) { + tdbError("failed to lseek fd due to %s. file:%s, offset:%" PRId64, strerror(errno), pPager->dbFileName, offset); + terrno = TAOS_SYSTEM_ERROR(errno); + tdbOsFree(pageBuf); + return -1; + } + + ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize); + if (ret < 0) { + tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName, + pPager->pageSize); + terrno = TAOS_SYSTEM_ERROR(errno); + tdbOsFree(pageBuf); return -1; } } - */ + + if (tdbOsFSync(pPager->fd) < 0) { + tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName); + terrno = TAOS_SYSTEM_ERROR(errno); + tdbOsFree(pageBuf); + return -1; + } + + tdbOsFree(pageBuf); + // 3, release the dirty pages SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1); SRBTreeNode *pNode = NULL; @@ -413,17 +559,55 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) { pPage->isDirty = 0; + tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); + hashset_remove(pPager->jPageSet, (void*)((long)TDB_PAGE_PGNO(pPage))); + tdbPCacheRelease(pPager->pCache, pPage, pTxn); + } + + tRBTreeCreate(&pPager->rbt, pageCmpFn); + + // 4, remove the journal file + tdbOsClose(pPager->jfd); + (void)tdbOsRemove(pPager->jFileName); + hashset_destroy(pPager->jPageSet); + + pPager->inTran = 0; + + return 0; +} + +int tdbPagerFlushPage(SPager *pPager, TXN *pTxn) { + SPage *pPage; + int ret; + + // loop to write the dirty pages to file + SRBTreeIter iter = tRBTreeIterCreate(&pPager->rbt, 1); + SRBTreeNode *pNode = NULL; + while ((pNode = tRBTreeIterNext(&iter)) != NULL) { + pPage = (SPage *)pNode; + ret = tdbPagerWritePageToDB(pPager, pPage); + if (ret < 0) { + tdbError("failed to write page to db since %s", tstrerror(terrno)); + return -1; + } + } + + tdbTrace("tdbttl commit:%p, %d/%d", pPager, pPager->dbOrigSize, pPager->dbFileSize); + pPager->dbOrigSize = pPager->dbFileSize; + + // release the page + iter = tRBTreeIterCreate(&pPager->rbt, 1); + while ((pNode = tRBTreeIterNext(&iter)) != NULL) { + pPage = (SPage *)pNode; + + pPage->isDirty = 0; + tRBTreeDrop(&pPager->rbt, (SRBTreeNode *)pPage); tdbPCacheRelease(pPager->pCache, pPage, pTxn); } tRBTreeCreate(&pPager->rbt, pageCmpFn); - // 4, remove the journal file - tdbOsClose(pPager->jfd); - (void)tdbOsRemove(pPager->jFileName); - pPager->inTran = 0; - return 0; } @@ -453,10 +637,8 @@ int tdbPagerFetchPage(SPager *pPager, SPgno *ppgno, SPage **ppPage, int (*initPa // fetch a page container memcpy(&pgid, pPager->fid, TDB_FILE_ID_LEN); pgid.pgno = pgno; - pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn); - if (pPage == NULL) { - ASSERT(0); - return -1; + while ((pPage = tdbPCacheFetch(pPager->pCache, &pgid, pTxn)) == NULL) { + tdbPagerFlushPage(pPager, pTxn); } tdbTrace("tdbttl fetch pager:%p", pPage->pPager); diff --git a/source/libs/tdb/src/inc/tdbInt.h b/source/libs/tdb/src/inc/tdbInt.h index e5ece98b28..731b1927e7 100644 --- a/source/libs/tdb/src/inc/tdbInt.h +++ b/source/libs/tdb/src/inc/tdbInt.h @@ -384,6 +384,8 @@ struct STDB { #endif }; +typedef struct hashset_st *hashset_t; + struct SPager { char *dbFileName; char *jFileName; @@ -394,7 +396,8 @@ struct SPager { SPCache *pCache; SPgno dbFileSize; SPgno dbOrigSize; - SPage *pDirty; + //SPage *pDirty; + hashset_t jPageSet; SRBTree rbt; u8 inTran; SPager *pNext; // used by TDB diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 1c6e1a2e17..6cac4b6093 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -124,8 +124,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { goto _err; } - char* candidate = NULL; - char* haystack = buf; + char* candidate = NULL; + char* haystack = buf; int64_t pos = 0; SWalCkHead* logContent = NULL; @@ -414,8 +414,10 @@ int walCheckAndRepairMeta(SWal* pWal) { } ASSERT(pFileInfo->fileSize == 0); // remove the empty wal log, and its idx + wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); taosRemoveFile(fnameStr); walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); + wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); taosRemoveFile(fnameStr); // remove its meta entry taosArrayRemove(pWal->fileInfoSet, fileIdx); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index b683ba1926..216dd5fcb1 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -319,7 +319,7 @@ int32_t walEndSnapshot(SWal *pWal) { SWalFileInfo *pInfo = taosArraySearch(pWal->fileInfoSet, &tmp, compareWalFileInfo, TD_LE); if (pInfo) { if (ver >= pInfo->lastVer) { - pInfo--; + pInfo++; } if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) { wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer); @@ -407,6 +407,7 @@ int32_t walRollImpl(SWal *pWal) { } walBuildLogName(pWal, newFileFirstVer, fnameStr); pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + wDebug("vgId:%d, wal create new file for write:%s", pWal->cfg.vgId, fnameStr); if (pLogFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); code = -1; diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index c1fee37610..76a312cd91 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -271,8 +271,11 @@ static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType cfgStypeStr(stype), value, terrstr()); return -1; } - pItem->stype = stype; + + // apply new timezone + osSetTimezone(value); + return 0; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index a1162d2e94..b406432616 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -377,6 +377,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in i TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_GROUP_ERROR, "Json not support in group/partition by") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QWORKER_QUIT, "Vnode/Qnode is quitting") // grant TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 59c7eeede6..fac4210d88 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -9,7 +9,7 @@ ,,y,script,./test.sh -f tsim/user/basic.sim ,,y,script,./test.sh -f tsim/user/password.sim ,,y,script,./test.sh -f tsim/user/privilege_db.sim -,,,script,./test.sh -f tsim/user/privilege_sysinfo.sim +,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim ,,,script,./test.sh -f tsim/db/alter_option.sim ,,,script,./test.sh -f tsim/db/alter_replica_13.sim ,,,script,./test.sh -f tsim/db/alter_replica_31.sim @@ -23,16 +23,16 @@ ,,,script,./test.sh -f tsim/db/create_all_options.sim ,,y,script,./test.sh -f tsim/db/delete_reuse1.sim ,,y,script,./test.sh -f tsim/db/delete_reuse2.sim -,,,script,./test.sh -f tsim/db/delete_reusevnode.sim +,,y,script,./test.sh -f tsim/db/delete_reusevnode.sim ,,y,script,./test.sh -f tsim/db/delete_reusevnode2.sim -,,,script,./test.sh -f tsim/db/delete_writing1.sim -,,,script,./test.sh -f tsim/db/delete_writing2.sim +,,y,script,./test.sh -f tsim/db/delete_writing1.sim +,,y,script,./test.sh -f tsim/db/delete_writing2.sim ,,y,script,./test.sh -f tsim/db/error1.sim ,,y,script,./test.sh -f tsim/db/keep.sim ,,y,script,./test.sh -f tsim/db/len.sim ,,y,script,./test.sh -f tsim/db/repeat.sim ,,y,script,./test.sh -f tsim/db/show_create_db.sim -,,,script,./test.sh -f tsim/db/show_create_table.sim +,,y,script,./test.sh -f tsim/db/show_create_table.sim ,,y,script,./test.sh -f tsim/db/tables.sim ,,y,script,./test.sh -f tsim/db/taosdlog.sim ,,,script,./test.sh -f tsim/dnode/balance_replica1.sim @@ -68,7 +68,7 @@ ,,y,script,./test.sh -f tsim/insert/basic0.sim ,,y,script,./test.sh -f tsim/insert/basic1.sim ,,y,script,./test.sh -f tsim/insert/basic2.sim -,,,script,./test.sh -f tsim/insert/commit-merge0.sim +,,y,script,./test.sh -f tsim/insert/commit-merge0.sim ,,y,script,./test.sh -f tsim/insert/insert_drop.sim ,,y,script,./test.sh -f tsim/insert/insert_select.sim ,,y,script,./test.sh -f tsim/insert/null.sim @@ -80,8 +80,8 @@ ,,y,script,./test.sh -f tsim/insert/query_multi_file.sim ,,y,script,./test.sh -f tsim/insert/tcp.sim ,,y,script,./test.sh -f tsim/insert/update0.sim -,,,script,./test.sh -f tsim/insert/update1_sort_merge.sim -,,,script,./test.sh -f tsim/parser/alter__for_community_version.sim +,,y,script,./test.sh -f tsim/insert/update1_sort_merge.sim +,,y,script,./test.sh -f tsim/parser/alter__for_community_version.sim ,,y,script,./test.sh -f tsim/parser/alter_column.sim ,,y,script,./test.sh -f tsim/parser/alter_stable.sim ,,y,script,./test.sh -f tsim/parser/alter.sim @@ -92,24 +92,24 @@ ,,y,script,./test.sh -f tsim/parser/binary_escapeCharacter.sim ,,,script,./test.sh -f tsim/parser/col_arithmetic_operation.sim ,,y,script,./test.sh -f tsim/parser/columnValue_bigint.sim -,,,script,./test.sh -f tsim/parser/columnValue_bool.sim +,,y,script,./test.sh -f tsim/parser/columnValue_bool.sim ,,y,script,./test.sh -f tsim/parser/columnValue_double.sim ,,y,script,./test.sh -f tsim/parser/columnValue_float.sim ,,y,script,./test.sh -f tsim/parser/columnValue_int.sim ,,y,script,./test.sh -f tsim/parser/columnValue_smallint.sim ,,y,script,./test.sh -f tsim/parser/columnValue_tinyint.sim ,,,script,./test.sh -f tsim/parser/columnValue_unsign.sim -,,,script,./test.sh -f tsim/parser/commit.sim -,,,script,./test.sh -f tsim/parser/condition.sim +,,y,script,./test.sh -f tsim/parser/commit.sim +,,y,script,./test.sh -f tsim/parser/condition.sim ,,y,script,./test.sh -f tsim/parser/constCol.sim -,,,script,./test.sh -f tsim/parser/create_db.sim -,,,script,./test.sh -f tsim/parser/create_mt.sim +,,y,script,./test.sh -f tsim/parser/create_db.sim +,,y,script,./test.sh -f tsim/parser/create_mt.sim ,,y,script,./test.sh -f tsim/parser/create_tb_with_tag_name.sim ,,y,script,./test.sh -f tsim/parser/create_tb.sim -,,,script,./test.sh -f tsim/parser/dbtbnameValidate.sim +,,y,script,./test.sh -f tsim/parser/dbtbnameValidate.sim ,,y,script,./test.sh -f tsim/parser/distinct.sim ,,y,script,./test.sh -f tsim/parser/fill_us.sim -,,,script,./test.sh -f tsim/parser/fill.sim +,,y,script,./test.sh -f tsim/parser/fill.sim ,,y,script,./test.sh -f tsim/parser/first_last.sim ,,y,script,./test.sh -f tsim/parser/fourArithmetic-basic.sim ,,,script,./test.sh -f tsim/parser/function.sim @@ -120,9 +120,9 @@ ,,y,script,./test.sh -f tsim/parser/import_commit1.sim ,,y,script,./test.sh -f tsim/parser/import_commit2.sim ,,y,script,./test.sh -f tsim/parser/import_commit3.sim -,,,script,./test.sh -f tsim/parser/import_file.sim +,,y,script,./test.sh -f tsim/parser/import_file.sim ,,y,script,./test.sh -f tsim/parser/import.sim -,,,script,./test.sh -f tsim/parser/insert_multiTbl.sim +,,y,script,./test.sh -f tsim/parser/insert_multiTbl.sim ,,y,script,./test.sh -f tsim/parser/insert_tb.sim ,,,script,./test.sh -f tsim/parser/join_manyblocks.sim ,,y,script,./test.sh -f tsim/parser/join_multitables.sim @@ -131,14 +131,14 @@ ,,y,script,./test.sh -f tsim/parser/last_cache.sim ,,y,script,./test.sh -f tsim/parser/last_groupby.sim ,,y,script,./test.sh -f tsim/parser/lastrow.sim -,,,script,./test.sh -f tsim/parser/lastrow2.sim +,,y,script,./test.sh -f tsim/parser/lastrow2.sim ,,y,script,./test.sh -f tsim/parser/like.sim ,,,script,./test.sh -f tsim/parser/limit.sim ,,,script,./test.sh -f tsim/parser/limit1.sim ,,y,script,./test.sh -f tsim/parser/mixed_blocks.sim -,,,script,./test.sh -f tsim/parser/nchar.sim +,,y,script,./test.sh -f tsim/parser/nchar.sim ,,,script,./test.sh -f tsim/parser/nestquery.sim -,,,script,./test.sh -f tsim/parser/null_char.sim +,,y,script,./test.sh -f tsim/parser/null_char.sim ,,y,script,./test.sh -f tsim/parser/precision_ns.sim ,,,script,./test.sh -f tsim/parser/projection_limit_offset.sim ,,y,script,./test.sh -f tsim/parser/regex.sim @@ -146,8 +146,8 @@ ,,y,script,./test.sh -f tsim/parser/select_distinct_tag.sim ,,y,script,./test.sh -f tsim/parser/select_from_cache_disk.sim ,,y,script,./test.sh -f tsim/parser/select_with_tags.sim -,,,script,./test.sh -f tsim/parser/selectResNum.sim -,,,script,./test.sh -f tsim/parser/set_tag_vals.sim +,,y,script,./test.sh -f tsim/parser/selectResNum.sim +,,y,script,./test.sh -f tsim/parser/set_tag_vals.sim ,,y,script,./test.sh -f tsim/parser/single_row_in_tb.sim ,,,script,./test.sh -f tsim/parser/sliding.sim ,,y,script,./test.sh -f tsim/parser/slimit_alter_tags.sim @@ -178,24 +178,24 @@ ,,,script,./test.sh -f tsim/mnode/basic3.sim ,,,script,./test.sh -f tsim/mnode/basic4.sim ,,,script,./test.sh -f tsim/mnode/basic5.sim -,,,script,./test.sh -f tsim/show/basic.sim +,,y,script,./test.sh -f tsim/show/basic.sim ,,y,script,./test.sh -f tsim/table/autocreate.sim ,,y,script,./test.sh -f tsim/table/basic1.sim -,,,script,./test.sh -f tsim/table/basic2.sim +,,y,script,./test.sh -f tsim/table/basic2.sim ,,y,script,./test.sh -f tsim/table/basic3.sim ,,y,script,./test.sh -f tsim/table/bigint.sim ,,y,script,./test.sh -f tsim/table/binary.sim ,,y,script,./test.sh -f tsim/table/bool.sim -,,,script,./test.sh -f tsim/table/column_name.sim +,,y,script,./test.sh -f tsim/table/column_name.sim ,,y,script,./test.sh -f tsim/table/column_num.sim ,,y,script,./test.sh -f tsim/table/column_value.sim ,,y,script,./test.sh -f tsim/table/column2.sim ,,y,script,./test.sh -f tsim/table/createmulti.sim ,,y,script,./test.sh -f tsim/table/date.sim ,,y,script,./test.sh -f tsim/table/db.table.sim -,,,script,./test.sh -f tsim/table/delete_reuse1.sim -,,,script,./test.sh -f tsim/table/delete_reuse2.sim -,,,script,./test.sh -f tsim/table/delete_writing.sim +,,y,script,./test.sh -f tsim/table/delete_reuse1.sim +,,y,script,./test.sh -f tsim/table/delete_reuse2.sim +,,y,script,./test.sh -f tsim/table/delete_writing.sim ,,y,script,./test.sh -f tsim/table/describe.sim ,,y,script,./test.sh -f tsim/table/double.sim ,,y,script,./test.sh -f tsim/table/float.sim @@ -203,11 +203,12 @@ ,,y,script,./test.sh -f tsim/table/int.sim ,,y,script,./test.sh -f tsim/table/limit.sim ,,y,script,./test.sh -f tsim/table/smallint.sim -,,,script,./test.sh -f tsim/table/table_len.sim +,,y,script,./test.sh -f tsim/table/table_len.sim ,,y,script,./test.sh -f tsim/table/table.sim ,,y,script,./test.sh -f tsim/table/tinyint.sim ,,y,script,./test.sh -f tsim/table/vgroup.sim ,,,script,./test.sh -f tsim/stream/basic0.sim -g +,,,script,./test.sh -f tsim/stream/basic1.sim ,,y,script,./test.sh -f tsim/stream/basic2.sim ,,,script,./test.sh -f tsim/stream/drop_stream.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim @@ -271,7 +272,7 @@ ,,y,script,./test.sh -f tsim/stable/tag_filter.sim ,,y,script,./test.sh -f tsim/stable/tag_modify.sim ,,y,script,./test.sh -f tsim/stable/tag_rename.sim -,,,script,./test.sh -f tsim/stable/values.sim +,,y,script,./test.sh -f tsim/stable/values.sim ,,y,script,./test.sh -f tsim/stable/vnode3.sim ,,y,script,./test.sh -f tsim/stable/metrics_idx.sim ,,,script,./test.sh -f tsim/sma/drop_sma.sim @@ -321,11 +322,11 @@ ,,y,script,./test.sh -f tsim/compress/compress.sim ,,y,script,./test.sh -f tsim/compress/uncompress.sim ,,y,script,./test.sh -f tsim/compute/avg.sim -,,,script,./test.sh -f tsim/compute/block_dist.sim +,,y,script,./test.sh -f tsim/compute/block_dist.sim ,,y,script,./test.sh -f tsim/compute/bottom.sim ,,y,script,./test.sh -f tsim/compute/count.sim ,,y,script,./test.sh -f tsim/compute/diff.sim -,,,script,./test.sh -f tsim/compute/diff2.sim +,,y,script,./test.sh -f tsim/compute/diff2.sim ,,y,script,./test.sh -f tsim/compute/first.sim ,,y,script,./test.sh -f tsim/compute/interval.sim ,,y,script,./test.sh -f tsim/compute/last_row.sim @@ -358,7 +359,7 @@ ,,y,script,./test.sh -f tsim/vector/metrics_query.sim ,,y,script,./test.sh -f tsim/vector/metrics_tag.sim ,,y,script,./test.sh -f tsim/vector/metrics_time.sim -,,,script,./test.sh -f tsim/vector/multi.sim +,,y,script,./test.sh -f tsim/vector/multi.sim ,,y,script,./test.sh -f tsim/vector/single.sim ,,y,script,./test.sh -f tsim/vector/table_field.sim ,,y,script,./test.sh -f tsim/vector/table_mix.sim @@ -380,7 +381,7 @@ ,,y,script,./test.sh -f tsim/tag/column.sim ,,y,script,./test.sh -f tsim/tag/commit.sim ,,y,script,./test.sh -f tsim/tag/create.sim -,,,script,./test.sh -f tsim/tag/delete.sim +,,y,script,./test.sh -f tsim/tag/delete.sim ,,y,script,./test.sh -f tsim/tag/double.sim ,,y,script,./test.sh -f tsim/tag/filter.sim ,,y,script,./test.sh -f tsim/tag/float.sim @@ -392,7 +393,7 @@ ,,y,script,./test.sh -f tsim/tag/tinyint.sim ,,y,script,./test.sh -f tsim/tag/drop_tag.sim ,,y,script,./test.sh -f tsim/tag/tbNameIn.sim -,,,script,./test.sh -f tmp/monitor.sim +,,y,script,./test.sh -f tmp/monitor.sim #system test diff --git a/tests/script/sh/checkAsan.sh b/tests/script/sh/checkAsan.sh index 4d1b0a3d6b..184dc9a88f 100755 --- a/tests/script/sh/checkAsan.sh +++ b/tests/script/sh/checkAsan.sh @@ -20,7 +20,7 @@ LOG_DIR=$TAOS_DIR/sim/tsim/asan error_num=`cat ${LOG_DIR}/*.asan | grep "ERROR" | wc -l` memory_leak=`cat ${LOG_DIR}/*.asan | grep "Direct leak" | wc -l` indirect_leak=`cat ${LOG_DIR}/*.asan | grep "Indirect leak" | wc -l` -runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | wc -l` +runtime_error=`cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | wc -l` echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m" echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m" diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 79be05e9fc..0f3c383b9e 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -87,6 +87,7 @@ ELSE () MESSAGE("CURRENT SOURCE DIR ${CMAKE_CURRENT_SOURCE_DIR}") IF (TD_WINDOWS) + MESSAGE("Building taosAdapter on Windows") INCLUDE(ExternalProject) ExternalProject_Add(taosadapter PREFIX "taosadapter" @@ -104,14 +105,18 @@ ELSE () COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}" INSTALL_COMMAND - COMMAND cmake -E time upx taosadapter ||: - COMMAND cmake -E copy taosadapter.exe ${CMAKE_BINARY_DIR}/build/bin + COMMAND cmake -E echo "Comparessing taosadapter.exe" + COMMAND cmake -E time upx taosadapter.exe + COMMAND cmake -E echo "Copy taosadapter.exe" + COMMAND cmake -E copy taosadapter.exe ${CMAKE_BINARY_DIR}/build/bin/taosadapter.exe COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E echo "Copy taosadapter.toml" COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/ - COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E echo "Copy taosadapter-debug.exe" COMMAND cmake -E copy taosadapter-debug.exe ${CMAKE_BINARY_DIR}/build/bin ) ELSE (TD_WINDOWS) + MESSAGE("Building taosAdapter on non-Windows") INCLUDE(ExternalProject) ExternalProject_Add(taosadapter PREFIX "taosadapter" @@ -126,11 +131,15 @@ ELSE () COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}" COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}" INSTALL_COMMAND + COMMAND cmake -E echo "Comparessing taosadapter.exe" COMMAND upx taosadapter || : + COMMAND cmake -E echo "Copy taosadapter" COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E echo "Copy taosadapter.toml" COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/ COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E echo "Copy taosadapter-debug" COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin ) ENDIF (TD_WINDOWS)