diff --git a/cmake/mxml_CMakeLists.txt.in b/cmake/mxml_CMakeLists.txt.in index 1ac90ebdd4..762df40e10 100644 --- a/cmake/mxml_CMakeLists.txt.in +++ b/cmake/mxml_CMakeLists.txt.in @@ -1,7 +1,7 @@ # cos ExternalProject_Add(mxml GIT_REPOSITORY https://github.com/michaelrsweet/mxml.git - GIT_TAG release-2.12 + GIT_TAG v2.12 SOURCE_DIR "${TD_CONTRIB_DIR}/mxml" #BINARY_DIR "" BUILD_IN_SOURCE TRUE diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 9d2b54dab3..340a3e917b 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -292,11 +292,11 @@ CONCAT_WS(separator_expr, expr1, expr2 [, expr] ...) LENGTH(expr) ``` -**Description**: The length in bytes of a string +**Description**: The length in bytes **Return value type**: Bigint -**Applicable data types**: VARCHAR and NCHAR fields or columns +**Applicable data types**: VARCHAR and NCHAR and VARBINARY **Nested query**: It can be used in both the outer query and inner query in a nested query. diff --git a/docs/zh/02-intro.md b/docs/zh/02-intro.md index 68a2541717..bb989f27da 100644 --- a/docs/zh/02-intro.md +++ b/docs/zh/02-intro.md @@ -10,7 +10,7 @@ TDengine 是一款开源、高性能、云原生的[时序数据库](https://tde ## 主要产品 -TDengine 有三个主要产品:TDengine Pro (即 TDengine 企业版),TDengine Cloud,和 TDengine OSS,关于它们的具体定义请参考 +TDengine 有三个主要产品:TDengine Enterprise (即 TDengine 企业版),TDengine Cloud,和 TDengine OSS,关于它们的具体定义请参考 - [TDengine 企业版](https://www.taosdata.com/tdengine-pro) - [TDengine 云服务](https://cloud.taosdata.com/?utm_source=menu&utm_medium=webcn) - [TDengine 开源版](https://www.taosdata.com/tdengine-oss) diff --git a/docs/zh/08-connector/14-java.mdx b/docs/zh/08-connector/14-java.mdx index 0ff00d1710..64e9a3daed 100644 --- a/docs/zh/08-connector/14-java.mdx +++ b/docs/zh/08-connector/14-java.mdx @@ -1004,7 +1004,7 @@ TaosConsumer consumer = new TaosConsumer<>(config); - httpConnectTimeout: 创建连接超时参数,单位 ms,默认为 5000 ms。仅在 WebSocket 连接下有效。 - messageWaitTimeout: 数据传输超时参数,单位 ms,默认为 10000 ms。仅在 WebSocket 连接下有效。 - httpPoolSize: 同一个连接下最大并行请求数。仅在 WebSocket 连接下有效。 - 其他参数请参考:[Consumer 参数列表](../../../develop/tmq#创建-consumer-以及consumer-group) + 其他参数请参考:[Consumer 参数列表](../../develop/tmq#创建-consumer-以及consumer-group) #### 订阅消费数据 @@ -1082,7 +1082,7 @@ consumer.unsubscribe(); consumer.close() ``` -详情请参考:[数据订阅](../../../develop/tmq) +详情请参考:[数据订阅](../../develop/tmq) #### 完整示例 @@ -1373,7 +1373,7 @@ public static void main(String[] args) throws Exception { **解决方法**: 更换 taos-jdbcdriver 3.0.2+ 版本。 -其它问题请参考 [FAQ](../../../train-faq/faq) +其它问题请参考 [FAQ](../../train-faq/faq) ## API 参考 diff --git a/docs/zh/08-connector/26-rust.mdx b/docs/zh/08-connector/26-rust.mdx index 3e51aa72bb..018552117e 100644 --- a/docs/zh/08-connector/26-rust.mdx +++ b/docs/zh/08-connector/26-rust.mdx @@ -352,7 +352,7 @@ client.put(&sml_data)? ### 数据订阅 -TDengine 通过消息队列 [TMQ](../../../taos-sql/tmq/) 启动一个订阅。 +TDengine 通过消息队列 [TMQ](../../taos-sql/tmq/) 启动一个订阅。 #### 创建 Topic @@ -491,7 +491,7 @@ let taos = pool.get()?; ## 常见问题 -请参考 [FAQ](../../../train-faq/faq) +请参考 [FAQ](../../train-faq/faq) ## API 参考 diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index cfec71934c..8b87a18e54 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -292,11 +292,11 @@ CONCAT_WS(separator_expr, expr1, expr2 [, expr] ...) LENGTH(expr) ``` -**功能说明**:以字节计数的字符串长度。 +**功能说明**:以字节计数的长度。 **返回结果类型**:BIGINT。 -**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。 +**适用数据类型**:VARCHAR, NCHAR, VARBINARY。 **嵌套子查询支持**:适用于内层查询和外层查询。 diff --git a/docs/zh/14-reference/05-taosbenchmark.md b/docs/zh/14-reference/05-taosbenchmark.md index d5ae95b20b..e42d921dc7 100644 --- a/docs/zh/14-reference/05-taosbenchmark.md +++ b/docs/zh/14-reference/05-taosbenchmark.md @@ -13,7 +13,7 @@ taosBenchmark (曾用名 taosdemo ) 是一个用于测试 TDengine 产品性能 taosBenchmark 有两种安装方式: -- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考[ TDengine 安装](../../operation/pkg-install)。 +- 安装 TDengine 官方安装包的同时会自动安装 taosBenchmark, 详情请参考 [TDengine 安装](../../get-started/)。 - 单独编译 taos-tools 并安装, 详情请参考 [taos-tools](https://github.com/taosdata/taos-tools) 仓库。 diff --git a/docs/zh/14-reference/14-taosKeeper.md b/docs/zh/14-reference/14-taosKeeper.md index 738d351d45..92c85e7a2c 100644 --- a/docs/zh/14-reference/14-taosKeeper.md +++ b/docs/zh/14-reference/14-taosKeeper.md @@ -16,7 +16,7 @@ taosKeeper 是 TDengine 3.0 版本监控指标的导出工具,通过简单的 taosKeeper 有两种安装方式: taosKeeper 安装方式: -- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../operation/pkg-install)。 +- 安装 TDengine 官方安装包的同时会自动安装 taosKeeper, 详情请参考[ TDengine 安装](../../get-started/)。 - 单独编译 taosKeeper 并安装,详情请参考 [taosKeeper](https://github.com/taosdata/taoskeeper) 仓库。 diff --git a/docs/zh/20-third-party/11-kafka.md b/docs/zh/20-third-party/11-kafka.md index 2f09e50f0b..3d8822fdae 100644 --- a/docs/zh/20-third-party/11-kafka.md +++ b/docs/zh/20-third-party/11-kafka.md @@ -23,7 +23,7 @@ TDengine Source Connector 用于把数据实时地从 TDengine 读出来发送 1. Linux 操作系统 2. 已安装 Java 8 和 Maven 3. 已安装 Git、curl、vi -4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../operation/pkg-install) +4. 已安装并启动 TDengine。如果还没有可参考[安装和卸载](../../get-started/) ## 安装 Kafka diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 99160f1519..ea3524d12d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -41,16 +41,16 @@ enum { STREAM_STATUS__PAUSE, }; -enum { +typedef enum ETaskStatus { TASK_STATUS__NORMAL = 0, TASK_STATUS__DROPPING, - TASK_STATUS__FAIL, + TASK_STATUS__UNINIT, // not used, an placeholder TASK_STATUS__STOP, TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner TASK_STATUS__HALT, // pause, but not be manipulated by user command TASK_STATUS__PAUSE, // pause TASK_STATUS__CK, // stream task is in checkpoint status, no data are allowed to put into inputQ anymore -}; +} ETaskStatus; enum { TASK_SCHED_STATUS__INACTIVE = 1, diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index 655629b92c..9e70a6bbf1 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -338,7 +338,7 @@ if [ "$verMode" == "cluster" ]; then tmp_pwd=`pwd` cd ${install_dir}/connector if [ ! -d taos-connector-jdbc ];then - git clone -b 3.2.1 --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||: + git clone -b main --depth=1 https://github.com/taosdata/taos-connector-jdbc.git ||: fi cd taos-connector-jdbc mvn clean package -Dmaven.test.skip=true diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 431864ae97..14be05d973 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -2342,9 +2342,9 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) { alterReq.alterType, alterReq.numOfFields, alterReq.ttl); SName name = {0}; - tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB); + tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, alterReq.name, detail); + auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, name.tname, detail); _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5752dae87b..66acbcc05b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -868,6 +868,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); taosThreadMutexLock(&execNodeList.lock); + mDebug("register to stream task node list"); keepStreamTasksInBuf(&streamObj, &execNodeList); taosThreadMutexUnlock(&execNodeList.lock); @@ -876,8 +877,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { char detail[2000] = {0}; sprintf(detail, "checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64 - ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 - ", maxDelay:%" PRId64 ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, + ", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64 + ", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64, createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark, createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate, createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB, @@ -1574,8 +1575,8 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock } else if (taskStatus == TASK_STATUS__DROPPING) { memcpy(varDataVal(status), "dropping", 8); varDataSetLen(status, 8); - } else if (taskStatus == TASK_STATUS__FAIL) { - memcpy(varDataVal(status), "fail", 4); + } else if (taskStatus == TASK_STATUS__UNINIT) { + memcpy(varDataVal(status), "uninit", 6); varDataSetLen(status, 4); } else if (taskStatus == TASK_STATUS__STOP) { memcpy(varDataVal(status), "stop", 4); @@ -2016,14 +2017,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); + const SEp* p = GET_ACTIVE_EP(pCurrent); - for (int32_t i = 0; i < pCurrent->numOfEps; ++i) { - const SEp *p = &(pCurrent->eps[i]); - if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { - return false; - } + if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { + return false; } - return true; } @@ -2120,6 +2118,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid); int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); if (code != TSDB_CODE_SUCCESS) { + sdbCancelFetch(pSdb, pIter); return code; } } @@ -2223,18 +2222,22 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { code = mndProcessVgroupChange(pMnode, &changeInfo); + + // keep the new vnode snapshot + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { + mDebug("create trans successfully, update cached node list"); + taosArrayDestroy(execNodeList.pNodeEntryList); + execNodeList.pNodeEntryList = pNodeSnapshot; + execNodeList.ts = ts; + } + } else { + mDebug("no update found in nodeList"); + taosArrayDestroy(pNodeSnapshot); } taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); - // keep the new vnode snapshot - if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { - taosArrayDestroy(execNodeList.pNodeEntryList); - execNodeList.pNodeEntryList = pNodeSnapshot; - execNodeList.ts = ts; - } - mDebug("end to do stream task node change checking"); atomic_store_32(&mndNodeCheckSentinel, 0); return 0; @@ -2284,7 +2287,6 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; - SStreamHbMsg req = {0}; int32_t code = TSDB_CODE_SUCCESS; @@ -2309,8 +2311,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t i = 0; i < req.numOfTasks; ++i) { STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i); - int64_t k[2] = {p->streamId, p->taskId}; - + int64_t k[2] = {p->streamId, p->taskId}; int32_t *index = taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); if (index == NULL) { continue; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 1146cfdc46..14ca7f3b02 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -165,6 +165,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); int32_t tqCheckAndRunStreamTask(STQ* pTq); +int32_t tqStartStreamTasks(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq); // tq util diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 447a7e2d90..4aee98148b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1416,7 +1416,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, } int8_t status = pTask->status.taskStatus; - if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY) { + if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) { // no lock needs to secure the access of the version if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) { // discard all the data when the stream task is suspended. @@ -1700,20 +1700,47 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr); streamTaskUpdateEpsetInfo(pTask, req.pNodeList); + streamSetStatusNormal(pTask); + + SStreamTask** ppHTask = NULL; + if (pTask->historyTaskId.taskId != 0) { + keys[0] = pTask->historyTaskId.streamId; + keys[1] = pTask->historyTaskId.taskId; + + ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + if (ppHTask == NULL || *ppHTask == NULL) { + tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", + pMeta->vgId, req.taskId); + } else { + tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); + streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); + } + } { - streamSetStatusNormal(pTask); streamMetaSaveTask(pMeta, pTask); + if (ppHTask != NULL) { + streamMetaSaveTask(pMeta, *ppHTask); + } + if (streamMetaCommit(pMeta) < 0) { // persist to disk } } streamTaskStop(pTask); + if (ppHTask != NULL) { + streamTaskStop(*ppHTask); + } + tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr); pMeta->closedTask += 1; + if (ppHTask != NULL) { + pMeta->closedTask += 1; + } + // possibly only handle the stream task. int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); bool allStopped = (pMeta->closedTask == numOfTasks); if (allStopped) { @@ -1752,6 +1779,7 @@ _end: taosWUnLockLatch(&pMeta->lock); if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { vInfo("vgId:%d, restart all stream tasks", vgId); + tqStartStreamTasks(pTq); tqCheckAndRunStreamTaskAsync(pTq); } } diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 61fc3c7ae9..4a1b3961cd 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -51,7 +51,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS SStreamSnapReader* pSnapReader = NULL; - if (streamSnapReaderOpen(pTq, sver, chkpId, pTq->path, &pSnapReader) == 0) { + if (streamSnapReaderOpen(meta, sver, chkpId, pTq->path, &pSnapReader) == 0) { pReader->complete = 1; } else { code = -1; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 58c7686aeb..eb587b8be2 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -224,6 +224,35 @@ int32_t tqStopStreamTasks(STQ* pTq) { return 0; } +int32_t tqStartStreamTasks(STQ* pTq) { + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = TD_VID(pTq->pVnode); + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks); + + if (numOfTasks == 0) { + return TSDB_CODE_SUCCESS; + } + + taosWLockLatch(&pMeta->lock); + + for (int32_t i = 0; i < numOfTasks; ++i) { + SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); + + int64_t key[2] = {pTaskId->streamId, pTaskId->taskId}; + SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key)); + + int8_t status = (*pTask)->status.taskStatus; + if (status == TASK_STATUS__STOP) { + streamSetStatusNormal(*pTask); + } + } + + taosWUnLockLatch(&pMeta->lock); + return 0; +} + int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d580b41093..43850ebfee 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -560,6 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); } else { vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); + tqStartStreamTasks(pVnode->pTq); tqCheckAndRunStreamTaskAsync(pVnode->pTq); } } else { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 9bd0991435..a59a7bb1ea 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -669,8 +669,13 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle* p->info.id.groupId = tupleGroupId; pInfo->groupId = tupleGroupId; } else { - pInfo->prefetchedTuple = pTupleHandle; - break; + if (p->info.rows == 0) { + appendOneRowToDataBlock(p, pTupleHandle); + p->info.id.groupId = pInfo->groupId = tupleGroupId; + } else { + pInfo->prefetchedTuple = pTupleHandle; + break; + } } } else { appendOneRowToDataBlock(p, pTupleHandle); @@ -715,14 +720,9 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData resetLimitInfoForNextGroup(&pInfo->limitInfo); } - bool limitReached = applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); - // if limit is reached within a group, do not clear limiInfo otherwise the next block - // will be processed. - if (newgroup && limitReached) { - resetLimitInfoForNextGroup(&pInfo->limitInfo); - } + applyLimitOffset(&pInfo->limitInfo, p, pTaskInfo); - if (p->info.rows > 0 || limitReached) { + if (p->info.rows > 0) { break; } } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 256123d62b..0057df7902 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1839,10 +1839,6 @@ static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - if (TSDB_DATA_TYPE_VARBINARY == ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 9d5a300c23..8008f4397e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2902,7 +2902,7 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) { taosCreateMD5Hash(buf, len); strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1); len = snprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pCol->colName); - taosCreateMD5Hash(buf, len); + // note: userAlias could be truncated here strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1); } } else { @@ -2910,7 +2910,7 @@ static SNode* createMultiResFunc(SFunctionNode* pSrcFunc, SExprNode* pExpr) { taosCreateMD5Hash(buf, len); strncpy(pFunc->node.aliasName, buf, TSDB_COL_NAME_LEN - 1); len = snprintf(buf, sizeof(buf) - 1, "%s(%s)", pSrcFunc->functionName, pExpr->userAlias); - taosCreateMD5Hash(buf, len); + // note: userAlias could be truncated here strncpy(pFunc->node.userAlias, buf, TSDB_COL_NAME_LEN - 1); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 39becca781..916cc6e9ee 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -391,8 +391,8 @@ static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; if (streamTaskShouldStop(&pTask->status)) { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); return; } @@ -409,17 +409,22 @@ static void doRetryDispatchData(void* param, void* tmrId) { streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } } else { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); - qDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); + int32_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); } } else { - atomic_sub_fetch_8(&pTask->status.timerActive, 1); + int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1); + qDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); } } void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { - qError("s-task:%s dispatch data in %" PRId64 "ms", pTask->id.idStr, waitDuration); - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); + qWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration); + if (pTask->launchTaskTimer != NULL) { + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer); + } else { + pTask->launchTaskTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); + } } int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, @@ -540,8 +545,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", - pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + + qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", + pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } @@ -995,8 +1002,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data", - id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); + int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1); + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data, ref:%d", + id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, ref); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 635915aeb6..5bc21286d7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -666,6 +666,8 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { + // pTask->chkInfo.checkpointVer may be 0, when a follower is become a leader + // In this case, we try not to start fill-history task anymore. if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { doClear(pKey, pVal, pCur, pRecycleList); tFreeStreamTask(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 4b86b9713c..743d87e938 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -235,7 +235,13 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { qDebug("s-task:%s enter into scan-history data stage, status:%s", id, str); streamTaskLaunchScanHistory(pTask); } else { - qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + if (pTask->info.fillHistory == 1) { + qDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id); + pTask->status.taskStatus = TASK_STATUS__DROPPING; + ASSERT(pTask->historyTaskId.taskId == 0); + } else { + qDebug("s-task:%s downstream tasks are ready, now ready for data from wal, status:%s", id, str); + } } // when current stream task is ready, check the related fill history task. @@ -579,19 +585,17 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { // todo fix the bug: 2. race condition // an fill history task needs to be started. int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { - int32_t tId = pTask->historyTaskId.taskId; - if (tId == 0) { + SStreamMeta* pMeta = pTask->pMeta; + int32_t hTaskId = pTask->historyTaskId.taskId; + if (hTaskId == 0) { return TSDB_CODE_SUCCESS; } ASSERT(pTask->status.downstreamReady == 1); qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr, - pTask->historyTaskId.streamId, tId); + pTask->historyTaskId.streamId, hTaskId); - SStreamMeta* pMeta = pTask->pMeta; - int32_t hTaskId = pTask->historyTaskId.taskId; - - int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId}; + int64_t keys[2] = {pTask->historyTaskId.streamId, hTaskId}; // Set the execute conditions, including the query time window and the version range SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); @@ -610,11 +614,12 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { // todo failed to create timer taosMemoryFree(pInfo); } else { - atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active + int32_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);// timer is active + ASSERT(ref == 1); qDebug("s-task:%s set timer active flag", pTask->id.idStr); } } else { // timer exists - ASSERT(pTask->status.timerActive > 0); + ASSERT(pTask->status.timerActive == 1); qDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); } @@ -918,6 +923,13 @@ void streamTaskHalt(SStreamTask* pTask) { return; } + // wait for checkpoint completed + while(pTask->status.taskStatus == TASK_STATUS__CK) { + qDebug("s-task:%s status:%s during generating checkpoint, wait for 1sec and retry set status:halt", pTask->id.idStr, + streamGetTaskStatusStr(TASK_STATUS__CK)); + taosMsleep(1000); + } + // upgrade to halt status if (status == TASK_STATUS__PAUSE) { qDebug("s-task:%s upgrade status to %s from %s", pTask->id.idStr, streamGetTaskStatusStr(TASK_STATUS__HALT), diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index e8aabef8e4..cd37462cde 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -59,6 +59,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxTopic.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqMaxGroupIds.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumeDiscontinuousData.py diff --git a/tests/script/tsim/query/multires_func.sim b/tests/script/tsim/query/multires_func.sim new file mode 100644 index 0000000000..34aadffe2e --- /dev/null +++ b/tests/script/tsim/query/multires_func.sim @@ -0,0 +1,20 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +sql create database test +sql use test +sql CREATE TABLE `tb` (`ts` TIMESTAMP, `c0` INT, `c1` FLOAT, `c2` BINARY(10)) + + +sql insert into tb values("2022-05-15 00:01:08.000", 1, 1.0, "abc") +sql insert into tb values("2022-05-16 00:01:08.000", 2, 2.0, "bcd") +sql insert into tb values("2022-05-17 00:01:08.000", 3, 3.0, "cde") + + +#sleep 10000000 +system taos -P7100 -s 'source tsim/query/t/multires_func.sql' | grep -v 'Query OK' | grep -v 'Client Version' > /tmp/multires_func.result +system echo ----------------------diff start----------------------- +system git diff --exit-code --color tsim/query/r/multires_func.result /tmp/multires_func.result +system echo ----------------------diff succeed----------------------- diff --git a/tests/script/tsim/query/r/multires_func.result b/tests/script/tsim/query/r/multires_func.result new file mode 100644 index 0000000000..c221b00b97 --- /dev/null +++ b/tests/script/tsim/query/r/multires_func.result @@ -0,0 +1,31 @@ +Copyright (c) 2022 by TDengine, all rights reserved. + +taos> source tsim/query/t/multires_func.sql +taos> use test; +Database changed. + +taos> select count(*) from tb\G; +*************************** 1.row *************************** +count(*): 3 + +taos> select last(*) from tb\G; +*************************** 1.row *************************** +ts: 2022-05-17 00:01:08.000 +c0: 3 +c1: 3.0000000 +c2: cde + +taos> select last_row(*) from tb\G; +*************************** 1.row *************************** +ts: 2022-05-17 00:01:08.000 +c0: 3 +c1: 3.0000000 +c2: cde + +taos> select first(*) from tb\G; +*************************** 1.row *************************** +ts: 2022-05-15 00:01:08.000 +c0: 1 +c1: 1.0000000 +c2: abc + diff --git a/tests/script/tsim/query/t/multires_func.sql b/tests/script/tsim/query/t/multires_func.sql new file mode 100644 index 0000000000..6a191233b9 --- /dev/null +++ b/tests/script/tsim/query/t/multires_func.sql @@ -0,0 +1,5 @@ +use test; +select count(*) from tb\G; +select last(*) from tb\G; +select last_row(*) from tb\G; +select first(*) from tb\G; diff --git a/tests/system-test/0-others/splitVGroup.py b/tests/system-test/0-others/splitVGroup.py index 9fd00892e4..ed80505ce2 100644 --- a/tests/system-test/0-others/splitVGroup.py +++ b/tests/system-test/0-others/splitVGroup.py @@ -198,7 +198,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/0-others/timeRangeWise.py b/tests/system-test/0-others/timeRangeWise.py index a7dc18aa82..5ef5aa4a75 100644 --- a/tests/system-test/0-others/timeRangeWise.py +++ b/tests/system-test/0-others/timeRangeWise.py @@ -210,7 +210,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/1-insert/precisionUS.py b/tests/system-test/1-insert/precisionUS.py index 1b41d66010..d634149297 100644 --- a/tests/system-test/1-insert/precisionUS.py +++ b/tests/system-test/1-insert/precisionUS.py @@ -220,7 +220,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/2-query/interval_limit_opt.py b/tests/system-test/2-query/interval_limit_opt.py index fef6e9facd..851138fed3 100644 --- a/tests/system-test/2-query/interval_limit_opt.py +++ b/tests/system-test/2-query/interval_limit_opt.py @@ -251,10 +251,19 @@ class TDTestCase: tdSql.checkData(2, 4, 9) tdSql.checkData(3, 4, 9) + def test_partition_by_limit_no_agg(self): + sql_template = 'select t1 from meters partition by t1 limit %d' + + for i in range(1, 5000, 1000): + tdSql.query(sql_template % i) + tdSql.checkRows(5 * i) + + def run(self): self.prepareTestEnv() self.test_interval_limit_offset() self.test_interval_partition_by_slimit_limit() + self.test_partition_by_limit_no_agg() def stop(self): tdSql.close() diff --git a/tests/system-test/2-query/orderBy.py b/tests/system-test/2-query/orderBy.py index fed1651b3a..fa447cbca4 100644 --- a/tests/system-test/2-query/orderBy.py +++ b/tests/system-test/2-query/orderBy.py @@ -220,7 +220,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/2-query/smaBasic.py b/tests/system-test/2-query/smaBasic.py index c221a70605..a82d190e2b 100644 --- a/tests/system-test/2-query/smaBasic.py +++ b/tests/system-test/2-query/smaBasic.py @@ -269,7 +269,7 @@ class TDTestCase: # init def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) + seed = time.time() % 10000 random.seed(seed) self.replicaVar = int(replicaVar) tdLog.debug(f"start to excute {__file__}") diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 7f972d857e..087e5a7c62 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -233,7 +233,7 @@ class TMQCom: #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): rowsBatched = 0 - sql += " %s%d values "%(stbName,i) + sql += " %s.%s%d values "%(dbName, stbName, i) for j in range(rowsPerTbl): sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) rowsBatched += 1 @@ -241,7 +241,7 @@ class TMQCom: tsql.execute(sql) rowsBatched = 0 if j < rowsPerTbl - 1: - sql = "insert into %s%d values " %(stbName,i) + sql = "insert into %s.%s%d values " %(dbName, stbName,i) else: sql = "insert into " #end sql @@ -263,7 +263,7 @@ class TMQCom: #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): rowsBatched = 0 - sql += " %s%d values "%(ctbPrefix,i) + sql += " %s.%s%d values "%(dbName, ctbPrefix,i) for j in range(rowsPerTbl): if (j % 2 == 0): sql += "(%d, %d, %d, 'tmqrow_%d') "%(startTs + j, j, j, j) @@ -274,7 +274,7 @@ class TMQCom: tsql.execute(sql) rowsBatched = 0 if j < rowsPerTbl - 1: - sql = "insert into %s%d values " %(ctbPrefix,i) + sql = "insert into %s.%s%d values " %(dbName, ctbPrefix, i) else: sql = "insert into " #end sql @@ -296,7 +296,7 @@ class TMQCom: #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): rowsBatched = 0 - sql += " %s%d values "%(ctbPrefix,i+ctbStartIdx) + sql += " %s.%s%d values "%(dbName, ctbPrefix, i+ctbStartIdx) for j in range(rowsPerTbl): if (j % 2 == 0): sql += "(%d, %d, %d, 'tmqrow_%d', now) "%(startTs + j, j, j, j) @@ -307,7 +307,7 @@ class TMQCom: tsql.execute(sql) rowsBatched = 0 if j < rowsPerTbl - 1: - sql = "insert into %s%d values " %(ctbPrefix,i+ctbStartIdx) + sql = "insert into %s.%s%d values " %(dbName, ctbPrefix, i+ctbStartIdx) else: sql = "insert into " #end sql diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 72cf3cd1cc..a7f79fc9db 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -28,12 +28,12 @@ static void shellRecordCommandToHistory(char *command); static int32_t shellRunCommand(char *command, bool recordHistory); static void shellRunSingleCommandImp(char *command); static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision); -static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); +static int64_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); static void shellPrintNChar(const char *str, int32_t length, int32_t width); static void shellPrintGeometry(const unsigned char *str, int32_t length, int32_t width); -static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql); -static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql); -static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql); +static int64_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql); +static int64_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql); +static int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql); static void shellReadHistory(); static void shellWriteHistory(); static void shellPrintError(TAOS_RES *tres, int64_t st); @@ -238,14 +238,14 @@ void shellRunSingleCommandImp(char *command) { if (pFields != NULL) { // select and show kinds of commands int32_t error_no = 0; - int32_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command); + int64_t numOfRows = shellDumpResult(pSql, fname, &error_no, printMode, command); if (numOfRows < 0) return; et = taosGetTimestampUs(); if (error_no == 0) { - printf("Query OK, %d row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6); + printf("Query OK, %"PRId64 " row(s) in set (%.6fs)\r\n", numOfRows, (et - st) / 1E6); } else { - printf("Query interrupted (%s), %d row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); + printf("Query interrupted (%s), %"PRId64 " row(s) in set (%.6fs)\r\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); } taos_free_result(pSql); } else { @@ -430,7 +430,7 @@ void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, i } } -int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) { +int64_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) { char fullname[PATH_MAX] = {0}; if (taosExpandDir(fname, fullname, PATH_MAX) != 0) { tstrncpy(fullname, fname, PATH_MAX); @@ -459,7 +459,7 @@ int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres) { } taosFprintfFile(pFile, "\r\n"); - int32_t numOfRows = 0; + int64_t numOfRows = 0; do { int32_t *length = taos_fetch_lengths(tres); for (int32_t i = 0; i < num_fields; i++) { @@ -702,7 +702,7 @@ bool shellIsShowQuery(const char *sql) { return false; } -int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { +int64_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; @@ -726,11 +726,11 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } - int32_t numOfRows = 0; + int64_t numOfRows = 0; int32_t showMore = 1; do { if (numOfRows < resShowMaxNum) { - printf("*************************** %d.row ***************************\r\n", numOfRows + 1); + printf("*************************** %"PRId64".row ***************************\r\n", numOfRows + 1); int32_t *length = taos_fetch_lengths(tres); @@ -856,7 +856,7 @@ void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields) { putchar('\n'); } -int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { +int64_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { TAOS_ROW row = taos_fetch_row(tres); if (row == NULL) { return 0; @@ -879,7 +879,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; } - int32_t numOfRows = 0; + int64_t numOfRows = 0; int32_t showMore = 1; do { @@ -915,8 +915,8 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) { return numOfRows; } -int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) { - int32_t numOfRows = 0; +int64_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql) { + int64_t numOfRows = 0; if (fname != NULL) { numOfRows = shellDumpResultToFile(fname, tres); } else if (vertical) { diff --git a/utils/test/c/varbinary_test.c b/utils/test/c/varbinary_test.c index e29b94ad1f..1d432ffbba 100644 --- a/utils/test/c/varbinary_test.c +++ b/utils/test/c/varbinary_test.c @@ -157,10 +157,6 @@ void varbinary_sql_test() { taos_free_result(pRes); // string function test, not support - pRes = taos_query(taos, "select length(c2) from stb"); - ASSERT(taos_errno(pRes) != 0); - taos_free_result(pRes); - pRes = taos_query(taos, "select ltrim(c2) from stb"); ASSERT(taos_errno(pRes) != 0); taos_free_result(pRes); @@ -190,7 +186,7 @@ void varbinary_sql_test() { ASSERT(taos_errno(pRes) != 0); taos_free_result(pRes); - // support first/last/last_row/count/hyperloglog/sample/tail/mode + // support first/last/last_row/count/hyperloglog/sample/tail/mode/length pRes = taos_query(taos, "select first(c2) from stb"); ASSERT(taos_errno(pRes) == 0); taos_free_result(pRes); @@ -207,6 +203,10 @@ void varbinary_sql_test() { ASSERT(taos_errno(pRes) == 0); taos_free_result(pRes); + pRes = taos_query(taos, "select length(c2) from stb where c2 = '\\x7F8290'"); + ASSERT(taos_errno(pRes) == 0); + taos_free_result(pRes); + pRes = taos_query(taos, "select cast(t2 as varbinary(16)) from stb order by ts"); while ((row = taos_fetch_row(pRes)) != NULL) { int32_t* length = taos_fetch_lengths(pRes);