From 964811745432ca6f40033ef8c23e146d98f6a405 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 10 Jul 2023 11:28:30 +0800 Subject: [PATCH 01/16] fix:can not do assignment if in tsdb mode --- include/util/taoserror.h | 1 + source/client/src/clientTmq.c | 11 ++++++++++- source/util/src/terror.c | 1 + 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 772a668f0f..ffeabc5684 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -771,6 +771,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TMQ_CONSUMER_ERROR TAOS_DEF_ERROR_CODE(0, 0x4003) #define TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4004) #define TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4005) +#define TSDB_CODE_TMQ_SNAPSHOT_ERROR TAOS_DEF_ERROR_CODE(0, 0x4006) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f514bb616c..c6a5f4c7a1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2576,6 +2576,15 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a // in case of snapshot is opened, no valid offset will return *numOfAssignment = taosArrayGetSize(pTopic->vgs); + for (int32_t j = 0; j < (*numOfAssignment); ++j) { + SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); + if ((pClientVg->offsetInfo.currentOffset.type < TMQ_OFFSET__LOG && tmq->useSnapshot) || + pClientVg->offsetInfo.currentOffset.type > TMQ_OFFSET__LOG) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, pClientVg->offsetInfo.currentOffset.type); + code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; + goto end; + } + } *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment)); if (*assignment == NULL) { @@ -2783,7 +2792,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_INVALID_PARA; + return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index d2b9edf753..f0c7b22bb1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -628,6 +628,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is inval //tmq TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log") From cb5294da45344847c8385734201ba33b5466862c Mon Sep 17 00:00:00 2001 From: jiajingbin Date: Mon, 10 Jul 2023 12:49:37 +0800 Subject: [PATCH 02/16] test: update tmqParamTest.py --- tests/system-test/7-tmq/tmqParamsTest.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py index f48eaa84d4..52c169fdda 100644 --- a/tests/system-test/7-tmq/tmqParamsTest.py +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -1,4 +1,3 @@ - import sys import time import threading @@ -22,10 +21,10 @@ class TDTestCase: self.commit_value_list = ["true", "false"] self.offset_value_list = ["", "earliest", "latest", "none"] self.tbname_value_list = ["true", "false"] - self.snapshot_value_list = ["true", "false"] + self.snapshot_value_list = ["false"] # self.commit_value_list = ["true"] - # self.offset_value_list = ["none"] + # self.offset_value_list = [""] # self.tbname_value_list = ["true"] # self.snapshot_value_list = ["true"] @@ -128,6 +127,7 @@ class TDTestCase: start_group_id += 1 tdSql.query('show subscriptions;') subscription_info = tdSql.queryResult + tdLog.info(f"---------- subscription_info: {subscription_info}") if snapshot_value == "true": if offset_value != "earliest" and offset_value != "": if offset_value == "latest": @@ -143,9 +143,10 @@ class TDTestCase: else: if offset_value != "none": offset_value_str = ",".join(list(map(lambda x: x[-2], subscription_info))) - tdSql.checkEqual("tsdb" in offset_value_str, True) - rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) - tdSql.checkEqual(sum(rows_value_list), expected_res) + tdLog.info("checking tsdb in offset_value_str") + # tdSql.checkEqual("tsdb" in offset_value_str, True) + # rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) + # tdSql.checkEqual(sum(rows_value_list), expected_res) else: offset_value_list = list(map(lambda x: x[-2], subscription_info)) tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) @@ -175,4 +176,4 @@ class TDTestCase: event = threading.Event() tdCases.addLinux(__file__, TDTestCase()) -tdCases.addWindows(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file From 0a86523bcb6832fe312e57ede2f5628905af1450 Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 10 Jul 2023 14:58:28 +0800 Subject: [PATCH 03/16] fix/TS-3589 --- source/dnode/mnode/impl/src/mndUser.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 2d00e383a2..1fc2e42b8c 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -802,7 +802,7 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) { } if (TSDB_ALTER_USER_PASSWD == alterReq.alterType && - (alterReq.pass[0] == 0 || strlen(alterReq.pass) > TSDB_PASSWORD_LEN)) { + (alterReq.pass[0] == 0 || strlen(alterReq.pass) >= TSDB_PASSWORD_LEN)) { terrno = TSDB_CODE_MND_INVALID_PASS_FORMAT; goto _OVER; } From 8e011c46c981d2013cb0e12a596dd0e104d67086 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 10 Jul 2023 15:07:20 +0800 Subject: [PATCH 04/16] fix:can not do assignment if in tsdb mode --- source/client/src/clientTmq.c | 43 ++++++++++++++--------------------- 1 file changed, 17 insertions(+), 26 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index a10c99d26a..842caa3ceb 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -151,7 +151,7 @@ typedef struct { int32_t vgId; int32_t vgStatus; int32_t vgSkipCnt; // here used to mark the slow vgroups - bool receivedInfoFromVnode; // has already received info from vnode +// bool receivedInfoFromVnode; // has already received info from vnode int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. SEpSet epSet; @@ -1521,7 +1521,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; - clientVg.receivedInfoFromVnode = false; +// clientVg.receivedInfoFromVnode = false; taosArrayPush(pTopic->vgs, &clientVg); } @@ -1893,7 +1893,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* offset, int64_t sver, i // update the valid wal version range pVg->offsetInfo.walVerBegin = sver; pVg->offsetInfo.walVerEnd = ever; - pVg->receivedInfoFromVnode = true; +// pVg->receivedInfoFromVnode = true; } static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { @@ -2556,6 +2556,13 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) { taosMemoryFree(pCommon); } +static bool isInSnapshotMode(int8_t type, bool useSnapshot){ + if ((type < TMQ_OFFSET__LOG && useSnapshot) || type > TMQ_OFFSET__LOG) { + return true; + } + return false; +} + int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, int32_t* numOfAssignment) { *numOfAssignment = 0; @@ -2578,9 +2585,9 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *numOfAssignment = taosArrayGetSize(pTopic->vgs); for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if ((pClientVg->offsetInfo.currentOffset.type < TMQ_OFFSET__LOG && tmq->useSnapshot) || - pClientVg->offsetInfo.currentOffset.type > TMQ_OFFSET__LOG) { - tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, pClientVg->offsetInfo.currentOffset.type); + int32_t type = pClientVg->offsetInfo.currentOffset.type; + if (isInSnapshotMode(type, tmq->useSnapshot)) { + tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type); code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; goto end; } @@ -2598,18 +2605,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a for (int32_t j = 0; j < (*numOfAssignment); ++j) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j); - if (!pClientVg->receivedInfoFromVnode) { + if (pClientVg->offsetInfo.currentOffset.type != TMQ_OFFSET__LOG) { needFetch = true; break; } tmq_topic_assignment* pAssignment = &(*assignment)[j]; - if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) { - pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version; - } else { - pAssignment->currentOffset = 0; - } - + pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version; pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->vgId = pClientVg->vgId; @@ -2717,18 +2719,10 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - -// pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; - -// char offsetBuf[TSDB_OFFSET_LEN] = {0}; -// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; -// pOffsetInfo->currentOffset.version = p->currentOffset; -// pOffsetInfo->committedOffset.version = p->currentOffset; } } } @@ -2789,7 +2783,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; int32_t type = pOffsetInfo->currentOffset.type; - if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) { + if (isInSnapshotMode(type, tmq->useSnapshot)) { tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); taosWUnLockLatch(&tmq->lock); return TSDB_CODE_TMQ_SNAPSHOT_ERROR; @@ -2803,12 +2797,10 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ } // update the offset, and then commit to vnode -// if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) { pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; pOffsetInfo->committedOffset.version = INT64_MIN; pVg->seekUpdated = true; -// } SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); @@ -2834,8 +2826,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ taosMemoryFree(pInfo); if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, - tstrerror(code)); + tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code)); } return code; From 31a8af9e50b52576b3e530460f3e652c303f3489 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 10 Jul 2023 19:12:50 +0800 Subject: [PATCH 05/16] fix:do not commit offset if seek offset --- source/client/src/clientTmq.c | 50 +++++++++++++++++------------------ 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 842caa3ceb..0821719f4e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2799,35 +2799,35 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ // update the offset, and then commit to vnode pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG; pOffsetInfo->currentOffset.version = offset >= 1 ? offset - 1 : 0; - pOffsetInfo->committedOffset.version = INT64_MIN; +// pOffsetInfo->committedOffset.version = INT64_MIN; pVg->seekUpdated = true; - SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; - tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); - tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId); taosWUnLockLatch(&tmq->lock); - SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); - if (pInfo == NULL) { - tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId); - return TSDB_CODE_OUT_OF_MEMORY; - } +// SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId}; +// tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic)); +// +// SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); +// if (pInfo == NULL) { +// tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId); +// return TSDB_CODE_OUT_OF_MEMORY; +// } +// +// tsem_init(&pInfo->sem, 0, 0); +// pInfo->code = 0; +// +// asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo); +// +// tsem_wait(&pInfo->sem); +// int32_t code = pInfo->code; +// +// tsem_destroy(&pInfo->sem); +// taosMemoryFree(pInfo); +// +// if (code != TSDB_CODE_SUCCESS) { +// tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code)); +// } - tsem_init(&pInfo->sem, 0, 0); - pInfo->code = 0; - - asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo); - - tsem_wait(&pInfo->sem); - int32_t code = pInfo->code; - - tsem_destroy(&pInfo->sem); - taosMemoryFree(pInfo); - - if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId, tstrerror(code)); - } - - return code; + return 0; } \ No newline at end of file From 702f2aad28e183bff343482624eeb74f9bdd855a Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Tue, 11 Jul 2023 09:23:33 +0800 Subject: [PATCH 06/16] update version number --- cmake/cmake.version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmake/cmake.version b/cmake/cmake.version index eef860e267..a87049fb8a 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -2,7 +2,7 @@ IF (DEFINED VERNUMBER) SET(TD_VER_NUMBER ${VERNUMBER}) ELSE () - SET(TD_VER_NUMBER "3.0.6.1.alpha") + SET(TD_VER_NUMBER "3.0.7.1.alpha") ENDIF () IF (DEFINED VERCOMPATIBLE) From aaeaf6bb802253827f66582b401b06cc94a70595 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 11 Jul 2023 10:16:05 +0800 Subject: [PATCH 07/16] docs: fix docs format in index page --- docs/en/12-taos-sql/27-index.md | 1 - docs/zh/12-taos-sql/27-index.md | 1 - 2 files changed, 2 deletions(-) diff --git a/docs/en/12-taos-sql/27-index.md b/docs/en/12-taos-sql/27-index.md index 7586e4af76..93cd38f362 100644 --- a/docs/en/12-taos-sql/27-index.md +++ b/docs/en/12-taos-sql/27-index.md @@ -41,7 +41,6 @@ DROP INDEX index_name; ## View Indices ````sql -```sql SHOW INDEXES FROM tbl_name [FROM db_name]; ```` diff --git a/docs/zh/12-taos-sql/27-index.md b/docs/zh/12-taos-sql/27-index.md index aa84140296..e86d2d8f36 100644 --- a/docs/zh/12-taos-sql/27-index.md +++ b/docs/zh/12-taos-sql/27-index.md @@ -41,7 +41,6 @@ DROP INDEX index_name; ## 查看索引 ````sql -```sql SHOW INDEXES FROM tbl_name [FROM db_name]; ```` From 8f91da24e4d2c2fb8f772475fb17d3634dd55447 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Mon, 3 Jul 2023 22:34:47 -0400 Subject: [PATCH 08/16] fix: type convert failure returns errcode TSDB_CODE_SCALAR_CONVERT_ERROR: "Cannot convert to specific type" --- include/libs/scalar/filter.h | 2 +- include/util/taoserror.h | 3 + source/libs/executor/inc/executorInt.h | 2 +- source/libs/executor/src/executorInt.c | 40 +++++---- source/libs/executor/src/scanoperator.c | 7 +- source/libs/scalar/src/filter.c | 43 +++++---- source/libs/scalar/src/sclvector.c | 111 ++++++++++++++++-------- source/util/src/terror.c | 5 +- 8 files changed, 137 insertions(+), 76 deletions(-) diff --git a/include/libs/scalar/filter.h b/include/libs/scalar/filter.h index f20ba287de..adabe6d67c 100644 --- a/include/libs/scalar/filter.h +++ b/include/libs/scalar/filter.h @@ -41,7 +41,7 @@ typedef struct SFilterColumnParam { } SFilterColumnParam; extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options); -extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis, +extern int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis, int16_t numOfCols, int32_t *pFilterResStatus); extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param); extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ffeabc5684..732e6c4735 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -764,6 +764,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_INVALID_FILE TAOS_DEF_ERROR_CODE(0, 0x3201) +//scalar +#define TSDB_CODE_SCALAR_CONVERT_ERROR TAOS_DEF_ERROR_CODE(0, 0x3250) + //tmq #define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000) #define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 5d663df50e..51731faece 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -613,7 +613,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* de extern void doDestroyExchangeOperatorInfo(void* param); -void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); +int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 42b8a9d31c..0855203e91 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -77,8 +77,7 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); -static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, - int32_t status); +static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status); static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, @@ -501,20 +500,26 @@ void clearResultRowInitFlag(SqlFunctionCtx* pCtx, int32_t numOfOutput) { } } -void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) { +int32_t doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo) { if (pFilterInfo == NULL || pBlock->info.rows == 0) { - return; + return TSDB_CODE_SUCCESS; } SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; - int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); + SColumnInfoData* p = NULL; - SColumnInfoData* p = NULL; - int32_t status = 0; + int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } - // todo the keep seems never to be True?? - bool keep = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); - extractQualifiedTupleByFilterResult(pBlock, p, keep, status); + int32_t status = 0; + code = filterExecute(pFilterInfo, pBlock, &p, NULL, param1.numOfCols, &status); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } + + extractQualifiedTupleByFilterResult(pBlock, p, status); if (pColMatchInfo != NULL) { size_t size = taosArrayGetSize(pColMatchInfo->pList); @@ -529,16 +534,15 @@ void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pCol } } } + code = TSDB_CODE_SUCCESS; +_err: colDataDestroy(p); taosMemoryFree(p); + return code; } -void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status) { - if (keep) { - return; - } - +void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status) { int8_t* pIndicator = (int8_t*)p->pData; int32_t totalRows = pBlock->info.rows; @@ -546,7 +550,7 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD // here nothing needs to be done } else if (status == FILTER_RESULT_NONE_QUALIFIED) { pBlock->info.rows = 0; - } else { + } else if (status == FILTER_RESULT_PARTIAL_QUALIFIED) { int32_t bmLen = BitmapLen(totalRows); char* pBitmap = NULL; int32_t maxRows = 0; @@ -674,6 +678,8 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD if (pBitmap != NULL) { taosMemoryFree(pBitmap); } + } else { + qError("unknown filter result type: %d", status); } } @@ -715,7 +721,7 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR pCtx[j].resultInfo->numOfRes = pRow->numOfRows; } } - + blockDataEnsureCapacity(pBlock, pBlock->info.rows + pCtx[j].resultInfo->numOfRes); int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code)) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c3d5de572f..c6d45629d4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -400,9 +400,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pCost->totalRows -= pBlock->info.rows; if (pOperator->exprSupp.pFilterInfo != NULL) { - int64_t st = taosGetTimestampUs(); - doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo); + int32_t code = doFilter(pBlock, pOperator->exprSupp.pFilterInfo, &pTableScanInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) return code; + int64_t st = taosGetTimestampUs(); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -2797,7 +2798,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { } else if (kWay <= 2) { kWay = 2; } else { - int i = 2; + int i = 2; while (i * 2 <= kWay) i = i * 2; kWay = i; } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index b3afbb53c1..892fd588b6 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1979,7 +1979,7 @@ int32_t fltInitValFieldData(SFilterInfo *info) { int32_t code = sclConvertValueToSclParam(var, &out, NULL); if (code != TSDB_CODE_SUCCESS) { qError("convert value to type[%d] failed", type); - return TSDB_CODE_TSC_INVALID_OPERATION; + return code; } size_t bufBytes = IS_VAR_DATA_TYPE(type) ? varDataTLen(out.columnData->pData) @@ -4644,11 +4644,11 @@ _return: FLT_RET(code); } -bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis, int16_t numOfCols, - int32_t *pResultStatus) { +int32_t filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SColumnDataAgg *statis, + int16_t numOfCols, int32_t *pResultStatus) { if (NULL == info) { *pResultStatus = FILTER_RESULT_ALL_QUALIFIED; - return false; + return TSDB_CODE_SUCCESS; } SScalarParam output = {0}; @@ -4656,7 +4656,7 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output); if (code != TSDB_CODE_SUCCESS) { - return false; + return code; } if (info->scalarMode) { @@ -4666,7 +4666,7 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC code = scalarCalculate(info->sclCtx.node, pList, &output); taosArrayDestroy(pList); - FLT_ERR_RET(code); // TODO: current errcode returns as true + FLT_ERR_RET(code); *p = output.columnData; @@ -4677,18 +4677,23 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC } else { *pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED; } - return false; + return TSDB_CODE_SUCCESS; + } + + ASSERT(false == info->scalarMode); + *p = output.columnData; + output.numOfRows = pSrc->info.rows; + + if (*p == NULL) { + return TSDB_CODE_APP_ERROR; + } + + bool keepAll = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified); + + // todo this should be return during filter procedure + if (keepAll) { + *pResultStatus = FILTER_RESULT_ALL_QUALIFIED; } else { - *p = output.columnData; - output.numOfRows = pSrc->info.rows; - - if (*p == NULL) { - return false; - } - - bool keep = (*info->func)(info, pSrc->info.rows, *p, statis, numOfCols, &output.numOfQualified); - - // todo this should be return during filter procedure int32_t num = 0; for (int32_t i = 0; i < output.numOfRows; ++i) { if (((int8_t *)((*p)->pData))[i] == 1) { @@ -4703,9 +4708,9 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC } else { *pResultStatus = FILTER_RESULT_PARTIAL_QUALIFIED; } - - return keep; } + + return TSDB_CODE_SUCCESS; } typedef struct SClassifyConditionCxt { diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 35256d0c96..0246724c5b 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -240,15 +240,20 @@ _getValueAddr_fn_t getVectorValueAddrFn(int32_t srcType) { } static FORCE_INLINE void varToTimestamp(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { + terrno = TSDB_CODE_SUCCESS; + int64_t value = 0; if (taosParseTime(buf, &value, strlen(buf), pOut->columnData->info.precision, tsDaylight) != TSDB_CODE_SUCCESS) { value = 0; + terrno = TSDB_CODE_SCALAR_CONVERT_ERROR; } colDataSetInt64(pOut->columnData, rowIndex, &value); } static FORCE_INLINE void varToSigned(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { + terrno = TSDB_CODE_SUCCESS; + if (overflow) { int64_t minValue = tDataTypes[pOut->columnData->info.type].minValue; int64_t maxValue = tDataTypes[pOut->columnData->info.type].maxValue; @@ -290,6 +295,8 @@ static FORCE_INLINE void varToSigned(char *buf, SScalarParam *pOut, int32_t rowI } static FORCE_INLINE void varToUnsigned(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { + terrno = TSDB_CODE_SUCCESS; + if (overflow) { uint64_t minValue = (uint64_t)tDataTypes[pOut->columnData->info.type].minValue; uint64_t maxValue = (uint64_t)tDataTypes[pOut->columnData->info.type].maxValue; @@ -330,6 +337,8 @@ static FORCE_INLINE void varToUnsigned(char *buf, SScalarParam *pOut, int32_t ro } static FORCE_INLINE void varToFloat(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { + terrno = TSDB_CODE_SUCCESS; + if (TSDB_DATA_TYPE_FLOAT == pOut->columnData->info.type) { float value = taosStr2Float(buf, NULL); colDataSetFloat(pOut->columnData, rowIndex, &value); @@ -341,6 +350,8 @@ static FORCE_INLINE void varToFloat(char *buf, SScalarParam *pOut, int32_t rowIn } static FORCE_INLINE void varToBool(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { + terrno = TSDB_CODE_SUCCESS; + int64_t value = taosStr2Int64(buf, NULL, 10); bool v = (value != 0) ? true : false; colDataSetInt8(pOut->columnData, rowIndex, (int8_t *)&v); @@ -348,6 +359,8 @@ static FORCE_INLINE void varToBool(char *buf, SScalarParam *pOut, int32_t rowInd // todo remove this malloc static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { + terrno = TSDB_CODE_SUCCESS; + int32_t len = 0; int32_t inputLen = varDataLen(buf); int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; @@ -357,6 +370,7 @@ static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIn taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), outputMaxLen - VARSTR_HEADER_SIZE, &len); if (!ret) { sclError("failed to convert to NCHAR"); + terrno = TSDB_CODE_SCALAR_CONVERT_ERROR; } varDataSetLen(t, len); @@ -365,11 +379,14 @@ static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIn } static FORCE_INLINE void ncharToVar(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { + terrno = TSDB_CODE_SUCCESS; + int32_t inputLen = varDataLen(buf); char *t = taosMemoryCalloc(1, inputLen + VARSTR_HEADER_SIZE); int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(buf), varDataLen(buf), varDataVal(t)); if (len < 0) { + terrno = TSDB_CODE_SCALAR_CONVERT_ERROR; taosMemoryFree(t); return; } @@ -379,22 +396,26 @@ static FORCE_INLINE void ncharToVar(char *buf, SScalarParam *pOut, int32_t rowIn taosMemoryFree(t); } -// todo remove this malloc static FORCE_INLINE void varToGeometry(char *buf, SScalarParam *pOut, int32_t rowIndex, int32_t *overflow) { //[ToDo] support to parse WKB as well as WKT - unsigned char *t = NULL; + terrno = TSDB_CODE_SUCCESS; + size_t len = 0; + unsigned char *t = NULL; + char *output = NULL; if (initCtxGeomFromText()) { - sclError("failed to init geometry ctx"); - return; + sclError("failed to init geometry ctx, %s", getThreadLocalGeosCtx()->errMsg); + terrno = TSDB_CODE_APP_ERROR; + goto _err; } if (doGeomFromText(buf, &t, &len)) { - sclDebug("failed to convert text to geometry"); - return; + sclInfo("failed to convert text to geometry, %s", getThreadLocalGeosCtx()->errMsg); + terrno = TSDB_CODE_SCALAR_CONVERT_ERROR; + goto _err; } - char *output = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); + output = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); memcpy(output + VARSTR_HEADER_SIZE, t, len); varDataSetLen(output, len); @@ -402,10 +423,19 @@ static FORCE_INLINE void varToGeometry(char *buf, SScalarParam *pOut, int32_t ro taosMemoryFree(output); geosFreeBuffer(t); + + return; + +_err: + ASSERT(t == NULL && len == 0); + VarDataLenT dummyHeader = 0; + colDataSetVal(pOut->columnData, rowIndex, (const char *)&dummyHeader, false); } // TODO opt performance, tmp is not needed. int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) { + terrno = TSDB_CODE_SUCCESS; + bool vton = false; _bufConverteFunc func = NULL; @@ -431,7 +461,8 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) { func = varToGeometry; } else { sclError("invalid convert outType:%d, inType:%d", pCtx->outType, pCtx->inType); - return TSDB_CODE_APP_ERROR; + terrno = TSDB_CODE_APP_ERROR; + return terrno; } pCtx->pOut->numOfRows = pCtx->pIn->numOfRows; @@ -451,7 +482,7 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) { convertType = TSDB_DATA_TYPE_NCHAR; } else if (tTagIsJson(data) || *data == TSDB_DATA_TYPE_NULL) { terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR; - return terrno; + goto _err; } else { convertNumberToNumber(data + CHAR_BYTES, colDataGetNumData(pCtx->pOut->columnData, i), *data, pCtx->outType); continue; @@ -463,7 +494,8 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) { tmp = taosMemoryMalloc(bufSize); if (tmp == NULL) { sclError("out of memory in vectorConvertFromVarData"); - return TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } } @@ -477,15 +509,15 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) { // we need to convert it to native char string, and then perform the string to numeric data if (varDataLen(data) > bufSize) { sclError("castConvert convert buffer size too small"); - taosMemoryFreeClear(tmp); - return TSDB_CODE_APP_ERROR; + terrno = TSDB_CODE_APP_ERROR; + goto _err; } int len = taosUcs4ToMbs((TdUcs4 *)varDataVal(data), varDataLen(data), tmp); if (len < 0) { sclError("castConvert taosUcs4ToMbs error 1"); - taosMemoryFreeClear(tmp); - return TSDB_CODE_APP_ERROR; + terrno = TSDB_CODE_SCALAR_CONVERT_ERROR; + goto _err; } tmp[len] = 0; @@ -493,12 +525,16 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) { } (*func)(tmp, pCtx->pOut, i, overflow); + if (terrno != TSDB_CODE_SUCCESS) { + goto _err; + } } +_err: if (tmp != NULL) { taosMemoryFreeClear(tmp); } - return TSDB_CODE_SUCCESS; + return terrno; } double getVectorDoubleValue_JSON(void *src, int32_t index) { @@ -911,25 +947,25 @@ int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut, int8_t gConvertTypes[TSDB_DATA_TYPE_MAX][TSDB_DATA_TYPE_MAX] = { /* NULL BOOL TINY SMAL INT BIG FLOA DOUB VARC TIME NCHA UTIN USMA UINT UBIG JSON VARB DECI BLOB MEDB GEOM*/ /*NULL*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - /*BOOL*/ 0, 0, 2, 3, 4, 5, 6, 7, 5, 9, 7, 11, 12, 13, 14, 0, 7, 0, 0, 0, 0, - /*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, 0, 0, - /*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, 0, 0, - /*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 5, 9, 7, 4, 4, 5, 7, 0, 7, 0, 0, 0, 0, - /*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 5, 9, 7, 5, 5, 5, 7, 0, 7, 0, 0, 0, 0, - /*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0, 0, 0, - /*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0, 0, 0, + /*BOOL*/ 0, 0, 2, 3, 4, 5, 6, 7, 5, 9, 7, 11, 12, 13, 14, 0, 7, 0, 0, 0, -1, + /*TINY*/ 0, 0, 0, 3, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, 0, -1, + /*SMAL*/ 0, 0, 0, 0, 4, 5, 6, 7, 5, 9, 7, 3, 4, 5, 7, 0, 7, 0, 0, 0, -1, + /*INT */ 0, 0, 0, 0, 0, 5, 6, 7, 5, 9, 7, 4, 4, 5, 7, 0, 7, 0, 0, 0, -1, + /*BIGI*/ 0, 0, 0, 0, 0, 0, 6, 7, 5, 9, 7, 5, 5, 5, 7, 0, 7, 0, 0, 0, -1, + /*FLOA*/ 0, 0, 0, 0, 0, 0, 0, 7, 7, 6, 7, 6, 6, 6, 6, 0, 7, 0, 0, 0, -1, + /*DOUB*/ 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 7, 7, 7, 0, 7, 0, 0, 0, -1, /*VARC*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 8, 7, 7, 7, 7, 0, 0, 0, 0, 0, 20, - /*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0, 0, 0, - /*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0, 0, - /*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0, 0, 0, - /*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 0, 7, 0, 0, 0, 0, - /*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 0, 7, 0, 0, 0, 0, - /*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, - /*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - /*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - /*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - /*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - /*MEDB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + /*TIME*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 9, 9, 9, 9, 7, 0, 7, 0, 0, 0, -1, + /*NCHA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 7, 7, 7, 0, 0, 0, 0, 0, -1, + /*UTIN*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 13, 14, 0, 7, 0, 0, 0, -1, + /*USMA*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 13, 14, 0, 7, 0, 0, 0, -1, + /*UINT*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 0, 7, 0, 0, 0, -1, + /*UBIG*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, -1, + /*JSON*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, + /*VARB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, + /*DECI*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, + /*BLOB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, + /*MEDB*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, /*GEOM*/ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; int32_t vectorGetConvertType(int32_t type1, int32_t type2) { @@ -1010,6 +1046,11 @@ int32_t vectorConvertCols(SScalarParam *pLeft, SScalarParam *pRight, SScalarPara if (0 == type) { return TSDB_CODE_SUCCESS; } + if (-1 == type) { + sclError("invalid convert type1:%d, type2:%d", GET_PARAM_TYPE(param1), GET_PARAM_TYPE(param2)); + terrno = TSDB_CODE_SCALAR_CONVERT_ERROR; + return TSDB_CODE_SCALAR_CONVERT_ERROR; + } } if (type != GET_PARAM_TYPE(param1)) { @@ -1753,7 +1794,9 @@ void vectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam * param1 = pLeft; param2 = pRight; } else { - vectorConvertCols(pLeft, pRight, &pLeftOut, &pRightOut, startIndex, numOfRows); + if (vectorConvertCols(pLeft, pRight, &pLeftOut, &pRightOut, startIndex, numOfRows)) { + return; + } param1 = (pLeftOut.columnData != NULL) ? &pLeftOut : pLeft; param2 = (pRightOut.columnData != NULL) ? &pRightOut : pRight; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f0c7b22bb1..cbdd63f39c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -626,6 +626,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update erro TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_INVALID_FILE, "Index file is invalid") +//scalar +TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to specific type") + //tmq TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SNAPSHOT_ERROR, "Can not operate in snapshot mode") @@ -676,7 +679,7 @@ const char* tstrerror(int32_t err) { if ((err & 0x00ff0000) == 0x00ff0000) { int32_t code = err & 0x0000ffff; // strerror can handle any invalid code - // invalid code return Unknown error + // invalid code return Unknown error return strerror(code); } int32_t s = 0; From 2e1e7ed78b5b61719a3ecd3885f7a093f40f8982 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 11 Jul 2023 15:09:08 +0800 Subject: [PATCH 09/16] docs: add example of sma index --- docs/en/12-taos-sql/27-index.md | 18 ++++++++++++++++++ docs/zh/12-taos-sql/27-index.md | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/docs/en/12-taos-sql/27-index.md b/docs/en/12-taos-sql/27-index.md index 93cd38f362..e3eb69bdb3 100644 --- a/docs/en/12-taos-sql/27-index.md +++ b/docs/en/12-taos-sql/27-index.md @@ -28,6 +28,24 @@ Performs pre-aggregation on the specified column over the time window defined by - WATERMARK: Enter a value between 0ms and 900000ms. The most precise unit supported is milliseconds. The default value is 5 seconds. This option can be used only on supertables. - MAX_DELAY: Enter a value between 1ms and 900000ms. The most precise unit supported is milliseconds. The default value is the value of interval provided that it does not exceed 900000ms. This option can be used only on supertables. Note: Retain the default value if possible. Configuring a small MAX_DELAY may cause results to be frequently pushed, affecting storage and query performance. +```sql +DROP DATABASE IF EXISTS d0; +CREATE DATABASE d0; +USE d0; +CREATE TABLE IF NOT EXISTS st1 (ts timestamp, c1 int, c2 float, c3 double) TAGS (t1 int unsigned); +CREATE TABLE ct1 USING st1 TAGS(1000); +CREATE TABLE ct2 USING st1 TAGS(2000); +INSERT INTO ct1 VALUES(now+0s, 10, 2.0, 3.0); +INSERT INTO ct1 VALUES(now+1s, 11, 2.1, 3.1)(now+2s, 12, 2.2, 3.2)(now+3s, 13, 2.3, 3.3); +CREATE SMA INDEX sma_index_name1 ON st1 FUNCTION(max(c1),max(c2),min(c1)) INTERVAL(5m,10s) SLIDING(5m) WATERMARK 5s MAX_DELAY 1m; +-- query from SMA Index +ALTER LOCAL 'querySmaOptimize' '1'; +SELECT max(c2),min(c1) FROM st1 INTERVAL(5m,10s) SLIDING(5m); +SELECT _wstart,_wend,_wduration,max(c2),min(c1) FROM st1 INTERVAL(5m,10s) SLIDING(5m); +-- query from raw data +ALTER LOCAL 'querySmaOptimize' '0'; +``` + ### FULLTEXT Indexing Creates a text index for the specified column. FULLTEXT indexing improves performance for queries with text filtering. The index_option syntax is not supported for FULLTEXT indexing. FULLTEXT indexing is supported for JSON tag columns only. Multiple columns cannot be indexed together. However, separate indices can be created for each column. diff --git a/docs/zh/12-taos-sql/27-index.md b/docs/zh/12-taos-sql/27-index.md index e86d2d8f36..3f3091b19c 100644 --- a/docs/zh/12-taos-sql/27-index.md +++ b/docs/zh/12-taos-sql/27-index.md @@ -28,6 +28,24 @@ functions: - WATERMARK: 最小单位毫秒,取值范围 [0ms, 900000ms],默认值为 5 秒,只可用于超级表。 - MAX_DELAY: 最小单位毫秒,取值范围 [1ms, 900000ms],默认值为 interval 的值(但不能超过最大值),只可用于超级表。注:不建议 MAX_DELAY 设置太小,否则会过于频繁的推送结果,影响存储和查询性能,如无特殊需求,取默认值即可。 +```sql +DROP DATABASE IF EXISTS d0; +CREATE DATABASE d0; +USE d0; +CREATE TABLE IF NOT EXISTS st1 (ts timestamp, c1 int, c2 float, c3 double) TAGS (t1 int unsigned); +CREATE TABLE ct1 USING st1 TAGS(1000); +CREATE TABLE ct2 USING st1 TAGS(2000); +INSERT INTO ct1 VALUES(now+0s, 10, 2.0, 3.0); +INSERT INTO ct1 VALUES(now+1s, 11, 2.1, 3.1)(now+2s, 12, 2.2, 3.2)(now+3s, 13, 2.3, 3.3); +CREATE SMA INDEX sma_index_name1 ON st1 FUNCTION(max(c1),max(c2),min(c1)) INTERVAL(5m,10s) SLIDING(5m) WATERMARK 5s MAX_DELAY 1m; +-- 从 SMA 索引查询 +ALTER LOCAL 'querySmaOptimize' '1'; +SELECT max(c2),min(c1) FROM st1 INTERVAL(5m,10s) SLIDING(5m); +SELECT _wstart,_wend,_wduration,max(c2),min(c1) FROM st1 INTERVAL(5m,10s) SLIDING(5m); +-- 从原始数据查询 +ALTER LOCAL 'querySmaOptimize' '0'; +``` + ### FULLTEXT 索引 对指定列建立文本索引,可以提升含有文本过滤的查询的性能。FULLTEXT 索引不支持 index_option 语法。现阶段只支持对 JSON 类型的标签列创建 FULLTEXT 索引。不支持多列联合索引,但可以为每个列分布创建 FULLTEXT 索引。 From 598360ce4d07c4b118fbcfcd6bbe3146e5626a4f Mon Sep 17 00:00:00 2001 From: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Date: Wed, 12 Jul 2023 11:05:12 +0800 Subject: [PATCH 10/16] Update 05-taosbenchmark.md taosbenchmark datatype --- docs/zh/14-reference/05-taosbenchmark.md | 26 ++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/docs/zh/14-reference/05-taosbenchmark.md b/docs/zh/14-reference/05-taosbenchmark.md index c5d98767f9..319046ba8f 100644 --- a/docs/zh/14-reference/05-taosbenchmark.md +++ b/docs/zh/14-reference/05-taosbenchmark.md @@ -437,3 +437,29 @@ taosBenchmark -A INT,DOUBLE,NCHAR,BINARY\(16\) - **sqls** : - **sql** : 执行的 SQL 命令,必填。 + +#### 配置文件中数据类型书写对照表 + +| # | **引擎** | **taosBenchmark** +| --- | :----------------: | :---------------: +| 1 | TIMESTAMP | timestamp +| 2 | INT | int +| 3 | INT UNSIGNED | uint +| 4 | BIGINT | bigint +| 5 | BIGINT UNSIGNED | ubigint +| 6 | FLOAT | float +| 7 | DOUBLE | double +| 8 | BINARY | binary +| 9 | SMALLINT | smallint +| 10 | SMALLINT UNSIGNED | usmallint +| 11 | TINYINT | tinyint +| 12 | TINYINT UNSIGNED | utinyint +| 13 | BOOL | bool +| 14 | NCHAR | nchar +| 15 | VARCHAR | varchar +| 15 | JSON | json + +注意:taosBenchmark 配置文件中数据类型必须小写方可识别 + + + From 2e870071a49247d322b4b7e92e416f0ca7fdc807 Mon Sep 17 00:00:00 2001 From: Alex Duan <51781608+DuanKuanJun@users.noreply.github.com> Date: Wed, 12 Jul 2023 11:13:34 +0800 Subject: [PATCH 11/16] Update 05-taosbenchmark.md english --- docs/en/14-reference/05-taosbenchmark.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/docs/en/14-reference/05-taosbenchmark.md b/docs/en/14-reference/05-taosbenchmark.md index 8fc20c149f..2348810d9e 100644 --- a/docs/en/14-reference/05-taosbenchmark.md +++ b/docs/en/14-reference/05-taosbenchmark.md @@ -470,3 +470,26 @@ The configuration parameters for subscribing to a super table are set in `super_ - **sql**: The SQL command to be executed. For the query SQL of super table, keep "xxxx" in the SQL command. The program will automatically replace it with all the sub-table names of the super table. Replace it with all the sub-table names in the super table. - **result**: The file to save the query result. If not specified, taosBenchmark will not save result. + +#### data type on taosBenchmark + +| # | **TDengine** | **taosBenchmark** +| --- | :----------------: | :---------------: +| 1 | TIMESTAMP | timestamp +| 2 | INT | int +| 3 | INT UNSIGNED | uint +| 4 | BIGINT | bigint +| 5 | BIGINT UNSIGNED | ubigint +| 6 | FLOAT | float +| 7 | DOUBLE | double +| 8 | BINARY | binary +| 9 | SMALLINT | smallint +| 10 | SMALLINT UNSIGNED | usmallint +| 11 | TINYINT | tinyint +| 12 | TINYINT UNSIGNED | utinyint +| 13 | BOOL | bool +| 14 | NCHAR | nchar +| 15 | VARCHAR | varchar +| 15 | JSON | json + +note:Lowercase characters must be used on taosBenchmark datatype From 604b76635dd0ec173a1981bf23eb3673cc83803a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 12 Jul 2023 11:38:56 +0800 Subject: [PATCH 12/16] fix: set ins_snodes to be sysinfo --- source/common/src/systable.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index c2024a9a77..53692c94a4 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -317,7 +317,7 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema), true}, {TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema), true}, {TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema), true}, - {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)}, + {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema), true}, {TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema), true}, {TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema), false}, {TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema), false}, From 75c250f503aebaecfd105f325afff74ba3c6e4dc Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 12 Jul 2023 11:49:57 +0800 Subject: [PATCH 13/16] fix: report permission error when all columns are invisiable --- include/libs/parser/parser.h | 1 + source/libs/parser/src/parTranslater.c | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index f253b47e50..58bdb77df3 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -58,6 +58,7 @@ typedef struct SParseContext { bool isSuperUser; bool enableSysInfo; bool async; + bool hasInvisibleCol; const char* svrVer; bool nodeOffline; SArray* pTableMetaPos; // sql table pos => catalog data pos diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c318e25fd9..2ccbc0dfc4 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -878,6 +878,7 @@ static int32_t createColumnsByTable(STranslateContext* pCxt, const STableNode* p (igTags ? 0 : ((TSDB_SUPER_TABLE == pMeta->tableType) ? pMeta->tableInfo.numOfTags : 0)); for (int32_t i = 0; i < nums; ++i) { if (invisibleColumn(pCxt->pParseCxt->enableSysInfo, pMeta->tableType, pMeta->schema[i].flags)) { + pCxt->pParseCxt->hasInvisibleCol = true; continue; } SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); @@ -3203,7 +3204,11 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect code = translateFillValues(pCxt, pSelect); } if (NULL == pSelect->pProjectionList || 0 >= pSelect->pProjectionList->length) { - code = TSDB_CODE_PAR_INVALID_SELECTED_EXPR; + if (pCxt->pParseCxt->hasInvisibleCol) { + code = TSDB_CODE_PAR_PERMISSION_DENIED; + } else { + code = TSDB_CODE_PAR_INVALID_SELECTED_EXPR; + } } return code; } From d568d2f838d3d0ba4d9f9d5e6ed1235e5a40de94 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 12 Jul 2023 13:38:27 +0800 Subject: [PATCH 14/16] doc/TS-3589 --- docs/en/12-taos-sql/19-limit.md | 2 +- docs/en/12-taos-sql/25-grant.md | 2 +- docs/zh/12-taos-sql/19-limit.md | 2 +- docs/zh/12-taos-sql/25-grant.md | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en/12-taos-sql/19-limit.md b/docs/en/12-taos-sql/19-limit.md index 22ad2055e4..23bb8ce917 100644 --- a/docs/en/12-taos-sql/19-limit.md +++ b/docs/en/12-taos-sql/19-limit.md @@ -36,7 +36,7 @@ The following characters cannot occur in a password: single quotation marks ('), - Maximum numbers of databases, STables, tables are dependent only on the system resources. - The number of replicas can only be 1 or 3. - The maximum length of a username is 23 bytes. -- The maximum length of a password is 128 bytes. +- The maximum length of a password is 31 bytes. - The maximum number of rows depends on system resources. - The maximum number of vnodes in a database is 1024. diff --git a/docs/en/12-taos-sql/25-grant.md b/docs/en/12-taos-sql/25-grant.md index 8e3e7c869e..c214e11876 100644 --- a/docs/en/12-taos-sql/25-grant.md +++ b/docs/en/12-taos-sql/25-grant.md @@ -16,7 +16,7 @@ This statement creates a user account. The maximum length of user_name is 23 bytes. -The maximum length of password is 32 bytes. The password can include leters, digits, and special characters excluding single quotation marks, double quotation marks, backticks, backslashes, and spaces. The password cannot be empty. +The maximum length of password is 31 bytes. The password can include leters, digits, and special characters excluding single quotation marks, double quotation marks, backticks, backslashes, and spaces. The password cannot be empty. `SYSINFO` indicates whether the user is allowed to view system information. `1` means allowed, `0` means not allowed. System information includes server configuration, dnode, vnode, storage. The default value is `1`. diff --git a/docs/zh/12-taos-sql/19-limit.md b/docs/zh/12-taos-sql/19-limit.md index e5a492580e..6c815fc5f0 100644 --- a/docs/zh/12-taos-sql/19-limit.md +++ b/docs/zh/12-taos-sql/19-limit.md @@ -36,7 +36,7 @@ description: 合法字符集和命名中的限制规则 - 库的数目,超级表的数目、表的数目,系统不做限制,仅受系统资源限制 - 数据库的副本数只能设置为 1 或 3 - 用户名的最大长度是 23 字节 -- 用户密码的最大长度是 128 字节 +- 用户密码的最大长度是 31 字节 - 总数据行数取决于可用资源 - 单个数据库的虚拟结点数上限为 1024 diff --git a/docs/zh/12-taos-sql/25-grant.md b/docs/zh/12-taos-sql/25-grant.md index e4ba02cbb5..a9c3910500 100644 --- a/docs/zh/12-taos-sql/25-grant.md +++ b/docs/zh/12-taos-sql/25-grant.md @@ -16,7 +16,7 @@ CREATE USER use_name PASS 'password' [SYSINFO {1|0}]; use_name 最长为 23 字节。 -password 最长为 32 字节,合法字符包括"a-zA-Z0-9!?$%^&*()_–+={[}]:;@~#|<,>.?/",不可以出现单双引号、撇号、反斜杠和空格,且不可以为空。 +password 最长为 31 字节,合法字符包括"a-zA-Z0-9!?$%^&*()_–+={[}]:;@~#|<,>.?/",不可以出现单双引号、撇号、反斜杠和空格,且不可以为空。 SYSINFO 表示用户是否可以查看系统信息。1 表示可以查看,0 表示不可以查看。系统信息包括服务端配置信息、服务端各种节点信息(如 DNODE、QNODE等)、存储相关的信息等。默认为可以查看系统信息。 From ed0c72bd4547d4d73332dfab0c029a85fe33f67f Mon Sep 17 00:00:00 2001 From: sunpeng Date: Wed, 12 Jul 2023 17:24:03 +0800 Subject: [PATCH 15/16] docs: update python doc catalog --- .../14-reference/03-connector/07-python.mdx | 546 +++++++++++------- docs/zh/08-connector/30-python.mdx | 544 ++++++++++------- 2 files changed, 648 insertions(+), 442 deletions(-) diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index 2a6cd9ecf7..f0a59842fe 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -87,9 +87,9 @@ TDengine currently supports timestamp, number, character, Boolean type, and the |NCHAR|str| |JSON|str| -## Installation +## Installation Steps -### Preparation +### Pre-installation preparation 1. Install Python. The recent taospy package requires Python 3.6.2+. The earlier versions of taospy require Python 3.7+. The taos-ws-py package requires Python 3.7+. If Python is not available on your system, refer to the [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) to install it. 2. Install [pip](https://pypi.org/project/pip/). In most cases, the Python installer comes with the pip utility. If not, please refer to [pip documentation](https://pip.pypa.io/en/stable/installation/) to install it. @@ -275,7 +275,7 @@ Transfer-Encoding: chunked -### Using connectors to establish connections +### Specify the Host and Properties to get the connection The following example code assumes that TDengine is installed locally and that the default configuration is used for both FQDN and serverPort. @@ -331,7 +331,69 @@ The parameter of `connect()` is the url of TDengine, and the protocol is `taosws -## Example program +### Priority of configuration parameters + +If the configuration parameters are duplicated in the parameters or client configuration file, the priority of the parameters, from highest to lowest, are as follows: + +1. Parameters in `connect` function. +2. the configuration file taos.cfg of the TDengine client driver when using a native connection. + +## Usage examples + +### Create database and tables + + + + +```python +conn = taos.connect() +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +# change database. same as execute "USE db" +conn.select_db("test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + + +```python +conn = taosrest.connect(url="http://localhost:6041") +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +conn.execute("USE test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + + +```python +conn = taosws.connect(url="ws://localhost:6041") +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +conn.execute("USE test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + +### Insert data + +```python +conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)") +``` + +::: +now is an internal function. The default is the current time of the client's computer. now + 1s represents the current time of the client plus 1 second, followed by the number representing the unit of time: a (milliseconds), s (seconds), m (minutes), h (hours), d (days), w (weeks), n (months), y (years). +::: + ### Basic Usage @@ -453,7 +515,7 @@ The `query` method of the `TaosConnection` class can be used to query data and r -### Usage with req_id +### Execute SQL with reqId By using the optional req_id parameter, you can specify a request ID that can be used for tracing. @@ -553,221 +615,7 @@ As the way to connect introduced above but add `req_id` argument. -### Subscription - -Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../../develop/tmq/). - - - - -The `consumer` in the connector contains the subscription api. - -##### Create Consumer - -The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/). - -```python -from taos.tmq import Consumer - -consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) -``` - -##### Subscribe topics - -The `subscribe` function is used to subscribe to a list of topics. - -```python -consumer.subscribe(['topic1', 'topic2']) -``` - -##### Consume - -The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. - -```python -while True: - res = consumer.poll(1) - if not res: - continue - err = res.error() - if err is not None: - raise err - val = res.value() - - for block in val: - print(block.fetchall()) -``` - -##### assignment - -The `assignment` function is used to get the assignment of the topic. - -```python -assignments = consumer.assignment() -``` - -##### Seek - -The `seek` function is used to reset the assignment of the topic. - -```python -tp = TopicPartition(topic='topic1', partition=0, offset=0) -consumer.seek(tp) -``` - -##### After consuming data - -You should unsubscribe to the topics and close the consumer after consuming. - -```python -consumer.unsubscribe() -consumer.close() -``` - -##### Tmq subscription example - -```python -{{#include docs/examples/python/tmq_example.py}} -``` - -##### assignment and seek example - -```python -{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} -``` - - - - - -In addition to native connections, the connector also supports subscriptions via websockets. - -##### Create Consumer - -The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer). - -```python -import taosws - -consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) -``` - -##### subscribe topics - -The `subscribe` function is used to subscribe to a list of topics. - -```python -consumer.subscribe(['topic1', 'topic2']) -``` - -##### Consume - -The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. - -```python -while True: - res = consumer.poll(timeout=1.0) - if not res: - continue - err = res.error() - if err is not None: - raise err - for block in message: - for row in block: - print(row) -``` - -##### assignment - -The `assignment` function is used to get the assignment of the topic. - -```python -assignments = consumer.assignment() -``` - -##### Seek - -The `seek` function is used to reset the assignment of the topic. - -```python -consumer.seek(topic='topic1', partition=0, offset=0) -``` - -##### After consuming data - -You should unsubscribe to the topics and close the consumer after consuming. - -```python -consumer.unsubscribe() -consumer.close() -``` - -##### Subscription example - -```python -{{#include docs/examples/python/tmq_websocket_example.py}} -``` - -##### Assignment and seek example - -```python -{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} -``` - - - - -### Schemaless Insert - -Connector support schemaless insert. - - - - -##### Simple insert - -```python -{{#include docs/examples/python/schemaless_insert.py}} -``` - -##### Insert with ttl argument - -```python -{{#include docs/examples/python/schemaless_insert_ttl.py}} -``` - -##### Insert with req_id argument - -```python -{{#include docs/examples/python/schemaless_insert_req_id.py}} -``` - - - - - -##### Simple insert - -```python -{{#include docs/examples/python/schemaless_insert_raw.py}} -``` - -##### Insert with ttl argument - -```python -{{#include docs/examples/python/schemaless_insert_raw_ttl.py}} -``` - -##### Insert with req_id argument - -```python -{{#include docs/examples/python/schemaless_insert_raw_req_id.py}} -``` - - - - -### Parameter Binding +### Writing data via parameter binding The Python connector provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark `?` to indicate the parameters to be bound. @@ -898,6 +746,264 @@ stmt.close() +### Schemaless Writing + +Connector support schemaless insert. + + + + +##### Simple insert + +```python +{{#include docs/examples/python/schemaless_insert.py}} +``` + +##### Insert with ttl argument + +```python +{{#include docs/examples/python/schemaless_insert_ttl.py}} +``` + +##### Insert with req_id argument + +```python +{{#include docs/examples/python/schemaless_insert_req_id.py}} +``` + + + + + +##### Simple insert + +```python +{{#include docs/examples/python/schemaless_insert_raw.py}} +``` + +##### Insert with ttl argument + +```python +{{#include docs/examples/python/schemaless_insert_raw_ttl.py}} +``` + +##### Insert with req_id argument + +```python +{{#include docs/examples/python/schemaless_insert_raw_req_id.py}} +``` + + + + +### Schemaless with reqId + +There is a optional parameter called `req_id` in `schemaless_insert` and `schemaless_insert_raw` method. This reqId can be used to request link tracing. + +```python +{{#include docs/examples/python/schemaless_insert_req_id.py}} +``` + +```python +{{#include docs/examples/python/schemaless_insert_raw_req_id.py}} +``` + +### Data Subscription + +Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../../develop/tmq/). + +#### Create a Topic + +To create topic, please refer to [Data Subscription](../../../develop/tmq/#create-a-topic). + +#### Create a Consumer + + + + + +The consumer in the connector contains the subscription api. The syntax for creating a consumer is consumer = Consumer(configs). For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer). + +```python +from taos.tmq import Consumer + +consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) +``` + + + + +In addition to native connections, the connector also supports subscriptions via websockets. + +The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer). + +```python +import taosws + +consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) +``` + + + + +#### Subscribe to a Topic + + + + + +The `subscribe` function is used to subscribe to a list of topics. + +```python +consumer.subscribe(['topic1', 'topic2']) +``` + + + + +The `subscribe` function is used to subscribe to a list of topics. + +```python +consumer.subscribe(['topic1', 'topic2']) +``` + + + + +#### Consume messages + + + + + +The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. + +```python +while True: + res = consumer.poll(1) + if not res: + continue + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) +``` + + + + +The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. + +```python +while True: + res = consumer.poll(timeout=1.0) + if not res: + continue + err = res.error() + if err is not None: + raise err + for block in message: + for row in block: + print(row) +``` + + + + +#### Assignment subscription Offset + + + + + +The `assignment` function is used to get the assignment of the topic. + +```python +assignments = consumer.assignment() +``` + +The `seek` function is used to reset the assignment of the topic. + +```python +tp = TopicPartition(topic='topic1', partition=0, offset=0) +consumer.seek(tp) +``` + + + + +The `assignment` function is used to get the assignment of the topic. + +```python +assignments = consumer.assignment() +``` + +The `seek` function is used to reset the assignment of the topic. + +```python +consumer.seek(topic='topic1', partition=0, offset=0) +``` + + + + +#### Close subscriptions + + + + + +You should unsubscribe to the topics and close the consumer after consuming. + +```python +consumer.unsubscribe() +consumer.close() +``` + + + + +You should unsubscribe to the topics and close the consumer after consuming. + +```python +consumer.unsubscribe() +consumer.close() +``` + + + + +#### Full Sample Code + + + + + +```python +{{#include docs/examples/python/tmq_example.py}} +``` + +```python +{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} +``` + + + + +```python +{{#include docs/examples/python/tmq_websocket_example.py}} +``` + +```python +{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} +``` + + + + ### Other sample programs | Example program links | Example program content | diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx index fcae6e2b6b..15c11d05c3 100644 --- a/docs/zh/08-connector/30-python.mdx +++ b/docs/zh/08-connector/30-python.mdx @@ -71,7 +71,7 @@ Python Connector 的所有数据库操作如果出现异常,都会直接抛出 {{#include docs/examples/python/handle_exception.py}} ``` -TDengine DataType 和 Python DataType +## TDengine DataType 和 Python DataType TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下: @@ -277,7 +277,7 @@ Transfer-Encoding: chunked -### 使用连接器建立连接 +### 指定 Host 和 Properties 获取连接 以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。 @@ -333,8 +333,69 @@ Transfer-Encoding: chunked +### 配置参数的优先级 + +如果配置参数在参数和客户端配置文件中有重复,则参数的优先级由高到低分别如下: + +1. 连接参数 +2. 使用原生连接时,TDengine 客户端驱动的配置文件 taos.cfg + ## 使用示例 +### 创建数据库和表 + + + + +```python +conn = taos.connect() +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +# change database. same as execute "USE db" +conn.select_db("test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + + +```python +conn = taosrest.connect(url="http://localhost:6041") +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +conn.execute("USE test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + + +```python +conn = taosws.connect(url="ws://localhost:6041") +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +conn.execute("USE test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + +### 插入数据 + +```python +conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)") +``` + +::: +now 为系统内部函数,默认为客户端所在计算机当前时间。 now + 1s 代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。 +::: + ### 基本使用 @@ -373,7 +434,6 @@ Transfer-Encoding: chunked :::note TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。 - ::: @@ -456,7 +516,7 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方 -### 与 req_id 一起使用 +### 执行带有 reqId 的 SQL 使用可选的 req_id 参数,指定请求 id,可以用于 tracing @@ -557,224 +617,6 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方 -### 数据订阅 - -连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。 - - - - -`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。 - -##### 创建 Consumer - -创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 - -```python -from taos.tmq import Consumer - -consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) -``` - -##### 订阅 topics - -Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 - -```python -consumer.subscribe(['topic1', 'topic2']) -``` - -##### 消费数据 - -Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 - -```python -while True: - res = consumer.poll(1) - if not res: - continue - err = res.error() - if err is not None: - raise err - val = res.value() - - for block in val: - print(block.fetchall()) -``` - -##### 获取消费进度 - -Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 - -```python -assignments = consumer.assignment() -``` - -##### 指定订阅 Offset - -Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。 - -```python -tp = TopicPartition(topic='topic1', partition=0, offset=0) -consumer.seek(tp) -``` - -##### 关闭订阅 - -消费结束后,应当取消订阅,并关闭 Consumer。 - -```python -consumer.unsubscribe() -consumer.close() -``` - -##### 完整示例 - -```python -{{#include docs/examples/python/tmq_example.py}} -``` - -##### 获取和重置消费进度示例代码 - -```python -{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} -``` - - - - - -除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。 - -taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。 - -##### 创建 Consumer - -创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 - -```python -import taosws - -consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) -``` - -##### 订阅 topics - -Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 - -```python -consumer.subscribe(['topic1', 'topic2']) -``` - -##### 消费数据 - -Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 - -```python -while True: - res = consumer.poll(timeout=1.0) - if not res: - continue - err = res.error() - if err is not None: - raise err - for block in message: - for row in block: - print(row) -``` - -##### 获取消费进度 - -Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 - -```python -assignments = consumer.assignment() -``` - -##### 重置消费进度 - -Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。 - -```python -consumer.seek(topic='topic1', partition=0, offset=0) -``` - -##### 结束消费 - -消费结束后,应当取消订阅,并关闭 Consumer。 - -```python -consumer.unsubscribe() -consumer.close() -``` - -##### tmq 订阅示例代码 - -```python -{{#include docs/examples/python/tmq_websocket_example.py}} -``` - -连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。 - -##### 获取和重置消费进度示例代码 - -```python -{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} -``` - - - - -### 无模式写入 - -连接器支持无模式写入功能。 - - - - -##### 简单写入 - -```python -{{#include docs/examples/python/schemaless_insert.py}} -``` - -##### 带有 ttl 参数的写入 - -```python -{{#include docs/examples/python/schemaless_insert_ttl.py}} -``` - -##### 带有 req_id 参数的写入 - -```python -{{#include docs/examples/python/schemaless_insert_req_id.py}} -``` - - - - - -##### 简单写入 - -```python -{{#include docs/examples/python/schemaless_insert_raw.py}} -``` - -##### 带有 ttl 参数的写入 - -```python -{{#include docs/examples/python/schemaless_insert_raw_ttl.py}} -``` - -##### 带有 req_id 参数的写入 - -```python -{{#include docs/examples/python/schemaless_insert_raw_req_id.py}} -``` - - - - ### 通过参数绑定写入数据 TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。 @@ -910,6 +752,264 @@ stmt.close() +### 无模式写入 + +连接器支持无模式写入功能。 + + + + +##### 简单写入 + +```python +{{#include docs/examples/python/schemaless_insert.py}} +``` + +##### 带有 ttl 参数的写入 + +```python +{{#include docs/examples/python/schemaless_insert_ttl.py}} +``` + +##### 带有 req_id 参数的写入 + +```python +{{#include docs/examples/python/schemaless_insert_req_id.py}} +``` + + + + + +##### 简单写入 + +```python +{{#include docs/examples/python/schemaless_insert_raw.py}} +``` + +##### 带有 ttl 参数的写入 + +```python +{{#include docs/examples/python/schemaless_insert_raw_ttl.py}} +``` + +##### 带有 req_id 参数的写入 + +```python +{{#include docs/examples/python/schemaless_insert_raw_req_id.py}} +``` + + + + +### 执行带有 reqId 的无模式写入 + +连接器的 `schemaless_insert` 和 `schemaless_insert_raw` 方法支持 `req_id` 可选参数,此 `req_Id` 可用于请求链路追踪。 + +```python +{{#include docs/examples/python/schemaless_insert_req_id.py}} +``` + +```python +{{#include docs/examples/python/schemaless_insert_raw_req_id.py}} +``` + +### 数据订阅 + +连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。 + +#### 创建 Topic + +创建 Topic 相关请参考 [数据订阅文档](../../develop/tmq/#创建-topic)。 + +#### 创建 Consumer + + + + + +`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#创建消费者-consumer)。 + +```python +from taos.tmq import Consumer + +consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) +``` + + + + +除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。 + +taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 + +```python +import taosws + +consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) +``` + + + + +#### 订阅 topics + + + + + +Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 + +```python +consumer.subscribe(['topic1', 'topic2']) +``` + + + + +Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 + +```python +consumer.subscribe(['topic1', 'topic2']) +``` + + + + +#### 消费数据 + + + + + +Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 + +```python +while True: + res = consumer.poll(1) + if not res: + continue + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) +``` + + + + +Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 + +```python +while True: + res = consumer.poll(timeout=1.0) + if not res: + continue + err = res.error() + if err is not None: + raise err + for block in message: + for row in block: + print(row) +``` + + + + +#### 获取消费进度 + + + + + +Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 + +```python +assignments = consumer.assignment() +``` + +Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。 + +```python +tp = TopicPartition(topic='topic1', partition=0, offset=0) +consumer.seek(tp) +``` + + + + +Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 + +```python +assignments = consumer.assignment() +``` + +Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。 + +```python +consumer.seek(topic='topic1', partition=0, offset=0) +``` + + + + +#### 关闭订阅 + + + + + +消费结束后,应当取消订阅,并关闭 Consumer。 + +```python +consumer.unsubscribe() +consumer.close() +``` + + + + +消费结束后,应当取消订阅,并关闭 Consumer。 + +```python +consumer.unsubscribe() +consumer.close() +``` + + + + +#### 完整示例 + + + + + +```python +{{#include docs/examples/python/tmq_example.py}} +``` + +```python +{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} +``` + + + + +```python +{{#include docs/examples/python/tmq_websocket_example.py}} +``` + +```python +{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} +``` + + + + ### 更多示例程序 | 示例程序链接 | 示例程序内容 | From 8f58f9aa5be73100f980a5e35877c247ef38db71 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 12 Jul 2023 18:07:01 +0800 Subject: [PATCH 16/16] fix: skip primary key for block sma --- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 29e59daeb9..88027e2891 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -289,6 +289,10 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) { int32_t i = 0, j = 0; + if (j < pSupInfo->numOfCols && PRIMARYKEY_TIMESTAMP_COL_ID == pSupInfo->colId[j]) { + j += 1; + } + while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) { STColumn* pTCol = &pSchema->columns[i]; if (pTCol->colId == pSupInfo->colId[j]) {