Merge branch '3.0' into fix/TD-30028

This commit is contained in:
Haojun Liao 2024-05-23 16:16:59 +08:00
commit cc7d5d7a91
19 changed files with 353 additions and 151 deletions

View File

@ -2358,20 +2358,24 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp* pRsp);
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
char* token; char* token;
} SMArbUpdateGroupReqMember; } SMArbUpdateGroupMember;
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int64_t dbUid; int64_t dbUid;
SMArbUpdateGroupReqMember members[2]; SMArbUpdateGroupMember members[2];
int8_t isSync; int8_t isSync;
SMArbUpdateGroupReqMember assignedLeader; SMArbUpdateGroupMember assignedLeader;
int64_t version; int64_t version;
} SMArbUpdateGroupReq; } SMArbUpdateGroup;
int32_t tSerializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq); typedef struct {
int32_t tDeserializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq); SArray* updateArray; // SMArbUpdateGroup
void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq* pReq); } SMArbUpdateGroupBatchReq;
int32_t tSerializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq);
int32_t tDeserializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq);
void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq* pReq);
typedef struct { typedef struct {
char queryStrId[TSDB_QUERY_ID_LEN]; char queryStrId[TSDB_QUERY_ID_LEN];

View File

@ -388,7 +388,8 @@
TD_NEW_MSG_SEG(TDMT_MND_ARB_MSG) //9 << 8 TD_NEW_MSG_SEG(TDMT_MND_ARB_MSG) //9 << 8
TD_DEF_MSG_TYPE(TDMT_MND_ARB_HEARTBEAT_TIMER, "mnd-arb-hb-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_HEARTBEAT_TIMER, "mnd-arb-hb-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used
TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ARB_MAX_MSG, "mnd-arb-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_MAX_MSG, "mnd-arb-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_ARB_MSG) TD_CLOSE_MSG_SEG(TDMT_END_ARB_MSG)

View File

@ -327,7 +327,6 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_DB_IN_CREATING TAOS_DEF_ERROR_CODE(0, 0x0396) // #define TSDB_CODE_MND_DB_IN_CREATING TAOS_DEF_ERROR_CODE(0, 0x0396) //
#define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x039A) #define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x039A)
#define TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE TAOS_DEF_ERROR_CODE(0, 0x039B) #define TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE TAOS_DEF_ERROR_CODE(0, 0x039B)
#define TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x039C)
// mnode-node // mnode-node
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0) #define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0)
@ -612,6 +611,16 @@ int32_t* taosGetErrno();
#define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821) #define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821)
#define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822) #define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822)
#define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823) #define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823)
#define TSDB_CODE_GRANT_BASIC_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0824)
#define TSDB_CODE_GRANT_STREAM_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0825)
#define TSDB_CODE_GRANT_SUBSCRIPTION_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0826)
#define TSDB_CODE_GRANT_VIEW_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0827)
#define TSDB_CODE_GRANT_AUDIT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0828)
#define TSDB_CODE_GRANT_CSV_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0829)
#define TSDB_CODE_GRANT_MULTI_STORAGE_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082A)
#define TSDB_CODE_GRANT_OBJECT_STROAGE_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082B)
#define TSDB_CODE_GRANT_DUAL_REPLICA_HA_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082C)
#define TSDB_CODE_GRANT_DB_ENCRYPTION_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082D)
// sync // sync
// #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x // #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x

View File

@ -92,6 +92,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat
} }
if (!pRsp) { if (!pRsp) {
releaseTscObj(pReq->connKey.tscRid); releaseTscObj(pReq->connKey.tscRid);
taosHashCancelIterate(hbMgr->activeInfo, pReq);
break; break;
} }
} }

View File

