diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3ed6b40d4d..be2f443140 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2358,20 +2358,24 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp* pRsp); typedef struct { int32_t dnodeId; char* token; -} SMArbUpdateGroupReqMember; +} SMArbUpdateGroupMember; typedef struct { - int32_t vgId; - int64_t dbUid; - SMArbUpdateGroupReqMember members[2]; - int8_t isSync; - SMArbUpdateGroupReqMember assignedLeader; - int64_t version; -} SMArbUpdateGroupReq; + int32_t vgId; + int64_t dbUid; + SMArbUpdateGroupMember members[2]; + int8_t isSync; + SMArbUpdateGroupMember assignedLeader; + int64_t version; +} SMArbUpdateGroup; -int32_t tSerializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq); -int32_t tDeserializeSMArbUpdateGroupReq(void* buf, int32_t bufLen, SMArbUpdateGroupReq* pReq); -void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq* pReq); +typedef struct { + SArray* updateArray; // SMArbUpdateGroup +} SMArbUpdateGroupBatchReq; + +int32_t tSerializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq); +int32_t tDeserializeSMArbUpdateGroupBatchReq(void* buf, int32_t bufLen, SMArbUpdateGroupBatchReq* pReq); +void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq* pReq); typedef struct { char queryStrId[TSDB_QUERY_ID_LEN]; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a5a3bd5ee0..6f15d7df70 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -388,7 +388,8 @@ TD_NEW_MSG_SEG(TDMT_MND_ARB_MSG) //9 << 8 TD_DEF_MSG_TYPE(TDMT_MND_ARB_HEARTBEAT_TIMER, "mnd-arb-hb-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_CHECK_SYNC_TIMER, "mnd-arb-check-sync-tmr", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP, "mnd-arb-update-group", NULL, NULL) // no longer used + TD_DEF_MSG_TYPE(TDMT_MND_ARB_UPDATE_GROUP_BATCH, "mnd-arb-update-group-batch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_ARB_MAX_MSG, "mnd-arb-max", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_ARB_MSG) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index dafdac9649..9ae75bade2 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -327,7 +327,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_DB_IN_CREATING TAOS_DEF_ERROR_CODE(0, 0x0396) // #define TSDB_CODE_MND_INVALID_SYS_TABLENAME TAOS_DEF_ERROR_CODE(0, 0x039A) #define TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE TAOS_DEF_ERROR_CODE(0, 0x039B) -#define TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x039C) // mnode-node #define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03A0) @@ -612,6 +611,16 @@ int32_t* taosGetErrno(); #define TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE TAOS_DEF_ERROR_CODE(0, 0x0821) #define TSDB_CODE_GRANT_DUPLICATED_ACTIVE TAOS_DEF_ERROR_CODE(0, 0x0822) #define TSDB_CODE_GRANT_VIEW_LIMITED TAOS_DEF_ERROR_CODE(0, 0x0823) +#define TSDB_CODE_GRANT_BASIC_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0824) +#define TSDB_CODE_GRANT_STREAM_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0825) +#define TSDB_CODE_GRANT_SUBSCRIPTION_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0826) +#define TSDB_CODE_GRANT_VIEW_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0827) +#define TSDB_CODE_GRANT_AUDIT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0828) +#define TSDB_CODE_GRANT_CSV_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0829) +#define TSDB_CODE_GRANT_MULTI_STORAGE_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082A) +#define TSDB_CODE_GRANT_OBJECT_STROAGE_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082B) +#define TSDB_CODE_GRANT_DUAL_REPLICA_HA_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082C) +#define TSDB_CODE_GRANT_DB_ENCRYPTION_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x082D) // sync // #define TSDB_CODE_SYN_INVALID_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0900) // 2.x diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 7d30a19140..1b6cb8fd22 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -92,6 +92,7 @@ static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *bat } if (!pRsp) { releaseTscObj(pReq->connKey.tscRid); + taosHashCancelIterate(hbMgr->activeInfo, pReq); break; } } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index b7d1417451..ef37a41fcf 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6437,21 +6437,28 @@ void tFreeSVArbSetAssignedLeaderRsp(SVArbSetAssignedLeaderRsp *pRsp) { taosMemoryFreeClear(pRsp->memberToken); } -int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) { +int32_t tSerializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); if (tStartEncode(&encoder) < 0) return -1; - if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; - if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1; - for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { - if (tEncodeI32(&encoder, pReq->members[i].dnodeId) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->members[i].token) < 0) return -1; + + int32_t sz = taosArrayGetSize(pReq->updateArray); + if (tEncodeI32(&encoder, sz) < 0) return -1; + + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i); + if (tEncodeI32(&encoder, pGroup->vgId) < 0) return -1; + if (tEncodeI64(&encoder, pGroup->dbUid) < 0) return -1; + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + if (tEncodeI32(&encoder, pGroup->members[i].dnodeId) < 0) return -1; + if (tEncodeCStr(&encoder, pGroup->members[i].token) < 0) return -1; + } + if (tEncodeI8(&encoder, pGroup->isSync) < 0) return -1; + if (tEncodeI32(&encoder, pGroup->assignedLeader.dnodeId) < 0) return -1; + if (tEncodeCStr(&encoder, pGroup->assignedLeader.token) < 0) return -1; + if (tEncodeI64(&encoder, pGroup->version) < 0) return -1; } - if (tEncodeI8(&encoder, pReq->isSync) < 0) return -1; - if (tEncodeI32(&encoder, pReq->assignedLeader.dnodeId) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->assignedLeader.token) < 0) return -1; - if (tEncodeI64(&encoder, pReq->version) < 0) return -1; tEndEncode(&encoder); @@ -6460,23 +6467,34 @@ int32_t tSerializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGrou return tlen; } -int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGroupReq *pReq) { +int32_t tDeserializeSMArbUpdateGroupBatchReq(void *buf, int32_t bufLen, SMArbUpdateGroupBatchReq *pReq) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); if (tStartDecode(&decoder) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1; - for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { - if (tDecodeI32(&decoder, &pReq->members[i].dnodeId) < 0) return -1; - pReq->members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); - if (tDecodeCStrTo(&decoder, pReq->members[i].token) < 0) return -1; + int32_t sz = 0; + if (tDecodeI32(&decoder, &sz) < 0) return -1; + + SArray *updateArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup)); + if (!updateArray) return -1; + + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup group = {0}; + if (tDecodeI32(&decoder, &group.vgId) < 0) return -1; + if (tDecodeI64(&decoder, &group.dbUid) < 0) return -1; + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + if (tDecodeI32(&decoder, &group.members[i].dnodeId) < 0) return -1; + group.members[i].token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); + if (tDecodeCStrTo(&decoder, group.members[i].token) < 0) return -1; + } + if (tDecodeI8(&decoder, &group.isSync) < 0) return -1; + if (tDecodeI32(&decoder, &group.assignedLeader.dnodeId) < 0) return -1; + group.assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); + if (tDecodeCStrTo(&decoder, group.assignedLeader.token) < 0) return -1; + if (tDecodeI64(&decoder, &group.version) < 0) return -1; + taosArrayPush(updateArray, &group); } - if (tDecodeI8(&decoder, &pReq->isSync) < 0) return -1; - if (tDecodeI32(&decoder, &pReq->assignedLeader.dnodeId) < 0) return -1; - pReq->assignedLeader.token = taosMemoryMalloc(TSDB_ARB_TOKEN_SIZE); - if (tDecodeCStrTo(&decoder, pReq->assignedLeader.token) < 0) return -1; - if (tDecodeI64(&decoder, &pReq->version) < 0) return -1; + pReq->updateArray = updateArray; tEndDecode(&decoder); @@ -6484,14 +6502,20 @@ int32_t tDeserializeSMArbUpdateGroupReq(void *buf, int32_t bufLen, SMArbUpdateGr return 0; } -void tFreeSMArbUpdateGroupReq(SMArbUpdateGroupReq *pReq) { - if (NULL == pReq) { +void tFreeSMArbUpdateGroupBatchReq(SMArbUpdateGroupBatchReq *pReq) { + if (NULL == pReq || NULL == pReq->updateArray) { return; } - for (int i = 0; i < 2; i++) { - taosMemoryFreeClear(pReq->members[i].token); + + int32_t sz = taosArrayGetSize(pReq->updateArray); + for (int32_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pGroup = taosArrayGet(pReq->updateArray, i); + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + taosMemoryFreeClear(pGroup->members[i].token); + } + taosMemoryFreeClear(pGroup->assignedLeader.token); } - taosMemoryFreeClear(pReq->assignedLeader.token); + taosArrayDestroy(pReq->updateArray); } // int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 68c55e235f..0c40622d08 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -180,7 +180,7 @@ typedef struct { tmsg_t originRpcType; char dbname[TSDB_TABLE_FNAME_LEN]; char stbname[TSDB_TABLE_FNAME_LEN]; - int32_t arbGroupId; + SHashObj* arbGroupIds; int32_t startFunc; int32_t stopFunc; int32_t paramLen; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 8c9ca87fb1..8008eb76e7 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -78,7 +78,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname); -void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId); +void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId); void mndTransSetSerial(STrans *pTrans); void mndTransSetParallel(STrans *pTrans); void mndTransSetChangeless(STrans *pTrans); diff --git a/source/dnode/mnode/impl/src/mndArbGroup.c b/source/dnode/mnode/impl/src/mndArbGroup.c index d0a86bdde7..50338fe889 100644 --- a/source/dnode/mnode/impl/src/mndArbGroup.c +++ b/source/dnode/mnode/impl/src/mndArbGroup.c @@ -39,10 +39,11 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup); static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew); static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup); +static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray); static int32_t mndProcessArbHbTimer(SRpcMsg *pReq); static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq); -static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq); +static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq); static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp); static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp); static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp); @@ -68,7 +69,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_ARB_HEARTBEAT_TIMER, mndProcessArbHbTimer); mndSetMsgHandle(pMnode, TDMT_MND_ARB_CHECK_SYNC_TIMER, mndProcessArbCheckSyncTimer); - mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP, mndProcessArbUpdateGroupReq); + mndSetMsgHandle(pMnode, TDMT_MND_ARB_UPDATE_GROUP_BATCH, mndProcessArbUpdateGroupBatchReq); mndSetMsgHandle(pMnode, TDMT_VND_ARB_HEARTBEAT_RSP, mndProcessArbHbRsp); mndSetMsgHandle(pMnode, TDMT_VND_ARB_CHECK_SYNC_RSP, mndProcessArbCheckSyncRsp); mndSetMsgHandle(pMnode, TDMT_SYNC_SET_ASSIGNED_LEADER_RSP, mndProcessArbSetAssignedLeaderRsp); @@ -81,9 +82,7 @@ int32_t mndInitArbGroup(SMnode *pMnode) { return sdbSetTable(pMnode->pSdb, table); } -void mndCleanupArbGroup(SMnode *pMnode) { - taosHashCleanup(arbUpdateHash); -} +void mndCleanupArbGroup(SMnode *pMnode) { taosHashCleanup(arbUpdateHash); } SArbGroup *mndAcquireArbGroup(SMnode *pMnode, int32_t vgId) { SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId); @@ -541,6 +540,8 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { return -1; } + SArray *pUpdateArray = taosArrayInit(16, sizeof(SArbGroup)); + while (1) { pIter = sdbFetch(pSdb, SDB_ARBGROUP, pIter, (void **)&pArbGroup); if (pIter == NULL) break; @@ -612,40 +613,27 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) { SArbGroup newGroup = {0}; mndArbGroupDupObj(&arbGroupDup, &newGroup); mndArbGroupSetAssignedLeader(&newGroup, candidateIndex); - if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) { - mError("vgId:%d, arb failed to pullup set assigned leader to dnodeId:%d, since %s", vgId, pMember->info.dnodeId, - terrstr()); - sdbRelease(pSdb, pArbGroup); - return -1; - } - - mInfo("vgId:%d, arb pull up set assigned leader to dnodeId:%d", vgId, pMember->info.dnodeId); + taosArrayPush(pUpdateArray, &newGroup); sdbRelease(pSdb, pArbGroup); } + (void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray); + + taosArrayDestroy(pUpdateArray); return 0; } -static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) { - SMArbUpdateGroupReq req = {0}; - req.vgId = pNewGroup->vgId; - req.dbUid = pNewGroup->dbUid; - for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { - req.members[i].dnodeId = pNewGroup->members[i].info.dnodeId; - req.members[i].token = pNewGroup->members[i].state.token; - } - req.isSync = pNewGroup->isSync; - req.assignedLeader.dnodeId = pNewGroup->assignedLeader.dnodeId; - req.assignedLeader.token = pNewGroup->assignedLeader.token; - req.version = pNewGroup->version; +static void *mndBuildArbUpdateGroupBatchReq(int32_t *pContLen, SArray *updateArray) { + SMArbUpdateGroupBatchReq req = {0}; + req.updateArray = updateArray; - int32_t contLen = tSerializeSMArbUpdateGroupReq(NULL, 0, &req); + int32_t contLen = tSerializeSMArbUpdateGroupBatchReq(NULL, 0, &req); if (contLen <= 0) return NULL; SMsgHead *pHead = rpcMallocCont(contLen); if (pHead == NULL) return NULL; - if (tSerializeSMArbUpdateGroupReq(pHead, contLen, &req) <= 0) { + if (tSerializeSMArbUpdateGroupBatchReq(pHead, contLen, &req) <= 0) { rpcFreeCont(pHead); return NULL; } @@ -653,60 +641,172 @@ static void *mndBuildArbUpdateGroupReq(int32_t *pContLen, SArbGroup *pNewGroup) return pHead; } +static void mndInitArbUpdateGroup(SArbGroup *pGroup, SMArbUpdateGroup *outGroup) { + outGroup->vgId = pGroup->vgId; + outGroup->dbUid = pGroup->dbUid; + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + outGroup->members[i].dnodeId = pGroup->members[i].info.dnodeId; + outGroup->members[i].token = pGroup->members[i].state.token; // just copy the pointer + } + outGroup->isSync = pGroup->isSync; + outGroup->assignedLeader.dnodeId = pGroup->assignedLeader.dnodeId; + outGroup->assignedLeader.token = pGroup->assignedLeader.token; // just copy the pointer + outGroup->version = pGroup->version; +} + static int32_t mndPullupArbUpdateGroup(SMnode *pMnode, SArbGroup *pNewGroup) { if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) { mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId); return 0; } - int32_t contLen = 0; - void *pHead = mndBuildArbUpdateGroupReq(&contLen, pNewGroup); - if (!pHead) { - mError("vgId:%d, failed to build arb-update-group request", pNewGroup->vgId); - return -1; - } - SRpcMsg rpcMsg = {.msgType = TDMT_MND_ARB_UPDATE_GROUP, .pCont = pHead, .contLen = contLen, .info.noResp = true}; + int32_t ret = -1; - int32_t ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); - if (ret == 0) { - taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0); + SMArbUpdateGroup newGroup = {0}; + mndInitArbUpdateGroup(pNewGroup, &newGroup); + + SArray *pArray = taosArrayInit(1, sizeof(SMArbUpdateGroup)); + taosArrayPush(pArray, &newGroup); + + int32_t contLen = 0; + void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray); + if (!pHead) { + mError("failed to build arb-update-group request"); + goto _OVER; } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true}; + ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + if (ret != 0) goto _OVER; + + taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0); + +_OVER: + taosArrayDestroy(pArray); return ret; } -static int32_t mndProcessArbUpdateGroupReq(SRpcMsg *pReq) { - int ret = 0; +static int32_t mndPullupArbUpdateGroupBatch(SMnode *pMnode, SArray *newGroupArray) { + int32_t ret = -1; - SMArbUpdateGroupReq req = {0}; - tDeserializeSMArbUpdateGroupReq(pReq->pCont, pReq->contLen, &req); + size_t sz = taosArrayGetSize(newGroupArray); + SArray *pArray = taosArrayInit(sz, sizeof(SMArbUpdateGroup)); + for (size_t i = 0; i < sz; i++) { + SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i); + if (taosHashGet(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)) != NULL) { + mInfo("vgId:%d, arb skip to pullup arb-update-group request, since it is in process", pNewGroup->vgId); + continue; + } - SArbGroup newGroup = {0}; - newGroup.vgId = req.vgId; - newGroup.dbUid = req.dbUid; - for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { - newGroup.members[i].info.dnodeId = req.members[i].dnodeId; - memcpy(newGroup.members[i].state.token, req.members[i].token, TSDB_ARB_TOKEN_SIZE); + SMArbUpdateGroup newGroup = {0}; + mndInitArbUpdateGroup(pNewGroup, &newGroup); + + taosArrayPush(pArray, &newGroup); + taosHashPut(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId), NULL, 0); } - newGroup.isSync = req.isSync; - newGroup.assignedLeader.dnodeId = req.assignedLeader.dnodeId; - memcpy(newGroup.assignedLeader.token, req.assignedLeader.token, TSDB_ARB_TOKEN_SIZE); - newGroup.version = req.version; - - SMnode *pMnode = pReq->info.node; - SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); - if (!pOldGroup) { - mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId); - return 0; - } - sdbRelease(pMnode->pSdb, pOldGroup); - - if (mndArbGroupUpdateTrans(pMnode, &newGroup) != 0) { - mError("vgId:%d, arb failed to update arbgroup, since %s", newGroup.vgId, terrstr()); - ret = -1; + if (taosArrayGetSize(pArray) == 0) { + ret = 0; + goto _OVER; } - tFreeSMArbUpdateGroupReq(&req); + int32_t contLen = 0; + void *pHead = mndBuildArbUpdateGroupBatchReq(&contLen, pArray); + if (!pHead) { + mError("failed to build arb-update-group request"); + goto _OVER; + } + + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_ARB_UPDATE_GROUP_BATCH, .pCont = pHead, .contLen = contLen, .info.noResp = true}; + ret = tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + +_OVER: + taosArrayDestroy(pArray); + + if (ret != 0) { + for (size_t i = 0; i < sz; i++) { + SArbGroup *pNewGroup = taosArrayGet(newGroupArray, i); + taosHashRemove(arbUpdateHash, &pNewGroup->vgId, sizeof(pNewGroup->vgId)); + } + } + + return ret; +} + +static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) { + int ret = -1; + size_t sz = 0; + + SMArbUpdateGroupBatchReq req = {0}; + if (tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req) != 0) { + mError("arb failed to decode arb-update-group request"); + return -1; + } + + SMnode *pMnode = pReq->info.node; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup"); + if (pTrans == NULL) { + mError("failed to update arbgroup in create trans, since %s", terrstr()); + goto _OVER; + } + + sz = taosArrayGetSize(req.updateArray); + for (size_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i); + SArbGroup newGroup = {0}; + newGroup.vgId = pUpdateGroup->vgId; + newGroup.dbUid = pUpdateGroup->dbUid; + for (int i = 0; i < TSDB_ARB_GROUP_MEMBER_NUM; i++) { + newGroup.members[i].info.dnodeId = pUpdateGroup->members[i].dnodeId; + memcpy(newGroup.members[i].state.token, pUpdateGroup->members[i].token, TSDB_ARB_TOKEN_SIZE); + } + + newGroup.isSync = pUpdateGroup->isSync; + newGroup.assignedLeader.dnodeId = pUpdateGroup->assignedLeader.dnodeId; + memcpy(newGroup.assignedLeader.token, pUpdateGroup->assignedLeader.token, TSDB_ARB_TOKEN_SIZE); + newGroup.version = pUpdateGroup->version; + + SArbGroup *pOldGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &newGroup.vgId); + if (!pOldGroup) { + mInfo("vgId:%d, arb skip to update arbgroup, since no obj found", newGroup.vgId); + taosHashRemove(arbUpdateHash, &newGroup.vgId, sizeof(int32_t)); + continue; + } + + mndTransAddArbGroupId(pTrans, newGroup.vgId); + + if (mndSetCreateArbGroupCommitLogs(pTrans, &newGroup) != 0) { + mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id, + terrstr()); + goto _OVER; + } + + mInfo("trans:%d, used to update arbgroup:%d, member0:[%d][%s] member1:[%d][%s] isSync:%d assigned:[%d][%s]", + pTrans->id, newGroup.vgId, newGroup.members[0].info.dnodeId, newGroup.members[0].state.token, + newGroup.members[1].info.dnodeId, newGroup.members[1].state.token, newGroup.isSync, + newGroup.assignedLeader.dnodeId, newGroup.assignedLeader.token); + + sdbRelease(pMnode->pSdb, pOldGroup); + } + + if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + + ret = 0; + +_OVER: + if (ret != 0) { + // failed to update arbgroup + for (size_t i = 0; i < sz; i++) { + SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i); + taosHashRemove(arbUpdateHash, &pUpdateGroup->vgId, sizeof(int32_t)); + } + } + + mndTransDrop(pTrans); + tFreeSMArbUpdateGroupBatchReq(&req); return ret; } @@ -739,7 +839,7 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) { pNew->members[1].info.dnodeId, pNew->members[1].state.token, pNew->isSync, pNew->assignedLeader.dnodeId, pNew->assignedLeader.token); - mndTransSetArbGroupId(pTrans, pNew->vgId); + mndTransAddArbGroupId(pTrans, pNew->vgId); if (mndTransCheckConflict(pMnode, pTrans) != 0) { ret = -1; goto _OVER; @@ -816,10 +916,10 @@ _OVER: } static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *memberArray) { - int ret = 0; int64_t nowMs = taosGetTimestampMs(); + size_t size = taosArrayGetSize(memberArray); + SArray *pUpdateArray = taosArrayInit(size, sizeof(SArbGroup)); - size_t size = taosArrayGetSize(memberArray); for (size_t i = 0; i < size; i++) { SVArbHbRspMember *pRspMember = taosArrayGet(memberArray, i); @@ -832,17 +932,16 @@ static int32_t mndUpdateArbHeartBeat(SMnode *pMnode, int32_t dnodeId, SArray *me bool updateToken = mndUpdateArbGroupByHeartBeat(pGroup, pRspMember, nowMs, dnodeId, &newGroup); if (updateToken) { - ret = mndPullupArbUpdateGroup(pMnode, &newGroup); - if (ret != 0) { - mInfo("failed to pullup update arb token, vgId:%d, since %s", pRspMember->vgId, terrstr()); - } + taosArrayPush(pUpdateArray, &newGroup); } sdbRelease(pMnode->pSdb, pGroup); - if (ret != 0) break; } - return ret; + (void)mndPullupArbUpdateGroupBatch(pMnode, pUpdateArray); + + taosArrayDestroy(pUpdateArray); + return 0; } bool mndUpdateArbGroupByCheckSync(SArbGroup *pGroup, int32_t vgId, char *member0Token, char *member1Token, @@ -900,6 +999,11 @@ static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token } static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) { + if (pRsp->contLen == 0) { + mDebug("arb hb-rsp contLen is 0"); + return 0; + } + int32_t ret = -1; SMnode *pMnode = pRsp->info.node; @@ -914,6 +1018,7 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) { SVArbHeartBeatRsp arbHbRsp = {0}; if (tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) { + mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code)); terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -934,6 +1039,11 @@ _OVER: } static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { + if (pRsp->contLen == 0) { + mDebug("arb check-sync-rsp contLen is 0"); + return 0; + } + int32_t ret = -1; SMnode *pMnode = pRsp->info.node; @@ -948,7 +1058,7 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) { SVArbCheckSyncRsp syncRsp = {0}; if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) { - mInfo("arb sync check failed, since:%s", tstrerror(pRsp->code)); + mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code)); if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) { terrno = TSDB_CODE_SUCCESS; return 0; @@ -1008,6 +1118,11 @@ _OVER: } static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) { + if (pRsp->contLen == 0) { + mDebug("arb set-assigned-rsp contLen is 0"); + return 0; + } + int32_t ret = -1; SMnode *pMnode = pRsp->info.node; @@ -1022,8 +1137,8 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) { SVArbSetAssignedLeaderRsp setAssignedRsp = {0}; if (tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) { + mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code)); terrno = TSDB_CODE_INVALID_MSG; - mInfo("arb set assigned failed, des failed since:%s", tstrerror(pRsp->code)); return -1; } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index f254fb16a5..ad596289de 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -821,8 +821,7 @@ static int32_t mndCheckDbEncryptKey(SMnode *pMnode, SCreateDbReq *pReq) { #ifdef TD_ENTERPRISE if (pReq->encryptAlgorithm == TSDB_ENCRYPT_ALGO_NONE) goto _exit; - if (grantCheck(TSDB_GRANT_DB_ENCRYPTION) != 0) { - code = TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED; + if ((code = grantCheck(TSDB_GRANT_DB_ENCRYPTION)) != 0) { goto _exit; } if (tsEncryptionKeyStat != ENCRYPT_KEY_STAT_LOADED) { @@ -1226,7 +1225,7 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { _OVER: if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (terrno != 0) code = terrno; - mError("db:%s, failed to alter since %s", alterReq.db, terrstr()); + mError("db:%s, failed to alter since %s", alterReq.db, tstrerror(code)); } mndReleaseDb(pMnode, pDb); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index fb15e4b857..44fe81ac09 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1847,8 +1847,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) { - terrno = TSDB_CODE_GRANT_EXPIRED; + if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { return -1; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index a81a391c3d..9bd7b3b18f 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -225,7 +225,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SArray *pFailedTasks = NULL; SArray *pOrphanTasks = NULL; - if (grantCheckExpire(TSDB_GRANT_STREAMS) < 0) { + if ((terrno = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { if (suspendAllStreams(pMnode, &pReq->info) < 0) { return -1; } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 84940e01d4..ecc163985c 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -26,7 +26,7 @@ #define TRANS_VER1_NUMBER 1 #define TRANS_VER2_NUMBER 2 #define TRANS_ARRAY_SIZE 8 -#define TRANS_RESERVE_SIZE 48 +#define TRANS_RESERVE_SIZE 44 static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOld); @@ -196,10 +196,21 @@ SSdbRaw *mndTransEncode(STrans *pTrans) { } SDB_SET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER) + + int32_t arbGroupNum = taosHashGetSize(pTrans->arbGroupIds); + SDB_SET_INT32(pRaw, dataPos, arbGroupNum, _OVER) + void *pIter = NULL; + pIter = taosHashIterate(pTrans->arbGroupIds, NULL); + while (pIter) { + int32_t arbGroupId = *(int32_t *)pIter; + SDB_SET_INT32(pRaw, dataPos, arbGroupId, _OVER) + pIter = taosHashIterate(pTrans->arbGroupIds, pIter); + } + SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER) - terrno = 0; + terrno = 0; _OVER: if (terrno != 0) { @@ -279,6 +290,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) { int32_t undoActionNum = 0; int32_t commitActionNum = 0; int32_t dataPos = 0; + int32_t arbgroupIdNum = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER; @@ -350,6 +362,16 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) { } SDB_GET_BINARY(pRaw, dataPos, pTrans->opername, TSDB_TRANS_OPER_LEN, _OVER); + + pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); + + SDB_GET_INT32(pRaw, dataPos, &arbgroupIdNum, _OVER) + for (int32_t i = 0; i < arbgroupIdNum; ++i) { + int32_t arbGroupId = 0; + SDB_GET_INT32(pRaw, dataPos, &arbGroupId, _OVER) + taosHashPut(pTrans->arbGroupIds, &arbGroupId, sizeof(int32_t), NULL, 0); + } + SDB_GET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) terrno = 0; @@ -462,6 +484,9 @@ void mndTransDropData(STrans *pTrans) { mndTransDropActions(pTrans->commitActions); pTrans->commitActions = NULL; } + if (pTrans->arbGroupIds != NULL) { + taosHashCleanup(pTrans->arbGroupIds); + } if (pTrans->pRpcArray != NULL) { taosArrayDestroy(pTrans->pRpcArray); pTrans->pRpcArray = NULL; @@ -581,6 +606,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); + pTrans->arbGroupIds = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64(); taosInitRWLatch(&pTrans->lockRpcArray); @@ -733,7 +759,9 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) } } -void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId) { pTrans->arbGroupId = groupId; } +void mndTransAddArbGroupId(STrans *pTrans, int32_t groupId) { + taosHashPut(pTrans->arbGroupIds, &groupId, sizeof(int32_t), NULL, 0); +} void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } @@ -821,7 +849,16 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { if (pNew->conflict == TRN_CONFLICT_ARBGROUP) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) { - if (pNew->arbGroupId == pTrans->arbGroupId) conflict = true; + void *pIter = taosHashIterate(pNew->arbGroupIds, NULL); + while (pIter != NULL) { + int32_t groupId = *(int32_t *)pIter; + if (taosHashGet(pTrans->arbGroupIds, &groupId, sizeof(int32_t)) != NULL) { + taosHashCancelIterate(pNew->arbGroupIds, pIter); + conflict = true; + break; + } + pIter = taosHashIterate(pNew->arbGroupIds, pIter); + } } } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index faa90dcbf8..11e9350e14 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -370,12 +370,12 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGran if ((TEST_VIEW_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_VIEW)) { QW_ELOG("query failed cause of view grant expired, msgMask:%d", msg.msgMask); tFreeSSubQueryMsg(&msg); - QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); + QW_ERR_RET(TSDB_CODE_GRANT_VIEW_EXPIRED); } if ((TEST_AUDIT_MASK(msg.msgMask)) && !taosGranted(TSDB_GRANT_AUDIT)) { QW_ELOG("query failed cause of audit grant expired, msgMask:%d", msg.msgMask); tFreeSSubQueryMsg(&msg); - QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); + QW_ERR_RET(TSDB_CODE_GRANT_AUDIT_EXPIRED); } } } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 83de642e51..fb745f86cb 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -759,7 +759,7 @@ void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStat void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; } bool isDeteled(SStreamFileState* pFileState, TSKEY ts) { - return pFileState->deleteMark > 0 && ts < (pFileState->maxTs - pFileState->deleteMark); + return pFileState->deleteMark != INT64_MAX && pFileState->maxTs > 0 && ts < (pFileState->maxTs - pFileState->deleteMark); } bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); } diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index 5a3dc867e6..4635ec340d 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -2741,7 +2741,7 @@ int32_t tsDecompressTimestamp2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t tsCompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { DEFINE_VAR(cmprAlg) - if (lvl != 0 && lossyFloat) { + if (l2 == L2_TSZ && lvl != 0 && lossyFloat) { return tsCompressFloatLossyImp(pIn, nEle, pOut); } FUNC_COMPRESS_IMPL(pIn, nIn, nEle, pOut, nOut, cmprAlg, pBuf, nBuf, TSDB_DATA_TYPE_FLOAT, 1); @@ -2760,7 +2760,7 @@ int32_t tsDecompressFloat2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int int32_t tsCompressDouble2(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint32_t cmprAlg, void *pBuf, int32_t nBuf) { DEFINE_VAR(cmprAlg) - if (lvl != 0 && lossyDouble) { + if (l2 == L2_TSZ && lvl != 0 && lossyDouble) { // lossy mode return tsCompressDoubleLossyImp(pIn, nEle, pOut); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ad811cc891..0f594af0e9 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -259,7 +259,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_IN_CREATING, "Database in creating TAOS_DEFINE_ERROR(TSDB_CODE_MND_ENCRYPT_NOT_ALLOW_CHANGE, "Encryption is not allowed to be changed after database is created") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INCONSIST_ENCRYPT_KEY, "Inconsistent encryption key") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ENCRYPT_KEY, "The cluster has not been set properly for database encryption") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_DB_ENCRYPT_GRANT_EXPIRED, "The database encryption function grant expired") // mnode-node TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_ALREADY_EXIST, "Mnode already exists") @@ -499,9 +498,20 @@ TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LACK_OF_BASIC, "Lack of basic functio TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJ_NOT_EXIST, "Grant object not exist") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_LAST_ACTIVE_NOT_FOUND, "The historial active code does not match") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MACHINES_MISMATCH, "Cluster machines mismatch with active code") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expire time of optional grant item is too large") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OPT_EXPIRE_TOO_LARGE, "Expiration time of optional grant item is too large") TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUPLICATED_ACTIVE, "The active code can't be activated repeatedly") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of view has reached the licensed upper limit") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_LIMITED, "Number of views has reached the licensed upper limit") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_BASIC_EXPIRED, "License expired for basic functions") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STREAM_EXPIRED, "License expired for stream function") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_SUBSCRIPTION_EXPIRED, "License expired for subscription function") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_VIEW_EXPIRED, "License expired for view function") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_AUDIT_EXPIRED, "License expired for audit function") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CSV_EXPIRED, "License expired for CSV function") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_MULTI_STORAGE_EXPIRED, "License expired for multi-tier storage function") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_OBJECT_STROAGE_EXPIRED, "License expired for object storage function") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DUAL_REPLICA_HA_EXPIRED,"License expired for dual-replica HA function") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DB_ENCRYPTION_EXPIRED, "License expired for database encryption function") + // sync TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout") diff --git a/tests/parallel_test/run.sh b/tests/parallel_test/run.sh index ef9413a9d5..e58f890ccd 100755 --- a/tests/parallel_test/run.sh +++ b/tests/parallel_test/run.sh @@ -315,19 +315,22 @@ function run_thread() { fi if [ -n "$corefile" ]; then echo -e "\e[34m corefiles: $corefile \e[0m" - local build_dir=$log_dir/build_${hosts[index]} - local remote_build_dir="${workdirs[index]}/${DEBUGPATH}/build" - # if [ $ent -ne 0 ]; then - # remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build" - # fi - mkdir "$build_dir" 2>/dev/null - if [ $? -eq 0 ]; then - # scp build binary - cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/" - echo "$cmd" - $cmd >/dev/null - fi fi + # scp build binary and unit test log + local build_dir=$log_dir/build_${hosts[index]} + local remote_build_dir="${workdirs[index]}/${DEBUGPATH}/build" + local remote_unit_test_log_dir="${workdirs[index]}/${DEBUGPATH}/Testing/Temporary/" + + mkdir "$build_dir" 2>/dev/null + if [ $? -eq 0 ]; then + cmd="$scpcmd:${remote_build_dir}/* ${build_dir}/" + echo "$cmd" + $cmd >/dev/null + cmd="$scpcmd:${remote_unit_test_log_dir}/* ${build_dir}/" + echo "$cmd" + $cmd >/dev/null + fi + # get remote sim dir local remote_sim_dir="${workdirs[index]}/tmp/thread_volume/$thread_no" local tarcmd="sshpass -p ${passwords[index]} ssh -o StrictHostKeyChecking=no -r ${usernames[index]}@${hosts[index]}" diff --git a/tests/system-test/8-stream/stream_multi_agg.py b/tests/system-test/8-stream/stream_multi_agg.py index 3532825493..acb80f528b 100644 --- a/tests/system-test/8-stream/stream_multi_agg.py +++ b/tests/system-test/8-stream/stream_multi_agg.py @@ -39,7 +39,7 @@ class TDTestCase: os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &") time.sleep(10) - tdSql.query("use test") + tdSql.execute("use test", queryTimes=100) tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)") tdLog.debug("========create stream and insert data ok========") time.sleep(15) @@ -66,7 +66,7 @@ class TDTestCase: os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1") # create stream - tdSql.execute("use db") + tdSql.execute("use db", queryTimes=100) tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True) time.sleep(5) diff --git a/tests/system-test/pytest.sh b/tests/system-test/pytest.sh index 2837c817be..060717c20e 100755 --- a/tests/system-test/pytest.sh +++ b/tests/system-test/pytest.sh @@ -89,8 +89,8 @@ else export LD_PRELOAD="$(realpath "$(gcc -print-file-name=libasan.so)") $(realpath "$(gcc -print-file-name=libstdc++.so)")" echo "Preload AsanSo:" $? - $* -a 2>$AsanFile - + $* -a 2> $AsanFile + cat $AsanFile unset LD_PRELOAD for ((i = 1; i <= 20; i++)); do AsanFileLen=$(cat $AsanFile | wc -l)