Merge branch '3.0' into fix/3_liaohj

This commit is contained in:
Haojun Liao 2024-05-31 14:27:46 +08:00 committed by GitHub
commit bef1953b24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 123 additions and 38 deletions

View File

@ -2361,12 +2361,19 @@ typedef struct {
} SMArbUpdateGroupMember;
typedef struct {
int32_t vgId;
int64_t dbUid;
SMArbUpdateGroupMember members[2];
int8_t isSync;
SMArbUpdateGroupMember assignedLeader;
int64_t version;
int32_t dnodeId;
char* token;
int8_t acked;
} SMArbUpdateGroupAssigned;
typedef struct {
int32_t vgId;
int64_t dbUid;
SMArbUpdateGroupMember members[2];
int8_t isSync;
int8_t assignedAcked;
SMArbUpdateGroupAssigned assignedLeader;
int64_t version;
} SMArbUpdateGroup;
typedef struct {

View File

@ -101,6 +101,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
*/
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
int64_t mndGetRoleTimeMs(SMnode *pMnode);
/**
* @brief Process the rpc, sync request.
*

View File

@ -183,6 +183,7 @@ typedef struct SProjectLogicNode {
char stmtName[TSDB_TABLE_NAME_LEN];
bool ignoreGroupId;
bool inputIgnoreGroup;
bool isSetOpProj;
} SProjectLogicNode;
typedef struct SIndefRowsFuncLogicNode {

View File

@ -76,6 +76,7 @@ static const SSysDbTableSchema arbGroupsSchema[] = {
{.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
};
static const SSysDbTableSchema clusterSchema[] = {

View File

@ -6460,6 +6460,11 @@ int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdat
if (tEncodeI64(&encoder, pGroup->version) < 0) return -1;
}
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
if (tEncodeI8(&encoder, pGroup->assignedLeader.acked) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -6492,8 +6497,18 @@ int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpd
group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1;
if (tDecodeI64(&decoder, &group.version) < 0) return -1;
group.assignedLeader.acked = false;
taosArrayPush(updateArray, &group);
}
if (!tDecodeIsEnd(&decoder)) {
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(updateArray, i);
if (tDecodeI8(&decoder, &pGroup->assignedLeader.acked) < 0) return -1;
}
}
pReq->updateArray = updateArray;
tEndDecode(&decoder);

View File

@ -255,6 +255,7 @@ typedef struct {
typedef struct {
int32_t dnodeId;
char token[TSDB_ARB_TOKEN_SIZE];
int8_t acked;
} SArbAssignedLeader;
typedef struct {

View File

@ -25,7 +25,7 @@
#include "mndVgroup.h"
#define ARBGROUP_VER_NUMBER 1
#define ARBGROUP_RESERVE_SIZE 64
#define ARBGROUP_RESERVE_SIZE 63
static SHashObj *arbUpdateHash = NULL;
@ -129,6 +129,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
@ -182,6 +183,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
pGroup->mutexInited = false;
@ -235,6 +237,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
pOld->isSync = pNew->isSync;
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
pOld->assignedLeader.acked = pNew->assignedLeader.acked;
pOld->version++;
_OVER:
@ -540,6 +543,14 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
return -1;
}
int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
int64_t nowMs = taosGetTimestampMs();
if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs,
nowMs);
return 0;
}
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
while (1) {
@ -551,15 +562,14 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
taosThreadMutexUnlock(&pArbGroup->mutex);
int32_t vgId = arbGroupDup.vgId;
int64_t nowMs = taosGetTimestampMs();
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader;
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
// 1. has assigned && is sync => send req
if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) {
// 1. has assigned && is sync && no response => send req
if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true && pAssignedLeader->acked == false) {
(void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term,
pAssignedLeader->token);
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId);
@ -651,6 +661,7 @@ static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup)
outGroup->isSync = pGroup->isSync;
outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
outGroup->version = pGroup->version;
}
@ -766,6 +777,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
newGroup.isSync = pUpdateGroup->isSync;
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
newGroup.version = pUpdateGroup->version;
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
@ -783,10 +795,10 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
goto _OVER;
}
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]",
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token);
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
sdbRelease(pMnode->pSdb, pOldGroup);
}
@ -819,11 +831,13 @@ static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
pGroup->assignedLeader.acked = false;
}
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
pGroup->assignedLeader.dnodeId = 0;
memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
pGroup->assignedLeader.acked = false;
}
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
@ -834,10 +848,10 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
goto _OVER;
}
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]",
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token,
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
pNew->assignedLeader.token);
pNew->assignedLeader.token, pNew->assignedLeader.acked);
mndTransAddArbGroupId(pTrans, pNew->vgId);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -1103,11 +1117,12 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char
goto _OVER;
}
if (pGroup->isSync) {
if (pGroup->assignedLeader.acked == false) {
mndArbGroupDupObj(pGroup, pNewGroup);
pNewGroup->isSync = false;
pNewGroup->assignedLeader.acked = true;
mInfo("vgId:%d, arb isSync is setting to false", vgId);
mInfo("vgId:%d, arb received assigned ack", vgId);
updateAssigned = true;
goto _OVER;
}
@ -1217,12 +1232,18 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)token, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false);
} else {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
}
taosThreadMutexUnlock(&pGroup->mutex);

View File

@ -310,6 +310,8 @@ static int32_t minCronTime() {
min = TMIN(min, tsStreamCheckpointInterval);
min = TMIN(min, 6); // checkpointRemain
min = TMIN(min, tsStreamNodeCheckInterval);
min = TMIN(min, tsArbHeartBeatIntervalSec);
min = TMIN(min, tsArbCheckSyncIntervalSec);
int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
min = TMIN(min, telemInt);
@ -362,6 +364,18 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
if (sec % tsUptimeInterval == 0) {
mndIncreaseUpTime(pMnode);
}
if (sec % (tsArbHeartBeatIntervalSec) == 0) {
if (mndPullupArbHeartbeat(pMnode) != 0) {
mError("failed to pullup arb heartbeat, since:%s", terrstr());
}
}
if (sec % (tsArbCheckSyncIntervalSec) == 0) {
if (mndPullupArbCheckSync(pMnode) != 0) {
mError("failed to pullup arb check sync, since:%s", terrstr());
}
}
}
void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) {
if (sec % (tsStatusInterval * 5) == 0) {
@ -394,18 +408,6 @@ static void *mndThreadFp(void *param) {
continue;
}
mndDoTimerPullupTask(pMnode, sec);
if (sec % (tsArbHeartBeatIntervalSec) == 0) {
if (mndPullupArbHeartbeat(pMnode) != 0) {
mError("failed to pullup arb heartbeat, since:%s", terrstr());
}
}
if (sec % (tsArbCheckSyncIntervalSec) == 0) {
if (mndPullupArbCheckSync(pMnode) != 0) {
mError("failed to pullup arb check sync, since:%s", terrstr());
}
}
}
return NULL;
@ -1048,6 +1050,11 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
return 0;
}
int64_t mndGetRoleTimeMs(SMnode *pMnode) {
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
return state.roleTimeMs;
}
void mndSetRestored(SMnode *pMnode, bool restored) {
if (restored) {
taosThreadRwlockWrlock(&pMnode->lock);

View File

@ -1587,6 +1587,7 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator
TSWAP(pProject->node.pLimit, pSetOperator->pLimit);
}
pProject->ignoreGroupId = true;
pProject->isSetOpProj = true;
int32_t code = TSDB_CODE_SUCCESS;

View File

@ -3284,18 +3284,34 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
SNodeList* pNewChildTargets = nodesMakeList();
if (NULL == pProjectNode->node.pParent) {
SNode* pProjection = NULL;
FOREACH(pProjection, pProjectNode->pProjections) {
SNode* pChildTarget = NULL;
FOREACH(pChildTarget, pChild->pTargets) {
if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) {
nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget));
SNode *pProjection = NULL, *pChildTarget = NULL;
bool needOrderMatch = QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pChild) && ((SProjectLogicNode*)pChild)->isSetOpProj;
bool orderMatch = true;
if (needOrderMatch) {
// For sql: select ... from (select ... union all select ...);
// When eliminating the outer proj (the outer select), we have to make sure that the outer proj projections and
// union all project targets have same columns in the same order. See detail in TD-30188
FORBOTH(pProjection, pProjectNode->pProjections, pChildTarget, pChild->pTargets) {
if (!pProjection) break;
if (0 != strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) {
orderMatch = false;
break;
}
nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget));
}
} else {
FOREACH(pProjection, pProjectNode->pProjections) {
FOREACH(pChildTarget, pChild->pTargets) {
if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) {
nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget));
break;
}
}
}
}
if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets)) {
if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets) &&
(!needOrderMatch || (needOrderMatch && orderMatch))) {
nodesDestroyList(pChild->pTargets);
pChild->pTargets = pNewChildTargets;
} else {

View File

@ -336,7 +336,7 @@ class TDTestCase:
tdSql.checkEqual(True, result[i][1] in key_status_list[1])
index += 1
tdSql.checkEqual(True, index > 0)
tdSql.query(f'show encryptions')
result = tdSql.queryResult
index = 0
@ -344,7 +344,7 @@ class TDTestCase:
tdSql.checkEqual(True, result[i][1] in key_status_list[1])
index += 1
tdSql.checkEqual(True, index > 0)
# loaded/sm4
tdSql.execute('drop database if exists db2')
tdSql.execute('create encrypt_key \'12345678\'')
@ -357,7 +357,7 @@ class TDTestCase:
tdSql.checkEqual(True, result[i][1] in key_status_list[3])
index += 1
tdSql.checkEqual(True, index > 0)
tdSql.query(f'show encryptions')
result = tdSql.queryResult
index = 0

View File

@ -240,9 +240,22 @@ class TDTestCase:
tdSql.error( " '' union all select c1 from ct1 " )
# tdSql.error( "select c1 from ct1 union select c1 from ct2 union select c1 from ct4 ")
def test_select_from_union_all(self):
tdSql.query('select c8, ts from ((select ts, c8,c1 from stb1 order by c1) union all select ts, c8, c1 from stb1 limit 15)')
tdSql.checkRows(15)
tdSql.query('select c8, ts from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)')
tdSql.checkRows(15)
tdSql.query('select ts, c1 from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)')
tdSql.checkRows(15)
tdSql.query('select ts, c1, c8 from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)')
tdSql.checkRows(15)
tdSql.query('select ts, c8, c1, 123 from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)')
tdSql.checkRows(15)
def all_test(self):
self.__test_error()
self.union_check()
self.test_select_from_union_all()
def __create_tb(self):