diff --git a/cmake/cmake.platform b/cmake/cmake.platform index 3e239d2e0c..eb8b63b4c2 100644 --- a/cmake/cmake.platform +++ b/cmake/cmake.platform @@ -102,6 +102,12 @@ IF ("${CPUTYPE}" STREQUAL "") SET(TD_ARM_64 TRUE) ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_64") + ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "loongarch64") + MESSAGE(STATUS "The current platform is loongarch64") + SET(PLATFORM_ARCH_STR "loongarch64") + SET(TD_LOONGARCH_64 TRUE) + ADD_DEFINITIONS("-D_TD_LOONGARCH_") + ADD_DEFINITIONS("-D_TD_LOONGARCH_64") ENDIF () ELSE () # if generate ARM version: @@ -118,6 +124,12 @@ ELSE () ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_64") SET(TD_ARM_64 TRUE) + ELSEIF (${CPUTYPE} MATCHES "loongarch64") + SET(PLATFORM_ARCH_STR "loongarch64") + MESSAGE(STATUS "input cpuType: loongarch64") + SET(TD_LOONGARCH_64 TRUE) + ADD_DEFINITIONS("-D_TD_LOONGARCH_") + ADD_DEFINITIONS("-D_TD_LOONGARCH_64") ELSEIF (${CPUTYPE} MATCHES "mips64") SET(PLATFORM_ARCH_STR "mips") MESSAGE(STATUS "input cpuType: mips64") diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index d18d85171d..82a7052125 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG a921bd4 + GIT_TAG 23e2b73 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/docs/en/14-reference/09-support-platform/index.md b/docs/en/14-reference/09-support-platform/index.md index fe26860765..061294f016 100644 --- a/docs/en/14-reference/09-support-platform/index.md +++ b/docs/en/14-reference/09-support-platform/index.md @@ -14,7 +14,7 @@ Note: ● means officially tested and verified, ○ means unofficially tested an ## List of supported platforms for TDengine clients and connectors -TDengine's connector can support a wide range of platforms, including X64/X86/ARM64/ARM32/MIPS/Alpha hardware platforms and Linux/Win64/Win32/macOS development environments. +TDengine's connector can support a wide range of platforms, including X64/X86/ARM64/ARM32/MIPS/Alpha/LoongArch64 hardware platforms and Linux/Win64/Win32/macOS development environments. The comparison matrix is as follows. diff --git a/docs/zh/05-get-started/index.md b/docs/zh/05-get-started/index.md index 20f8235d87..dec4d800bc 100644 --- a/docs/zh/05-get-started/index.md +++ b/docs/zh/05-get-started/index.md @@ -3,6 +3,8 @@ title: 立即开始 description: '快速设置 TDengine 环境并体验其高效写入和查询' --- +import xiaot from './tdengine.webp' + TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)。 本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。 @@ -12,4 +14,10 @@ import DocCardList from '@theme/DocCardList'; import {useCurrentSidebarCategory} from '@docusaurus/theme-common'; -``` \ No newline at end of file +``` + +### 开发者技术交流群 + +微信扫描下面二维码,加“小 T”为好友,即可加入“物联网大数据技术前沿群”,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。 + +小 T 的二维码 diff --git a/docs/zh/05-get-started/tdengine.webp b/docs/zh/05-get-started/tdengine.webp new file mode 100644 index 0000000000..e1bc0a75ac Binary files /dev/null and b/docs/zh/05-get-started/tdengine.webp differ diff --git a/docs/zh/08-connector/10-cpp.mdx b/docs/zh/08-connector/10-cpp.mdx index cc7991da74..8a4f4946a7 100644 --- a/docs/zh/08-connector/10-cpp.mdx +++ b/docs/zh/08-connector/10-cpp.mdx @@ -115,6 +115,7 @@ TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤) 订阅和消费 ```c + {{#include examples/c/tmq.c}} ``` diff --git a/docs/zh/14-reference/09-support-platform/index.md b/docs/zh/14-reference/09-support-platform/index.md index 7292ca4814..500eeeb14c 100644 --- a/docs/zh/14-reference/09-support-platform/index.md +++ b/docs/zh/14-reference/09-support-platform/index.md @@ -16,7 +16,7 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表" ## TDengine 客户端和连接器支持的平台列表 -目前 TDengine 的连接器可支持的平台广泛,目前包括:X64/X86/ARM64/ARM32/MIPS/Alpha 等硬件平台,以及 Linux/Win64/Win32/macOS 等开发环境。 +目前 TDengine 的连接器可支持的平台广泛,目前包括:X64/X86/ARM64/ARM32/MIPS/LoongArch64 等硬件平台,以及 Linux/Win64/Win32/macOS 等开发环境。 对照矩阵如下: diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md index 31093ce557..f72735d903 100644 --- a/docs/zh/28-releases/01-tdengine.md +++ b/docs/zh/28-releases/01-tdengine.md @@ -1,9 +1,11 @@ --- sidebar_label: TDengine 发布历史 -title: TDengine 发布历史 +title: TDengine 发布历史及下载链接 description: TDengine 发布历史、Release Notes 及下载链接 --- +各版本 TDengine 安装包下载链接如下: + import Release from "/components/ReleaseV3"; ## 3.0.1.6 @@ -33,4 +35,3 @@ import Release from "/components/ReleaseV3"; ## 3.0.1.0 - diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md index 2623391fb9..ac4a884f8b 100644 --- a/docs/zh/28-releases/02-tools.md +++ b/docs/zh/28-releases/02-tools.md @@ -1,9 +1,11 @@ --- sidebar_label: taosTools 发布历史 -title: taosTools 发布历史 +title: taosTools 发布历史及下载链接 description: taosTools 的发布历史、Release Notes 和下载链接 --- +各版本 taosTools 安装包下载链接如下: + import Release from "/components/ReleaseV3"; ## 2.2.7 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b16b5a2d4b..99c5c72e2f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -418,13 +418,17 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { buf = taosDecodeVariantI32(buf, &pSW->nCols); buf = taosDecodeVariantI32(buf, &pSW->version); - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) { - return NULL; - } + if (pSW->nCols > 0) { + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + return NULL; + } - for (int32_t i = 0; i < pSW->nCols; i++) { - buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + for (int32_t i = 0; i < pSW->nCols; i++) { + buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + } + } else { + pSW->pSchema = NULL; } return (void*)buf; } @@ -839,7 +843,7 @@ typedef struct { int64_t dbId; int32_t vgVersion; int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT - int64_t stateTs; // ms + int64_t stateTs; // ms } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); @@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE } static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema); + taosMemoryFreeClear(pSubTopicEp->schema.pSchema); + pSubTopicEp->schema.nCols = 0; taosArrayDestroy(pSubTopicEp->vgs); } diff --git a/packaging/docker/dockerbuild.sh b/packaging/docker/dockerbuild.sh index 4c8f1413aa..b02387a3d1 100755 --- a/packaging/docker/dockerbuild.sh +++ b/packaging/docker/dockerbuild.sh @@ -5,7 +5,7 @@ set -e #set -x # dockerbuild.sh -# -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] +# -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...] # -n [version number] # -p [password for docker hub] # -V [stable | beta] @@ -57,7 +57,7 @@ do dockerLatest=$(echo $OPTARG) ;; h) - echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] " + echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...] " echo " -n [version number] " echo " -p [password for docker hub] " echo " -V [stable | beta] " @@ -136,4 +136,4 @@ if [ "$cloudBuild" != "y" ] && [ ${dockerLatest} == 'y' ] ;then docker push tdengine/tdengine-${dockername}:latest fi -rm -f ${pkgFile} \ No newline at end of file +rm -f ${pkgFile} diff --git a/packaging/docker/dockerbuildi.sh b/packaging/docker/dockerbuildi.sh index a0a954e30f..9b7497dc4a 100755 --- a/packaging/docker/dockerbuildi.sh +++ b/packaging/docker/dockerbuildi.sh @@ -5,7 +5,7 @@ set -e #set -x # dockerbuild.sh -# -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] +# -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...] # -n [version number] # -p [password for docker hub] @@ -30,7 +30,7 @@ do passWord=$(echo $OPTARG) ;; h) - echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] " + echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...] " echo " -n [version number] " echo " -p [password for docker hub] " exit 0 diff --git a/packaging/release.sh b/packaging/release.sh index f250e667fa..c07331a0df 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -6,7 +6,7 @@ set -e #set -x # release.sh -v [cluster | edge] -# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...] +# -c [aarch32 | aarch64 | x64 | x86 | mips64 | loongarch64...] # -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] # -V [stable | beta] # -l [full | lite] @@ -19,7 +19,7 @@ set -e # set parameters by default value verMode=edge # [cluster, edge, cloud] verType=stable # [stable, beta] -cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...] +cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 loongarch64...] osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] pagMode=full # [full | lite] soMode=dynamic # [static | dynamic] @@ -77,7 +77,7 @@ while getopts "hv:V:c:o:l:s:d:a:n:m:H:" arg; do ;; h) echo "Usage: $(basename $0) -v [cluster | edge] " - echo " -c [aarch32 | aarch64 | x64 | x86 | mips64 ...] " + echo " -c [aarch32 | aarch64 | x64 | x86 | mips64 | loongarch64 ...] " echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] " echo " -V [stable | beta] " echo " -l [full | lite] " @@ -216,7 +216,7 @@ else fi # check support cpu type -if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "arm64" ]] || [[ "$cpuType" == "arm32" ]] || [[ "$cpuType" == "mips64" ]]; then +if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "arm64" ]] || [[ "$cpuType" == "arm32" ]] || [[ "$cpuType" == "mips64" ]] || [[ "$cpuType" == "loongarch64" ]] ; then if [ "$verMode" == "edge" ]; then # community-version compile cmake ../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro} diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index bf34d3e2df..0f881beb66 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -61,7 +61,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); for (int32_t i = 0; i < numOfBatchs; ++i) { SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); - tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, rsp->uid); + tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, + rsp->uid); if (rsp->vgVersion < 0) { code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); @@ -293,6 +294,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { taosThreadMutexUnlock(&appInfo.mutex); tscError("cluster not exist, key:%s", key); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); tFreeClientHbBatchRsp(&pRsp); return -1; } @@ -322,6 +324,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { tFreeClientHbBatchRsp(&pRsp); taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); return code; } diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index d1aeaac587..4cd1b5416c 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -2082,7 +2082,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { SSmlLineInfo elements = {0}; - uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql); + uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql", info->id); int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 56e725fac7..698c07d9bc 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -532,6 +532,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.sql = strdup(pCreate->sql); streamObj.smaId = smaObj.uid; streamObj.watermark = pCreate->watermark; + streamObj.fillHistory = STREAM_FILL_HISTORY_ON; streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE; streamObj.triggerParam = pCreate->maxDelay; streamObj.ast = strdup(smaObj.ast); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e1ca1d2708..522036afa2 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -637,6 +637,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { if (pIter == NULL) break; if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue; + int32_t sz = taosArrayGetSize(pConsumer->assignedTopics); for (int32_t i = 0; i < sz; i++) { char *name = taosArrayGetP(pConsumer->assignedTopics, i); @@ -649,6 +650,33 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { return -1; } } + + sz = taosArrayGetSize(pConsumer->rebNewTopics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(pConsumer->rebNewTopics, i); + if (strcmp(name, pTopic->name) == 0) { + mndReleaseConsumer(pMnode, pConsumer); + mndReleaseTopic(pMnode, pTopic); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb new)", + dropReq.name, pConsumer->consumerId, pConsumer->cgroup); + return -1; + } + } + + sz = taosArrayGetSize(pConsumer->rebRemovedTopics); + for (int32_t i = 0; i < sz; i++) { + char *name = taosArrayGetP(pConsumer->rebRemovedTopics, i); + if (strcmp(name, pTopic->name) == 0) { + mndReleaseConsumer(pMnode, pConsumer); + mndReleaseTopic(pMnode, pTopic); + terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; + mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s (reb remove)", + dropReq.name, pConsumer->consumerId, pConsumer->cgroup); + return -1; + } + } + sdbRelease(pSdb, pConsumer); } @@ -675,15 +703,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { mInfo("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); -#if 0 - if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) { - ASSERT(0); - mndTransDrop(pTrans); - mndReleaseTopic(pMnode, pTopic); - return -1; - } -#endif - // TODO check if rebalancing if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) { /*ASSERT(0);*/ diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 9e2fe4aaf0..2ceae91f7c 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -70,6 +70,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo); int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); +void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta); struct SMeta { TdThreadRwlock lock; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 8f8691cfc2..29af2bda67 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -61,14 +61,14 @@ struct SVBufPoolNode { }; struct SVBufPool { - SVBufPool* next; - SVnode* pVnode; - volatile int32_t nRef; - TdThreadSpinlock lock; - int64_t size; - uint8_t* ptr; - SVBufPoolNode* pTail; - SVBufPoolNode node; + SVBufPool* next; + SVnode* pVnode; + TdThreadSpinlock* lock; + volatile int32_t nRef; + int64_t size; + uint8_t* ptr; + SVBufPoolNode* pTail; + SVBufPoolNode node; }; int32_t vnodeOpenBufPool(SVnode* pVnode); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 4aabd39800..620022c06d 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -1445,3 +1445,13 @@ int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) { _exit: return code; } + +void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta) { + SMetaStbStats stats = {0}; + + if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) { + stats.ctbNum += delta; + + metaStatsCacheUpsert(pMeta, &stats); + } +} diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 6dadce80ca..5921adfbfa 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -371,7 +371,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { // update uid index metaUpdateUidIdx(pMeta, &nStbEntry); - metaStatsCacheDrop(pMeta, nStbEntry.uid); + // metaStatsCacheDrop(pMeta, nStbEntry.uid); metaULock(pMeta); @@ -450,6 +450,10 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe #endif ++pMeta->pVnode->config.vndStats.numOfCTables; + + metaWLock(pMeta); + metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1); + metaULock(pMeta); } else { me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ttlDays = pReq->ttl; @@ -670,6 +674,8 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn); --pMeta->pVnode->config.vndStats.numOfCTables; + + metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1); } else if (e.type == TSDB_NORMAL_TABLE) { // drop schema.db (todo) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 9c377fe7f5..f75dce8231 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -582,10 +582,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = -1; } - tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid:%" PRId64 - ", version:%" PRId64 "", + tqDebug("tmq poll: consumer %" PRId64 + ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, - dataRsp.rspOffset.uid, dataRsp.rspOffset.version); + dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); tDeleteSMqDataRsp(&dataRsp); return code; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 913fa67bd6..f782411084 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -244,7 +244,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem int32_t rows = pDataBlock->info.rows; - tqDebug("tq sink, convert block %d, rows: %d", i, rows); + tqDebug("tq sink, convert block1 %d, rows: %d", i, rows); int32_t dataLen = 0; int32_t schemaLen = 0; @@ -486,7 +486,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d blkHead->uid = 0; blkHead->schemaLen = 0; - tqDebug("tq sink, convert block %d, rows: %d", i, rows); + tqDebug("tq sink, convert block2 %d, rows: %d", i, rows); int32_t dataLen = 0; void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); @@ -514,6 +514,9 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); } else { void* colData = colDataGetData(pColData, j); + if (k == 0) { + tqDebug("tq sink, row %d ts %" PRId64, j, *(int64_t*)colData); + } tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 52c7a07c49..654afe1b6a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -458,11 +458,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p } static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { - int8_t level = 1; - int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); - const uint32_t factor = 4; + int8_t level = 1; + int8_t tlevel = TMIN(pSl->maxLevel, pSl->level + 1); - while ((taosRandR(&pSl->seed) % factor) == 0 && level < tlevel) { + while ((taosRandR(&pSl->seed) & 0x3) == 0 && level < tlevel) { level++; } @@ -568,7 +567,9 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i do { key.ts = row.pTSRow->ts; nRow++; - tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); + if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) { + tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); + } code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1); if (code) { goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index a40b1667c5..755a551e20 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1015,6 +1015,191 @@ _err: return code; } +static int32_t tBlockDataAppendBlockRow(SBlockData *pBlockData, SBlockData *pBlockDataFrom, int32_t iRow) { + int32_t code = 0; + + SColVal cv = {0}; + int32_t iColDataFrom = 0; + SColData *pColDataFrom = + (iColDataFrom < pBlockDataFrom->nColData) ? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom] : NULL; + + for (int32_t iColDataTo = 0; iColDataTo < pBlockData->nColData; iColDataTo++) { + SColData *pColDataTo = &((SColData *)pBlockData->aColData->pData)[iColDataTo]; + + while (pColDataFrom && pColDataFrom->cid < pColDataTo->cid) { + iColDataFrom++; + pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData) + ? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom] + : NULL; + } + + if (pColDataFrom == NULL || pColDataFrom->cid > pColDataTo->cid) { + code = tColDataAppendValue(pColDataTo, &COL_VAL_NONE(pColDataTo->cid, pColDataTo->type)); + if (code) goto _exit; + } else { + tColDataGetValue(pColDataFrom, iRow, &cv); + + code = tColDataAppendValue(pColDataTo, &cv); + if (code) goto _exit; + + iColDataFrom++; + pColDataFrom = (iColDataFrom < pBlockDataFrom->nColData) + ? &((SColData *)pBlockDataFrom->aColData->pData)[iColDataFrom] + : NULL; + } + } + +_exit: + return code; +} + +static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSchema *pTSchema) { + int32_t code = 0; + + int32_t iTColumn = 1; + STColumn *pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + void *pBitmap = pRow->statis ? tdGetBitmapAddrTp(pRow, pTSchema->flen) : NULL; + + for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { + SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData]; + + while (pTColumn && pTColumn->colId < pColData->cid) { + iTColumn++; + pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + } + + if (pTColumn == NULL || pTColumn->colId > pColData->cid) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } else { + ASSERT(pTColumn->type == pColData->type); + + SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type}; + + if (pRow->statis) { + TDRowValT vt = TD_VTYPE_MAX; + tdGetBitmapValTypeII(pBitmap, iTColumn - 1, &vt); + + if (vt == TD_VTYPE_NORM) { + cv.flag = CV_FLAG_VALUE; + + if (IS_VAR_DATA_TYPE(pTColumn->type)) { + void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); + cv.value.nData = varDataLen(pData); + cv.value.pData = varDataVal(pData); + } else { + memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes); + } + + code = tColDataAppendValue(pColData, &cv); + if (code) goto _exit; + } else if (vt == TD_VTYPE_NONE) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } else if (vt == TD_VTYPE_NULL) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pColData->cid, pColData->type)); + if (code) goto _exit; + } else { + ASSERT(0); + } + } else { + cv.flag = CV_FLAG_VALUE; + + if (IS_VAR_DATA_TYPE(pTColumn->type)) { + void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); + cv.value.nData = varDataLen(pData); + cv.value.pData = varDataVal(pData); + } else { + memcpy(&cv.value.val, pRow->data + pTColumn->offset - sizeof(TSKEY), pTColumn->bytes); + } + + code = tColDataAppendValue(pColData, &cv); + if (code) goto _exit; + } + + iTColumn++; + pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + } + } + +_exit: + return code; +} + +static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSchema *pTSchema) { + int32_t code = 0; + + col_id_t kvIter = 0; + col_id_t nKvCols = tdRowGetNCols(pRow) - 1; + void *pColIdx = TD_ROW_COL_IDX(pRow); + void *pBitmap = tdGetBitmapAddrKv(pRow, tdRowGetNCols(pRow)); + int32_t iTColumn = 1; + STColumn *pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + + for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { + SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData]; + + while (pTColumn && pTColumn->colId < pColData->cid) { + iTColumn++; + pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + } + + if (pTColumn == NULL || pTColumn->colId > pColData->cid) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } else { + ASSERT(pTColumn->type == pColData->type); + + SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type}; + TDRowValT vt = TD_VTYPE_NONE; // default is NONE + SKvRowIdx *pKvIdx = NULL; + + while (kvIter < nKvCols) { + pKvIdx = (SKvRowIdx *)POINTER_SHIFT(pColIdx, kvIter * sizeof(SKvRowIdx)); + if (pKvIdx->colId == pTColumn->colId) { + tdGetBitmapValTypeII(pBitmap, kvIter, &vt); + ++kvIter; + break; + } else if (pKvIdx->colId > pTColumn->colId) { + vt = TD_VTYPE_NONE; + break; + } else { + ++kvIter; + } + } + + if (vt == TD_VTYPE_NORM) { + cv.flag = CV_FLAG_VALUE; + + void *pData = POINTER_SHIFT(pRow, pKvIdx->offset); + if (IS_VAR_DATA_TYPE(pTColumn->type)) { + cv.value.nData = varDataLen(pData); + cv.value.pData = varDataVal(pData); + } else { + memcpy(&cv.value.val, pData, pTColumn->bytes); + } + + code = tColDataAppendValue(pColData, &cv); + if (code) goto _exit; + } else if (vt == TD_VTYPE_NONE) { + code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + if (code) goto _exit; + } else if (vt == TD_VTYPE_NULL) { + code = tColDataAppendValue(pColData, &COL_VAL_NULL(pColData->cid, pColData->type)); + if (code) goto _exit; + } else { + ASSERT(0); + } + + iTColumn++; + pTColumn = (iTColumn < pTSchema->numOfCols) ? &pTSchema->columns[iTColumn] : NULL; + } + } + +_exit: + return code; +} + int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { int32_t code = 0; @@ -1036,27 +1221,20 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS if (code) goto _err; pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); - // OTHER - SRowIter rIter = {0}; - SColVal *pColVal; - - tRowIterInit(&rIter, pRow, pTSchema); - pColVal = tRowIterNext(&rIter); - for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { - SColData *pColData = &((SColData *)pBlockData->aColData->pData)[iColData]; - - while (pColVal && pColVal->cid < pColData->cid) { - pColVal = tRowIterNext(&rIter); - } - - if (pColVal == NULL || pColVal->cid > pColData->cid) { - code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); + SColVal cv = {0}; + if (pRow->type == 0) { + if (TD_IS_TP_ROW(pRow->pTSRow)) { + code = tBlockDataAppendTPRow(pBlockData, pRow->pTSRow, pTSchema); + if (code) goto _err; + } else if (TD_IS_KV_ROW(pRow->pTSRow)) { + code = tBlockDataAppendKVRow(pBlockData, pRow->pTSRow, pTSchema); if (code) goto _err; } else { - code = tColDataAppendValue(pColData, pColVal); - if (code) goto _err; - pColVal = tRowIterNext(&rIter); + ASSERT(0); } + } else { + code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow); + if (code) goto _err; } pBlockData->nRow++; diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 6ac2ce1c16..71e926bd35 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -27,10 +27,21 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) return -1; } - if (taosThreadSpinInit(&pPool->lock, 0) != 0) { - taosMemoryFree(pPool); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + if (VND_IS_RSMA(pVnode)) { + pPool->lock = taosMemoryMalloc(sizeof(TdThreadSpinlock)); + if (!pPool->lock) { + taosMemoryFree(pPool); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (taosThreadSpinInit(pPool->lock, 0) != 0) { + taosMemoryFree((void*)pPool->lock); + taosMemoryFree(pPool); + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + } else { + pPool->lock = NULL; } pPool->next = NULL; @@ -49,7 +60,10 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) static int vnodeBufPoolDestroy(SVBufPool *pPool) { vnodeBufPoolReset(pPool); - taosThreadSpinDestroy(&pPool->lock); + if (pPool->lock) { + taosThreadSpinDestroy(pPool->lock); + taosMemoryFree((void*)pPool->lock); + } taosMemoryFree(pPool); return 0; } @@ -114,7 +128,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { void *p = NULL; ASSERT(pPool != NULL); - taosThreadSpinLock(&pPool->lock); + if (pPool->lock) taosThreadSpinLock(pPool->lock); if (pPool->node.size >= pPool->ptr - pPool->node.data + size) { // allocate from the anchor node p = pPool->ptr; @@ -125,7 +139,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pNode = taosMemoryMalloc(sizeof(*pNode) + size); if (pNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - taosThreadSpinUnlock(&pPool->lock); + if (pPool->lock) taosThreadSpinUnlock(pPool->lock); return NULL; } @@ -138,7 +152,7 @@ void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) { pPool->size = pPool->size + sizeof(*pNode) + size; } - taosThreadSpinUnlock(&pPool->lock); + if (pPool->lock) taosThreadSpinUnlock(pPool->lock); return p; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 62146b6048..e1db1f4729 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -981,6 +981,7 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); +void destroyOperatorInfo(SOperatorInfo* pOperator); int32_t getMaximumIdleDurationSec(); /* diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 709e981a1f..7fd288cd57 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -91,9 +91,6 @@ static void destroyAggOperatorInfo(void* param); static void destroyIntervalOperatorInfo(void* param); - -static void destroyOperatorInfo(SOperatorInfo* pOperator); - void setOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; ASSERT(pOperator->pTaskInfo != NULL); @@ -2172,7 +2169,7 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { } } -static void destroyOperatorInfo(SOperatorInfo* pOperator) { +void destroyOperatorInfo(SOperatorInfo* pOperator) { if (pOperator == NULL) { return; } @@ -3426,6 +3423,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta ASSERT(code == 0); if (code == -1) { // coverity scan + pGroupResInfo->index += 1; continue; } SResultRow* pRow = (SResultRow*)pVal; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 47ee663abe..26a5f6838d 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -816,12 +816,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition goto _error; } - setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, + pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -900,7 +900,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) { // TODO check tbname validity if (pData != (void*)-1) { memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN); + int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); memcpy(pDest->info.parTbName, varDataVal(pData), len); /*pDest->info.parTbName[len + 1] = 0;*/ } else { @@ -1099,11 +1099,12 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); - setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, + pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, - destroyStreamPartitionOperatorInfo, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, NULL); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b83fac9fa6..d2e59ea8e1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -946,7 +946,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->currentGroupId = -1; pInfo->assignBlockUid = pTableScanNode->assignBlockUid; - setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, + pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); @@ -980,7 +981,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pInfo->dataReader = pReadHandle; // pInfo->prevGroupId = -1; - setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, + pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL); return pOperator; } @@ -1136,8 +1138,10 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi goto _error; } - setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL); + setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, + OP_NOT_OPENED, pInfo, pTaskInfo); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL); return pOperator; _error: @@ -1581,7 +1585,7 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) { // TODO check tbname validation if (pData != (void*)-1 && pData != NULL) { memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); - int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN); + int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); memcpy(pBlock->info.parTbName, varDataVal(pData), len); /*pBlock->info.parTbName[len + 1] = 0;*/ } else { @@ -1769,6 +1773,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { + tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); return NULL; } ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1); @@ -2351,7 +2356,8 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pInfo->vnode = pHandle->vnode; pInfo->sContext = pHandle->sContext; - setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, + pTaskInfo); pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL); return pOperator; @@ -2366,9 +2372,7 @@ _end: static void destroyStreamScanOperatorInfo(void* param) { SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { - STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; - destroyTableScanOperatorInfo(pTableScanInfo); - taosMemoryFreeClear(pStreamScan->pTableScanOp); + destroyOperatorInfo(pStreamScan->pTableScanOp); } if (pStreamScan->tqReader) { tqCloseReader(pStreamScan->tqReader); @@ -2537,7 +2541,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->partitionSup.needCalc = false; - setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, + pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; @@ -4175,7 +4180,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo->readHandle = *(SReadHandle*)readHandle; } - setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, + pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL); return pOperator; @@ -4305,7 +4311,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; - setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, + pTaskInfo); initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); @@ -4815,11 +4822,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN int32_t rowSize = pInfo->pResBlock->info.rowSize; pInfo->bufPageSize = getProperSortPageSize(rowSize); - setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, + pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, - destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo, + getTableMergeScanExplainExecInfo); pOperator->cost.openCost = 0; return pOperator; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 7c9d73ad13..ddd948a6dd 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -680,9 +680,9 @@ SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index) { void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) { for (int32_t i = start; i < end; i++) { destroyExprInfo(pFillCol[i].pExpr, 1); - taosMemoryFreeClear(pFillCol[i].pExpr); taosVariantDestroy(&pFillCol[i].fillVal); } + taosMemoryFreeClear(pFillCol[start].pExpr); taosMemoryFree(pFillCol); return NULL; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4c369e8802..5d318c8ed6 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3580,6 +3580,11 @@ static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessio tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey)); } +static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) { + *pHashKey = *pKey; + pHashKey->win.ekey = pKey->win.skey; +} + static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { if (tSimpleHashGetSize(pHashMap) == 0) { return; @@ -3588,8 +3593,8 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) { for (int32_t i = 0; i < size; i++) { SSessionKey* pWin = taosArrayGet(pWins, i); if (!pWin) continue; - SSessionKey key = *pWin; - key.win.ekey = key.win.skey; + SSessionKey key = {0}; + getSessionHashKey(pWin, &key); tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); } } @@ -3642,7 +3647,9 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) { streamStateSessionDel(pAggSup->pState, pKey); - tSimpleHashRemove(pAggSup->pResultRows, pKey, sizeof(SSessionKey)); + SSessionKey hashKey = {0}; + getSessionHashKey(pKey, &hashKey); + tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey)); return true; } @@ -3753,8 +3760,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - SSessionKey key = winInfo.sessionWin; - key.win.ekey = key.win.skey; + SSessionKey key = {0}; + getSessionHashKey(&winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } @@ -3853,23 +3860,28 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo colDataAppendNULL(pCalEdCol, pBlock->info.rows); SHashObj* pGroupIdTbNameMap = NULL; - if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || + if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || + pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { SStreamSessionAggOperatorInfo* pInfo = pOp->info; pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; } else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { SStreamStateAggOperatorInfo* pInfo = pOp->info; pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; + } else { + ASSERT(0); } char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t)); SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); if (tbname == NULL) { + /*printf("\n\n no tbname for group id %" PRId64 "%p %p\n\n", res->groupId, pOp->info, pGroupIdTbNameMap);*/ colDataAppendNULL(pTableCol, pBlock->info.rows); } else { char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); + /*printf("\n\n get tbname %s group id %" PRId64 "\n\n", tbname, res->groupId);*/ } pBlock->info.rows += 1; } @@ -3896,8 +3908,8 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup; - SSessionKey chWinKey = *pWinKey; - chWinKey.win.ekey = chWinKey.win.skey; + SSessionKey chWinKey = {0}; + getSessionHashKey(pWinKey, &chWinKey); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey); SResultRow* pResult = NULL; SResultRow* pChResult = NULL; @@ -3978,8 +3990,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) { for (int32_t i = 0; i < size; i++) { SSessionKey* pWinKey = taosArrayGet(pResWins, i); if (!pWinKey) continue; - SSessionKey winInfo = *pWinKey; - winInfo.win.ekey = winInfo.win.skey; + SSessionKey winInfo = {0}; + getSessionHashKey(pWinKey, &winInfo); tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0); } } @@ -4046,7 +4058,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock->info.parTbName[0]) { taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); - /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ + /*printf("\n\n put tbname %s group id %" PRId64 "\n\n into %p %p", pBlock->info.parTbName, pBlock->info.groupId,*/ + /*pInfo, pInfo->pGroupIdTbNameMap);*/ } if (pBlock->info.parTbName[0]) { @@ -4561,8 +4574,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { - SSessionKey key = curWin.winInfo.sessionWin; - key.win.ekey = key.win.skey; + SSessionKey key = {0}; + getSessionHashKey(&curWin.winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); } } @@ -4645,6 +4658,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); +#if 0 + char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState); + qDebug("===stream===final session%s", pBuf); + taosMemoryFree(pBuf); +#endif + doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { printDataBlock(pInfo->pDelRes, "single state delete"); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 7897623ab1..0e5cb14208 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1878,6 +1878,7 @@ static int32_t rewriteIsTrue(SNode* pSrc, SNode** pIsTrue) { static EDealRes translateCaseWhen(STranslateContext* pCxt, SCaseWhenNode* pCaseWhen) { bool first = true; + bool allNullThen = true; SNode* pNode = NULL; FOREACH(pNode, pCaseWhen->pWhenThenList) { SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode; @@ -1889,12 +1890,28 @@ static EDealRes translateCaseWhen(STranslateContext* pCxt, SCaseWhenNode* pCaseW } pWhenThen->pWhen = pIsTrue; } - if (first || dataTypeComp(&pCaseWhen->node.resType, &((SExprNode*)pNode)->resType) < 0) { - pCaseWhen->node.resType = ((SExprNode*)pNode)->resType; + + SExprNode* pThenExpr = (SExprNode*)pNode; + if (TSDB_DATA_TYPE_NULL == pThenExpr->resType.type) { + continue; + } + allNullThen = false; + if (first || dataTypeComp(&pCaseWhen->node.resType, &pThenExpr->resType) < 0) { + pCaseWhen->node.resType = pThenExpr->resType; } first = false; } + if (allNullThen) { + if (NULL != pCaseWhen->pElse) { + pCaseWhen->node.resType = ((SExprNode*)pCaseWhen->pElse)->resType; + } else { + pCaseWhen->node.resType.type = TSDB_DATA_TYPE_NULL; + pCaseWhen->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_NULL].bytes; + return DEAL_RES_CONTINUE; + } + } + FOREACH(pNode, pCaseWhen->pWhenThenList) { SWhenThenNode* pWhenThen = (SWhenThenNode*)pNode; if (!dataTypeEqual(&pCaseWhen->node.resType, &((SExprNode*)pNode)->resType)) { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 00a72a1946..e8c3f2fa8d 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -138,7 +138,7 @@ static char* getSyntaxErrFormat(int32_t errCode) { case TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY: return "Primary timestamp column cannot be dropped"; case TSDB_CODE_PAR_INVALID_MODIFY_COL: - return "Only binary/nchar column length could be modified"; + return "Only binary/nchar column length could be modified, and the length can only be increased, not decreased"; case TSDB_CODE_PAR_INVALID_TBNAME: return "Invalid tbname pseudo column"; case TSDB_CODE_PAR_INVALID_FUNCTION_NAME: diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ccb0dd4a92..88c39c1157 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -521,9 +521,13 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa void* tmp = NULL; int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen); if (code == 0) { - *key = resKey; - *pVal = tdbRealloc(NULL, *pVLen); - memcpy(*pVal, tmp, *pVLen); + if (key->win.skey != resKey.win.skey) { + code = -1; + } else { + *key = resKey; + *pVal = tdbRealloc(NULL, *pVLen); + memcpy(*pVal, tmp, *pVLen); + } } streamStateFreeCur(pCur); return code; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5e76e6bd83..ac54749ae1 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -96,7 +96,7 @@ typedef void* queue[2]; //#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit //#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) -#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) +#define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_PACKET_LIMIT 1024 * 1024 * 512 diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index fd42c14101..4fb00b1a6d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -25,7 +25,8 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; queue wreqQueue; - uv_timer_t* timer; + + uv_timer_t* timer; // read timer, forbidden void* hostThrd; @@ -79,6 +80,7 @@ typedef struct SCliThrd { uint64_t nextTimeout; // next timeout void* pTransInst; // + void (*destroyAhandleFp)(void* ahandle); SHashObj* fqdn2ipCache; SCvtAddr cvtAddr; @@ -102,6 +104,8 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); +// register conn timer +static void cliConnTimeout(uv_timer_t* handle); // register timer for read static void cliReadTimeoutCb(uv_timer_t* handle); // register timer in each thread to clear expire conn @@ -155,6 +159,7 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq, static FORCE_INLINE void destroyUserdata(STransMsg* userdata); static FORCE_INLINE void destroyCmsg(void* cmsg); +static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); @@ -258,16 +263,15 @@ static void* cliWorkThread(void* arg); static void cliReleaseUnfinishedMsg(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; for (int i = 0; i < transQueueSize(&conn->cliMsgs); i++) { SCliMsg* msg = transQueueGet(&conn->cliMsgs, i); if (msg != NULL && msg->ctx != NULL && msg->ctx->ahandle != (void*)0x9527) { if (conn->ctx.freeFunc != NULL && msg->ctx->ahandle != NULL) { conn->ctx.freeFunc(msg->ctx->ahandle); - } else if (msg->ctx->ahandle != NULL && pTransInst->destroyFp != NULL) { + } else if (msg->ctx->ahandle != NULL && pThrd->destroyAhandleFp != NULL) { tDebug("%s conn %p destroy unfinished ahandle %p", CONN_GET_INST_LABEL(conn), conn, msg->ctx->ahandle); - pTransInst->destroyFp(msg->ctx->ahandle); + pThrd->destroyAhandleFp(msg->ctx->ahandle); } } destroyCmsg(msg); @@ -407,9 +411,16 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { bool once = false; do { SCliMsg* pMsg = transQueuePop(&pConn->cliMsgs); + if (pMsg == NULL && once) { break; } + + if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg)) { + destroyCmsg(pMsg); + break; + } + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransMsg transMsg = {0}; @@ -439,6 +450,7 @@ void cliHandleExceptImpl(SCliConn* pConn, int32_t code) { continue; } } + if (pMsg == NULL || (pMsg && pMsg->type != Release)) { if (cliAppCb(pConn, &transMsg, pMsg) != 0) { return; @@ -454,6 +466,19 @@ void cliHandleExcept(SCliConn* conn) { cliHandleExceptImpl(conn, -1); } +void cliConnTimeout(uv_timer_t* handle) { + SCliConn* conn = handle->data; + SCliThrd* pThrd = conn->hostThrd; + + tTrace("%s conn %p conn timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); + + uv_timer_stop(handle); + handle->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + + conn->timer = NULL; + cliHandleExceptImpl(conn, -1); +} void cliReadTimeoutCb(uv_timer_t* handle) { // set up timeout cb SCliConn* conn = handle->data; @@ -545,7 +570,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->list->size >= 50) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; - arg->param2 = NULL; + arg->param2 = thrd; STrans* pTransInst = thrd->pTransInst; conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); @@ -630,8 +655,16 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; - conn->connReq.data = conn; + uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; + if (timer == NULL) { + timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); + tDebug("no available timer, create a timer %p", timer); + uv_timer_init(pThrd->loop, timer); + } + timer->data = conn; + conn->timer = timer; + conn->connReq.data = conn; transReqQueueInit(&conn->wreqQueue); transQueueInit(&conn->cliMsgs, NULL); @@ -661,8 +694,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { } if (conn->timer != NULL) { uv_timer_stop(conn->timer); - taosArrayPush(pThrd->timerList, &conn->timer); conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); conn->timer = NULL; } @@ -811,6 +844,15 @@ _RETURN: void cliConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; + SCliThrd* pThrd = pConn->hostThrd; + + if (pConn->timer != NULL) { + uv_timer_stop(pConn->timer); + pConn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &pConn->timer); + pConn->timer = NULL; + } + if (status != 0) { tError("%s conn %p failed to connect server:%s", CONN_GET_INST_LABEL(pConn), pConn, uv_strerror(status)); cliHandleExcept(pConn); @@ -989,31 +1031,26 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { conn->ip = strdup(EPSET_GET_INUSE_IP(&pCtx->epSet)); conn->port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - int ret = transSetConnOption((uv_tcp_t*)conn->stream); - if (ret) { - tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret)); - } - int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT); - if (fd == -1) { - tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn); - cliHandleExcept(conn); - return; - } - uv_tcp_open((uv_tcp_t*)conn->stream, fd); - struct sockaddr_in addr; addr.sin_family = AF_INET; - addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip); addr.sin_port = (uint16_t)htons((uint16_t)conn->port); tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port); - ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); + + int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); if (ret != 0) { tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, uv_err_name(ret)); + + uv_timer_stop(conn->timer); + conn->timer->data = NULL; + taosArrayPush(pThrd->timerList, &conn->timer); + conn->timer = NULL; + cliHandleExcept(conn); return; } + uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } STraceId* trace = &pMsg->msg.info.traceId; tGTrace("%s conn %p ready", pTransInst->label, conn); @@ -1136,6 +1173,8 @@ static void* cliWorkThread(void* arg) { pThrd->pid = taosGetSelfPthreadId(); setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); + + tDebug("thread quit-thread:%08" PRId64, pThrd->pid); return NULL; } @@ -1177,6 +1216,25 @@ static FORCE_INLINE void destroyCmsg(void* arg) { taosMemoryFree(pMsg); } +static FORCE_INLINE void destroyCmsgAndAhandle(void* param) { + if (param == NULL) return; + + STaskArg* arg = param; + SCliMsg* pMsg = arg->param1; + SCliThrd* pThrd = arg->param2; + + tDebug("destroy Ahandle A"); + if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + tDebug("destroy Ahandle B"); + pThrd->destroyAhandleFp(pMsg->ctx->ahandle); + } + tDebug("destroy Ahandle C"); + + transDestroyConnCtx(pMsg->ctx); + destroyUserdata(&pMsg->msg); + taosMemoryFree(pMsg); +} + static SCliThrd* createThrdObj(void* trans) { STrans* pTransInst = trans; @@ -1195,7 +1253,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->prepare->data = pThrd; // uv_prepare_start(pThrd->prepare, cliPrepareCb); - int32_t timerSize = 512; + int32_t timerSize = 64; pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); for (int i = 0; i < timerSize; i++) { uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t)); @@ -1211,6 +1269,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->pTransInst = trans; + pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->quit = false; return pThrd; @@ -1226,9 +1285,10 @@ static void destroyThrdObj(SCliThrd* pThrd) { TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); transAsyncPoolDestroy(pThrd->asyncPool); - transDQDestroy(pThrd->delayQueue, destroyCmsg); + transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); transDQDestroy(pThrd->timeoutQueue, NULL); + tDebug("thread destroy %" PRId64, pThrd->pid); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i); taosMemoryFree(timer); @@ -1254,7 +1314,18 @@ void cliSendQuit(SCliThrd* thrd) { } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { - uv_read_stop((uv_stream_t*)handle); + if (uv_handle_get_type(handle) == UV_TIMER) { + // SCliConn* pConn = handle->data; + // if (pConn != NULL && pConn->timer != NULL) { + // SCliThrd* pThrd = pConn->hostThrd; + // uv_timer_stop((uv_timer_t*)handle); + // handle->data = NULL; + // taosArrayPush(pThrd->timerList, &pConn->timer); + // pConn->timer = NULL; + // } + } else { + uv_read_stop((uv_stream_t*)handle); + } uv_close(handle, cliDestroy); } } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 18b812f314..7710abcaa1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -497,7 +497,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { SDelayTask* task = container_of(minNode, SDelayTask, node); STaskArg* arg = task->arg; - if (freeFunc) freeFunc(arg->param1); + if (freeFunc) freeFunc(arg); taosMemoryFree(arg); taosMemoryFree(task); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index e5ca9faacb..5e5d6656b4 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -902,9 +902,11 @@ void taosSetCoreDump(bool enable) { old_len = sizeof(old_usespid); +#ifndef __loongarch64 if (syscall(SYS__sysctl, &args) == -1) { // printf("_sysctl(kern_core_uses_pid) set fail: %s", strerror(errno)); } +#endif // printf("The old core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid); @@ -918,9 +920,11 @@ void taosSetCoreDump(bool enable) { old_len = sizeof(old_usespid); +#ifndef __loongarch64 if (syscall(SYS__sysctl, &args) == -1) { // printf("_sysctl(kern_core_uses_pid) get fail: %s", strerror(errno)); } +#endif // printf("The new core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid); #endif @@ -989,4 +993,4 @@ bool taosCheckCurrentInDll() { #else return false; #endif -} \ No newline at end of file +} diff --git a/source/util/src/tcrc32c.c b/source/util/src/tcrc32c.c index bd662fa02c..fc9db5a3ff 100644 --- a/source/util/src/tcrc32c.c +++ b/source/util/src/tcrc32c.c @@ -19,7 +19,7 @@ */ #define _DEFAULT_SOURCE -#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) +#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_) #include #endif @@ -512,7 +512,7 @@ static uint32_t table[16][256] = { 0x9c221d09, 0x6e2e10f7, 0x7dd67004, 0x8fda7dfa} }; -#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) +#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_) static uint32_t long_shifts[4][256] = { {0x00000000, 0xe040e0ac, 0xc56db7a9, 0x252d5705, 0x8f3719a3, 0x6f77f90f, 0x4a5aae0a, 0xaa1a4ea6, 0x1b8245b7, 0xfbc2a51b, 0xdeeff21e, 0x3eaf12b2, 0x94b55c14, 0x74f5bcb8, 0x51d8ebbd, 0xb1980b11, 0x37048b6e, 0xd7446bc2, @@ -846,7 +846,7 @@ uint32_t crc32c_sf(uint32_t crci, crc_stream input, size_t length) { } return (uint32_t)crc ^ 0xffffffff; } -#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) +#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_) /* Apply the zeros operator table to crc. */ static uint32_t shift_crc(uint32_t shift_table[][256], uint32_t crc) { return shift_table[0][crc & 0xff] ^ shift_table[1][(crc >> 8) & 0xff] ^ shift_table[2][(crc >> 16) & 0xff] ^ @@ -857,7 +857,7 @@ static uint32_t shift_crc(uint32_t shift_table[][256], uint32_t crc) { version. Otherwise, use the software version. */ uint32_t (*crc32c)(uint32_t crci, crc_stream bytes, size_t len) = crc32c_sf; -#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) +#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_) /* Compute CRC-32C using the Intel hardware instruction. */ uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) { crc_stream next = buf; @@ -1012,7 +1012,7 @@ uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) { #endif // #ifndef _TD_ARM_ void taosResolveCRC() { -#if defined _TD_ARM_ || defined _TD_MIPS_ || defined WINDOWS +#if defined _TD_ARM_ || defined _TD_MIPS_ || defined WINDOWS || defined _TD_LOONGARCH_ crc32c = crc32c_sf; #else int32_t sse42; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0e6568d692..a1162d2e94 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -521,7 +521,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TIMELINE_FUNC, "Invalid timeline fu TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PASSWD, "Invalid password") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid alter table statement") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY, "Primary timestamp column cannot be dropped") -TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified, and the length can only be increased, not decreased") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TBNAME, "Invalid tbname pseudo column") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FUNCTION_NAME, "Invalid function name") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COMMENT_TOO_LONG, "Comment too long") diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index f7949048ca..a2ce5ac08c 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -435,7 +435,7 @@ static inline int32_t taosBuildLogHead(char *buffer, const char *flags) { taosGetTimeOfDay(&timeSecs); time_t curTime = timeSecs.tv_sec; - ptm = taosLocalTimeNolock(&Tm, &curTime, taosGetDaylight()); + ptm = taosLocalTime(&curTime, &Tm); return sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %08" PRId64 " %s", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, ptm->tm_min, ptm->tm_sec, (int32_t)timeSecs.tv_usec, taosGetSelfPthreadId(), flags); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6775760ee4..8955a65dfa 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -714,6 +714,7 @@ ,,,system-test,python3 ./test.py -f 7-tmq/tmq_taosx.py ,,,system-test,python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py ,,,system-test,python3 ./test.py -f 99-TDcase/TD-19201.py +,,,system-test,python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5 ,,,system-test,python3 ./test.py -f 2-query/between.py -Q 2 ,,,system-test,python3 ./test.py -f 2-query/distinct.py -Q 2 ,,,system-test,python3 ./test.py -f 2-query/varchar.py -Q 2 diff --git a/tests/script/tsim/stream/drop_stream.sim b/tests/script/tsim/stream/drop_stream.sim index b25e002140..817780ca59 100644 --- a/tests/script/tsim/stream/drop_stream.sim +++ b/tests/script/tsim/stream/drop_stream.sim @@ -216,6 +216,60 @@ sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDeng print ========== step6 repeat sql drop database test; + +print ========== interval\session\state window + +sql CREATE DATABASE test1 BUFFER 96 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 STRICT 'off' WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0; +sql use test1; +sql CREATE STABLE st (time TIMESTAMP, ca DOUBLE, cb DOUBLE, cc int) TAGS (ta VARCHAR(10) ); + +print ========== create table before stream + +sql CREATE TABLE t1 using st TAGS ('aaa'); +sql CREATE TABLE t2 using st TAGS ('bbb'); +sql CREATE TABLE t3 using st TAGS ('ccc'); +sql CREATE TABLE t4 using st TAGS ('ddd'); + +print ========== stable + +sql create stream streamd1 into streamt1 as select ca, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(null); +sql create stream streamd2 into streamt2 as select ca, _wstart,_wend, count(*), max(ca), max(cb) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(linear); +sql create stream streamd3 into streamt3 as select ca, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca session(time, 60m); +sql create stream streamd4 into streamt4 as select ta, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta session(time, 60m); +sql_error create stream streamd5 into streamt5 as select ca, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca state_window(cc); +sql_error create stream streamd6 into streamt6 as select ta, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta state_window(cc); + +print ========== table + +sql create stream streamd7 into streamt7 as select ca, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(null); +sql create stream streamd8 into streamt8 as select ca, _wstart,_wend, count(*), max(ca), max(cb) from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(linear); +sql create stream streamd9 into streamt9 as select ca, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca session(time, 60m); +sql create stream streamd10 into streamt10 as select ta, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta session(time, 60m); +sql create stream streamd11 into streamt11 as select ca, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca state_window(cc); +sql create stream streamd12 into streamt12 as select ta, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta state_window(cc); + +print ========== create table after stream +sql CREATE TABLE t5 using st TAGS ('eee'); +sql CREATE TABLE t6 using st TAGS ('fff'); +sql CREATE TABLE t7 using st TAGS ('ggg'); +sql CREATE TABLE t8 using st TAGS ('fff'); + +sleep 1000 +print ========== drop stream +sql drop stream if exists streamd1; +sql drop stream if exists streamd2; +sql drop stream if exists streamd3; +sql drop stream if exists streamd4; +#sql drop stream if exists streamd5; +#sql drop stream if exists streamd6; +sql drop stream if exists streamd7; +sql drop stream if exists streamd8; +sql drop stream if exists streamd9; +sql drop stream if exists streamd10; +sql drop stream if exists streamd11; +sql drop stream if exists streamd12; +print ========== step7 + system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim index dc7d9bc407..87d7d4b7fc 100644 --- a/tests/script/tsim/stream/state0.sim +++ b/tests/script/tsim/stream/state0.sim @@ -544,4 +544,192 @@ if $rows != 10 then endi +sql drop stream if exists streams4; +sql drop database if exists test4; +sql drop stable if exists streamt4; +sql create database if not exists test4 vgroups 10 precision "ms" ; +sql use test4; +sql create table st (ts timestamp, c1 tinyint, c2 smallint) tags (t1 tinyint) ; +sql create table t1 using st tags (-81) ; +sql create table t2 using st tags (-81) ; +sql create stream if not exists streams4 trigger window_close into streamt4 as select _wstart AS start, min(c1),count(c1) from t1 state_window(c1); + +sql insert into t1 (ts, c1) values (1668073288209, 11); +sql insert into t1 (ts, c1) values (1668073288210, 11); +sql insert into t1 (ts, c1) values (1668073288211, 11); +sql insert into t1 (ts, c1) values (1668073288212, 11); +sql insert into t1 (ts, c1) values (1668073288213, 11); +sql insert into t1 (ts, c1) values (1668073288214, 11); +sql insert into t1 (ts, c1) values (1668073288215, 29); + +$loop_count = 0 +loop7: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop7 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop7 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop7 +endi + +sql delete from t1 where ts = cast(1668073288214 as timestamp); +sql insert into t1 (ts, c1) values (1668073288216, 29); +sql delete from t1 where ts = cast(1668073288215 as timestamp); +sql insert into t1 (ts, c1) values (1668073288217, 29); +sql delete from t1 where ts = cast(1668073288216 as timestamp); +sql insert into t1 (ts, c1) values (1668073288218, 29); +sql delete from t1 where ts = cast(1668073288217 as timestamp); +sql insert into t1 (ts, c1) values (1668073288219, 29); +sql delete from t1 where ts = cast(1668073288218 as timestamp); +sql insert into t1 (ts, c1) values (1668073288220, 29); +sql delete from t1 where ts = cast(1668073288219 as timestamp); + +$loop_count = 0 +loop8: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows + goto loop8 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop8 +endi + +sql insert into t1 (ts, c1) values (1668073288221, 65); +sql insert into t1 (ts, c1) values (1668073288222, 65); +sql insert into t1 (ts, c1) values (1668073288223, 65); +sql insert into t1 (ts, c1) values (1668073288224, 65); +sql insert into t1 (ts, c1) values (1668073288225, 65); +sql insert into t1 (ts, c1) values (1668073288226, 65); + +$loop_count = 0 +loop8: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows + goto loop8 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop8 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop8 +endi + +if $data11 != 29 then + print =====data11=$data11 + goto loop8 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop8 +endi + +sql insert into t1 (ts, c1) values (1668073288224, 64); + +$loop_count = 0 +loop9: + +sleep 200 + +sql select * from streamt4 order by start; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 4 then + print =====rows=$rows + goto loop9 +endi + +if $data01 != 11 then + print =====data01=$data01 + goto loop9 +endi + +if $data02 != 5 then + print =====data02=$data02 + goto loop9 +endi + +if $data11 != 29 then + print =====data11=$data11 + goto loop9 +endi + +if $data12 != 1 then + print =====data12=$data12 + goto loop9 +endi + +if $data21 != 65 then + print =====data21=$data21 + goto loop9 +endi + +if $data22 != 3 then + print =====data22=$data22 + goto loop9 +endi + +if $data31 != 64 then + print =====data31=$data31 + goto loop9 +endi + +if $data32 != 1 then + print =====data32=$data32 + goto loop9 +endi + + system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/system-test/7-tmq/tmqShow.py b/tests/system-test/7-tmq/tmqShow.py index 406fd9f7f3..5e79f17459 100644 --- a/tests/system-test/7-tmq/tmqShow.py +++ b/tests/system-test/7-tmq/tmqShow.py @@ -137,7 +137,7 @@ class TDTestCase: tdLog.info("check show subscriptions") tdSql.query("show subscriptions") - # tdLog.debug(tdSql.queryResult) + tdLog.debug(tdSql.queryResult) rows = tdSql.getRows() expectSubscriptions = paraDict['vgroups'] * len(topicNameList) tdLog.info("show subscriptions rows: %d, expect Subscriptions: %d"%(rows,expectSubscriptions)) diff --git a/tests/system-test/7-tmq/tmqSubscribeStb-r3.py b/tests/system-test/7-tmq/tmqSubscribeStb-r3.py index 01f1ca5b15..85222a941b 100644 --- a/tests/system-test/7-tmq/tmqSubscribeStb-r3.py +++ b/tests/system-test/7-tmq/tmqSubscribeStb-r3.py @@ -1,26 +1,27 @@ -from distutils.log import error -import taos -import sys -import time -import socket import os -import threading -import subprocess import platform +import socket +import subprocess +import sys +import threading +import time +from distutils.log import error -from util.log import * -from util.sql import * +import taos from util.cases import * -from util.dnodes import * -from util.dnodes import TDDnodes -from util.dnodes import TDDnode from util.cluster import * from util.common import * +from util.dnodes import * +from util.dnodes import TDDnode, TDDnodes +from util.log import * +from util.sql import * + sys.path.append("./6-cluster") sys.path.append("./7-tmq") -from tmqCommon import * -from clusterCommonCreate import * from clusterCommonCheck import clusterComCheck +from clusterCommonCreate import * +from tmqCommon import * + class TDTestCase: def __init__(self): @@ -265,6 +266,7 @@ class TDTestCase: tdLog.info("start consume processor") tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tmqCom.getStartConsumeNotifyFromTmqsim() tdLog.info("================= restart dnode 2===========================") cluster.dnodes[1].stoptaosd() cluster.dnodes[1].starttaosd() diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 9bb02159f0..8402a5a589 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -540,11 +540,20 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t } } -bool shellIsLimitQuery(const char *sql) { - // todo refactor +// show whole result for this query return true, like limit or describe +bool shellIsShowWhole(const char *sql) { + // limit if (taosStrCaseStr(sql, " limit ") != NULL) { return true; } + // describe + if (taosStrCaseStr(sql, "describe ") != NULL) { + return true; + } + // show + if (taosStrCaseStr(sql, "show ") != NULL) { + return true; + } return false; } @@ -578,7 +587,7 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { uint64_t resShowMaxNum = UINT64_MAX; - if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { + if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsShowWhole(sql)) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } @@ -723,7 +732,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { uint64_t resShowMaxNum = UINT64_MAX; - if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { + if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsShowWhole(sql)) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index 83dfa1fc57..40d5bb12d2 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1097,18 +1097,12 @@ int sml_time_Test() { pRes = taos_query(taos, "use sml_db"); taos_free_result(pRes); - char* tmp = (char*)taosMemoryCalloc(1024, 1); - memcpy(tmp, sql[0], strlen(sql[0])); - *(char*)(tmp+44) = 0; - int32_t totalRows = 0; - pRes = taos_schemaless_insert_raw(taos, tmp, strlen(sql[0]), &totalRows, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); + pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS); - ASSERT(totalRows == 3); printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); int code = taos_errno(pRes); taos_free_result(pRes); taos_close(taos); - taosMemoryFree(tmp); return code; }