@ -6437,21 +6437,28 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp *pRsp) {
taosMemoryFreeClear(pRsp->memberToken); taosMemoryFreeClear(pRsp->memberToken);
} }
int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) { int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; int32_t sz = taosArrayGetSize(pReq->updateArray);
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { if (tEncodeI32(&encoder, sz) < 0) return -1;
if (tEncodeI32(&encoder, pReq->members[i].dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->members[i].token) < 0) return -1; for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
if (tEncodeI32(&encoder, pGroup->vgId) < 0) return -1;
if (tEncodeI64(&encoder, pGroup->dbUid) < 0) return -1;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
if (tEncodeI32(&encoder, pGroup->members[i].dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pGroup->members[i].token) < 0) return -1;
}
if (tEncodeI8(&encoder, pGroup->isSync) < 0) return -1;
if (tEncodeI32(&encoder, pGroup->assignedLeader.dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pGroup->assignedLeader.token) < 0) return -1;
if (tEncodeI64(&encoder, pGroup->version) < 0) return -1;
} }
if (tEncodeI8(&encoder, pReq->isSync) < 0) return -1;
if (tEncodeI32(&encoder, pReq->assignedLeader.dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->assignedLeader.token) < 0) return -1;
if (tEncodeI64(&encoder, pReq->version) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
@ -6460,23 +6467,34 @@ int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGrou
return tlen; return tlen;
} }
int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) { int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; int32_t sz = 0;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; if (tDecodeI32(&decoder, &sz) < 0) return -1;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
if (tDecodeI32(&decoder, &pReq->members[i].dnodeId) < 0) return -1; SArray *updateArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
pReq->members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); if (!updateArray) return -1;
if (tDecodeCStrTo(&decoder, pReq->members[i].token) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup group = {0};
if (tDecodeI32(&decoder, &group.vgId) < 0) return -1;
if (tDecodeI64(&decoder, &group.dbUid) < 0) return -1;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
if (tDecodeI32(&decoder, &group.members[i].dnodeId) < 0) return -1;
group.members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, group.members[i].token) < 0) return -1;
}
if (tDecodeI8(&decoder, &group.isSync) < 0) return -1;
if (tDecodeI32(&decoder, &group.assignedLeader.dnodeId) < 0) return -1;
group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1;
if (tDecodeI64(&decoder, &group.version) < 0) return -1;
taosArrayPush(updateArray, &group);
} }
if (tDecodeI8(&decoder, &pReq->isSync) < 0) return -1; pReq->updateArray = updateArray;
if (tDecodeI32(&decoder, &pReq->assignedLeader.dnodeId) < 0) return -1;
pReq->assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE);
if (tDecodeCStrTo(&decoder, pReq->assignedLeader.token) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->version) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
@ -6484,14 +6502,20 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr
return 0; return 0;
} }
void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq *pReq) { void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq *pReq) {
if (NULL == pReq) { if (NULL == pReq || NULL == pReq->updateArray) {
return; return;
} }
for (int i = 0; i < 2; i++) {
taosMemoryFreeClear(pReq->members[i].token); int32_t sz = taosArrayGetSize(pReq->updateArray);
for (int32_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i);
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
taosMemoryFreeClear(pGroup->members[i].token);
}
taosMemoryFreeClear(pGroup->assignedLeader.token);
} }
taosMemoryFreeClear(pReq->assignedLeader.token); taosArrayDestroy(pReq->updateArray);
} }
// int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) { // int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {

View File

@ -180,7 +180,7 @@ typedef struct {
tmsg_t originRpcType; tmsg_t originRpcType;
char dbname[TSDB_TABLE_FNAME_LEN]; char dbname[TSDB_TABLE_FNAME_LEN];
char stbname[TSDB_TABLE_FNAME_LEN]; char stbname[TSDB_TABLE_FNAME_LEN];
int32_t arbGroupId; SHashObj* arbGroupIds;
int32_t startFunc; int32_t startFunc;
int32_t stopFunc; int32_t stopFunc;
int32_t paramLen; int32_t paramLen;

View File

@ -78,7 +78,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname); void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId); void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId);
void mndTransSetSerial(STrans *pTrans); void mndTransSetSerial(STrans *pTrans);
void mndTransSetParallel(STrans *pTrans); void mndTransSetParallel(STrans *pTrans);
void mndTransSetChangeless(STrans *pTrans); void mndTransSetChangeless(STrans *pTrans);

View File

