From fd34087b8cb42935f5cf1974a9f5f1c5acb84c18 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 Date: Mon, 27 May 2024 16:20:13 +0800 Subject: [PATCH 1/6] fix select from union all caused crash --- include/libs/nodes/plannodes.h | 1 + source/libs/planner/src/planLogicCreater.c | 1 + source/libs/planner/src/planOptimizer.c | 30 +++++++++++++++++----- tests/system-test/2-query/union1.py | 13 ++++++++++ 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 18bc24d612..a691433ee6 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -183,6 +183,7 @@ typedef struct SProjectLogicNode { char stmtName[TSDB_TABLE_NAME_LEN]; bool ignoreGroupId; bool inputIgnoreGroup; + bool isSetOpProj; } SProjectLogicNode; typedef struct SIndefRowsFuncLogicNode { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 23b8baf031..acc29997a2 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1587,6 +1587,7 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator TSWAP(pProject->node.pLimit, pSetOperator->pLimit); } pProject->ignoreGroupId = true; + pProject->isSetOpProj = true; int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index a2a0343316..7d10c02529 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -3284,18 +3284,34 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* SNodeList* pNewChildTargets = nodesMakeList(); if (NULL == pProjectNode->node.pParent) { - SNode* pProjection = NULL; - FOREACH(pProjection, pProjectNode->pProjections) { - SNode* pChildTarget = NULL; - FOREACH(pChildTarget, pChild->pTargets) { - if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) { - nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget)); + SNode *pProjection = NULL, *pChildTarget = NULL; + bool needOrderMatch = QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pChild) && ((SProjectLogicNode*)pChild)->isSetOpProj; + bool orderMatch = true; + if (needOrderMatch) { + // For sql: select ... from (select ... union all select ...); + // When eliminating the outer proj (the outer select), we have to make sure that the outer proj projections and + // union all project targets have same columns in the same order. See detail in TD-30188 + FORBOTH(pProjection, pProjectNode->pProjections, pChildTarget, pChild->pTargets) { + if (!pProjection) break; + if (0 != strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) { + orderMatch = false; break; } + nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget)); + } + } else { + FOREACH(pProjection, pProjectNode->pProjections) { + FOREACH(pChildTarget, pChild->pTargets) { + if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) { + nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget)); + break; + } + } } } - if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets)) { + if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets) && + (!needOrderMatch || (needOrderMatch && orderMatch))) { nodesDestroyList(pChild->pTargets); pChild->pTargets = pNewChildTargets; } else { diff --git a/tests/system-test/2-query/union1.py b/tests/system-test/2-query/union1.py index 8db5ce01f3..853dfe9582 100644 --- a/tests/system-test/2-query/union1.py +++ b/tests/system-test/2-query/union1.py @@ -240,9 +240,22 @@ class TDTestCase: tdSql.error( " '' union all select c1 from ct1 " ) # tdSql.error( "select c1 from ct1 union select c1 from ct2 union select c1 from ct4 ") + def test_select_from_union_all(self): + tdSql.query('select c8, ts from ((select ts, c8,c1 from stb1 order by c1) union all select ts, c8, c1 from stb1 limit 15)') + tdSql.checkRows(15) + tdSql.query('select c8, ts from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)') + tdSql.checkRows(15) + tdSql.query('select ts, c1 from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)') + tdSql.checkRows(15) + tdSql.query('select ts, c1, c8 from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)') + tdSql.checkRows(15) + tdSql.query('select ts, c8, c1, 123 from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)') + tdSql.checkRows(15) + def all_test(self): self.__test_error() self.union_check() + self.test_select_from_union_all() def __create_tb(self): From a8383369ba3959eb77b6069bfefdb2489fdf6874 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 28 May 2024 15:13:58 +0800 Subject: [PATCH 2/6] enh: arb check roletime before check sync --- include/dnode/mnode/mnode.h | 2 ++ source/dnode/mnode/impl/src/mndArbGroup.c | 9 ++++++- source/dnode/mnode/impl/src/mndMain.c | 31 ++++++++++++++--------- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 108e6f18a6..fe96fe1117 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -101,6 +101,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr */ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); +int64_t mndGetRoleTimeMs(SMnode *pMnode); + /** * @brief Process the rpc, sync request. * diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index 50338fe889..b00da9ba3f 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -540,6 +540,14 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { return -1; } + int64_t roleTimeMs = mndGetRoleTimeMs(pMnode); + int64_t nowMs = taosGetTimestampMs(); + if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) { + mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs, + nowMs); + return 0; + } + SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup)); while (1) { @@ -551,7 +559,6 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { taosThreadMutexUnlock(&pArbGroup->mutex); int32_t vgId = arbGroupDup.vgId; - int64_t nowMs = taosGetTimestampMs(); bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs); bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index a78edcb05e..850c527a14 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -334,6 +334,8 @@ static int32_t minCronTime() { min = TMIN(min, tsStreamCheckpointInterval); min = TMIN(min, 6); // checkpointRemain min = TMIN(min, tsStreamNodeCheckInterval); + min = TMIN(min, tsArbHeartBeatIntervalSec); + min = TMIN(min, tsArbCheckSyncIntervalSec); int64_t telemInt = TMIN(60, (tsTelemInterval - 1)); min = TMIN(min, telemInt); @@ -390,6 +392,18 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) { if (sec % tsUptimeInterval == 0) { mndIncreaseUpTime(pMnode); } + + if (sec % (tsArbHeartBeatIntervalSec) == 0) { + if (mndPullupArbHeartbeat(pMnode) != 0) { + mError("failed to pullup arb heartbeat, since:%s", terrstr()); + } + } + + if (sec % (tsArbCheckSyncIntervalSec) == 0) { + if (mndPullupArbCheckSync(pMnode) != 0) { + mError("failed to pullup arb check sync, since:%s", terrstr()); + } + } } void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) { if (sec % (tsStatusInterval * 5) == 0) { @@ -421,18 +435,6 @@ static void *mndThreadFp(void *param) { continue; } mndDoTimerPullupTask(pMnode, sec); - - if (sec % (tsArbHeartBeatIntervalSec) == 0) { - if (mndPullupArbHeartbeat(pMnode) != 0) { - mError("failed to pullup arb heartbeat, since:%s", terrstr()); - } - } - - if (sec % (tsArbCheckSyncIntervalSec) == 0) { - if (mndPullupArbCheckSync(pMnode) != 0) { - mError("failed to pullup arb check sync, since:%s", terrstr()); - } - } } return NULL; @@ -1076,6 +1078,11 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { return 0; } +int64_t mndGetRoleTimeMs(SMnode *pMnode) { + SSyncState state = syncGetState(pMnode->syncMgmt.sync); + return state.roleTimeMs; +} + void mndSetRestored(SMnode *pMnode, bool restored) { if (restored) { taosThreadRwlockWrlock(&pMnode->lock); From 4c47626baf315221d36e5c407ae149d79aad39dd Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 28 May 2024 16:29:18 +0800 Subject: [PATCH 3/6] fix: arb distinguish between isSync and acked --- include/common/tmsg.h | 19 +++++++++----- source/common/src/systable.c | 1 + source/common/src/tmsg.c | 15 +++++++++++ source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndArbGroup.c | 32 ++++++++++++++++------- 5 files changed, 53 insertions(+), 15 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index be2f443140..6aa2cd8c36 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2361,12 +2361,19 @@ typedef struct { } SMArbUpdateGroupMember; typedef struct { - int32_t vgId; - int64_t dbUid; - SMArbUpdateGroupMember members[2]; - int8_t isSync; - SMArbUpdateGroupMember assignedLeader; - int64_t version; + int32_t dnodeId; + char* token; + int8_t acked; +} SMArbUpdateGroupAssigned; + +typedef struct { + int32_t vgId; + int64_t dbUid; + SMArbUpdateGroupMember members[2]; + int8_t isSync; + int8_t assignedAcked; + SMArbUpdateGroupAssigned assignedLeader; + int64_t version; } SMArbUpdateGroup; typedef struct { diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 9de682dd3a..95b18705cd 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -76,6 +76,7 @@ static const SSysDbTableSchema arbGroupsSchema[] = { {.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, + {.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, }; static const SSysDbTableSchema clusterSchema[] = { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ef37a41fcf..ad1243f21b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6460,6 +6460,11 @@ int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdat if (tEncodeI64(&encoder, pGroup->version) < 0) return -1; } + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i); + if (tEncodeI8(&encoder, pGroup->assignedLeader.acked) < 0) return -1; + } + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -6492,8 +6497,18 @@ int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpd group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1; if (tDecodeI64(&decoder, &group.version) < 0) return -1; + group.assignedLeader.acked = false; + taosArrayPush(updateArray, &group); } + + if (!tDecodeIsEnd(&decoder)) { + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pGroup = taosArrayGet(updateArray, i); + if (tDecodeI8(&decoder, &pGroup->assignedLeader.acked) < 0) return -1; + } + } + pReq->updateArray = updateArray; tEndDecode(&decoder); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 0c40622d08..5c21e9b22b 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -255,6 +255,7 @@ typedef struct { typedef struct { int32_t dnodeId; char token[TSDB_ARB_TOKEN_SIZE]; + int8_t acked; } SArbAssignedLeader; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index b00da9ba3f..6a6b3d2daa 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -25,7 +25,7 @@ #include "mndVgroup.h" #define ARBGROUP_VER_NUMBER 1 -#define ARBGROUP_RESERVE_SIZE 64 +#define ARBGROUP_RESERVE_SIZE 63 static SHashObj *arbUpdateHash = NULL; @@ -129,6 +129,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) { SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER) SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER) + SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER) SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER) @@ -182,6 +183,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER) SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER) + SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER) pGroup->mutexInited = false; @@ -235,6 +237,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p pOld->isSync = pNew->isSync; pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId; memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); + pOld->assignedLeader.acked = pNew->assignedLeader.acked; pOld->version++; _OVER: @@ -565,8 +568,8 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader; int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId; - // 1. has assigned && is sync => send req - if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) { + // 1. has assigned && is sync && no response => send req + if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true && pAssignedLeader->acked == false) { (void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term, pAssignedLeader->token); mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId); @@ -658,6 +661,7 @@ static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) outGroup->isSync = pGroup->isSync; outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId; outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer + outGroup->assignedLeader.acked = pGroup->assignedLeader.acked; outGroup->version = pGroup->version; } @@ -773,6 +777,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) { newGroup.isSync = pUpdateGroup->isSync; newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId; memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); + newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked; newGroup.version = pUpdateGroup->version; SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); @@ -790,10 +795,10 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) { goto _OVER; } - mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", + mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]", pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token, newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync, - newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token); + newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked); sdbRelease(pMnode->pSdb, pOldGroup); } @@ -826,11 +831,13 @@ static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) { pGroup->assignedLeader.dnodeId = pMember->info.dnodeId; strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE); + pGroup->assignedLeader.acked = false; } static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) { pGroup->assignedLeader.dnodeId = 0; memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE); + pGroup->assignedLeader.acked = false; } static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) { @@ -841,10 +848,10 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) { goto _OVER; } - mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", + mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]", pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token, pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId, - pNew->assignedLeader.token); + pNew->assignedLeader.token, pNew->assignedLeader.acked); mndTransAddArbGroupId(pTrans, pNew->vgId); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -1110,11 +1117,12 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char goto _OVER; } - if (pGroup->isSync) { + if (pGroup->assignedLeader.acked == false) { mndArbGroupDupObj(pGroup, pNewGroup); pNewGroup->isSync = false; + pNewGroup->assignedLeader.acked = true; - mInfo("vgId:%d, arb isSync is setting to false", vgId); + mInfo("vgId:%d, arb received assigned ack", vgId); updateAssigned = true; goto _OVER; } @@ -1224,12 +1232,18 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)token, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false); } else { pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetNULL(pColInfo, numOfRows); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetNULL(pColInfo, numOfRows); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetNULL(pColInfo, numOfRows); } taosThreadMutexUnlock(&pGroup->mutex); From 4bfddf5020e0532c12f90ab548f892a8dd83c6b7 Mon Sep 17 00:00:00 2001 From: Shungang Li Date: Tue, 28 May 2024 19:09:56 +0800 Subject: [PATCH 4/6] fix: ci test case --- tests/system-test/0-others/information_schema.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 944b2fbb1e..35f5e39214 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,7 +222,7 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(254, 255)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(255, 256)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) @@ -336,7 +336,7 @@ class TDTestCase: tdSql.checkEqual(True, result[i][1] in key_status_list[1]) index += 1 tdSql.checkEqual(True, index > 0) - + tdSql.query(f'show encryptions') result = tdSql.queryResult index = 0 @@ -344,7 +344,7 @@ class TDTestCase: tdSql.checkEqual(True, result[i][1] in key_status_list[1]) index += 1 tdSql.checkEqual(True, index > 0) - + # loaded/sm4 tdSql.execute('drop database if exists db2') tdSql.execute('create encrypt_key \'12345678\'') @@ -357,7 +357,7 @@ class TDTestCase: tdSql.checkEqual(True, result[i][1] in key_status_list[3]) index += 1 tdSql.checkEqual(True, index > 0) - + tdSql.query(f'show encryptions') result = tdSql.queryResult index = 0 From 88a2c373b35aa849c8c552df4a6add0aa08fd5df Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 30 May 2024 10:22:26 +0800 Subject: [PATCH 5/6] more fix --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 80 ++++++++--------------- 1 file changed, 29 insertions(+), 51 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index f6082e60e3..551d4c62b2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -18,7 +18,6 @@ // extern dependencies typedef struct { int32_t fid; - bool hasDataToCommit; STFileSet *fset; } SFileSetCommitInfo; @@ -512,36 +511,25 @@ _exit: return code; } -static int32_t tsdbCommitInfoAdd(STsdb *tsdb, const SFileSetCommitInfo *info) { +static int32_t tsdbCommitInfoAdd(STsdb *tsdb, int32_t fid) { int32_t code = 0; int32_t lino = 0; SFileSetCommitInfo *tinfo; - vHashGet(tsdb->commitInfo->ht, info, (void **)&tinfo); - if (tinfo) { - if (info->hasDataToCommit && !tinfo->hasDataToCommit) { - tinfo->hasDataToCommit = true; - } - } else { - if ((tinfo = taosMemoryCalloc(1, sizeof(*tinfo))) == NULL) { - TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); - } - tinfo->fid = info->fid; - tinfo->hasDataToCommit = info->hasDataToCommit; - if (info->fset) { - code = tsdbTFileSetInitCopy(tsdb, info->fset, &tinfo->fset); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = vHashPut(tsdb->commitInfo->ht, tinfo); - TSDB_CHECK_CODE(code, lino, _exit); - - if ((taosArrayPush(tsdb->commitInfo->arr, &tinfo)) == NULL) { - TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); - } - taosArraySort(tsdb->commitInfo->arr, tFileSetCommitInfoPCompare); + if ((tinfo = taosMemoryMalloc(sizeof(*tinfo))) == NULL) { + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); } + tinfo->fid = fid; + tinfo->fset = NULL; + + code = vHashPut(tsdb->commitInfo->ht, tinfo); + TSDB_CHECK_CODE(code, lino, _exit); + + if ((taosArrayPush(tsdb->commitInfo->arr, &tinfo)) == NULL) { + TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit); + } + taosArraySort(tsdb->commitInfo->arr, tFileSetCommitInfoPCompare); _exit: if (code) { @@ -585,12 +573,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) { fid = tsdbKeyFid(TSDBROW_TS(row), tsdb->keepCfg.days, tsdb->keepCfg.precision); tsdbFidKeyRange(fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); - SFileSetCommitInfo info = { - .fid = fid, - .hasDataToCommit = true, - .fset = NULL, - }; - code = tsdbCommitInfoAdd(tsdb, &info); + code = tsdbCommitInfoAdd(tsdb, fid); TSDB_CHECK_CODE(code, lino, _exit); from.key.ts = maxKey + 1; @@ -599,18 +582,13 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) { taosThreadMutexLock(&tsdb->mutex); - // copy existing file set - for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { - SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); - tsdbFSGetFSet(tsdb->pFS, info->fid, &fset); - if (fset) { - tsdbTFileSetInitCopy(tsdb, fset, &info->fset); - } - } - // scan tomb data if (tsdb->imem->nDel > 0) { TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { + if (tsdbTFileSetIsEmpty(fset)) { + continue; + } + SFileSetCommitInfo *info; SFileSetCommitInfo tinfo = { .fid = fset->fid, @@ -623,6 +601,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) { } int64_t minKey, maxKey; + bool hasDataToCommit = false; tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1); for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) { @@ -631,10 +610,8 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) { if (pDelData->sKey > maxKey || pDelData->eKey < minKey) { continue; } else { - tinfo.fid = fset->fid; - tinfo.hasDataToCommit = true; - tinfo.fset = fset; - if ((code = tsdbCommitInfoAdd(tsdb, &tinfo))) { + hasDataToCommit = true; + if ((code = tsdbCommitInfoAdd(tsdb, fset->fid))) { taosThreadMutexUnlock(&tsdb->mutex); TSDB_CHECK_CODE(code, lino, _exit); } @@ -642,7 +619,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) { } } - if (tinfo.hasDataToCommit) { + if (hasDataToCommit) { break; } } @@ -653,6 +630,9 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) { for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset); + if (fset) { + tsdbTFileSetInitCopy(tsdb, fset, &info->fset); + } } taosThreadMutexUnlock(&tsdb->mutex); @@ -756,10 +736,8 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) { for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { committer.ctx->info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); - if (committer.ctx->info->hasDataToCommit) { - code = tsdbCommitFileSet(&committer); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tsdbCommitFileSet(&committer); + TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbCloseCommitter(&committer, code); @@ -792,7 +770,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) { for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); - if (info->hasDataToCommit && info->fset) { + if (info->fset) { tsdbFinishTaskOnFileSet(tsdb, info->fid); } } @@ -824,7 +802,7 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) { taosThreadMutexLock(&pTsdb->mutex); for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) { SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i); - if (info->hasDataToCommit && info->fset) { + if (info->fset) { tsdbFinishTaskOnFileSet(pTsdb, info->fid); } } From 91394f2e6dabc2bf4e61b85ccc47c6ae805f9b93 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 30 May 2024 11:01:44 +0800 Subject: [PATCH 6/6] fix more --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 551d4c62b2..ffe5c2c1e0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -573,8 +573,15 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) { fid = tsdbKeyFid(TSDBROW_TS(row), tsdb->keepCfg.days, tsdb->keepCfg.precision); tsdbFidKeyRange(fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); - code = tsdbCommitInfoAdd(tsdb, fid); - TSDB_CHECK_CODE(code, lino, _exit); + SFileSetCommitInfo *info; + SFileSetCommitInfo tinfo = { + .fid = fid, + }; + vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info); + if (info == NULL) { + code = tsdbCommitInfoAdd(tsdb, fid); + TSDB_CHECK_CODE(code, lino, _exit); + } from.key.ts = maxKey + 1; }