From 173446e84e38550c57b79e4695391538262b751e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 25 Jun 2024 13:59:56 +0800 Subject: [PATCH 01/16] fix:[TD-30704] null pointer error if exception occurs --- source/client/src/clientTmq.c | 84 +++++++++++++++++++++++------------ 1 file changed, 55 insertions(+), 29 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index f5f083e5d8..c4fc9db44f 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -769,34 +769,35 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { } int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { - if (pMsg) { - SMqHbRsp rsp = {0}; - tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); + if (pMsg == NULL) { + return code; + } + SMqHbRsp rsp = {0}; + tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); - int64_t refId = *(int64_t*)param; - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); - if (tmq != NULL) { - taosWLockLatch(&tmq->lock); - for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) { - STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i); - if (privilege->noPrivilege == 1) { - int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); - for (int32_t j = 0; j < topicNumCur; j++) { - SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); - if (strcmp(pTopicCur->topicName, privilege->topic) == 0) { - tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic); - pTopicCur->noPrivilege = 1; - } + int64_t refId = *(int64_t*)param; + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); + if (tmq != NULL) { + taosWLockLatch(&tmq->lock); + for (int32_t i = 0; i < taosArrayGetSize(rsp.topicPrivileges); i++) { + STopicPrivilege* privilege = taosArrayGet(rsp.topicPrivileges, i); + if (privilege->noPrivilege == 1) { + int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); + for (int32_t j = 0; j < topicNumCur; j++) { + SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); + if (strcmp(pTopicCur->topicName, privilege->topic) == 0) { + tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic); + pTopicCur->noPrivilege = 1; } } } - taosWUnLockLatch(&tmq->lock); - taosReleaseRef(tmqMgmt.rsetId, refId); } - tDestroySMqHbRsp(&rsp); - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); + taosWUnLockLatch(&tmq->lock); + taosReleaseRef(tmqMgmt.rsetId, refId); } + tDestroySMqHbRsp(&rsp); + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); return 0; } @@ -984,10 +985,16 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { } int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { + if(param == NULL) { + return code; + } + SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; - taosMemoryFree(pMsg->pEpSet); + if(pMsg){ + taosMemoryFree(pMsg->pEpSet); + } tsem_post(&pParam->rspSem); return 0; } @@ -2563,6 +2570,7 @@ end: } int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { + if(param == NULL) return code; SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); if (tmq == NULL) { @@ -2575,6 +2583,9 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { goto END; } + if (pMsg == NULL) { + goto END; + } SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); @@ -2604,18 +2615,24 @@ END: FAIL: if (pParam->sync) { SAskEpInfo* pInfo = pParam->pParam; - pInfo->code = code; - tsem_post(&pInfo->sem); + if(pInfo) { + pInfo->code = code; + tsem_post(&pInfo->sem); + } + } + + if(pMsg){ + taosMemoryFree(pMsg->pEpSet); + taosMemoryFree(pMsg->pData); } - taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pMsg->pData); taosMemoryFree(pParam); return code; } int32_t syncAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); + if(pInfo == NULL) return TSDB_CODE_OUT_OF_MEMORY; tsem_init(&pInfo->sem, 0, 0); askEp(pTmq, pInfo, true, false); @@ -2767,6 +2784,9 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { } static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { + if(param == NULL) { + return code; + } SMqVgWalInfoParam* pParam = param; SMqVgCommon* pCommon = pParam->pCommon; @@ -2798,8 +2818,11 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { tsem_post(&pCommon->rsp); } - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); + if(pMsg){ + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + } + return 0; } @@ -3231,6 +3254,9 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); } + if(param == NULL) { + return code; + } SMqSeekParam* pParam = param; pParam->code = code; tsem_post(&pParam->sem); From 512671df856d685b057ac87186f3abe450280f47 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 26 Jun 2024 02:56:49 +0000 Subject: [PATCH 02/16] Add index filtering based on column value type --- source/libs/index/src/indexFilter.c | 47 +++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index cb42e60c01..d5d4592b47 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -624,6 +624,31 @@ static int32_t sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typ } return 0; } + +static int8_t sifCheckNumericTypeSame(uint8_t left, uint8_t right) { + if (left != right) { + return 0; + } + return 1; +} +static int8_t sifShouldUseIndexBasedOnType(SIFParam *left, SIFParam *right) { + if (left->colValType == TSDB_DATA_TYPE_GEOMETRY || right->colValType == TSDB_DATA_TYPE_GEOMETRY) { + return 0; + } + if (IS_VAR_DATA_TYPE(left->colValType) && !IS_VAR_DATA_TYPE(right->colValType)) { + return 0; + } + if (IS_NUMERIC_TYPE(left->colValType) && !IS_NUMERIC_TYPE(right->colValType)) { + return 0; + } + if (IS_NUMERIC_TYPE(left->colValType) && IS_NUMERIC_TYPE(right->colValType)) { + if (!sifCheckNumericTypeSame(left->colValType, right->colValType)) { + return 0; + } + } + + return 1; +} static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { int ret = 0; SIndexMetaArg *arg = &output->arg; @@ -641,8 +666,13 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP ret = indexJsonSearch(arg->ivtIdx, mtm, output->result); indexMultiTermQueryDestroy(mtm); } else { - if (left->colValType == TSDB_DATA_TYPE_GEOMETRY || right->colValType == TSDB_DATA_TYPE_GEOMETRY) { - return TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR; + // if (left->colValType == TSDB_DATA_TYPE_GEOMETRY || right->colValType == TSDB_DATA_TYPE_GEOMETRY) { + // return TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR; + // } + int8_t useIndex = sifShouldUseIndexBasedOnType(left, right); + if (!useIndex) { + output->status = SFLT_NOT_INDEX; + return -1; } bool reverse = false, equal = false; @@ -660,15 +690,12 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP SDataTypeBuf typedata; memset(&typedata, 0, sizeof(typedata)); - if (IS_VAR_DATA_TYPE(left->colValType)) { - if (!IS_VAR_DATA_TYPE(right->colValType)) { - NUM_TO_STRING(right->colValType, right->condValue, sizeof(buf) - 2, buf + VARSTR_HEADER_SIZE); - varDataSetLen(buf, strlen(buf + VARSTR_HEADER_SIZE)); - param.val = buf; - } - } else { - if (sifSetFltParam(left, right, &typedata, ¶m) != 0) return -1; + + if (sifSetFltParam(left, right, &typedata, ¶m) != 0) { + output->status = SFLT_NOT_INDEX; + return -1; } + ret = left->api.metaFilterTableIds(arg->metaEx, ¶m, output->result); if (ret == 0) { taosArraySort(output->result, uidCompare); From 6e6a9acc430b21782a68211669ccd5e926737c52 Mon Sep 17 00:00:00 2001 From: bitcapybara Date: Wed, 26 Jun 2024 03:03:59 +0000 Subject: [PATCH 03/16] test: [TS-5067] add test cases for the drop consumer group statement --- tests/system-test/7-tmq/tmq_taosx.py | 67 +++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 49b62d8abb..d15c9d61b8 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -516,17 +516,17 @@ class TDTestCase: "td.connect.pass": "taosdata", "auto.offset.reset": "earliest", } - consumer = Consumer(consumer_dict) + consumer1 = Consumer(consumer_dict) try: - consumer.subscribe(["t1"]) + consumer1.subscribe(["t1"]) except TmqError: tdLog.exit(f"subscribe error") index = 0 try: while True: - res = consumer.poll(1) + res = consumer1.poll(1) if not res: if index != 1: tdLog.exit("consume error") @@ -543,18 +543,71 @@ class TDTestCase: index += 1 finally: - consumer.close() + consumer1.close() - consumer1 = Consumer(consumer_dict) + consumer2 = Consumer(consumer_dict) try: - consumer1.subscribe(["t2"]) + consumer2.subscribe(["t2"]) except TmqError: tdLog.exit(f"subscribe error") + tdSql.query(f'show subscriptions') + tdSql.checkRows(2) + + tdSql.query(f'show consumers') + tdSql.checkRows(1) tdSql.execute(f'drop consumer group g1 on t1') tdSql.query(f'show consumers') tdSql.checkRows(1) - consumer1.close() + + tdSql.query(f'show subscriptions') + tdSql.checkRows(1) + + index = 0 + try: + while True: + res = consumer2.poll(1) + if not res: + if index != 1: + tdLog.exit("consume error") + break + val = res.value() + if val is None: + continue + cnt = 0; + for block in val: + cnt += len(block.fetchall()) + + if cnt != 8: + tdLog.exit("consume error") + + index += 1 + finally: + consumer2.close() + + consumer3 = Consumer(consumer_dict) + try: + consumer3.subscribe(["t2"]) + except TmqError: + tdLog.exit(f"subscribe error") + + tdSql.query(f'show consumers') + tdSql.checkRows(1) + + tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)') + try: + res = consumer3.poll(1) + if not res: + tdLog.exit("consume1 error") + finally: + consumer3.close() + + tdSql.query(f'show consumers') + tdSql.checkRows(0) + + tdSql.query(f'show subscriptions') + tdSql.checkRows(1) + tdSql.execute(f'drop topic t1') tdSql.execute(f'drop topic t2') tdSql.execute(f'drop database d1') From 3014c66965c91c50dee40b28b6af0c42a06a6028 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 26 Jun 2024 11:21:31 +0800 Subject: [PATCH 04/16] fix(streamMeta): commit tdb after load tasks --- source/libs/stream/src/streamMeta.c | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 03c7b93f91..08e7c97150 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -151,7 +151,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { int8_t ret = STREAM_STATA_COMPATIBLE; TBC* pCur = NULL; - if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { // no task info, no stream return ret; } @@ -557,7 +557,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { return -1; } - if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER) { pTask->ver = SSTREAM_TASK_VER; } @@ -907,7 +907,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (p == NULL) { code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1); if (code < 0) { - stError("failed to expand s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno)); + stError("failed to expand s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno)); tFreeStreamTask(pTask); continue; } @@ -958,6 +958,8 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); + + (void)streamMetaCommit(pMeta); } static bool waitForEnoughDuration(SMetaHbInfo* pInfo) { @@ -1012,7 +1014,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (pTask == NULL) { continue; @@ -1052,12 +1054,14 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { } if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { - entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; + entry.checkpointInfo.failed = + ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId); + stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, + (*pTask)->chkInfo.pActiveInfo->transId); } } @@ -1384,7 +1388,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa int64_t now = taosGetTimestampMs(); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%"PRId64, vgId, numOfTasks, now); + stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%" PRId64, vgId, numOfTasks, now); if (numOfTasks == 0) { stInfo("vgId:%d no tasks to be started", pMeta->vgId); @@ -1513,8 +1517,8 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { int32_t num = taosArrayGetSize(pMeta->pTaskList); for (int32_t i = 0; i < num; ++i) { SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; - SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; + SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL) { continue; } @@ -1598,7 +1602,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 if (pStartInfo->startAllTasks != 1) { int64_t el = endTs - startTs; stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", - pMeta->vgId, taskId, ready, el); + pMeta->vgId, taskId, ready, el); streamMetaWUnLock(pMeta); return 0; } @@ -1725,7 +1729,8 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt taosHashPut(pMeta->updateInfo.pTasks, &hEntry, sizeof(hEntry), NULL, 0); stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask/hTask closed, elapsed:%" PRId64 - " ms", id, vgId, transId, el); + " ms", + id, vgId, transId, el); } else { stDebug("s-task:%s vgId:%d transId:%d task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", id, vgId, transId, el); @@ -1740,4 +1745,4 @@ void streamMetaClearUpdateTaskList(SStreamMeta* pMeta) { void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) { taosHashClear(pMeta->updateInfo.pTasks); pMeta->updateInfo.transId = transId; -} \ No newline at end of file +} From 6517f59a851209c7fc2f3e33ab21dc23177f96d9 Mon Sep 17 00:00:00 2001 From: bitcapybara Date: Wed, 26 Jun 2024 07:12:17 +0000 Subject: [PATCH 05/16] test: Update TDTestCase in tmq_taosx.py to include additional data checks --- tests/system-test/7-tmq/tmq_taosx.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index d15c9d61b8..d30d88bb1c 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -553,15 +553,25 @@ class TDTestCase: tdSql.query(f'show subscriptions') tdSql.checkRows(2) + tdSql.checkData(0, 0, "t2") + tdSql.checkData(0, 1, 'g1') + tdSql.checkData(1, 0, 't1') + tdSql.checkData(1, 1, 'g1') tdSql.query(f'show consumers') tdSql.checkRows(1) + tdSql.checkData(0, 1, 'g1') + tdSql.checkData(0, 4, 't2') tdSql.execute(f'drop consumer group g1 on t1') tdSql.query(f'show consumers') tdSql.checkRows(1) + tdSql.checkData(0, 1, 'g1') + tdSql.checkData(0, 4, 't2') tdSql.query(f'show subscriptions') tdSql.checkRows(1) + tdSql.checkData(0, 0, "t2") + tdSql.checkData(0, 1, 'g1') index = 0 try: @@ -593,6 +603,8 @@ class TDTestCase: tdSql.query(f'show consumers') tdSql.checkRows(1) + tdSql.checkData(0, 1, 'g1') + tdSql.checkData(0, 4, 't2') tdSql.execute(f'insert into t4 using st tags(3) values(now, 1)') try: @@ -607,6 +619,8 @@ class TDTestCase: tdSql.query(f'show subscriptions') tdSql.checkRows(1) + tdSql.checkData(0, 0, "t2") + tdSql.checkData(0, 1, 'g1') tdSql.execute(f'drop topic t1') tdSql.execute(f'drop topic t2') From 58b3dd7b99c649e6608772bf6c25da4077eb86bf Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 26 Jun 2024 08:45:14 +0000 Subject: [PATCH 06/16] Add index filtering based on column value type --- source/libs/index/src/indexFilter.c | 32 +++----- tests/script/tsim/tagindex/indexOverflow.sim | 82 ++++++++++++++++++++ 2 files changed, 93 insertions(+), 21 deletions(-) create mode 100644 tests/script/tsim/tagindex/indexOverflow.sim diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index d5d4592b47..80994775d5 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -625,28 +625,21 @@ static int32_t sifSetFltParam(SIFParam *left, SIFParam *right, SDataTypeBuf *typ return 0; } -static int8_t sifCheckNumericTypeSame(uint8_t left, uint8_t right) { - if (left != right) { - return 0; - } - return 1; -} static int8_t sifShouldUseIndexBasedOnType(SIFParam *left, SIFParam *right) { - if (left->colValType == TSDB_DATA_TYPE_GEOMETRY || right->colValType == TSDB_DATA_TYPE_GEOMETRY) { + // not compress + if (left->colValType == TSDB_DATA_TYPE_FLOAT) return 0; + + if (left->colValType == TSDB_DATA_TYPE_GEOMETRY || right->colValType == TSDB_DATA_TYPE_GEOMETRY || + left->colValType == TSDB_DATA_TYPE_JSON || right->colValType == TSDB_DATA_TYPE_JSON) { return 0; } - if (IS_VAR_DATA_TYPE(left->colValType) && !IS_VAR_DATA_TYPE(right->colValType)) { - return 0; - } - if (IS_NUMERIC_TYPE(left->colValType) && !IS_NUMERIC_TYPE(right->colValType)) { - return 0; - } - if (IS_NUMERIC_TYPE(left->colValType) && IS_NUMERIC_TYPE(right->colValType)) { - if (!sifCheckNumericTypeSame(left->colValType, right->colValType)) { - return 0; - } - } + if (IS_VAR_DATA_TYPE(left->colValType)) { + if (!IS_VAR_DATA_TYPE(right->colValType)) return 0; + } else if (IS_NUMERIC_TYPE(left->colValType)) { + if (!IS_NUMERIC_TYPE(right->colValType)) return 0; + if (left->colValType != right->colValType) return 0; + } return 1; } static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { @@ -666,9 +659,6 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP ret = indexJsonSearch(arg->ivtIdx, mtm, output->result); indexMultiTermQueryDestroy(mtm); } else { - // if (left->colValType == TSDB_DATA_TYPE_GEOMETRY || right->colValType == TSDB_DATA_TYPE_GEOMETRY) { - // return TSDB_CODE_QRY_GEO_NOT_SUPPORT_ERROR; - // } int8_t useIndex = sifShouldUseIndexBasedOnType(left, right); if (!useIndex) { output->status = SFLT_NOT_INDEX; diff --git a/tests/script/tsim/tagindex/indexOverflow.sim b/tests/script/tsim/tagindex/indexOverflow.sim new file mode 100644 index 0000000000..9e297099d1 --- /dev/null +++ b/tests/script/tsim/tagindex/indexOverflow.sim @@ -0,0 +1,82 @@ + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print ======== step0 +$dbPrefix = ta_3_db +$tbPrefix = ta_3_tb +$mtPrefix = ta_3_mt +$lastRowNum = 0 +$tbNum = 100000 +$rowNum = 20 +$totalNum = 200 + +print =============== create database +sql create database $dbPrefix +sql use $dbPrefix + + + +sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 tinyint, t1c tinyint) +$i = 0 +$tinyLimit = 127 +$tinyTable = tinyTable +while $i < $tinyLimit + $tb = $tinyTable . $i + sql insert into $tb using $mtPrefix tags( $i , $i ) values( now , $i ) + $i = $i + 1 +endw + +$i = 0 +$maxTinyLimit = 200 + +# 1. compress index and no-index to verify resultset +# 2. compress resultset of index filter and scalar filter +while $i < $maxTinyLimit + sql select * from $mtPrefix where t1 <= $i + $lastRowNum = $rows + + sql select * from $mtPrefix where t1c <= $i + if $lastRowNum != $rows then + return -1 + endi + + $i = $i + 1 +endw + + +$tbPrefix = ta_3_tb_c +$mtPrefix = ta_3_mt_c +$colPrefix = 'col' +sql create table if not exists $mtPrefix (ts timestamp, c1 int) tags (t1 nchar(18), t1c nchar(18)) +$i = 0 +$tinyLimit = 127 +while $i < $tinyLimit + $tb = $tbPrefix . $i + sql insert into $tb using $mtPrefix tags( $colPrefix , $colPrefix ) values( now , $i ) + $i = $i + 1 +endw + +$i = 0 +$maxTinyLimit = 200 + +# 1. compress index and no-index to verify resultset +# 2. compress resultset of index filter and scalar filter +while $i < $maxTinyLimit + sql select * from $mtPrefix where t1 <= $i + $lastRowNum = $rows + + sql select * from $mtPrefix where t1c <= $i + if $lastRowNum != $rows then + return -1 + endi + + $i = $i + 1 +endw + + + + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file From a5441cdcdeb3145dfe17eb728d9f0c5ed4dec0f9 Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 26 Jun 2024 08:49:38 +0000 Subject: [PATCH 07/16] Add index filtering based on column value type --- source/libs/index/src/indexFilter.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 80994775d5..323f855601 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -637,7 +637,6 @@ static int8_t sifShouldUseIndexBasedOnType(SIFParam *left, SIFParam *right) { if (IS_VAR_DATA_TYPE(left->colValType)) { if (!IS_VAR_DATA_TYPE(right->colValType)) return 0; } else if (IS_NUMERIC_TYPE(left->colValType)) { - if (!IS_NUMERIC_TYPE(right->colValType)) return 0; if (left->colValType != right->colValType) return 0; } return 1; From a1ee30e5fcbf79beedd61d73d95b992ab437197f Mon Sep 17 00:00:00 2001 From: Yihao Deng Date: Wed, 26 Jun 2024 09:12:26 +0000 Subject: [PATCH 08/16] Add index filtering based on column value type --- tests/parallel_test/cases.task | 2 ++ tests/script/win-test-file | 1 + 2 files changed, 3 insertions(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index b96c8eb030..533923ec73 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1488,12 +1488,14 @@ ,,y,script,./test.sh -f tmp/monitor.sim ,,y,script,./test.sh -f tsim/tagindex/add_index.sim ,,n,script,./test.sh -f tsim/tagindex/sma_and_tag_index.sim +,,y,script,./test.sh -f tsim/tagindex/indexOverflow.sim ,,y,script,./test.sh -f tsim/view/view.sim ,,y,script,./test.sh -f tsim/query/cache_last.sim ,,y,script,./test.sh -f tsim/query/const.sim ,,y,script,./test.sh -f tsim/query/nestedJoinView.sim + #develop test ,,n,develop-test,python3 ./test.py -f 2-query/table_count_scan.py ,,n,develop-test,python3 ./test.py -f 2-query/pseudo_column.py diff --git a/tests/script/win-test-file b/tests/script/win-test-file index d51de0a61b..acc4c74d21 100644 --- a/tests/script/win-test-file +++ b/tests/script/win-test-file @@ -401,6 +401,7 @@ ./test.sh -f tsim/tag/tbNameIn.sim ./test.sh -f tmp/monitor.sim ./test.sh -f tsim/tagindex/add_index.sim +./test.sh -f tsim/tagindex/indexOverflow.sim ./test.sh -f tsim/tagindex/sma_and_tag_index.sim ./test.sh -f tsim/view/view.sim ./test.sh -f tsim/query/cache_last.sim From d4b31cfd8ec892af08bb66e8f601acbe9a88eec2 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 26 Jun 2024 18:32:09 +0800 Subject: [PATCH 09/16] fix:[TS-4921] errors in test --- include/common/tglobal.h | 1 + include/libs/monitor/clientMonitor.h | 10 +- source/client/src/clientEnv.c | 29 ++-- source/client/src/clientMonitor.c | 177 ++++++++++++---------- source/client/src/clientMsgHandler.c | 2 +- source/client/test/clientMonitorTests.cpp | 6 +- source/common/src/tglobal.c | 6 +- source/common/test/commonTests.cpp | 2 +- source/dnode/mnode/impl/inc/mndDef.h | 5 + source/dnode/mnode/impl/src/mndDnode.c | 27 ++-- source/os/src/osFile.c | 24 ++- source/util/src/tconfig.c | 13 ++ 12 files changed, 170 insertions(+), 132 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index dd9589ccd4..96b9617fc4 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -277,6 +277,7 @@ void taosSetGlobalDebugFlag(int32_t flag); void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal); void taosLocalCfgForbiddenToChange(char *name, bool *forbidden); int8_t taosGranted(int8_t type); +int32_t taosSetSlowLogScope(char *pScope); #ifdef __cplusplus } diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index bdb77bab28..b3596362fd 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -31,9 +31,7 @@ typedef enum SQL_RESULT_CODE { SQL_RESULT_CANCEL = 2, } SQL_RESULT_CODE; -#define SLOW_LOG_SEND_SIZE 1024*1024 -extern tsem2_t monitorSem; -extern STaosQueue* monitorQueue; +#define SLOW_LOG_SEND_SIZE 8*1024 typedef struct { int64_t clusterId; @@ -55,18 +53,14 @@ typedef struct { void monitorClose(); void monitorInit(); -void monitorSendAllSlowLogFromTempDir(void* pInst); void monitorClientSQLReqInit(int64_t clusterKey); void monitorClientSlowQueryInit(int64_t clusterId); void monitorCreateClient(int64_t clusterId); void monitorCreateClientCounter(int64_t clusterId, const char* name, const char* help, size_t label_key_count, const char** label_keys); void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values); -void* monitorThreadFunc(void *param); -void monitorFreeSlowLogData(MonitorSlowLogData* pData); const char* monitorResultStr(SQL_RESULT_CODE code); -void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet); - +int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value); #ifdef __cplusplus } #endif diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 15568669f1..fbeea9ddc1 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -107,17 +107,17 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ } char clusterId[32] = {0}; if (snprintf(clusterId, sizeof(clusterId), "%" PRId64, pTscObj->pAppInfo->clusterId) < 0){ - uError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId); + tscError("failed to generate clusterId:%" PRId64, pTscObj->pAppInfo->clusterId); } char startTs[32] = {0}; if (snprintf(startTs, sizeof(startTs), "%" PRId64, pRequest->metric.start/1000) < 0){ - uError("failed to generate startTs:%" PRId64, pRequest->metric.start/1000); + tscError("failed to generate startTs:%" PRId64, pRequest->metric.start/1000); } char requestId[32] = {0}; if (snprintf(requestId, sizeof(requestId), "%" PRIu64, pRequest->requestId) < 0){ - uError("failed to generate requestId:%" PRIu64, pRequest->requestId); + tscError("failed to generate requestId:%" PRIu64, pRequest->requestId); } cJSON_AddItemToObject(json, "cluster_id", cJSON_CreateString(clusterId)); cJSON_AddItemToObject(json, "start_ts", cJSON_CreateString(startTs)); @@ -142,7 +142,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ char pid[32] = {0}; if (snprintf(pid, sizeof(pid), "%d", appInfo.pid) < 0){ - uError("failed to generate pid:%d", appInfo.pid); + tscError("failed to generate pid:%d", appInfo.pid); } cJSON_AddItemToObject(json, "process_id", cJSON_CreateString(pid)); @@ -153,25 +153,14 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ }else if(pRequest->pDb != NULL){ cJSON_AddItemToObject(json, "db", cJSON_CreateString(pRequest->pDb)); }else{ - cJSON_AddItemToObject(json, "db", cJSON_CreateString("unknown")); + cJSON_AddItemToObject(json, "db", cJSON_CreateString("")); } + char* value = cJSON_PrintUnformatted(json); + if(monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, value) < 0){ + taosMemoryFree(value); + } - MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0); - if (slowLogData == NULL) { - cJSON_Delete(json); - tscError("[monitor] failed to allocate slow log data"); - return; - } - slowLogData->clusterId = pTscObj->pAppInfo->clusterId; - slowLogData->value = cJSON_PrintUnformatted(json); - tscDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); - if (taosWriteQitem(monitorQueue, slowLogData) == 0){ - tsem2_post(&monitorSem); - }else{ - monitorFreeSlowLogData(slowLogData); - taosFreeQitem(slowLogData); - } cJSON_Delete(json); } diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index e66884e74e..c7e65eaf76 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -85,31 +85,6 @@ static SAppInstInfo* getAppInstByClusterId(int64_t clusterId) { return *(SAppInstInfo**)p; } -static int32_t tscMonitortInit() { - TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - TdThread monitorThread; - if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { - uError("failed to create monitor thread since %s", strerror(errno)); - return -1; - } - - taosThreadAttrDestroy(&thAttr); - return 0; -} - -static void tscMonitorStop() { - if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) { - uDebug("monitor thread already stopped"); - return; - } - - while (atomic_load_32(&slowLogFlag) > 0) { - taosMsleep(100); - } -} - static int32_t monitorReportAsyncCB(void* param, SDataBuf* pMsg, int32_t code) { if (TSDB_CODE_SUCCESS != code) { uError("found error in monitorReport send callback, code:%d, please check the network.", code); @@ -161,7 +136,7 @@ static int32_t sendReport(void* pTransporter, SEpSet *epSet, char* pCont, MONITO return code; } -void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){ +static void monitorReadSendSlowLog(TdFilePtr pFile, void* pTransporter, SEpSet *epSet){ char buf[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log char pCont[SLOW_LOG_SEND_SIZE + 1] = {0}; // +1 for \0, for print log int32_t offset = 0; @@ -268,8 +243,9 @@ static void sendAllSlowLog(){ uDebug("[monitor] sendAllSlowLog when client close"); } -void monitorSendAllSlowLogFromTempDir(void* inst){ - SAppInstInfo* pInst = (SAppInstInfo*)inst; +static void monitorSendAllSlowLogFromTempDir(int64_t clusterId){ + SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId); + if(pInst == NULL || !pInst->monitorParas.tsEnableMonitor){ uInfo("[monitor] monitor is disabled, skip send slow log"); return; @@ -350,45 +326,6 @@ static void sendAllCounter(){ } } -void monitorInit() { - uInfo("[monitor] tscMonitor init"); - monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (monitorCounterHash == NULL) { - uError("failed to create monitorCounterHash"); - } - taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); - - monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (monitorSlowLogHash == NULL) { - uError("failed to create monitorSlowLogHash"); - } - taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); - - monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); - if (monitorTimer == NULL) { - uError("failed to create monitor timer"); - } - - taosInitRWLatch(&monitorLock); - tscMonitortInit(); -} - -void monitorClose() { - uInfo("[monitor] tscMonitor close"); - taosRLockLatch(&monitorLock); - - if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) { - uDebug("[monitor] monitorFlag is not 0"); - } - tscMonitorStop(); - sendAllSlowLog(); - sendAllCounter(); - taosHashCleanup(monitorCounterHash); - taosHashCleanup(monitorSlowLogHash); - taosTmrCleanUp(monitorTimer); - taosRUnLockLatch(&monitorLock); -} - void monitorCreateClient(int64_t clusterId) { MonitorClient* pMonitor = NULL; taosWLockLatch(&monitorLock); @@ -477,7 +414,7 @@ end: } void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values) { - taosRLockLatch(&monitorLock); + taosWLockLatch(&monitorLock); MonitorClient** ppMonitor = (MonitorClient**)taosHashGet(monitorCounterHash, &clusterId, LONG_BYTES); if (ppMonitor == NULL || *ppMonitor == NULL) { uError("monitorCounterInc not found pMonitor %"PRId64, clusterId); @@ -494,7 +431,7 @@ void monitorCounterInc(int64_t clusterId, const char* counterName, const char** uInfo("[monitor] monitorCounterInc %"PRIx64"(%p):%s", pMonitor->clusterId, pMonitor, counterName); end: - taosRUnLockLatch(&monitorLock); + taosWUnLockLatch(&monitorLock); } const char* monitorResultStr(SQL_RESULT_CODE code) { @@ -502,16 +439,16 @@ const char* monitorResultStr(SQL_RESULT_CODE code) { return result_state[code]; } -void monitorFreeSlowLogData(MonitorSlowLogData* pData) { +static void monitorFreeSlowLogData(MonitorSlowLogData* pData) { if (pData == NULL) { return; } taosMemoryFree(pData->value); } -void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } +static void monitorThreadFuncUnexpectedStopped(void) { atomic_store_32(&slowLogFlag, -1); } -void reportSlowLog(void* param, void* tmrId) { +static void reportSlowLog(void* param, void* tmrId) { taosRLockLatch(&monitorLock); if (atomic_load_32(&monitorFlag) == 1) { taosRUnLockLatch(&monitorLock); @@ -538,8 +475,8 @@ void reportSlowLog(void* param, void* tmrId) { taosTmrReset(reportSlowLog, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); } -void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){ - taosRLockLatch(&monitorLock); +static void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){ + taosWLockLatch(&monitorLock); TdFilePtr pFile = NULL; void* tmp = taosHashGet(monitorSlowLogHash, &slowLogData->clusterId, LONG_BYTES); if (tmp == NULL){ @@ -594,10 +531,10 @@ void monitorWriteSlowLog2File(MonitorSlowLogData* slowLogData, char *tmpPath){ uDebug("[monitor] write slow log to file:%p, clusterId:%"PRIx64, pFile, slowLogData->clusterId); FAILED: - taosRUnLockLatch(&monitorLock); + taosWUnLockLatch(&monitorLock); } -void* monitorThreadFunc(void *param){ +static void* monitorThreadFunc(void *param){ setThreadName("client-monitor-slowlog"); #ifdef WINDOWS @@ -638,7 +575,11 @@ void* monitorThreadFunc(void *param){ taosReadQitem(monitorQueue, (void**)&slowLogData); if (slowLogData != NULL) { uDebug("[monitor] read slow log data from queue, clusterId:%" PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); - monitorWriteSlowLog2File(slowLogData, tmpPath); + if (slowLogData->value == NULL){ + monitorSendAllSlowLogFromTempDir(slowLogData->clusterId); + }else{ + monitorWriteSlowLog2File(slowLogData, tmpPath); + } } monitorFreeSlowLogData(slowLogData); taosFreeQitem(slowLogData); @@ -649,4 +590,86 @@ void* monitorThreadFunc(void *param){ tsem2_destroy(&monitorSem); slowLogFlag = -2; return NULL; +} + +static int32_t tscMonitortInit() { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + TdThread monitorThread; + if (taosThreadCreate(&monitorThread, &thAttr, monitorThreadFunc, NULL) != 0) { + uError("failed to create monitor thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + return 0; +} + +static void tscMonitorStop() { + if (atomic_val_compare_exchange_32(&slowLogFlag, 0, 1)) { + uDebug("monitor thread already stopped"); + return; + } + + while (atomic_load_32(&slowLogFlag) > 0) { + taosMsleep(100); + } +} + +void monitorInit() { + uInfo("[monitor] tscMonitor init"); + monitorCounterHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (monitorCounterHash == NULL) { + uError("failed to create monitorCounterHash"); + } + taosHashSetFreeFp(monitorCounterHash, destroyMonitorClient); + + monitorSlowLogHash = (SHashObj*)taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (monitorSlowLogHash == NULL) { + uError("failed to create monitorSlowLogHash"); + } + taosHashSetFreeFp(monitorSlowLogHash, destroySlowLogClient); + + monitorTimer = taosTmrInit(0, 0, 0, "MONITOR"); + if (monitorTimer == NULL) { + uError("failed to create monitor timer"); + } + + taosInitRWLatch(&monitorLock); + tscMonitortInit(); +} + +void monitorClose() { + uInfo("[monitor] tscMonitor close"); + taosWLockLatch(&monitorLock); + + if (atomic_val_compare_exchange_32(&monitorFlag, 0, 1)) { + uDebug("[monitor] monitorFlag is not 0"); + } + tscMonitorStop(); + sendAllSlowLog(); + sendAllCounter(); + taosHashCleanup(monitorCounterHash); + taosHashCleanup(monitorSlowLogHash); + taosTmrCleanUp(monitorTimer); + taosWUnLockLatch(&monitorLock); +} + +int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value){ + MonitorSlowLogData* slowLogData = taosAllocateQitem(sizeof(MonitorSlowLogData), DEF_QITEM, 0); + if (slowLogData == NULL) { + uError("[monitor] failed to allocate slow log data"); + return -1; + } + slowLogData->clusterId = clusterId; + slowLogData->value = value; + uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); + if (taosWriteQitem(monitorQueue, slowLogData) == 0){ + tsem2_post(&monitorSem); + }else{ + monitorFreeSlowLogData(slowLogData); + taosFreeQitem(slowLogData); + } + return 0; } \ No newline at end of file diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 8c917a7534..417cb8b562 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -155,7 +155,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){ tscError("failed to put appInfo into appInfo.pInstMapByClusterId"); } - monitorSendAllSlowLogFromTempDir(pTscObj->pAppInfo); + monitorPutData2MonitorQueue(pTscObj->pAppInfo->clusterId, NULL); monitorClientSlowQueryInit(connectRsp.clusterId); monitorClientSQLReqInit(connectRsp.clusterId); } diff --git a/source/client/test/clientMonitorTests.cpp b/source/client/test/clientMonitorTests.cpp index c74f4f7290..2d3ce87f38 100644 --- a/source/client/test/clientMonitorTests.cpp +++ b/source/client/test/clientMonitorTests.cpp @@ -107,7 +107,7 @@ TEST(clientMonitorTest, ReadOneFile) { SEpSet* epSet = NULL; // Call the function to be tested - monitorReadSendSlowLog(pFile, pTransporter, epSet); +// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet); char value[size] = {0}; memset(value, '0', size - 1); @@ -115,7 +115,7 @@ TEST(clientMonitorTest, ReadOneFile) { uError("failed to write len to file:%p since %s", pFile, terrstr()); } - monitorReadSendSlowLog(pFile, pTransporter, epSet); +// monitorReadSendSlowLog(pFile, (int64_t)pTransporter, epSet); // Clean up any resources created for testing taosCloseFile(&pFile); @@ -164,6 +164,6 @@ TEST(clientMonitorTest, ReadTwoFile) { pAppInfo.clusterId = 2; pAppInfo.monitorParas.tsEnableMonitor = 1; strcpy(tsTempDir,"/tmp"); - monitorSendAllSlowLogFromTempDir(&pAppInfo); +// monitorSendAllSlowLogFromTempDir(&pAppInfo); } \ No newline at end of file diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index cba852b48f..784b83075c 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -180,7 +180,7 @@ int32_t tsMaxRetryWaitTime = 10000; bool tsUseAdapter = false; int32_t tsMetaCacheMaxSize = -1; // MB int32_t tsSlowLogThreshold = 10; // seconds -int32_t tsSlowLogThresholdTest = 10; // seconds +int32_t tsSlowLogThresholdTest = INT32_MAX; // seconds char tsSlowLogExceptDb[TSDB_DB_NAME_LEN] = ""; // seconds int32_t tsSlowLogScope = SLOW_LOG_TYPE_QUERY; char* tsSlowLogScopeString = "query"; @@ -973,7 +973,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) { sndDebugFlag = cfgGetItem(pCfg, "sndDebugFlag")->i32; } -static int32_t taosSetSlowLogScope(char *pScope) { +int32_t taosSetSlowLogScope(char *pScope) { if (NULL == pScope || 0 == strlen(pScope)) { return SLOW_LOG_TYPE_QUERY; } @@ -984,7 +984,7 @@ static int32_t taosSetSlowLogScope(char *pScope) { char *tmp = NULL; while((scope = strsep(&pScope, "|")) != NULL){ taosMemoryFreeClear(tmp); - tmp = strdup(scope); + tmp = taosStrdup(scope); strtrim(tmp); if (0 == strcasecmp(tmp, "all")) { slowScope |= SLOW_LOG_TYPE_ALL; diff --git a/source/common/test/commonTests.cpp b/source/common/test/commonTests.cpp index 197ccdb6ca..360d1ed31a 100644 --- a/source/common/test/commonTests.cpp +++ b/source/common/test/commonTests.cpp @@ -743,7 +743,7 @@ static int32_t taosSetSlowLogScope(char *pScope) { char *tmp = NULL; while((scope = strsep(&pScope, "|")) != NULL){ taosMemoryFreeClear(tmp); - tmp = strdup(scope); + tmp = taosStrdup(scope); strtrim(tmp); if (0 == strcasecmp(tmp, "all")) { slowScope |= SLOW_LOG_TYPE_ALL; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index dd577f8908..089c4a10b3 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -144,6 +144,11 @@ typedef enum { DND_REASON_ENABLE_WHITELIST_NOT_MATCH, DND_REASON_ENCRYPTION_KEY_NOT_MATCH, DND_REASON_STATUS_MONITOR_NOT_MATCH, + DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH, + DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH, + DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH, + DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH, + DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH, DND_REASON_OTHERS } EDndReason; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index f5ab56c1f2..9f0fc2decf 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -49,6 +49,12 @@ static const char *offlineReason[] = { "ttlChangeOnWrite not match", "enableWhiteList not match", "encryptionKey not match", + "monitor not match", + "monitor switch not match", + "monitor interval not match", + "monitor slow log threshold not match", + "monitor slow log sql max len not match", + "monitor slow log scopenot match", "unknown", }; @@ -438,20 +444,20 @@ void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeInfo) { } } -#define CHECK_MONITOR_PARA(para) \ +#define CHECK_MONITOR_PARA(para,err) \ if (pCfg->monitorParas.para != para) { \ mError("dnode:%d, para:%d inconsistent with cluster:%d", pDnode->id, pCfg->monitorParas.para, para); \ - terrno = TSDB_CODE_DNODE_INVALID_MONITOR_PARAS; \ - return DND_REASON_STATUS_MONITOR_NOT_MATCH;\ + terrno = err; \ + return err;\ } static int32_t mndCheckClusterCfgPara(SMnode *pMnode, SDnodeObj *pDnode, const SClusterCfg *pCfg) { - CHECK_MONITOR_PARA(tsEnableMonitor); - CHECK_MONITOR_PARA(tsMonitorInterval); - CHECK_MONITOR_PARA(tsSlowLogThreshold); - CHECK_MONITOR_PARA(tsSlowLogThresholdTest); - CHECK_MONITOR_PARA(tsSlowLogMaxLen); - CHECK_MONITOR_PARA(tsSlowLogScope); + CHECK_MONITOR_PARA(tsEnableMonitor, DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH); + CHECK_MONITOR_PARA(tsMonitorInterval, DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH); + CHECK_MONITOR_PARA(tsSlowLogThreshold, DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH); + CHECK_MONITOR_PARA(tsSlowLogThresholdTest, DND_REASON_STATUS_MONITOR_NOT_MATCH); + CHECK_MONITOR_PARA(tsSlowLogMaxLen, DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH); + CHECK_MONITOR_PARA(tsSlowLogScope, DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH); if (0 != strcasecmp(pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb)) { mError("dnode:%d, tsSlowLogExceptDb:%s inconsistent with cluster:%s", pDnode->id, pCfg->monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb); @@ -557,9 +563,6 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { SStatisReq statisReq = {0}; int32_t code = -1; - char strClusterId[TSDB_CLUSTER_ID_LEN] = {0}; - sprintf(strClusterId, "%" PRId64, pMnode->clusterId); - if (tDeserializeSStatisReq(pReq->pCont, pReq->contLen, &statisReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return code; diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index ac6cf7bad2..f63339bcfc 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -1222,20 +1222,24 @@ int32_t taosUmaskFile(int32_t maskVal) { int32_t taosGetErrorFile(TdFilePtr pFile) { return errno; } int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) { + int64_t ret = -1; +#if FILE_WITH_LOCK + taosThreadRwlockRdlock(&(pFile->rwlock)); +#endif if (pFile == NULL || ptrBuf == NULL) { - return -1; + goto END; } if (*ptrBuf != NULL) { taosMemoryFreeClear(*ptrBuf); } ASSERT(pFile->fp != NULL); if (pFile->fp == NULL) { - return -1; + goto END; } #ifdef WINDOWS size_t bufferSize = 512; *ptrBuf = taosMemoryMalloc(bufferSize); - if (*ptrBuf == NULL) return -1; + if (*ptrBuf == NULL) goto END; size_t bytesRead = 0; size_t totalBytesRead = 0; @@ -1244,7 +1248,7 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) { char *result = fgets(*ptrBuf + totalBytesRead, bufferSize - totalBytesRead, pFile->fp); if (result == NULL) { taosMemoryFreeClear(*ptrBuf); - return -1; + goto END; } bytesRead = strlen(*ptrBuf + totalBytesRead); totalBytesRead += bytesRead; @@ -1257,18 +1261,24 @@ int64_t taosGetLineFile(TdFilePtr pFile, char **__restrict ptrBuf) { void *newBuf = taosMemoryRealloc(*ptrBuf, bufferSize); if (newBuf == NULL) { taosMemoryFreeClear(*ptrBuf); - return -1; + goto END; } *ptrBuf = newBuf; } (*ptrBuf)[totalBytesRead] = '\0'; - return totalBytesRead; + ret = totalBytesRead; #else size_t len = 0; - return getline(ptrBuf, &len, pFile->fp); + ret = getline(ptrBuf, &len, pFile->fp); #endif + + END: +#if FILE_WITH_LOCK + taosThreadRwlockUnlock(&(pFile->rwlock)); +#endif + return ret; } int64_t taosGetsFile(TdFilePtr pFile, int32_t maxSize, char *__restrict buf) { diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index 8be38a811f..adde3a3331 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -23,6 +23,7 @@ #include "tlog.h" #include "tunit.h" #include "tutil.h" +#include "tglobal.h" #define CFG_NAME_PRINT_LEN 24 #define CFG_SRC_PRINT_LEN 12 @@ -432,6 +433,18 @@ int32_t cfgCheckRangeForDynUpdate(SConfig *pCfg, const char *name, const char *p } switch (pItem->dtype) { + case CFG_DTYPE_STRING:{ + if(strcasecmp(name, "slowLogScope") == 0){ + char* tmp = taosStrdup(pVal); + if(taosSetSlowLogScope(tmp) < 0){ + terrno = TSDB_CODE_INVALID_CFG; + cfgUnLock(pCfg); + taosMemoryFree(tmp); + return -1; + } + taosMemoryFree(tmp); + } + } break; case CFG_DTYPE_BOOL: { int32_t ival = (int32_t)atoi(pVal); if (ival != 0 && ival != 1) { From 32ff8a7d99bbb9e6aa63f7d6453b7d381e7795a1 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 26 Jun 2024 20:03:27 +0800 Subject: [PATCH 10/16] fix:[TS-4921] errors in test --- source/client/src/clientEnv.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index fbeea9ddc1..af396876bb 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -126,7 +126,7 @@ static void generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int32_ cJSON_AddItemToObject(json, "code", cJSON_CreateNumber(pRequest->code)); cJSON_AddItemToObject(json, "error_info", cJSON_CreateString(tstrerror(pRequest->code))); cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)); - cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.totalRows)); + cJSON_AddItemToObject(json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)); if(strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen){ char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen]; pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0'; From a58ba79046dc37967cd2b7850526ecd00e139e22 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 27 Jun 2024 00:01:07 +0800 Subject: [PATCH 11/16] fix:[TS-4921] errors in test --- source/client/src/clientMonitor.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index c7e65eaf76..9e990dd545 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -568,6 +568,7 @@ static void* monitorThreadFunc(void *param){ uError("open queue error since %s", terrstr()); return NULL; } + uDebug("monitorThreadFunc start"); while (1) { if (slowLogFlag > 0) break; @@ -665,6 +666,9 @@ int32_t monitorPutData2MonitorQueue(int64_t clusterId, char* value){ slowLogData->clusterId = clusterId; slowLogData->value = value; uDebug("[monitor] write slow log to queue, clusterId:%"PRIx64 " value:%s", slowLogData->clusterId, slowLogData->value); + while (monitorQueue == NULL) { + taosMsleep(100); + } if (taosWriteQitem(monitorQueue, slowLogData) == 0){ tsem2_post(&monitorSem); }else{ From 5685346d7884dc980ebf0c74b20d06932022ac24 Mon Sep 17 00:00:00 2001 From: sima Date: Mon, 24 Jun 2024 20:28:16 +0800 Subject: [PATCH 12/16] fix:[TD-30730] Modify precision rules for input parameters of function to_iso8601 and add test. --- source/libs/function/src/builtins.c | 9 --- source/libs/scalar/src/sclfunc.c | 78 ++++++++++++------------- tests/system-test/2-query/To_iso8601.py | 60 ++++++++++++++++++- 3 files changed, 95 insertions(+), 52 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 498b46dcfe..e3e84ac20b 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2166,15 +2166,6 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); } - if (QUERY_NODE_VALUE == nodeType(nodesListGetNode(pFunc->pParameterList, 0))) { - SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 0); - - if (!validateTimestampDigits(pValue)) { - pFunc->node.resType = (SDataType){.bytes = 0, .type = TSDB_DATA_TYPE_BINARY}; - return TSDB_CODE_SUCCESS; - } - } - // param1 if (numOfParams == 2) { SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 48bedde91a..6d87ae9baa 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1095,24 +1095,45 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * char fraction[20] = {0}; bool hasFraction = false; NUM_TO_STRING(type, input, sizeof(fraction), fraction); - int32_t tsDigits = (int32_t)strlen(fraction); + int32_t fractionLen; char buf[64] = {0}; int64_t timeVal; + char* format = NULL; + int64_t quot = 0; + long mod = 0; + GET_TYPED_DATA(timeVal, int64_t, type, input); - if (tsDigits > TSDB_TIME_PRECISION_SEC_DIGITS) { - if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { - timeVal = timeVal / 1000; - } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - timeVal = timeVal / ((int64_t)(1000 * 1000)); - } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal = timeVal / ((int64_t)(1000 * 1000 * 1000)); - } else { + + switch (pInput->columnData[0].info.precision) { + case TSDB_TIME_PRECISION_MILLI: { + quot = timeVal / 1000; + fractionLen = 5; + format = ".%03" PRId64; + mod = timeVal % 1000; + break; + } + + case TSDB_TIME_PRECISION_MICRO: { + quot = timeVal / 1000000; + fractionLen = 8; + format = ".%06" PRId64; + mod = timeVal % 1000000; + break; + } + + case TSDB_TIME_PRECISION_NANO: { + quot = timeVal / 1000000000; + fractionLen = 11; + format = ".%09" PRId64; + mod = timeVal % 1000000000; + break; + } + + default: { colDataSetNULL(pOutput->columnData, i); continue; } - hasFraction = true; - memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS); } // trans current timezone's unix ts to dest timezone @@ -1122,18 +1143,19 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * if (0 != offsetOfTimezone(tz, &offset)) { goto _end; } - timeVal -= offset + 3600 * ((int64_t)tsTimezone); + quot -= offset + 3600 * ((int64_t)tsTimezone); struct tm tmInfo; int32_t len = 0; - if (taosLocalTime((const time_t *)&timeVal, &tmInfo, buf) == NULL) { + if (taosLocalTime((const time_t *)", &tmInfo, buf) == NULL) { len = (int32_t)strlen(buf); goto _end; } - strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &tmInfo); - len = (int32_t)strlen(buf); + len = (int32_t)strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &tmInfo); + + len += snprintf(buf + len, fractionLen, format, mod); // add timezone string if (tzLen > 0) { @@ -1141,32 +1163,6 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * len += tzLen; } - if (hasFraction) { - int32_t fracLen = (int32_t)strlen(fraction) + 1; - - char *tzInfo; - if (buf[len - 1] == 'z' || buf[len - 1] == 'Z') { - tzInfo = &buf[len - 1]; - memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo)); - } else { - tzInfo = strchr(buf, '+'); - if (tzInfo) { - memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo)); - } else { - // search '-' backwards - tzInfo = strrchr(buf, '-'); - if (tzInfo) { - memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo)); - } - } - } - - char tmp[32] = {0}; - sprintf(tmp, ".%s", fraction); - memcpy(tzInfo, tmp, fracLen); - len += fracLen; - } - _end: memmove(buf + VARSTR_HEADER_SIZE, buf, len); varDataSetLen(buf, len); diff --git a/tests/system-test/2-query/To_iso8601.py b/tests/system-test/2-query/To_iso8601.py index 160473ffce..24865ada88 100644 --- a/tests/system-test/2-query/To_iso8601.py +++ b/tests/system-test/2-query/To_iso8601.py @@ -19,6 +19,61 @@ class TDTestCase: self.dbname = 'db' self.stbname = f'{self.dbname}.stb' self.ntbname = f'{self.dbname}.ntb' + def check_timestamp_precision(self): + time_zone = time.strftime('%z') + tdSql.execute(f'drop database if exists {self.dbname}') + tdSql.execute(f'create database {self.dbname} precision "us"') + tdSql.execute(f'use {self.dbname}') + tdSql.execute(f'create table if not exists {self.ntbname}(ts timestamp, c1 int, c2 timestamp)') + tdSql.execute(f'insert into {self.ntbname} values(now,1,today())') + ts_list = ['1', '11', '111', '1111', '11111', '111111', '1111111', '11111111', '111111111', '1111111111', + '11111111111','111111111111','1111111111111','11111111111111','111111111111111','1111111111111111', + '11111111111111111','111111111111111111','1111111111111111111'] + res_list_ms = ['1970-01-01T08:00:00.001+0800', '1970-01-01T08:00:00.011+0800', '1970-01-01T08:00:00.111+0800', + '1970-01-01T08:00:01.111+0800', '1970-01-01T08:00:11.111+0800', '1970-01-01T08:01:51.111+0800', + '1970-01-01T08:18:31.111+0800', '1970-01-01T11:05:11.111+0800', '1970-01-02T14:51:51.111+0800', + '1970-01-14T04:38:31.111+0800', '1970-05-09T22:25:11.111+0800', '1973-07-10T08:11:51.111+0800', + '2005-03-18T09:58:31.111+0800', '2322-02-06T03:45:11.111+0800', '5490-12-21T13:31:51.111+0800', + '37179-09-17T15:18:31.111+0800', '354067-02-04T09:05:11.111+0800', + '3522940-12-11T18:51:51.111+0800', '35211679-06-14T20:38:31.111+0800'] + res_list_us = ['1970-01-01T08:00:00.000001+0800', '1970-01-01T08:00:00.000011+0800', + '1970-01-01T08:00:00.000111+0800', '1970-01-01T08:00:00.001111+0800', + '1970-01-01T08:00:00.011111+0800', '1970-01-01T08:00:00.111111+0800', + '1970-01-01T08:00:01.111111+0800', '1970-01-01T08:00:11.111111+0800', + '1970-01-01T08:01:51.111111+0800', '1970-01-01T08:18:31.111111+0800', + '1970-01-01T11:05:11.111111+0800', '1970-01-02T14:51:51.111111+0800', + '1970-01-14T04:38:31.111111+0800', '1970-05-09T22:25:11.111111+0800', + '1973-07-10T08:11:51.111111+0800', '2005-03-18T09:58:31.111111+0800', + '2322-02-06T03:45:11.111111+0800', '5490-12-21T13:31:51.111111+0800', + '37179-09-17T15:18:31.111111+0800'] + res_list_ns = ['1970-01-01T08:00:00.000000001+0800', '1970-01-01T08:00:00.000000011+0800', + '1970-01-01T08:00:00.000000111+0800', '1970-01-01T08:00:00.000001111+0800', + '1970-01-01T08:00:00.000011111+0800', '1970-01-01T08:00:00.000111111+0800', + '1970-01-01T08:00:00.001111111+0800', '1970-01-01T08:00:00.011111111+0800', + '1970-01-01T08:00:00.111111111+0800', '1970-01-01T08:00:01.111111111+0800', + '1970-01-01T08:00:11.111111111+0800', '1970-01-01T08:01:51.111111111+0800', + '1970-01-01T08:18:31.111111111+0800', '1970-01-01T11:05:11.111111111+0800', + '1970-01-02T14:51:51.111111111+0800', '1970-01-14T04:38:31.111111111+0800', + '1970-05-09T22:25:11.111111111+0800', '1973-07-10T08:11:51.111111111+0800', + '2005-03-18T09:58:31.111111111+0800'] + # test to_iso8601's precision with default precision 'ms' + for i in range(len(ts_list)): + tdSql.query(f'select to_iso8601({ts_list[i]})') + tdSql.checkEqual(tdSql.queryResult[0][0],res_list_ms[i]) + # test to_iso8601's precision with table's precision 'us' + for i in range(len(ts_list)): + tdSql.query(f'select to_iso8601({ts_list[i]}) from {self.ntbname}') + tdSql.checkEqual(tdSql.queryResult[0][0],res_list_us[i]) + + tdSql.execute(f'drop database if exists {self.dbname}') + tdSql.execute(f'create database {self.dbname} precision "ns"') + tdSql.execute(f'use {self.dbname}') + tdSql.execute(f'create table if not exists {self.ntbname}(ts timestamp, c1 int, c2 timestamp)') + tdSql.execute(f'insert into {self.ntbname} values(now,1,today())') + # test to_iso8601's precision with table's precision 'ns' + for i in range(len(ts_list)): + tdSql.query(f'select to_iso8601({ts_list[i]}) from {self.ntbname}') + tdSql.checkEqual(tdSql.queryResult[0][0],res_list_ns[i]) def check_customize_param_ms(self): time_zone = time.strftime('%z') tdSql.execute(f'drop database if exists {self.dbname}') @@ -65,7 +120,7 @@ class TDTestCase: tdSql.checkRows(1) for i in range(0,3): tdSql.query("select to_iso8601(1) from db.ntb") - tdSql.checkData(i,0,"1970-01-01T08:00:01+0800") + tdSql.checkData(i,0,"1970-01-01T08:00:00.001+0800") tdSql.checkRows(3) tdSql.query("select to_iso8601(ts) from db.ntb") tdSql.checkRows(3) @@ -97,7 +152,7 @@ class TDTestCase: tdSql.checkRows(3) tdSql.query("select to_iso8601(1) from db.stb") for i in range(0,3): - tdSql.checkData(i,0,"1970-01-01T08:00:01+0800") + tdSql.checkData(i,0,"1970-01-01T08:00:00.001+0800") tdSql.checkRows(3) tdSql.query("select to_iso8601(ts) from db.stb") tdSql.checkRows(3) @@ -113,6 +168,7 @@ class TDTestCase: def run(self): # sourcery skip: extract-duplicate-method self.check_base_function() self.check_customize_param_ms() + self.check_timestamp_precision() def stop(self): tdSql.close() From 4601583361d0b8c5c155299451f317fcc46d45d7 Mon Sep 17 00:00:00 2001 From: sima Date: Tue, 25 Jun 2024 16:50:09 +0800 Subject: [PATCH 13/16] fix:[TD-30730] Modify precision rules for input parameters of function timetruncate and add test. --- source/libs/scalar/src/sclfunc.c | 191 +--------------------- tests/system-test/2-query/timetruncate.py | 36 +++- 2 files changed, 42 insertions(+), 185 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 6d87ae9baa..1a9d7f4900 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1343,9 +1343,6 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[timePrecIdx]), pInput[timePrecIdx].columnData->pData); memcpy(timezone, varDataVal(pInput[timeZoneIdx].columnData->pData), varDataLen(pInput[timeZoneIdx].columnData->pData)); - int64_t factor = TSDB_TICK_PER_SECOND(timePrec); - int64_t unit = timeUnit * 1000 / factor; - for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { colDataSetNULL(pOutput->columnData, i); @@ -1355,201 +1352,27 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara char *input = colDataGetData(pInput[0].columnData, i); if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */ - int32_t ret = convertStringToTimestamp(type, input, TSDB_TIME_PRECISION_NANO, &timeVal); + int32_t ret = convertStringToTimestamp(type, input, timePrec, &timeVal); if (ret != TSDB_CODE_SUCCESS) { colDataSetNULL(pOutput->columnData, i); continue; } - // If converted value is less than 10digits in second, use value in second instead - int64_t timeValSec = timeVal / 1000000000; - if (timeValSec < 1000000000) { - timeVal = timeValSec; - } } else if (type == TSDB_DATA_TYPE_BIGINT) { /* unix timestamp */ GET_TYPED_DATA(timeVal, int64_t, type, input); } else if (type == TSDB_DATA_TYPE_TIMESTAMP) { /* timestamp column*/ GET_TYPED_DATA(timeVal, int64_t, type, input); - int64_t timeValSec = timeVal / factor; - if (timeValSec < 1000000000) { - timeVal = timeValSec; - } } char buf[20] = {0}; NUM_TO_STRING(TSDB_DATA_TYPE_BIGINT, &timeVal, sizeof(buf), buf); - int32_t tsDigits = (int32_t)strlen(buf); - switch (unit) { - case 0: { /* 1u or 1b */ - if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) { - timeVal = timeVal * 1; - } else { - timeVal = timeVal / 1000 * 1000; - } - } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { - timeVal = timeVal * factor; - } else { - timeVal = timeVal * 1; - } - break; - } - case 1: { /* 1a */ - if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { - timeVal = timeVal * 1; - } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - timeVal = timeVal / 1000 * 1000; - } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal = timeVal / 1000000 * 1000000; - } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { - timeVal = timeVal * factor; - } else { - colDataSetNULL(pOutput->columnData, i); - continue; - } - break; - } - case 1000: { /* 1s */ - if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { - timeVal = timeVal / 1000 * 1000; - } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - timeVal = timeVal / 1000000 * 1000000; - } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal = timeVal / 1000000000 * 1000000000; - } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { - timeVal = timeVal * factor; - } else { - colDataSetNULL(pOutput->columnData, i); - continue; - } - break; - } - case 60000: { /* 1m */ - if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { - timeVal = timeVal / 1000 / 60 * 60 * 1000; - } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - timeVal = timeVal / 1000000 / 60 * 60 * 1000000; - } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal = timeVal / 1000000000 / 60 * 60 * 1000000000; - } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { - timeVal = timeVal * factor / factor / 60 * 60 * factor; - } else { - colDataSetNULL(pOutput->columnData, i); - continue; - } - break; - } - case 3600000: { /* 1h */ - if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { - timeVal = timeVal / 1000 / 3600 * 3600 * 1000; - } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - timeVal = timeVal / 1000000 / 3600 * 3600 * 1000000; - } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal = timeVal / 1000000000 / 3600 * 3600 * 1000000000; - } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { - timeVal = timeVal * factor / factor / 3600 * 3600 * factor; - } else { - colDataSetNULL(pOutput->columnData, i); - continue; - } - break; - } - case 86400000: { /* 1d */ - if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { - if (ignoreTz) { - timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000)) % (((int64_t)86400) * 1000); - } else { - timeVal = timeVal / 1000 / 86400 * 86400 * 1000; - } - } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - if (ignoreTz) { - timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000)) % (((int64_t)86400) * 1000000); - } else { - timeVal = timeVal / 1000000 / 86400 * 86400 * 1000000; - } - } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - if (ignoreTz) { - timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000000)) % (((int64_t)86400) * 1000000000); - } else { - timeVal = timeVal / 1000000000 / 86400 * 86400 * 1000000000; - } - } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { - if (ignoreTz) { - timeVal = (timeVal - (timeVal + offsetFromTz(timezone, 1)) % (86400L)) * factor; - } else { - timeVal = timeVal * factor / factor / 86400 * 86400 * factor; - } - } else { - colDataSetNULL(pOutput->columnData, i); - continue; - } - break; - } - case 604800000: { /* 1w */ - if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { - if (ignoreTz) { - timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000)) % (((int64_t)604800) * 1000); - } else { - timeVal = timeVal / 1000 / 604800 * 604800 * 1000; - } - } else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - if (ignoreTz) { - timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000)) % (((int64_t)604800) * 1000000); - } else { - timeVal = timeVal / 1000000 / 604800 * 604800 * 1000000; - } - } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - if (ignoreTz) { - timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1000000000)) % (((int64_t)604800) * 1000000000); - } else { - timeVal = timeVal / 1000000000 / 604800 * 604800 * 1000000000; - } - } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { - if (ignoreTz) { - timeVal = timeVal - (timeVal + offsetFromTz(timezone, 1)) % (((int64_t)604800L) * factor); - } else { - timeVal = timeVal * factor / factor / 604800 * 604800 * factor; - } - } else { - colDataSetNULL(pOutput->columnData, i); - continue; - } - break; - } - default: { - timeVal = timeVal * 1; - break; - } + // truncate the timestamp to time_unit precision + int64_t seconds = timeUnit / TSDB_TICK_PER_SECOND(timePrec); + if (ignoreTz && (seconds == 604800 || seconds == 86400)) { + timeVal = timeVal - (timeVal + offsetFromTz(timezone, TSDB_TICK_PER_SECOND(timePrec))) % (((int64_t)seconds) * TSDB_TICK_PER_SECOND(timePrec));; + } else { + timeVal = timeVal / timeUnit * timeUnit; } - - // truncate the timestamp to db precision - switch (timePrec) { - case TSDB_TIME_PRECISION_MILLI: { - if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - timeVal = timeVal / 1000; - } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal = timeVal / 1000000; - } - break; - } - case TSDB_TIME_PRECISION_MICRO: { - if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal = timeVal / 1000; - } else if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { - timeVal = timeVal * 1000; - } - break; - } - case TSDB_TIME_PRECISION_NANO: { - if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - timeVal = timeVal * 1000; - } else if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) { - timeVal = timeVal * 1000000; - } - break; - } - } - colDataSetVal(pOutput->columnData, i, (char *)&timeVal, false); } diff --git a/tests/system-test/2-query/timetruncate.py b/tests/system-test/2-query/timetruncate.py index 09bdfcef63..67a71cde17 100644 --- a/tests/system-test/2-query/timetruncate.py +++ b/tests/system-test/2-query/timetruncate.py @@ -22,6 +22,7 @@ class TDTestCase: '2020-4-1 00:00:00.001002', '2020-5-1 00:00:00.001002001' ] + self.unix_ts = ['1','1111','1111111','1111111111','1111111111111'] self.db_param_precision = ['ms','us','ns'] self.time_unit = ['1w','1d','1h','1m','1s','1a','1u','1b'] self.error_unit = ['2w','2d','2h','2m','2s','2a','2u','1c','#1'] @@ -134,7 +135,7 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/1000/60/60/24)*24*60*60*1000*1000*1000 ) else: # assuming the client timezone is UTC+0800 - tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i] - (date_time[i] + 8 * 3600 * 1000000) % (86400 * 1000000))) + tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i] - (date_time[i] + 8 * 3600 * 1000000000) % (86400 * 1000000000))) elif unit.lower() == '1w': for i in range(len(self.ts_str)): if self.rest_tag != 'rest': @@ -167,16 +168,49 @@ class TDTestCase: self.check_tb_type(unit,tb_type,ignore_tz) tdSql.checkRows(len(self.ts_str)) self.check_ms_timestamp(unit,date_time,ignore_tz) + for uts in self.unix_ts: + ans_time = [] + if tb_type.lower() == 'ntb': + tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ntbname}') + elif tb_type.lower() == 'ctb': + tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ctbname}') + elif tb_type.lower() == 'stb': + tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.stbname}') + for i in range(len(self.ts_str)): + ans_time.append(int(uts)) + self.check_ms_timestamp(unit, ans_time, ignore_tz) elif precision.lower() == 'us': for ignore_tz in tz_options: self.check_tb_type(unit,tb_type,ignore_tz) tdSql.checkRows(len(self.ts_str)) self.check_us_timestamp(unit,date_time,ignore_tz) + for uts in self.unix_ts: + ans_time = [] + if tb_type.lower() == 'ntb': + tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ntbname}') + elif tb_type.lower() == 'ctb': + tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ctbname}') + elif tb_type.lower() == 'stb': + tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.stbname}') + for i in range(len(self.ts_str)): + ans_time.append(int(uts)) + self.check_us_timestamp(unit, ans_time, ignore_tz) elif precision.lower() == 'ns': for ignore_tz in tz_options: self.check_tb_type(unit,tb_type, ignore_tz) tdSql.checkRows(len(self.ts_str)) self.check_ns_timestamp(unit,date_time,ignore_tz) + for uts in self.unix_ts: + ans_time = [] + if tb_type.lower() == 'ntb': + tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ntbname}') + elif tb_type.lower() == 'ctb': + tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.ctbname}') + elif tb_type.lower() == 'stb': + tdSql.query(f'select timetruncate({uts},{unit},{ignore_tz}) from {self.stbname}') + for i in range(len(self.ts_str)): + ans_time.append(int(uts)) + self.check_ns_timestamp(unit, ans_time, ignore_tz) for unit in self.error_unit: if tb_type.lower() == 'ntb': tdSql.error(f'select timetruncate(ts,{unit}) from {self.ntbname}') From 5dda4dba6f7a6a0f4708d82731c2abb2f6311ee4 Mon Sep 17 00:00:00 2001 From: sima Date: Wed, 26 Jun 2024 09:43:49 +0800 Subject: [PATCH 14/16] fix:[TD-30730] Change doc's description about functions. --- docs/en/12-taos-sql/10-function.md | 6 +++--- docs/zh/12-taos-sql/10-function.md | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/en/12-taos-sql/10-function.md b/docs/en/12-taos-sql/10-function.md index 27dbbfcc08..4bd701f713 100644 --- a/docs/en/12-taos-sql/10-function.md +++ b/docs/en/12-taos-sql/10-function.md @@ -398,7 +398,7 @@ Conversion functions change the data type of a value. CAST(expr AS type_name) ``` -**Description**: Convert the input data `expr` into the type specified by `type_name`. This function can be used only in SELECT statements. +**Description**: Convert the input data `expr` into the type specified by `type_name`. **Return value type**: The type specified by parameter `type_name` @@ -435,8 +435,7 @@ TO_ISO8601(expr [, timezone]) **More explanations**: - You can specify a time zone in the following format: [z/Z, +/-hhmm, +/-hh, +/-hh:mm]. For example, TO_ISO8601(1, "+00:00"). -- If the input is a UNIX timestamp, the precision of the returned value is determined by the digits of the input timestamp -- If the input is a column of TIMESTAMP type, the precision of the returned value is same as the precision set for the current data base in use +- The precision of the input timestamp will be recognized automatically according to the precision of the table used, milliseconds will be used if no table is specified. #### TO_JSON @@ -650,6 +649,7 @@ use_current_timezone: { - Time unit specified by `time_unit` can be: 1b (nanoseconds), 1u (microseconds), 1a (milliseconds), 1s (seconds), 1m (minutes), 1h (hours), 1d (days), or 1w (weeks) - The precision of the returned timestamp is same as the precision set for the current data base in use +- The precision of the input timestamp will be recognized automatically according to the precision of the table used, milliseconds will be used if no table is specified. - If the input data is not formatted as a timestamp, the returned value is null. - When using 1d/1w as the time unit to truncate timestamp, you can specify whether to truncate based on the current time zone by setting the use_current_timezone parameter. Value 0 indicates truncation using the UTC time zone, value 1 indicates truncation using the current time zone. diff --git a/docs/zh/12-taos-sql/10-function.md b/docs/zh/12-taos-sql/10-function.md index 6f4f9b3d84..26996a39fd 100644 --- a/docs/zh/12-taos-sql/10-function.md +++ b/docs/zh/12-taos-sql/10-function.md @@ -398,7 +398,7 @@ UPPER(expr) CAST(expr AS type_name) ``` -**功能说明**:数据类型转换函数,返回 expr 转换为 type_name 指定的类型后的结果。只适用于 select 子句中。 +**功能说明**:数据类型转换函数,返回 expr 转换为 type_name 指定的类型后的结果。 **返回结果类型**:CAST 中指定的类型(type_name)。 @@ -435,8 +435,7 @@ TO_ISO8601(expr [, timezone]) **使用说明**: - timezone 参数允许输入的时区格式为: [z/Z, +/-hhmm, +/-hh, +/-hh:mm]。例如,TO_ISO8601(1, "+00:00")。 -- 如果输入是表示 UNIX 时间戳的整形,返回格式精度由时间戳的位数决定; -- 如果输入是 TIMESTAMP 类型的列,返回格式的时间戳精度与当前 DATABASE 设置的时间精度一致。 +- 输入时间戳的精度由所查询表的精度确定, 若未指定表, 则精度为毫秒. #### TO_JSON @@ -650,6 +649,7 @@ use_current_timezone: { - 支持的时间单位 time_unit 如下: 1b(纳秒), 1u(微秒),1a(毫秒),1s(秒),1m(分),1h(小时),1d(天), 1w(周)。 - 返回的时间戳精度与当前 DATABASE 设置的时间精度一致。 +- 输入时间戳的精度由所查询表的精度确定, 若未指定表, 则精度为毫秒. - 输入包含不符合时间日期格式的字符串则返回 NULL。 - 当使用 1d/1w 作为时间单位对时间戳进行截断时, 可通过设置 use_current_timezone 参数指定是否根据当前时区进行截断处理。 值 0 表示使用 UTC 时区进行截断,值 1 表示使用当前时区进行截断。 From 96e32227263f8f401f64972b2b2c12f4176fd412 Mon Sep 17 00:00:00 2001 From: sima Date: Wed, 26 Jun 2024 15:34:50 +0800 Subject: [PATCH 15/16] fix:[TD-30730] fix mergejoin operator's timetruncate caculation on timezone. --- source/libs/executor/src/hashjoinoperator.c | 2 +- source/libs/executor/src/mergejoinoperator.c | 2 +- source/libs/scalar/src/sclfunc.c | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 31d3385676..2fe2ccc56f 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -116,7 +116,7 @@ int32_t hJoinLaunchPrimExpr(SSDataBlock* pBlock, SHJoinTableCtx* pTable, int32_t SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId); if (0 != pCtx->timezoneUnit) { for (int32_t i = startIdx; i <= endIdx; ++i) { - ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit - pCtx->timezoneUnit; + ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] - pCtx->timezoneUnit) % pCtx->truncateUnit; } } else { for (int32_t i = startIdx; i <= endIdx; ++i) { diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index faaebc1cd8..2e2101231b 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -948,7 +948,7 @@ int32_t mJoinLaunchPrimExpr(SSDataBlock* pBlock, SMJoinTableCtx* pTable) { SColumnInfoData* pPrimOut = taosArrayGet(pBlock->pDataBlock, pTable->primCtx.targetSlotId); if (0 != pCtx->timezoneUnit) { for (int32_t i = 0; i < pBlock->info.rows; ++i) { - ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] / pCtx->truncateUnit * pCtx->truncateUnit - pCtx->timezoneUnit; + ((int64_t*)pPrimOut->pData)[i] = ((int64_t*)pPrimIn->pData)[i] - (((int64_t*)pPrimIn->pData)[i] + pCtx->timezoneUnit) % pCtx->truncateUnit; } } else { for (int32_t i = 0; i < pBlock->info.rows; ++i) { diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 1a9d7f4900..282e935dd8 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1369,7 +1369,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara // truncate the timestamp to time_unit precision int64_t seconds = timeUnit / TSDB_TICK_PER_SECOND(timePrec); if (ignoreTz && (seconds == 604800 || seconds == 86400)) { - timeVal = timeVal - (timeVal + offsetFromTz(timezone, TSDB_TICK_PER_SECOND(timePrec))) % (((int64_t)seconds) * TSDB_TICK_PER_SECOND(timePrec));; + timeVal = timeVal - (timeVal + offsetFromTz(timezone, TSDB_TICK_PER_SECOND(timePrec))) % timeUnit; } else { timeVal = timeVal / timeUnit * timeUnit; } From 1ec867449e752a4f694e5f42a0b28cbbd8e30c13 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 27 Jun 2024 10:11:38 +0800 Subject: [PATCH 16/16] fix:[TS-4921] errors in test --- include/libs/monitor/clientMonitor.h | 2 +- source/dnode/mnode/impl/src/mndDnode.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index b3596362fd..68b55e71a9 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -31,7 +31,7 @@ typedef enum SQL_RESULT_CODE { SQL_RESULT_CANCEL = 2, } SQL_RESULT_CODE; -#define SLOW_LOG_SEND_SIZE 8*1024 +#define SLOW_LOG_SEND_SIZE 32*1024 typedef struct { int64_t clusterId; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 9f0fc2decf..73addea6fe 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -54,7 +54,7 @@ static const char *offlineReason[] = { "monitor interval not match", "monitor slow log threshold not match", "monitor slow log sql max len not match", - "monitor slow log scopenot match", + "monitor slow log scope not match", "unknown", };