@ -39,10 +39,11 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup);
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew); static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew);
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup); static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup);
static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray);
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq); static int32_t mndProcessArbHbTimer(SRpcMsg *pReq);
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq); static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq);
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq); static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq);
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp); static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp);
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp); static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp);
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp); static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp);
@ -68,7 +69,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer); mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer);
mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer); mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer);
mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP, mndProcessArbUpdateGroupReq); mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq);
mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp); mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp); mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp);
mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp); mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp);
@ -81,9 +82,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
void mndCleanupArbGroup(SMnode *pMnode) { void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); }
taosHashCleanup(arbUpdateHash);
}
SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) { SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) {
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId); SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
@ -541,6 +540,8 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
return -1; return -1;
} }
SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup));
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup); pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup);
if (pIter == NULL) break; if (pIter == NULL) break;
@ -612,40 +613,27 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
SArbGroup newGroup = {0}; SArbGroup newGroup = {0};
mndArbGroupDupObj(&arbGroupDup, &newGroup); mndArbGroupDupObj(&arbGroupDup, &newGroup);
mndArbGroupSetAssignedLeader(&newGroup, candidateIndex); mndArbGroupSetAssignedLeader(&newGroup, candidateIndex);
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) { taosArrayPush(pUpdateArray, &newGroup);
mError("vgId:%d, arb failed to pullup set assigned leader to dnodeId:%d, since %s", vgId, pMember->info.dnodeId,
terrstr());
sdbRelease(pSdb, pArbGroup);
return -1;
}
mInfo("vgId:%d, arb pull up set assigned leader to dnodeId:%d", vgId, pMember->info.dnodeId);
sdbRelease(pSdb, pArbGroup); sdbRelease(pSdb, pArbGroup);
} }
(void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray);
taosArrayDestroy(pUpdateArray);
return 0; return 0;
} }
static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) { static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) {
SMArbUpdateGroupReq req = {0}; SMArbUpdateGroupBatchReq req = {0};
req.vgId = pNewGroup->vgId; req.updateArray = updateArray;
req.dbUid = pNewGroup->dbUid;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
req.members[i].dnodeId = pNewGroup->members[i].info.dnodeId;
req.members[i].token = pNewGroup->members[i].state.token;
}
req.isSync = pNewGroup->isSync;
req.assignedLeader.dnodeId = pNewGroup->assignedLeader.dnodeId;
req.assignedLeader.token = pNewGroup->assignedLeader.token;
req.version = pNewGroup->version;
int32_t contLen = tSerializeSMArbUpdateGroupReq(NULL, 0, &req); int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req);
if (contLen <= 0) return NULL; if (contLen <= 0) return NULL;
SMsgHead *pHead = rpcMallocCont(contLen); SMsgHead *pHead = rpcMallocCont(contLen);
if (pHead == NULL) return NULL; if (pHead == NULL) return NULL;
if (tSerializeSMArbUpdateGroupReq(pHead, contLen, &req) <= 0) { if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) {
rpcFreeCont(pHead); rpcFreeCont(pHead);
return NULL; return NULL;
} }
@ -653,60 +641,172 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup)
return pHead; return pHead;
} }
static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) {
outGroup->vgId = pGroup->vgId;
outGroup->dbUid = pGroup->dbUid;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId;
outGroup->members[i].token = pGroup->members[i].state.token; // just copy the pointer
}
outGroup->isSync = pGroup->isSync;
outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId;
outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer
outGroup->version = pGroup->version;
}
static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) { static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) {
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) { if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId); mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
return 0; return 0;
} }
int32_t contLen = 0; int32_t ret = -1;
void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup);
if (!pHead) {
mError("vgId:%d, failed to build arb-update-group request", pNewGroup->vgId);
return -1;
}
SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true};
int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); SMArbUpdateGroup newGroup = {0};
if (ret == 0) { mndInitArbUpdateGroup(pNewGroup, &newGroup);
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup));
taosArrayPush(pArray, &newGroup);
int32_t contLen = 0;
void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
if (!pHead) {
mError("failed to build arb-update-group request");
goto _OVER;
} }
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
if (ret != 0) goto _OVER;
taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
_OVER:
taosArrayDestroy(pArray);
return ret; return ret;
} }
static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) { static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray) {
int ret = 0; int32_t ret = -1;
SMArbUpdateGroupReq req = {0}; size_t sz = taosArrayGetSize(newGroupArray);
tDeserializeSMArbUpdateGroupReq(pReq->pCont, pReq->contLen, &req); SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup));
for (size_t i = 0; i < sz; i++) {
SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) {
mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId);
continue;
}
SArbGroup newGroup = {0}; SMArbUpdateGroup newGroup = {0};
newGroup.vgId = req.vgId; mndInitArbUpdateGroup(pNewGroup, &newGroup);
newGroup.dbUid = req.dbUid;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { taosArrayPush(pArray, &newGroup);
newGroup.members[i].info.dnodeId = req.members[i].dnodeId; taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0);
memcpy(newGroup.members[i].state.token, req.members[i].token, TSDB_ARB_TOKEN_SIZE);
} }
newGroup.isSync = req.isSync; if (taosArrayGetSize(pArray) == 0) {
newGroup.assignedLeader.dnodeId = req.assignedLeader.dnodeId; ret = 0;
memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE); goto _OVER;
newGroup.version = req.version;
SMnode *pMnode = pReq->info.node;
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
if (!pOldGroup) {
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
return 0;
}
sdbRelease(pMnode->pSdb, pOldGroup);
if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) {
mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr());
ret = -1;
} }
tFreeSMArbUpdateGroupReq(&req); int32_t contLen = 0;
void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray);
if (!pHead) {
mError("failed to build arb-update-group request");
goto _OVER;
}
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true};
ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
_OVER:
taosArrayDestroy(pArray);
if (ret != 0) {
for (size_t i = 0; i < sz; i++) {
SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i);
taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId));
}
}
return ret;
}
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
int ret = -1;
size_t sz = 0;
SMArbUpdateGroupBatchReq req = {0};
if (tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req) != 0) {
mError("arb failed to decode arb-update-group request");
return -1;
}
SMnode *pMnode = pReq->info.node;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
if (pTrans == NULL) {
mError("failed to update arbgroup in create trans, since %s", terrstr());
goto _OVER;
}
sz = taosArrayGetSize(req.updateArray);
for (size_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
SArbGroup newGroup = {0};
newGroup.vgId = pUpdateGroup->vgId;
newGroup.dbUid = pUpdateGroup->dbUid;
for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) {
newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId;
memcpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE);
}
newGroup.isSync = pUpdateGroup->isSync;
newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId;
memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE);
newGroup.version = pUpdateGroup->version;
SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId);
if (!pOldGroup) {
mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId);
taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t));
continue;
}
mndTransAddArbGroupId(pTrans, newGroup.vgId);
if (mndSetCreateArbGroupCommitLogs(pTrans, &newGroup) != 0) {
mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
terrstr());
goto _OVER;
}
mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]",
pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token,
newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync,
newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token);
sdbRelease(pMnode->pSdb, pOldGroup);
}
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
ret = 0;
_OVER:
if (ret != 0) {
// failed to update arbgroup
for (size_t i = 0; i < sz; i++) {
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t));
}
}
mndTransDrop(pTrans);
tFreeSMArbUpdateGroupBatchReq(&req);
return ret; return ret;
} }
@ -739,7 +839,7 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId, pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId,
pNew->assignedLeader.token); pNew->assignedLeader.token);
mndTransSetArbGroupId(pTrans, pNew->vgId); mndTransAddArbGroupId(pTrans, pNew->vgId);
if (mndTransCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
ret = -1; ret = -1;
goto _OVER; goto _OVER;
@ -816,10 +916,10 @@ _OVER:
} }
static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) { static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) {
int ret = 0;
int64_t nowMs = taosGetTimestampMs(); int64_t nowMs = taosGetTimestampMs();
size_t size = taosArrayGetSize(memberArray);
SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup));
size_t size = taosArrayGetSize(memberArray);
for (size_t i = 0; i < size; i++) { for (size_t i = 0; i < size; i++) {
SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i); SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i);
@ -832,17 +932,16 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me
bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup); bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup);
if (updateToken) { if (updateToken) {
ret = mndPullupArbUpdateGroup(pMnode, &newGroup); taosArrayPush(pUpdateArray, &newGroup);
if (ret != 0) {
mInfo("failed to pullup update arb token, vgId:%d, since %s", pRspMember->vgId, terrstr());
}
} }
sdbRelease(pMnode->pSdb, pGroup); sdbRelease(pMnode->pSdb, pGroup);
if (ret != 0) break;
} }
return ret; (void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray);
taosArrayDestroy(pUpdateArray);
return 0;
} }
bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token, bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token,
@ -900,6 +999,11 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token
} }
static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) { static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
if (pRsp->contLen == 0) {
mDebug("arb hb-rsp contLen is 0");
return 0;
}
int32_t ret = -1; int32_t ret = -1;
SMnode *pMnode = pRsp->info.node; SMnode *pMnode = pRsp->info.node;
@ -914,6 +1018,7 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
SVArbHeartBeatRsp arbHbRsp = {0}; SVArbHeartBeatRsp arbHbRsp = {0};
if (tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) { if (tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) {
mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
@ -934,6 +1039,11 @@ _OVER:
} }
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
if (pRsp->contLen == 0) {
mDebug("arb check-sync-rsp contLen is 0");
return 0;
}
int32_t ret = -1; int32_t ret = -1;
SMnode *pMnode = pRsp->info.node; SMnode *pMnode = pRsp->info.node;
@ -948,7 +1058,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
SVArbCheckSyncRsp syncRsp = {0}; SVArbCheckSyncRsp syncRsp = {0};
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) { if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code)); mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) { if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
return 0; return 0;
@ -1008,6 +1118,11 @@ _OVER:
} }
static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) { static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
if (pRsp->contLen == 0) {
mDebug("arb set-assigned-rsp contLen is 0");
return 0;
}
int32_t ret = -1; int32_t ret = -1;
SMnode *pMnode = pRsp->info.node; SMnode *pMnode = pRsp->info.node;
@ -1022,8 +1137,8 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
SVArbSetAssignedLeaderRsp setAssignedRsp = {0}; SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
if (tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) { if (tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) {
mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
mInfo("arb set assigned failed, des failed since:%s", tstrerror(pRsp->code));
return -1; return -1;
} }

View File

@ -821,8 +821,7 @@ static int32_t mndCheckDbEncryptKey(SMnode *pMnode, SCreateDbReq *pReq) {
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
if (pReq->encryptAlgorithm == TSDB_ENCRYPT_ALGO_NONE) goto _exit; if (pReq->encryptAlgorithm == TSDB_ENCRYPT_ALGO_NONE) goto _exit;
if (grantCheck(TSDB_GRANT_DB_ENCRYPTION) != 0) { if ((code = grantCheck(TSDB_GRANT_DB_ENCRYPTION)) != 0) {
code = TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED;
goto _exit; goto _exit;
} }
if (tsEncryptionKeyStat != ENCRYPT_KEY_STAT_LOADED) { if (tsEncryptionKeyStat != ENCRYPT_KEY_STAT_LOADED) {
@ -1226,7 +1225,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
if (terrno != 0) code = terrno; if (terrno != 0) code = terrno;
mError("db:%s, failed to alter since %s", alterReq.db, terrstr()); mError("db:%s, failed to alter since %s", alterReq.db, tstrerror(code));
} }
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);

View File

@ -1847,8 +1847,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) { if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
terrno = TSDB_CODE_GRANT_EXPIRED;
return -1; return -1;
} }

