Merge branch '3.0' into feat/TD-30006

This commit is contained in:
Yihao Deng 2024-05-31 02:49:58 +00:00
commit 5293c4a1dd
13 changed files with 160 additions and 90 deletions

View File

@ -2362,12 +2362,19 @@ typedef struct {
} SMArbUpdateGroupMember; } SMArbUpdateGroupMember;
typedef struct { typedef struct {
int32_t vgId; int32_t dnodeId;
int64_t dbUid; char* token;
SMArbUpdateGroupMember members[2]; int8_t acked;
int8_t isSync; } SMArbUpdateGroupAssigned;
SMArbUpdateGroupMember assignedLeader;
int64_t version; typedef struct {
int32_t vgId;
int64_t dbUid;
SMArbUpdateGroupMember members[2];
int8_t isSync;
int8_t assignedAcked;
SMArbUpdateGroupAssigned assignedLeader;
int64_t version;
} SMArbUpdateGroup; } SMArbUpdateGroup;
typedef struct { typedef struct {

View File

@ -101,6 +101,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
*/ */
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
int64_t mndGetRoleTimeMs(SMnode *pMnode);
/** /**
* @brief Process the rpc, sync request. * @brief Process the rpc, sync request.
* *

View File

@ -183,6 +183,7 @@ typedef struct SProjectLogicNode {
char stmtName[TSDB_TABLE_NAME_LEN]; char stmtName[TSDB_TABLE_NAME_LEN];
bool ignoreGroupId; bool ignoreGroupId;
bool inputIgnoreGroup; bool inputIgnoreGroup;
bool isSetOpProj;
} SProjectLogicNode; } SProjectLogicNode;
typedef struct SIndefRowsFuncLogicNode { typedef struct SIndefRowsFuncLogicNode {

View File

@ -76,6 +76,7 @@ static const SSysDbTableSchema arbGroupsSchema[] = {
{.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "is_sync", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, {.name = "assigned_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
{.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true}, {.name = "assigned_token", .bytes = TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
{.name = "assigned_acked", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
}; };
static const SSysDbTableSchema clusterSchema[] = { static const SSysDbTableSchema clusterSchema[] = {

View File

@ -6486,6 +6486,11 @@ int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdat
if (tEncodeI64(&encoder, pGroup->version) < 0) return -1; if (tEncodeI64(&encoder, pGroup->version) < 0) return -1;
} }
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
if (tEncodeI8(&encoder, pGroup->assignedLeader.acked) < 0) return -1;
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -6518,8 +6523,18 @@ int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpd
group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1; if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1;
if (tDecodeI64(&decoder, &group.version) < 0) return -1; if (tDecodeI64(&decoder, &group.version) < 0) return -1;
group.assignedLeader.acked = false;
taosArrayPush(updateArray, &group); taosArrayPush(updateArray, &group);
} }
if (!tDecodeIsEnd(&decoder)) {
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(updateArray, i);
if (tDecodeI8(&decoder, &pGroup->assignedLeader.acked) < 0) return -1;
}
}
pReq->updateArray = updateArray; pReq->updateArray = updateArray;
tEndDecode(&decoder); tEndDecode(&decoder);

View File

@ -255,6 +255,7 @@ typedef struct {
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
char token[TSDB_ARB_TOKEN_SIZE]; char token[TSDB_ARB_TOKEN_SIZE];
int8_t acked;
} SArbAssignedLeader; } SArbAssignedLeader;
typedef struct { typedef struct {

View File

@ -25,7 +25,7 @@
#include "mndVgroup.h" #include "mndVgroup.h"
#define ARBGROUP_VER_NUMBER 1 #define ARBGROUP_VER_NUMBER 1
#define ARBGROUP_RESERVE_SIZE 64 #define ARBGROUP_RESERVE_SIZE 63
static SHashObj *arbUpdateHash = NULL; static SHashObj *arbUpdateHash = NULL;
@ -129,6 +129,7 @@ SSdbRaw *mndArbGroupActionEncode(SArbGroup *pGroup) {
SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER) SDB_SET_INT32(pRaw, dataPos, pLeader->dnodeId, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) SDB_SET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER) SDB_SET_INT64(pRaw, dataPos, pGroup->version, _OVER)
SDB_SET_INT8(pRaw, dataPos, pLeader->acked, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, ARBGROUP_RESERVE_SIZE, _OVER)
@ -182,6 +183,7 @@ SSdbRow *mndArbGroupActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER) SDB_GET_INT32(pRaw, dataPos, &pLeader->dnodeId, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER) SDB_GET_BINARY(pRaw, dataPos, pLeader->token, TSDB_ARB_TOKEN_SIZE, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER) SDB_GET_INT64(pRaw, dataPos, &pGroup->version, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pLeader->acked, _OVER)
pGroup->mutexInited = false; pGroup->mutexInited = false;
@ -235,6 +237,7 @@ static int32_t mndArbGroupActionUpdate(SSdb *pSdb, SArbGroup *pOld, SArbGroup *p
pOld->isSync = pNew->isSync; pOld->isSync = pNew->isSync;
pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId; pOld->assignedLeader.dnodeId = pNew->assignedLeader.dnodeId;
memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); memcpy(pOld->assignedLeader.token, pNew->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
pOld->assignedLeader.acked = pNew->assignedLeader.acked;
pOld->version++; pOld->version++;
_OVER: _OVER:
@ -540,6 +543,14 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
return -1; return -1;
} }
int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
int64_t nowMs = taosGetTimestampMs();
if (nowMs - roleTimeMs < tsArbHeartBeatIntervalSec * 1000 * 2) {
mInfo("arb skip to check sync since mnd had just switch over, roleTime:%" PRId64 " now:%" PRId64, roleTimeMs,
nowMs);
return 0;
}
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup)); SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
while (1) { while (1) {
@ -551,15 +562,14 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
taosThreadMutexUnlock(&pArbGroup->mutex); taosThreadMutexUnlock(&pArbGroup->mutex);
int32_t vgId = arbGroupDup.vgId; int32_t vgId = arbGroupDup.vgId;
int64_t nowMs = taosGetTimestampMs();
bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs); bool member0IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 0, nowMs);
bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs); bool member1IsTimeout = mndCheckArbMemberHbTimeout(&arbGroupDup, 1, nowMs);
SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader; SArbAssignedLeader *pAssignedLeader = &arbGroupDup.assignedLeader;
int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId; int32_t currentAssignedDnodeId = pAssignedLeader->dnodeId;
// 1. has assigned && is sync => send req // 1. has assigned && is sync && no response => send req
if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true) { if (currentAssignedDnodeId != 0 && arbGroupDup.isSync == true && pAssignedLeader->acked == false) {
(void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term, (void)mndSendArbSetAssignedLeaderReq(pMnode, currentAssignedDnodeId, vgId, arbToken, term,
pAssignedLeader->token); pAssignedLeader->token);
mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId); mInfo("vgId:%d, arb send set assigned leader to dnodeId:%d", vgId, currentAssignedDnodeId);
@ -651,6 +661,7 @@ static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup)
outGroup->isSync = pGroup->isSync; outGroup->isSync = pGroup->isSync;
outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId; outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
outGroup->assignedLeader.acked = pGroup->assignedLeader.acked;
outGroup->version = pGroup->version; outGroup->version = pGroup->version;
} }
@ -766,6 +777,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
newGroup.isSync = pUpdateGroup->isSync; newGroup.isSync = pUpdateGroup->isSync;
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId; newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
newGroup.assignedLeader.acked = pUpdateGroup->assignedLeader.acked;
newGroup.version = pUpdateGroup->version; newGroup.version = pUpdateGroup->version;
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
@ -783,10 +795,10 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token, pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync, newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token); newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token, newGroup.assignedLeader.acked);
sdbRelease(pMnode->pSdb, pOldGroup); sdbRelease(pMnode->pSdb, pOldGroup);
} }
@ -819,11 +831,13 @@ static void mndArbGroupSetAssignedLeader(SArbGroup *pGroup, int32_t index) {
pGroup->assignedLeader.dnodeId = pMember->info.dnodeId; pGroup->assignedLeader.dnodeId = pMember->info.dnodeId;
strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE); strncpy(pGroup->assignedLeader.token, pMember->state.token, TSDB_ARB_TOKEN_SIZE);
pGroup->assignedLeader.acked = false;
} }
static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) { static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
pGroup->assignedLeader.dnodeId = 0; pGroup->assignedLeader.dnodeId = 0;
memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE); memset(pGroup->assignedLeader.token, 0, TSDB_ARB_TOKEN_SIZE);
pGroup->assignedLeader.acked = false;
} }
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) { static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
@ -834,10 +848,10 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
goto _OVER; goto _OVER;
} }
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s][%d]",
pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token, pTrans->id, pNew->vgId, pNew->members[0].info.dnodeId, pNew->members[0].state.token,
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId, pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
pNew->assignedLeader.token); pNew->assignedLeader.token, pNew->assignedLeader.acked);
mndTransAddArbGroupId(pTrans, pNew->vgId); mndTransAddArbGroupId(pTrans, pNew->vgId);
if (mndTransCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -1103,11 +1117,12 @@ bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char
goto _OVER; goto _OVER;
} }
if (pGroup->isSync) { if (pGroup->assignedLeader.acked == false) {
mndArbGroupDupObj(pGroup, pNewGroup); mndArbGroupDupObj(pGroup, pNewGroup);
pNewGroup->isSync = false; pNewGroup->isSync = false;
pNewGroup->assignedLeader.acked = true;
mInfo("vgId:%d, arb isSync is setting to false", vgId); mInfo("vgId:%d, arb received assigned ack", vgId);
updateAssigned = true; updateAssigned = true;
goto _OVER; goto _OVER;
} }
@ -1217,12 +1232,18 @@ static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE); STR_WITH_MAXSIZE_TO_VARSTR(token, pGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE + VARSTR_HEADER_SIZE);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)token, false); colDataSetVal(pColInfo, numOfRows, (const char *)token, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pGroup->assignedLeader.acked, false);
} else { } else {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows); colDataSetNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows); colDataSetNULL(pColInfo, numOfRows);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
} }
taosThreadMutexUnlock(&pGroup->mutex); taosThreadMutexUnlock(&pGroup->mutex);

View File

@ -334,6 +334,8 @@ static int32_t minCronTime() {
min = TMIN(min, tsStreamCheckpointInterval); min = TMIN(min, tsStreamCheckpointInterval);
min = TMIN(min, 6); // checkpointRemain min = TMIN(min, 6); // checkpointRemain
min = TMIN(min, tsStreamNodeCheckInterval); min = TMIN(min, tsStreamNodeCheckInterval);
min = TMIN(min, tsArbHeartBeatIntervalSec);
min = TMIN(min, tsArbCheckSyncIntervalSec);
int64_t telemInt = TMIN(60, (tsTelemInterval - 1)); int64_t telemInt = TMIN(60, (tsTelemInterval - 1));
min = TMIN(min, telemInt); min = TMIN(min, telemInt);
@ -390,6 +392,18 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
if (sec % tsUptimeInterval == 0) { if (sec % tsUptimeInterval == 0) {
mndIncreaseUpTime(pMnode); mndIncreaseUpTime(pMnode);
} }
if (sec % (tsArbHeartBeatIntervalSec) == 0) {
if (mndPullupArbHeartbeat(pMnode) != 0) {
mError("failed to pullup arb heartbeat, since:%s", terrstr());
}
}
if (sec % (tsArbCheckSyncIntervalSec) == 0) {
if (mndPullupArbCheckSync(pMnode) != 0) {
mError("failed to pullup arb check sync, since:%s", terrstr());
}
}
} }
void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) { void mndDoTimerCheckTask(SMnode *pMnode, int64_t sec) {
if (sec % (tsStatusInterval * 5) == 0) { if (sec % (tsStatusInterval * 5) == 0) {
@ -421,18 +435,6 @@ static void *mndThreadFp(void *param) {
continue; continue;
} }
mndDoTimerPullupTask(pMnode, sec); mndDoTimerPullupTask(pMnode, sec);
if (sec % (tsArbHeartBeatIntervalSec) == 0) {
if (mndPullupArbHeartbeat(pMnode) != 0) {
mError("failed to pullup arb heartbeat, since:%s", terrstr());
}
}
if (sec % (tsArbCheckSyncIntervalSec) == 0) {
if (mndPullupArbCheckSync(pMnode) != 0) {
mError("failed to pullup arb check sync, since:%s", terrstr());
}
}
} }
return NULL; return NULL;
@ -1076,6 +1078,11 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
return 0; return 0;
} }
int64_t mndGetRoleTimeMs(SMnode *pMnode) {
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
return state.roleTimeMs;
}
void mndSetRestored(SMnode *pMnode, bool restored) { void mndSetRestored(SMnode *pMnode, bool restored) {
if (restored) { if (restored) {
taosThreadRwlockWrlock(&pMnode->lock); taosThreadRwlockWrlock(&pMnode->lock);

View File

@ -18,7 +18,6 @@
// extern dependencies // extern dependencies
typedef struct { typedef struct {
int32_t fid; int32_t fid;
bool hasDataToCommit;
STFileSet *fset; STFileSet *fset;
} SFileSetCommitInfo; } SFileSetCommitInfo;
@ -512,36 +511,25 @@ _exit:
return code; return code;
} }
static int32_t tsdbCommitInfoAdd(STsdb *tsdb, const SFileSetCommitInfo *info) { static int32_t tsdbCommitInfoAdd(STsdb *tsdb, int32_t fid) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SFileSetCommitInfo *tinfo; SFileSetCommitInfo *tinfo;
vHashGet(tsdb->commitInfo->ht, info, (void **)&tinfo); if ((tinfo = taosMemoryMalloc(sizeof(*tinfo))) == NULL) {
if (tinfo) { TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
if (info->hasDataToCommit && !tinfo->hasDataToCommit) {
tinfo->hasDataToCommit = true;
}
} else {
if ((tinfo = taosMemoryCalloc(1, sizeof(*tinfo))) == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
tinfo->fid = info->fid;
tinfo->hasDataToCommit = info->hasDataToCommit;
if (info->fset) {
code = tsdbTFileSetInitCopy(tsdb, info->fset, &tinfo->fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = vHashPut(tsdb->commitInfo->ht, tinfo);
TSDB_CHECK_CODE(code, lino, _exit);
if ((taosArrayPush(tsdb->commitInfo->arr, &tinfo)) == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
taosArraySort(tsdb->commitInfo->arr, tFileSetCommitInfoPCompare);
} }
tinfo->fid = fid;
tinfo->fset = NULL;
code = vHashPut(tsdb->commitInfo->ht, tinfo);
TSDB_CHECK_CODE(code, lino, _exit);
if ((taosArrayPush(tsdb->commitInfo->arr, &tinfo)) == NULL) {
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
}
taosArraySort(tsdb->commitInfo->arr, tFileSetCommitInfoPCompare);
_exit: _exit:
if (code) { if (code) {
@ -585,13 +573,15 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
fid = tsdbKeyFid(TSDBROW_TS(row), tsdb->keepCfg.days, tsdb->keepCfg.precision); fid = tsdbKeyFid(TSDBROW_TS(row), tsdb->keepCfg.days, tsdb->keepCfg.precision);
tsdbFidKeyRange(fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); tsdbFidKeyRange(fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
SFileSetCommitInfo info = { SFileSetCommitInfo *info;
.fid = fid, SFileSetCommitInfo tinfo = {
.hasDataToCommit = true, .fid = fid,
.fset = NULL,
}; };
code = tsdbCommitInfoAdd(tsdb, &info); vHashGet(tsdb->commitInfo->ht, &tinfo, (void **)&info);
TSDB_CHECK_CODE(code, lino, _exit); if (info == NULL) {
code = tsdbCommitInfoAdd(tsdb, fid);
TSDB_CHECK_CODE(code, lino, _exit);
}
from.key.ts = maxKey + 1; from.key.ts = maxKey + 1;
} }
@ -599,18 +589,13 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
taosThreadMutexLock(&tsdb->mutex); taosThreadMutexLock(&tsdb->mutex);
// copy existing file set
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
tsdbFSGetFSet(tsdb->pFS, info->fid, &fset);
if (fset) {
tsdbTFileSetInitCopy(tsdb, fset, &info->fset);
}
}
// scan tomb data // scan tomb data
if (tsdb->imem->nDel > 0) { if (tsdb->imem->nDel > 0) {
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) { TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
if (tsdbTFileSetIsEmpty(fset)) {
continue;
}
SFileSetCommitInfo *info; SFileSetCommitInfo *info;
SFileSetCommitInfo tinfo = { SFileSetCommitInfo tinfo = {
.fid = fset->fid, .fid = fset->fid,
@ -623,6 +608,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
} }
int64_t minKey, maxKey; int64_t minKey, maxKey;
bool hasDataToCommit = false;
tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1); iter = tRBTreeIterCreate(tsdb->imem->tbDataTree, 1);
for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) { for (SRBTreeNode *node = tRBTreeIterNext(&iter); node; node = tRBTreeIterNext(&iter)) {
@ -631,10 +617,8 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) { if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
continue; continue;
} else { } else {
tinfo.fid = fset->fid; hasDataToCommit = true;
tinfo.hasDataToCommit = true; if ((code = tsdbCommitInfoAdd(tsdb, fset->fid))) {
tinfo.fset = fset;
if ((code = tsdbCommitInfoAdd(tsdb, &tinfo))) {
taosThreadMutexUnlock(&tsdb->mutex); taosThreadMutexUnlock(&tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -642,7 +626,7 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
} }
} }
if (tinfo.hasDataToCommit) { if (hasDataToCommit) {
break; break;
} }
} }
@ -653,6 +637,9 @@ static int32_t tsdbCommitInfoBuild(STsdb *tsdb) {
for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { for (int i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset); tsdbBeginTaskOnFileSet(tsdb, info->fid, &fset);
if (fset) {
tsdbTFileSetInitCopy(tsdb, fset, &info->fset);
}
} }
taosThreadMutexUnlock(&tsdb->mutex); taosThreadMutexUnlock(&tsdb->mutex);
@ -756,10 +743,8 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
committer.ctx->info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); committer.ctx->info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
if (committer.ctx->info->hasDataToCommit) { code = tsdbCommitFileSet(&committer);
code = tsdbCommitFileSet(&committer); TSDB_CHECK_CODE(code, lino, _exit);
TSDB_CHECK_CODE(code, lino, _exit);
}
} }
code = tsdbCloseCommitter(&committer, code); code = tsdbCloseCommitter(&committer, code);
@ -792,7 +777,7 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(tsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(tsdb->commitInfo->arr, i);
if (info->hasDataToCommit && info->fset) { if (info->fset) {
tsdbFinishTaskOnFileSet(tsdb, info->fid); tsdbFinishTaskOnFileSet(tsdb, info->fid);
} }
} }
@ -824,7 +809,7 @@ int32_t tsdbCommitAbort(STsdb *pTsdb) {
taosThreadMutexLock(&pTsdb->mutex); taosThreadMutexLock(&pTsdb->mutex);
for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) { for (int32_t i = 0; i < taosArrayGetSize(pTsdb->commitInfo->arr); i++) {
SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i); SFileSetCommitInfo *info = *(SFileSetCommitInfo **)taosArrayGet(pTsdb->commitInfo->arr, i);
if (info->hasDataToCommit && info->fset) { if (info->fset) {
tsdbFinishTaskOnFileSet(pTsdb, info->fid); tsdbFinishTaskOnFileSet(pTsdb, info->fid);
} }
} }

View File

@ -1587,6 +1587,7 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator
TSWAP(pProject->node.pLimit, pSetOperator->pLimit); TSWAP(pProject->node.pLimit, pSetOperator->pLimit);
} }
pProject->ignoreGroupId = true; pProject->ignoreGroupId = true;
pProject->isSetOpProj = true;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;

View File

@ -3284,18 +3284,34 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
SNodeList* pNewChildTargets = nodesMakeList(); SNodeList* pNewChildTargets = nodesMakeList();
if (NULL == pProjectNode->node.pParent) { if (NULL == pProjectNode->node.pParent) {
SNode* pProjection = NULL; SNode *pProjection = NULL, *pChildTarget = NULL;
FOREACH(pProjection, pProjectNode->pProjections) { bool needOrderMatch = QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pChild) && ((SProjectLogicNode*)pChild)->isSetOpProj;
SNode* pChildTarget = NULL; bool orderMatch = true;
FOREACH(pChildTarget, pChild->pTargets) { if (needOrderMatch) {
if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) { // For sql: select ... from (select ... union all select ...);
nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget)); // When eliminating the outer proj (the outer select), we have to make sure that the outer proj projections and
// union all project targets have same columns in the same order. See detail in TD-30188
FORBOTH(pProjection, pProjectNode->pProjections, pChildTarget, pChild->pTargets) {
if (!pProjection) break;
if (0 != strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) {
orderMatch = false;
break; break;
} }
nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget));
}
} else {
FOREACH(pProjection, pProjectNode->pProjections) {
FOREACH(pChildTarget, pChild->pTargets) {
if (0 == strcmp(((SColumnNode*)pProjection)->colName, ((SColumnNode*)pChildTarget)->colName)) {
nodesListAppend(pNewChildTargets, nodesCloneNode(pChildTarget));
break;
}
}
} }
} }
if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets)) { if (eliminateProjOptCanChildConditionUseChildTargets(pChild, pNewChildTargets) &&
(!needOrderMatch || (needOrderMatch && orderMatch))) {
nodesDestroyList(pChild->pTargets); nodesDestroyList(pChild->pTargets);
pChild->pTargets = pNewChildTargets; pChild->pTargets = pNewChildTargets;
} else { } else {

View File

@ -222,7 +222,7 @@ class TDTestCase:
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdLog.info(len(tdSql.queryResult)) tdLog.info(len(tdSql.queryResult))
tdSql.checkEqual(True, len(tdSql.queryResult) in range(254, 255)) tdSql.checkEqual(True, len(tdSql.queryResult) in range(255, 256))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult)) tdSql.checkEqual(54, len(tdSql.queryResult))
@ -336,7 +336,7 @@ class TDTestCase:
tdSql.checkEqual(True, result[i][1] in key_status_list[1]) tdSql.checkEqual(True, result[i][1] in key_status_list[1])
index += 1 index += 1
tdSql.checkEqual(True, index > 0) tdSql.checkEqual(True, index > 0)
tdSql.query(f'show encryptions') tdSql.query(f'show encryptions')
result = tdSql.queryResult result = tdSql.queryResult
index = 0 index = 0
@ -344,7 +344,7 @@ class TDTestCase:
tdSql.checkEqual(True, result[i][1] in key_status_list[1]) tdSql.checkEqual(True, result[i][1] in key_status_list[1])
index += 1 index += 1
tdSql.checkEqual(True, index > 0) tdSql.checkEqual(True, index > 0)
# loaded/sm4 # loaded/sm4
tdSql.execute('drop database if exists db2') tdSql.execute('drop database if exists db2')
tdSql.execute('create encrypt_key \'12345678\'') tdSql.execute('create encrypt_key \'12345678\'')
@ -357,7 +357,7 @@ class TDTestCase:
tdSql.checkEqual(True, result[i][1] in key_status_list[3]) tdSql.checkEqual(True, result[i][1] in key_status_list[3])
index += 1 index += 1
tdSql.checkEqual(True, index > 0) tdSql.checkEqual(True, index > 0)
tdSql.query(f'show encryptions') tdSql.query(f'show encryptions')
result = tdSql.queryResult result = tdSql.queryResult
index = 0 index = 0

View File

@ -240,9 +240,22 @@ class TDTestCase:
tdSql.error( " '' union all select c1 from ct1 " ) tdSql.error( " '' union all select c1 from ct1 " )
# tdSql.error( "select c1 from ct1 union select c1 from ct2 union select c1 from ct4 ") # tdSql.error( "select c1 from ct1 union select c1 from ct2 union select c1 from ct4 ")
def test_select_from_union_all(self):
tdSql.query('select c8, ts from ((select ts, c8,c1 from stb1 order by c1) union all select ts, c8, c1 from stb1 limit 15)')
tdSql.checkRows(15)
tdSql.query('select c8, ts from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)')
tdSql.checkRows(15)
tdSql.query('select ts, c1 from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)')
tdSql.checkRows(15)
tdSql.query('select ts, c1, c8 from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)')
tdSql.checkRows(15)
tdSql.query('select ts, c8, c1, 123 from ((select ts, c8,c1 from stb1 order by c1) union all (select ts, c8, c1 from stb1 order by c8 limit 10) limit 15)')
tdSql.checkRows(15)
def all_test(self): def all_test(self):
self.__test_error() self.__test_error()
self.union_check() self.union_check()
self.test_select_from_union_all()
def __create_tb(self): def __create_tb(self):