View File

@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SArray *pFailedTasks = NULL; SArray *pFailedTasks = NULL;
SArray *pOrphanTasks = NULL; SArray *pOrphanTasks = NULL;
if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) { if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
if (suspendAllStreams(pMnode, &pReq->info) < 0) { if (suspendAllStreams(pMnode, &pReq->info) < 0) {
return -1; return -1;
} }

View File

@ -26,7 +26,7 @@
#define TRANS_VER1_NUMBER 1 #define TRANS_VER1_NUMBER 1
#define TRANS_VER2_NUMBER 2 #define TRANS_VER2_NUMBER 2
#define TRANS_ARRAY_SIZE 8 #define TRANS_ARRAY_SIZE 8
#define TRANS_RESERVE_SIZE 48 #define TRANS_RESERVE_SIZE 44
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans);
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld);
@ -196,10 +196,21 @@ SSdbRaw *mndTransEncode(STrans *pTrans) {
} }
SDB_SET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER)
int32_t arbGroupNum = taosHashGetSize(pTrans->arbGroupIds);
SDB_SET_INT32(pRaw, dataPos, arbGroupNum, _OVER)
void *pIter = NULL;
pIter = taosHashIterate(pTrans->arbGroupIds, NULL);
while (pIter) {
int32_t arbGroupId = *(int32_t *)pIter;
SDB_SET_INT32(pRaw, dataPos, arbGroupId, _OVER)
pIter = taosHashIterate(pTrans->arbGroupIds, pIter);
}
SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0; terrno = 0;
_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
@ -279,6 +290,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
int32_t undoActionNum = 0; int32_t undoActionNum = 0;
int32_t commitActionNum = 0; int32_t commitActionNum = 0;
int32_t dataPos = 0; int32_t dataPos = 0;
int32_t arbgroupIdNum = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
@ -350,6 +362,16 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
} }
SDB_GET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER); SDB_GET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER);
pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
SDB_GET_INT32(pRaw, dataPos, &arbgroupIdNum, _OVER)
for (int32_t i = 0; i < arbgroupIdNum; ++i) {
int32_t arbGroupId = 0;
SDB_GET_INT32(pRaw, dataPos, &arbGroupId, _OVER)
taosHashPut(pTrans->arbGroupIds, &arbGroupId, sizeof(int32_t), NULL, 0);
}
SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
@ -462,6 +484,9 @@ void mndTransDropData(STrans *pTrans) {
mndTransDropActions(pTrans->commitActions); mndTransDropActions(pTrans->commitActions);
pTrans->commitActions = NULL; pTrans->commitActions = NULL;
} }
if (pTrans->arbGroupIds != NULL) {
taosHashCleanup(pTrans->arbGroupIds);
}
if (pTrans->pRpcArray != NULL) { if (pTrans->pRpcArray != NULL) {
taosArrayDestroy(pTrans->pRpcArray); taosArrayDestroy(pTrans->pRpcArray);
pTrans->pRpcArray = NULL; pTrans->pRpcArray = NULL;
@ -581,6 +606,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64(); pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64();
taosInitRWLatch(&pTrans->lockRpcArray); taosInitRWLatch(&pTrans->lockRpcArray);
@ -733,7 +759,9 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname)
} }
} }
void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId) { pTrans->arbGroupId = groupId; } void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId) {
taosHashPut(pTrans->arbGroupIds, &groupId, sizeof(int32_t), NULL, 0);
}
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
@ -821,7 +849,16 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) { if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) { if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
if (pNew->arbGroupId == pTrans->arbGroupId) conflict = true; void *pIter = taosHashIterate(pNew->arbGroupIds, NULL);
while (pIter != NULL) {
int32_t groupId = *(int32_t *)pIter;
if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) {
taosHashCancelIterate(pNew->arbGroupIds, pIter);
conflict = true;
break;
}
pIter = taosHashIterate(pNew->arbGroupIds, pIter);
}
} }
} }

View File

@ -370,12 +370,12 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran
if ((TEST_VIEW_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_VIEW)) { if ((TEST_VIEW_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_VIEW)) {
QW_ELOG("query failed cause of view grant expired, msgMask:%d", msg.msgMask); QW_ELOG("query failed cause of view grant expired, msgMask:%d", msg.msgMask);
tFreeSSubQueryMsg(&msg); tFreeSSubQueryMsg(&msg);
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); QW_ERR_RET(TSDB_CODE_GRANT_VIEW_EXPIRED);
} }
if ((TEST_AUDIT_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_AUDIT)) { if ((TEST_AUDIT_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_AUDIT)) {
QW_ELOG("query failed cause of audit grant expired, msgMask:%d", msg.msgMask); QW_ELOG("query failed cause of audit grant expired, msgMask:%d", msg.msgMask);
tFreeSSubQueryMsg(&msg); tFreeSSubQueryMsg(&msg);
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); QW_ERR_RET(TSDB_CODE_GRANT_AUDIT_EXPIRED);
} }
} }
} }

View File

@ -759,7 +759,7 @@ void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStat
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; } void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark); return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 && ts < (pFileState->maxTs - pFileState->deleteMark);
} }
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); } bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }

View File

@ -2741,7 +2741,7 @@ int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut,
int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) { int32_t nBuf) {
DEFINE_VAR(cmprAlg) DEFINE_VAR(cmprAlg)
if (lvl != 0 && lossyFloat) { if (l2 == L2_TSZ && lvl != 0 && lossyFloat) {
return tsCompressFloatLossyImp(pIn, nEle, pOut); return tsCompressFloatLossyImp(pIn, nEle, pOut);
} }
FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1); FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1);
@ -2760,7 +2760,7 @@ int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf,
int32_t nBuf) { int32_t nBuf) {
DEFINE_VAR(cmprAlg) DEFINE_VAR(cmprAlg)
if (lvl != 0 && lossyDouble) { if (l2 == L2_TSZ && lvl != 0 && lossyDouble) {
// lossy mode // lossy mode
return tsCompressDoubleLossyImp(pIn, nEle, pOut); return tsCompressDoubleLossyImp(pIn, nEle, pOut);
} }

View File

@ -259,7 +259,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_CREATING, "Database in creating
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE, "Encryption is not allowed to be changed after database is created") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE, "Encryption is not allowed to be changed after database is created")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY, "Inconsistent encryption key") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY, "Inconsistent encryption key")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ENCRYPT_KEY, "The cluster has not been set properly for database encryption") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ENCRYPT_KEY, "The cluster has not been set properly for database encryption")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED, "The database encryption function grant expired")
// mnode-node // mnode-node
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists")
@ -499,9 +498,20 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LACK_OF_BASIC, "Lack of basic functio
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJ_NOT_EXIST, "Grant object not exist") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJ_NOT_EXIST, "Grant object not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LAST_ACTIVE_NOT_FOUND, "The historial active code does not match") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LAST_ACTIVE_NOT_FOUND, "The historial active code does not match")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MACHINES_MISMATCH, "Cluster machines mismatch with active code") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MACHINES_MISMATCH, "Cluster machines mismatch with active code")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expire time of optional grant item is too large") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expiration time of optional grant item is too large")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of view has reached the licensed upper limit") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of views has reached the licensed upper limit")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_BASIC_EXPIRED, "License expired for basic functions")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STREAM_EXPIRED, "License expired for stream function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_SUBSCRIPTION_EXPIRED, "License expired for subscription function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_EXPIRED, "License expired for view function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_AUDIT_EXPIRED, "License expired for audit function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CSV_EXPIRED, "License expired for CSV function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MULTI_STORAGE_EXPIRED, "License expired for multi-tier storage function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJECT_STROAGE_EXPIRED, "License expired for object storage function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUAL_REPLICA_HA_EXPIRED,"License expired for dual-replica HA function")
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DB_ENCRYPTION_EXPIRED, "License expired for database encryption function")
// sync // sync
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")

View File

@ -315,19 +315,22 @@ function run_thread() {
fi fi
if [ -n "$corefile" ]; then if [ -n "$corefile" ]; then
echo -e "\e[34m corefiles: $corefile \e[0m" echo -e "\e[34m corefiles: $corefile \e[0m"
local build_dir=$log_dir/build_${hosts[index]}
local remote_build_dir="${workdirs[index]}/${DEBUGPATH}/build"
# if [ $ent -ne 0 ]; then
# remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build"
# fi
mkdir "$build_dir" 2>/dev/null
if [ $? -eq 0 ]; then
# scp build binary
cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/"
echo "$cmd"
$cmd >/dev/null
fi
fi fi
# scp build binary and unit test log
local build_dir=$log_dir/build_${hosts[index]}
local remote_build_dir="${workdirs[index]}/${DEBUGPATH}/build"
local remote_unit_test_log_dir="${workdirs[index]}/${DEBUGPATH}/Testing/Temporary/"
mkdir "$build_dir" 2>/dev/null
if [ $? -eq 0 ]; then
cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/"
echo "$cmd"
$cmd >/dev/null
cmd="$scpcmd:${remote_unit_test_log_dir}/* ${build_dir}/"
echo "$cmd"
$cmd >/dev/null
fi
# get remote sim dir # get remote sim dir
local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no" local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no"
local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}"

View File

@ -39,7 +39,7 @@ class TDTestCase:
os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &") os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &")
time.sleep(10) time.sleep(10)
tdSql.query("use test") tdSql.execute("use test", queryTimes=100)
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
tdLog.debug("========create stream and insert data ok========") tdLog.debug("========create stream and insert data ok========")
time.sleep(15) time.sleep(15)
@ -66,7 +66,7 @@ class TDTestCase:
os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1") os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1")
# create stream # create stream
tdSql.execute("use db") tdSql.execute("use db", queryTimes=100)
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
time.sleep(5) time.sleep(5)

View File

@ -89,8 +89,8 @@ else
export LD_PRELOAD="$(realpath "$(gcc -print-file-name=libasan.so)") $(realpath "$(gcc -print-file-name=libstdc++.so)")" export LD_PRELOAD="$(realpath "$(gcc -print-file-name=libasan.so)") $(realpath "$(gcc -print-file-name=libstdc++.so)")"
echo "Preload AsanSo:" $? echo "Preload AsanSo:" $?
$* -a 2>$AsanFile $* -a 2> $AsanFile
cat $AsanFile
unset LD_PRELOAD unset LD_PRELOAD
for ((i = 1; i <= 20; i++)); do for ((i = 1; i <= 20; i++)); do
AsanFileLen=$(cat $AsanFile | wc -l) AsanFileLen=$(cat $AsanFile | wc -l)