Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/TD-30987-5
This commit is contained in:
commit
cbb1234c7b
|
@ -401,7 +401,7 @@ pipeline {
|
|||
}
|
||||
}
|
||||
stage('linux test') {
|
||||
agent{label " slave1_47 || slave1_48 || slave1_49 || slave1_50 || slave1_52 || slave1_59 || slave1_63 || worker03 || slave215 || slave217 || slave219 "}
|
||||
agent{label "slave1_47 || slave1_48 || slave1_49 || slave1_50 || slave1_52 || worker03 || slave215 || slave217 || slave219 "}
|
||||
options { skipDefaultCheckout() }
|
||||
when {
|
||||
changeRequest()
|
||||
|
|
|
@ -54,6 +54,7 @@ int32_t mndInitAcct(SMnode *pMnode) {
|
|||
void mndCleanupAcct(SMnode *pMnode) {}
|
||||
|
||||
static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
SAcctObj acctObj = {0};
|
||||
tstrncpy(acctObj.acct, TSDB_DEFAULT_USER, TSDB_USER_LEN);
|
||||
acctObj.createdTime = taosGetTimestampMs();
|
||||
|
@ -76,7 +77,10 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
|
|||
};
|
||||
|
||||
SSdbRaw *pRaw = mndAcctActionEncode(&acctObj);
|
||||
if (pRaw == NULL) return -1;
|
||||
if (pRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
|
||||
mInfo("acct:%s, will be created when deploying, raw:%p", acctObj.acct, pRaw);
|
||||
|
@ -84,21 +88,24 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
|
|||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "create-acct");
|
||||
if (pTrans == NULL) {
|
||||
sdbFreeRaw(pRaw);
|
||||
mError("acct:%s, failed to create since %s", acctObj.acct, terrstr());
|
||||
return -1;
|
||||
code = terrno;
|
||||
mError("acct:%s, failed to create since %s", acctObj.acct, tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
mInfo("trans:%d, used to create acct:%s", pTrans->id, acctObj.acct);
|
||||
|
||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
||||
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
|
||||
code = mndTransAppendCommitlog(pTrans, pRaw);
|
||||
if (code != 0) {
|
||||
mError("trans:%d, failed to commit redo log since %s", pTrans->id, tstrerror(code));
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -216,31 +223,36 @@ static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) {
|
|||
}
|
||||
|
||||
static int32_t mndProcessCreateAcctReq(SRpcMsg *pReq) {
|
||||
if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_CREATE_ACCT) != 0) {
|
||||
return -1;
|
||||
int32_t code = 0;
|
||||
code = mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_CREATE_ACCT);
|
||||
if (code != 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", terrstr());
|
||||
return -1;
|
||||
code = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndProcessAlterAcctReq(SRpcMsg *pReq) {
|
||||
if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_ALTER_ACCT) != 0) {
|
||||
return -1;
|
||||
int32_t code = 0;
|
||||
code = mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_ALTER_ACCT);
|
||||
if (code != 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", terrstr());
|
||||
return -1;
|
||||
code = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndProcessDropAcctReq(SRpcMsg *pReq) {
|
||||
if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_DROP_ACCT) != 0) {
|
||||
return -1;
|
||||
int32_t code = 0;
|
||||
if ((code = mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_DROP_ACCT)) != 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", terrstr());
|
||||
return -1;
|
||||
code = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
|
@ -57,6 +57,7 @@ static int32_t mndArbCheckToken(const char *token1, const char *token2) {
|
|||
}
|
||||
|
||||
int32_t mndInitArbGroup(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
SSdbTable table = {
|
||||
.sdbType = SDB_ARBGROUP,
|
||||
.keyType = SDB_KEY_INT32,
|
||||
|
@ -78,6 +79,10 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
|
|||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
|
||||
|
||||
arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||
if (arbUpdateHash == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
@ -248,50 +253,74 @@ _OVER:
|
|||
}
|
||||
|
||||
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||
if (pRedoRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
|
||||
if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING)) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pUndoRaw == NULL) return -1;
|
||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
if (pUndoRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if ((code = mndTransAppendUndolog(pTrans, pUndoRaw)) != 0) TAOS_RETURN(code);
|
||||
if ((code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
if (pCommitRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0)) TAOS_RETURN(code);
|
||||
if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||
if (pRedoRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if ((code = mndTransAppendPrepareLog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
|
||||
if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||
if (pRedoRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if ((code = mndTransAppendRedolog(pTrans, pRedoRaw)) != 0) TAOS_RETURN(code);
|
||||
if ((code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING)) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
if (pCommitRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) TAOS_RETURN(code);
|
||||
if ((code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED)) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -343,6 +372,7 @@ static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t
|
|||
}
|
||||
|
||||
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SArbGroup *pArbGroup = NULL;
|
||||
|
@ -377,7 +407,7 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
|
||||
mError("failed to get arb token for arb-hb timer");
|
||||
pIter = taosHashIterate(pDnodeHash, NULL);
|
||||
while (pIter) {
|
||||
|
@ -386,7 +416,7 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
|||
pIter = taosHashIterate(pDnodeHash, pIter);
|
||||
}
|
||||
taosHashCleanup(pDnodeHash);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int64_t nowMs = taosGetTimestampMs();
|
||||
|
@ -447,6 +477,7 @@ static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbT
|
|||
|
||||
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
|
||||
char *member1Token) {
|
||||
int32_t code = 0;
|
||||
int32_t contLen = 0;
|
||||
void *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
|
||||
if (!pHead) {
|
||||
|
@ -459,10 +490,12 @@ static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbTok
|
|||
if (epSet.numOfEps == 0) {
|
||||
mError("vgId:%d, failed to send check-sync request since no epSet found", vgId);
|
||||
rpcFreeCont(pHead);
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
if (code != 0) {
|
||||
mError("vgId:%d, failed to send check-sync request since 0x%x", vgId, code);
|
||||
} else {
|
||||
|
@ -502,11 +535,14 @@ static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, ch
|
|||
|
||||
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
|
||||
int64_t term, char *memberToken) {
|
||||
int32_t code = 0;
|
||||
int32_t contLen = 0;
|
||||
void *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken);
|
||||
if (!pHead) {
|
||||
mError("vgId:%d, failed to build set-assigned request", vgId);
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
|
||||
|
||||
|
@ -514,9 +550,11 @@ static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, i
|
|||
if (epSet.numOfEps == 0) {
|
||||
mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since no epSet found", dnodeId, vgId);
|
||||
rpcFreeCont(pHead);
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
if (code != 0) {
|
||||
mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since 0x%x", dnodeId, vgId, code);
|
||||
} else {
|
||||
|
@ -526,6 +564,7 @@ static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, i
|
|||
}
|
||||
|
||||
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SArbGroup *pArbGroup = NULL;
|
||||
|
@ -533,14 +572,16 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
|||
void *pIter = NULL;
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
|
||||
mError("failed to get arb token for arb-check-sync timer");
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
int64_t term = mndGetTerm(pMnode);
|
||||
if (term < 0) {
|
||||
mError("arb failed to get term since %s", terrstr());
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
|
||||
|
@ -747,13 +788,13 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
||||
int ret = -1;
|
||||
int code = -1;
|
||||
size_t sz = 0;
|
||||
|
||||
SMArbUpdateGroupBatchReq req = {0};
|
||||
if (tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req) != 0) {
|
||||
if ((code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req)) != 0) {
|
||||
mError("arb failed to decode arb-update-group request");
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
|
@ -789,7 +830,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
|||
|
||||
mndTransAddArbGroupId(pTrans, newGroup.vgId);
|
||||
|
||||
if (mndSetCreateArbGroupCommitLogs(pTrans, &newGroup) != 0) {
|
||||
if ((code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup)) != 0) {
|
||||
mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
|
||||
terrstr());
|
||||
goto _OVER;
|
||||
|
@ -803,13 +844,13 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
|||
sdbRelease(pMnode->pSdb, pOldGroup);
|
||||
}
|
||||
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER;
|
||||
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
|
||||
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
if (ret != 0) {
|
||||
if (code != 0) {
|
||||
// failed to update arbgroup
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
|
||||
|
@ -819,7 +860,7 @@ _OVER:
|
|||
|
||||
mndTransDrop(pTrans);
|
||||
tFreeSMArbUpdateGroupBatchReq(&req);
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
|
||||
|
@ -841,10 +882,11 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
|
|||
}
|
||||
|
||||
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
||||
int32_t ret = -1;
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
|
||||
if (pTrans == NULL) {
|
||||
mError("failed to update arbgroup in create trans, vgId:%d, since %s", pNew->vgId, terrstr());
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -854,23 +896,22 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
|||
pNew->assignedLeader.token, pNew->assignedLeader.acked);
|
||||
|
||||
mndTransAddArbGroupId(pTrans, pNew->vgId);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
ret = -1;
|
||||
if ((code = mndTransCheckConflict(pMnode, pTrans)) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateArbGroupCommitLogs(pTrans, pNew) != 0) {
|
||||
mError("failed to update arbgroup in set commit log, vgId:%d, since %s", pNew->vgId, terrstr());
|
||||
if ((code = mndSetCreateArbGroupCommitLogs(pTrans, pNew)) != 0) {
|
||||
mError("failed to update arbgroup in set commit log, vgId:%d, since %s", pNew->vgId, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
|
||||
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||
|
@ -993,11 +1034,13 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync) {
|
||||
int32_t code = 0;
|
||||
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
|
||||
if (pGroup == NULL) {
|
||||
terrno = TSDB_CODE_NOT_FOUND;
|
||||
mInfo("failed to update arb sync, vgId:%d not found", vgId);
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SArbGroup newGroup = {0};
|
||||
|
@ -1018,38 +1061,36 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t ret = -1;
|
||||
int32_t code = -1;
|
||||
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
|
||||
mError("failed to get arb token for arb-hb response");
|
||||
terrno = TSDB_CODE_NOT_FOUND;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SVArbHeartBeatRsp arbHbRsp = {0};
|
||||
if (tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) {
|
||||
if ((code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp)) != 0) {
|
||||
mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
|
||||
mInfo("arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]", arbHbRsp.dnodeId,
|
||||
arbToken, arbHbRsp.arbToken);
|
||||
terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
|
||||
code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
(void)mndUpdateArbHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers);
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
tFreeSVArbHeartBeatRsp(&arbHbRsp);
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
||||
|
@ -1058,27 +1099,25 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t ret = -1;
|
||||
int32_t code = -1;
|
||||
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
|
||||
mError("failed to get arb token for arb-check-sync response");
|
||||
terrno = TSDB_CODE_NOT_FOUND;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SVArbCheckSyncRsp syncRsp = {0};
|
||||
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
|
||||
if ((code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp)) != 0) {
|
||||
mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
|
||||
if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
return 0;
|
||||
}
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
|
||||
|
@ -1089,16 +1128,16 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
|||
}
|
||||
|
||||
bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
|
||||
if (mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync) != 0) {
|
||||
if ((code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync)) != 0) {
|
||||
mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
tFreeSVArbCheckSyncRsp(&syncRsp);
|
||||
return ret;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
|
||||
|
@ -1138,35 +1177,35 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t ret = -1;
|
||||
int32_t code = -1;
|
||||
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if ((code = mndGetArbToken(pMnode, arbToken)) != 0) {
|
||||
mError("failed to get arb token for arb-set-assigned response");
|
||||
terrno = TSDB_CODE_NOT_FOUND;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
|
||||
if (tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) {
|
||||
if ((code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp)) != 0) {
|
||||
mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
|
||||
mInfo("skip update arb assigned for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
|
||||
arbToken, setAssignedRsp.arbToken);
|
||||
terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
|
||||
code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
|
||||
if (!pGroup) {
|
||||
mError("failed to set arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1174,17 +1213,17 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
|||
bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
|
||||
pRsp->code, &newGroup);
|
||||
if (updateAssigned) {
|
||||
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
||||
mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
|
||||
if ((code = mndPullupArbUpdateGroup(pMnode, &newGroup)) != 0) {
|
||||
mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "audit.h"
|
||||
#include "mndCluster.h"
|
||||
#include "audit.h"
|
||||
#include "mndGrant.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
|
@ -61,11 +61,13 @@ int32_t mndInitCluster(SMnode *pMnode) {
|
|||
void mndCleanupCluster(SMnode *pMnode) {}
|
||||
|
||||
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
SClusterObj *pCluster = sdbAcquire(pSdb, SDB_CLUSTER, &pMnode->clusterId);
|
||||
if (pCluster == NULL) {
|
||||
return -1;
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
tstrncpy(clusterName, pCluster->name, len);
|
||||
|
@ -247,7 +249,10 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
|||
mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
|
||||
|
||||
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
|
||||
if (pRaw == NULL) return -1;
|
||||
if (pRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
|
||||
mInfo("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
|
||||
|
@ -256,21 +261,22 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
|||
if (pTrans == NULL) {
|
||||
sdbFreeRaw(pRaw);
|
||||
mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr());
|
||||
return -1;
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
mInfo("trans:%d, used to create cluster:%" PRId64, pTrans->id, clusterObj.id);
|
||||
|
||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
||||
if ((code = mndTransAppendCommitlog(pTrans, pRaw)) != 0) {
|
||||
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -347,22 +353,33 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
mInfo("update cluster uptime to %d", clusterObj.upTime);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime");
|
||||
if (pTrans == NULL) return -1;
|
||||
if (pTrans == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
if (pCommitRaw == NULL) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -374,13 +391,12 @@ int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SMCfgClusterReq cfgReq = {0};
|
||||
if (tDeserializeSMCfgClusterReq(pReq->pCont, pReq->contLen, &cfgReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mInfo("cluster: start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
|
||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_CLUSTER) != 0) {
|
||||
code = terrno != 0 ? terrno : TSDB_CODE_MND_NO_RIGHTS;
|
||||
if ((code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_CLUSTER)) != 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -410,17 +426,17 @@ int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
{ // audit
|
||||
auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql, TMIN(cfgReq.sqlLen, GRANT_ACTIVE_HEAD_LEN << 1));
|
||||
auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql,
|
||||
TMIN(cfgReq.sqlLen, GRANT_ACTIVE_HEAD_LEN << 1));
|
||||
}
|
||||
_exit:
|
||||
tFreeSMCfgClusterReq(&cfgReq);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
mError("cluster: failed to config:%s %s since %s", cfgReq.config, cfgReq.value, terrstr());
|
||||
} else {
|
||||
mInfo("cluster: success to config:%s %s", cfgReq.config, cfgReq.value);
|
||||
}
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndProcessConfigClusterRsp(SRpcMsg *pRsp) {
|
||||
|
|
|
@ -387,11 +387,11 @@ TEST_F(MndTestSdb, 00_API) {
|
|||
void *pRow2 = sdbGetRowObj(NULL);
|
||||
ASSERT_EQ(pRow2 == NULL, 1);
|
||||
|
||||
//sdbRaw.c
|
||||
// sdbRaw.c
|
||||
SStrObj strObj;
|
||||
SSdbRaw *pRaw1 = NULL;
|
||||
strSetDefault(&strObj, 1);
|
||||
|
||||
|
||||
pRaw1 = strEncode(&strObj);
|
||||
int32_t id = sdbGetIdFromRaw(pSdb, pRaw1);
|
||||
ASSERT_EQ(id, -2);
|
||||
|
@ -399,32 +399,32 @@ TEST_F(MndTestSdb, 00_API) {
|
|||
SSdbRaw *pRaw2 = sdbAllocRaw(SDB_USER, 1, -128);
|
||||
ASSERT_EQ(pRaw2 == NULL, 1);
|
||||
|
||||
ASSERT_EQ(sdbSetRawInt8(NULL, 0, 0), -1);
|
||||
ASSERT_EQ(sdbSetRawInt8(pRaw1, -128, 0), -1);
|
||||
ASSERT_EQ(sdbSetRawInt32(NULL, 0, 0), -1);
|
||||
ASSERT_EQ(sdbSetRawInt32(pRaw1, -128, 0), -1);
|
||||
ASSERT_EQ(sdbSetRawInt16(NULL, 0, 0), -1);
|
||||
ASSERT_EQ(sdbSetRawInt16(pRaw1, -128, 0), -1);
|
||||
ASSERT_EQ(sdbSetRawInt64(NULL, 0, 0), -1);
|
||||
ASSERT_EQ(sdbSetRawInt64(pRaw1, -128, 0), -1);
|
||||
ASSERT_EQ(sdbSetRawBinary(NULL, 0, "12", 3), -1);
|
||||
ASSERT_EQ(sdbSetRawBinary(pRaw1, 9028, "12", 3), -1);
|
||||
ASSERT_EQ(sdbSetRawDataLen(NULL, 0), -1);
|
||||
ASSERT_EQ(sdbSetRawDataLen(pRaw1, 9000), -1);
|
||||
ASSERT_EQ(sdbSetRawStatus(NULL, SDB_STATUS_READY), -1);
|
||||
ASSERT_EQ(sdbSetRawStatus(pRaw1, SDB_STATUS_INIT), -1);
|
||||
ASSERT_EQ(sdbSetRawInt8(NULL, 0, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbSetRawInt8(pRaw1, -128, 0), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbSetRawInt32(NULL, 0, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbSetRawInt32(pRaw1, -128, 0), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbSetRawInt16(NULL, 0, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbSetRawInt16(pRaw1, -128, 0), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbSetRawInt64(NULL, 0, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbSetRawInt64(pRaw1, -128, 0), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbSetRawBinary(NULL, 0, "12", 3), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbSetRawBinary(pRaw1, 9028, "12", 3), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbSetRawDataLen(NULL, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbSetRawDataLen(pRaw1, 9000), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbSetRawStatus(NULL, SDB_STATUS_READY), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbSetRawStatus(pRaw1, SDB_STATUS_INIT), TSDB_CODE_INVALID_PARA);
|
||||
|
||||
ASSERT_EQ(sdbGetRawInt8(NULL, 0, 0), -1);
|
||||
ASSERT_EQ(sdbGetRawInt8(pRaw1, 9000, 0), -1);
|
||||
ASSERT_EQ(sdbGetRawInt32(NULL, 0, 0), -1);
|
||||
ASSERT_EQ(sdbGetRawInt32(pRaw1, 9000, 0), -1);
|
||||
ASSERT_EQ(sdbGetRawInt16(NULL, 0, 0), -1);
|
||||
ASSERT_EQ(sdbGetRawInt16(pRaw1, 9000, 0), -1);
|
||||
ASSERT_EQ(sdbGetRawInt64(NULL, 0, 0), -1);
|
||||
ASSERT_EQ(sdbGetRawInt64(pRaw1, 9000, 0), -1);
|
||||
ASSERT_EQ(sdbGetRawBinary(NULL, 0, 0, 4096), -1);
|
||||
ASSERT_EQ(sdbGetRawBinary(pRaw1, 9000, 0, 112), -1);
|
||||
ASSERT_EQ(sdbGetRawSoftVer(NULL, 0), -1);
|
||||
ASSERT_EQ(sdbGetRawInt8(NULL, 0, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbGetRawInt8(pRaw1, 9000, 0), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbGetRawInt32(NULL, 0, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbGetRawInt32(pRaw1, 9000, 0), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbGetRawInt16(NULL, 0, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbGetRawInt16(pRaw1, 9000, 0), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbGetRawInt64(NULL, 0, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbGetRawInt64(pRaw1, 9000, 0), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbGetRawBinary(NULL, 0, 0, 4096), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbGetRawBinary(pRaw1, 9000, 0, 112), TSDB_CODE_SDB_INVALID_DATA_LEN);
|
||||
ASSERT_EQ(sdbGetRawSoftVer(NULL, 0), TSDB_CODE_INVALID_PTR);
|
||||
ASSERT_EQ(sdbGetRawTotalSize(NULL), -1);
|
||||
|
||||
// sdbHash.c
|
||||
|
|
|
@ -112,6 +112,8 @@ void sdbCleanup(SSdb *pSdb) {
|
|||
}
|
||||
|
||||
int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
||||
int32_t code = 0;
|
||||
|
||||
ESdbType sdbType = table.sdbType;
|
||||
EKeyType keyType = table.keyType;
|
||||
pSdb->keyTypes[sdbType] = table.keyType;
|
||||
|
@ -134,8 +136,8 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
|||
|
||||
SHashObj *hash = taosHashInit(64, taosGetDefaultHashFunction(hashType), true, HASH_ENTRY_LOCK);
|
||||
if (hash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
pSdb->maxId[sdbType] = 0;
|
||||
|
@ -146,16 +148,17 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
|||
}
|
||||
|
||||
static int32_t sdbCreateDir(SSdb *pSdb) {
|
||||
int32_t code = 0;
|
||||
if (taosMulMkDir(pSdb->currDir) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to create dir:%s since %s", pSdb->currDir, terrstr());
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to create dir:%s since %s", pSdb->currDir, tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (taosMkDir(pSdb->tmpDir) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr());
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to create dir:%s since %s", pSdb->tmpDir, tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -14,18 +14,19 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "crypt.h"
|
||||
#include "sdb.h"
|
||||
#include "sync.h"
|
||||
#include "tchecksum.h"
|
||||
#include "wal.h"
|
||||
#include "tglobal.h"
|
||||
#include "crypt.h"
|
||||
#include "wal.h"
|
||||
|
||||
#define SDB_TABLE_SIZE 24
|
||||
#define SDB_RESERVE_SIZE 512
|
||||
#define SDB_FILE_VER 1
|
||||
|
||||
static int32_t sdbDeployData(SSdb *pSdb) {
|
||||
int32_t code = 0;
|
||||
mInfo("start to deploy sdb");
|
||||
|
||||
for (int32_t i = SDB_MAX - 1; i >= 0; --i) {
|
||||
|
@ -33,8 +34,9 @@ static int32_t sdbDeployData(SSdb *pSdb) {
|
|||
if (fp == NULL) continue;
|
||||
|
||||
mInfo("start to deploy sdb:%s", sdbTableName(i));
|
||||
if ((*fp)(pSdb->pMnode) != 0) {
|
||||
mError("failed to deploy sdb:%s since %s", sdbTableName(i), terrstr());
|
||||
code = (*fp)(pSdb->pMnode);
|
||||
if (code != 0) {
|
||||
mError("failed to deploy sdb:%s since %s", sdbTableName(i), tstrerror(code));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -80,61 +82,62 @@ static void sdbResetData(SSdb *pSdb) {
|
|||
}
|
||||
|
||||
static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
||||
int32_t code = 0;
|
||||
int64_t sver = 0;
|
||||
int32_t ret = taosReadFile(pFile, &sver, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (ret != sizeof(int64_t)) {
|
||||
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||
return -1;
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (sver != SDB_FILE_VER) {
|
||||
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||
return -1;
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (ret != sizeof(int64_t)) {
|
||||
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||
return -1;
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (ret != sizeof(int64_t)) {
|
||||
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||
return -1;
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (ret != sizeof(int64_t)) {
|
||||
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||
return -1;
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||
int64_t maxId = 0;
|
||||
ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (ret != sizeof(int64_t)) {
|
||||
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||
return -1;
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (i < SDB_MAX) {
|
||||
pSdb->maxId[i] = maxId;
|
||||
|
@ -145,12 +148,12 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
int64_t ver = 0;
|
||||
ret = taosReadFile(pFile, &ver, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (ret != sizeof(int64_t)) {
|
||||
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||
return -1;
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (i < SDB_MAX) {
|
||||
pSdb->tableVer[i] = ver;
|
||||
|
@ -160,37 +163,38 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
char reserve[SDB_RESERVE_SIZE] = {0};
|
||||
ret = taosReadFile(pFile, reserve, sizeof(reserve));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (ret != sizeof(reserve)) {
|
||||
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||
return -1;
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
||||
int32_t code = 0;
|
||||
int64_t sver = SDB_FILE_VER;
|
||||
if (taosWriteFile(pFile, &sver, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||
|
@ -199,8 +203,8 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
maxId = pSdb->maxId[i];
|
||||
}
|
||||
if (taosWriteFile(pFile, &maxId, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -210,15 +214,15 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
ver = pSdb->tableVer[i];
|
||||
}
|
||||
if (taosWriteFile(pFile, &ver, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
}
|
||||
|
||||
char reserve[SDB_RESERVE_SIZE] = {0};
|
||||
if (taosWriteFile(pFile, reserve, sizeof(reserve)) != sizeof(reserve)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -237,21 +241,22 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
|||
|
||||
SSdbRaw *pRaw = taosMemoryMalloc(bufLen + 100);
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("failed read sdb file since %s", terrstr());
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("failed read sdb file since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
taosMemoryFree(pRaw);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mInfo("read sdb file:%s finished since %s", file, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mInfo("read sdb file:%s finished since %s", file, tstrerror(code));
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (sdbReadFileHead(pSdb, pFile) != 0) {
|
||||
mError("failed to read sdb file:%s head since %s", file, terrstr());
|
||||
code = sdbReadFileHead(pSdb, pFile);
|
||||
if (code != 0) {
|
||||
mError("failed to read sdb file:%s head since %s", file, tstrerror(code));
|
||||
taosMemoryFree(pRaw);
|
||||
taosCloseFile(&pFile);
|
||||
return -1;
|
||||
|
@ -278,14 +283,14 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
|||
}
|
||||
|
||||
readLen = pRaw->dataLen + sizeof(int32_t);
|
||||
if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB ){
|
||||
if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
|
||||
readLen = ENCRYPTED_LEN(pRaw->dataLen) + sizeof(int32_t);
|
||||
}
|
||||
if (readLen >= bufLen) {
|
||||
bufLen = pRaw->dataLen * 2;
|
||||
SSdbRaw *pNewRaw = taosMemoryMalloc(bufLen + 100);
|
||||
if (pNewRaw == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
mError("failed read sdb file since malloc new sdbRaw size:%d failed", bufLen);
|
||||
goto _OVER;
|
||||
}
|
||||
|
@ -308,10 +313,14 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB ){
|
||||
if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
|
||||
int32_t count = 0;
|
||||
|
||||
char *plantContent = taosMemoryMalloc(ENCRYPTED_LEN(pRaw->dataLen));
|
||||
if (plantContent == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SCryptOpts opts;
|
||||
opts.len = ENCRYPTED_LEN(pRaw->dataLen);
|
||||
|
@ -321,8 +330,8 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
|||
strncpy(opts.key, tsEncryptKey, ENCRYPT_KEY_LEN);
|
||||
|
||||
count = CBC_Decrypt(&opts);
|
||||
|
||||
//mDebug("read sdb, CBC_Decrypt dataLen:%d, descrypted len:%d, %s", pRaw->dataLen, count, __FUNCTION__);
|
||||
|
||||
// mDebug("read sdb, CBC_Decrypt dataLen:%d, descrypted len:%d, %s", pRaw->dataLen, count, __FUNCTION__);
|
||||
|
||||
memcpy(pRaw->pData, plantContent, pRaw->dataLen);
|
||||
taosMemoryFree(plantContent);
|
||||
|
@ -355,8 +364,7 @@ _OVER:
|
|||
taosCloseFile(&pFile);
|
||||
sdbFreeRaw(pRaw);
|
||||
|
||||
terrno = code;
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t sdbReadFile(SSdb *pSdb) {
|
||||
|
@ -365,7 +373,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
|||
sdbResetData(pSdb);
|
||||
int32_t code = sdbReadFileImp(pSdb);
|
||||
if (code != 0) {
|
||||
mError("failed to read sdb file since %s", terrstr());
|
||||
mError("failed to read sdb file since %s", tstrerror(code));
|
||||
sdbResetData(pSdb);
|
||||
}
|
||||
|
||||
|
@ -388,13 +396,14 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
|
||||
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr());
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to open sdb file:%s for write since %s", tmpfile, tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (sdbWriteFileHead(pSdb, pFile) != 0) {
|
||||
mError("failed to write sdb file:%s head since %s", tmpfile, terrstr());
|
||||
code = sdbWriteFileHead(pSdb, pFile);
|
||||
if (code != 0) {
|
||||
mError("failed to write sdb file:%s head since %s", tmpfile, tstrerror(code));
|
||||
taosCloseFile(&pFile);
|
||||
return -1;
|
||||
}
|
||||
|
@ -436,8 +445,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
}
|
||||
|
||||
int32_t newDataLen = pRaw->dataLen;
|
||||
char* newData = pRaw->pData;
|
||||
if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB ){
|
||||
char *newData = pRaw->pData;
|
||||
if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
|
||||
newDataLen = ENCRYPTED_LEN(pRaw->dataLen);
|
||||
newData = taosMemoryMalloc(newDataLen);
|
||||
if (newData == NULL) {
|
||||
|
@ -456,8 +465,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
|
||||
int32_t count = CBC_Encrypt(&opts);
|
||||
|
||||
//mDebug("write sdb, CBC_Encrypt encryptedDataLen:%d, dataLen:%d, %s",
|
||||
// newDataLen, pRaw->dataLen, __FUNCTION__);
|
||||
// mDebug("write sdb, CBC_Encrypt encryptedDataLen:%d, dataLen:%d, %s",
|
||||
// newDataLen, pRaw->dataLen, __FUNCTION__);
|
||||
}
|
||||
|
||||
if (taosWriteFile(pFile, newData, newDataLen) != newDataLen) {
|
||||
|
@ -467,7 +476,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
break;
|
||||
}
|
||||
|
||||
if(tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB ){
|
||||
if (tsiEncryptAlgorithm == DND_CA_SM4 && (tsiEncryptScope & DND_CS_SDB) == DND_CS_SDB) {
|
||||
taosMemoryFree(newData);
|
||||
}
|
||||
|
||||
|
@ -549,19 +558,22 @@ int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) {
|
|||
}
|
||||
}
|
||||
if (code != 0) {
|
||||
mError("failed to write sdb file since %s", terrstr());
|
||||
mError("failed to write sdb file since %s", tstrerror(code));
|
||||
}
|
||||
taosThreadMutexUnlock(&pSdb->filelock);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t sdbDeploy(SSdb *pSdb) {
|
||||
if (sdbDeployData(pSdb) != 0) {
|
||||
return -1;
|
||||
int32_t code = 0;
|
||||
code = sdbDeployData(pSdb);
|
||||
if (code != 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (sdbWriteFile(pSdb, 0) != 0) {
|
||||
return -1;
|
||||
code = sdbWriteFile(pSdb, 0);
|
||||
if (code != 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -605,6 +617,7 @@ static void sdbCloseIter(SSdbIter *pIter) {
|
|||
}
|
||||
|
||||
int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *term, int64_t *config) {
|
||||
int32_t code = 0;
|
||||
SSdbIter *pIter = sdbCreateIter(pSdb);
|
||||
if (pIter == NULL) return -1;
|
||||
|
||||
|
@ -617,19 +630,19 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter
|
|||
int64_t commitConfig = pSdb->commitConfig;
|
||||
if (taosCopyFile(datafile, pIter->name) < 0) {
|
||||
taosThreadMutexUnlock(&pSdb->filelock);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, tstrerror(code));
|
||||
sdbCloseIter(pIter);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
taosThreadMutexUnlock(&pSdb->filelock);
|
||||
|
||||
pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
|
||||
if (pIter->file == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to open sdb file:%s since %s", pIter->name, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to open sdb file:%s since %s", pIter->name, tstrerror(code));
|
||||
sdbCloseIter(pIter);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*ppIter = pIter;
|
||||
|
@ -645,21 +658,22 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter, int64_t *index, int64_t *ter
|
|||
void sdbStopRead(SSdb *pSdb, SSdbIter *pIter) { sdbCloseIter(pIter); }
|
||||
|
||||
int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
|
||||
int32_t code = 0;
|
||||
int32_t maxlen = 4096;
|
||||
void *pBuf = taosMemoryCalloc(1, maxlen);
|
||||
if (pBuf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t readlen = taosReadFile(pIter->file, pBuf, maxlen);
|
||||
if (readlen < 0 || readlen > maxlen) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, terrstr(), pIter->total);
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("sdbiter:%p, failed to read snapshot since %s, total:%" PRId64, pIter, tstrerror(code), pIter->total);
|
||||
*ppBuf = NULL;
|
||||
*len = 0;
|
||||
taosMemoryFree(pBuf);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
} else if (readlen == 0) {
|
||||
mInfo("sdbiter:%p, read snapshot to the end, total:%" PRId64, pIter, pIter->total);
|
||||
*ppBuf = NULL;
|
||||
|
@ -676,15 +690,19 @@ int32_t sdbDoRead(SSdb *pSdb, SSdbIter *pIter, void **ppBuf, int32_t *len) {
|
|||
}
|
||||
|
||||
int32_t sdbStartWrite(SSdb *pSdb, SSdbIter **ppIter) {
|
||||
int32_t code = 0;
|
||||
SSdbIter *pIter = sdbCreateIter(pSdb);
|
||||
if (pIter == NULL) return -1;
|
||||
if (pIter == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
pIter->file = taosOpenFile(pIter->name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pIter->file == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to open %s since %s", pIter->name, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to open %s since %s", pIter->name, tstrerror(code));
|
||||
sdbCloseIter(pIter);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*ppIter = pIter;
|
||||
|
@ -702,8 +720,8 @@ int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, i
|
|||
}
|
||||
|
||||
if (taosFsyncFile(pIter->file) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("sdbiter:%p, failed to fasync file %s since %s", pIter, pIter->name, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("sdbiter:%p, failed to fasync file %s since %s", pIter, pIter->name, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -713,13 +731,14 @@ int32_t sdbStopWrite(SSdb *pSdb, SSdbIter *pIter, bool isApply, int64_t index, i
|
|||
char datafile[PATH_MAX] = {0};
|
||||
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||
if (taosRenameFile(pIter->name, datafile) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, terrstr());
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("sdbiter:%p, failed to rename file %s to %s since %s", pIter, pIter->name, datafile, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (sdbReadFile(pSdb) != 0) {
|
||||
mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, terrstr());
|
||||
code = sdbReadFile(pSdb);
|
||||
if (code != 0) {
|
||||
mError("sdbiter:%p, failed to read from %s since %s", pIter, datafile, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -742,11 +761,12 @@ _OVER:
|
|||
}
|
||||
|
||||
int32_t sdbDoWrite(SSdb *pSdb, SSdbIter *pIter, void *pBuf, int32_t len) {
|
||||
int32_t code = 0;
|
||||
int32_t writelen = taosWriteFile(pIter->file, pBuf, len);
|
||||
if (writelen != len) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to write len:%d since %s, total:%" PRId64, len, terrstr(), pIter->total);
|
||||
return -1;
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to write len:%d since %s, total:%" PRId64, len, tstrerror(code), pIter->total);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
pIter->total += writelen;
|
||||
|
|
|
@ -53,14 +53,15 @@ void sdbFreeRaw(SSdbRaw *pRaw) {
|
|||
}
|
||||
|
||||
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(int8_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*(int8_t *)(pRaw->pData + dataPos) = val;
|
||||
|
@ -68,14 +69,15 @@ int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val) {
|
|||
}
|
||||
|
||||
int32_t sdbSetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(uint8_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*(uint8_t *)(pRaw->pData + dataPos) = val;
|
||||
|
@ -83,14 +85,15 @@ int32_t sdbSetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t val) {
|
|||
}
|
||||
|
||||
int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(int32_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*(int32_t *)(pRaw->pData + dataPos) = val;
|
||||
|
@ -98,14 +101,15 @@ int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) {
|
|||
}
|
||||
|
||||
int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(int16_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*(int16_t *)(pRaw->pData + dataPos) = val;
|
||||
|
@ -113,14 +117,15 @@ int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val) {
|
|||
}
|
||||
|
||||
int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(int64_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*(int64_t *)(pRaw->pData + dataPos) = val;
|
||||
|
@ -128,14 +133,15 @@ int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val) {
|
|||
}
|
||||
|
||||
int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + valLen > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (pVal != NULL) {
|
||||
|
@ -145,14 +151,15 @@ int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_
|
|||
}
|
||||
|
||||
int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataLen > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
pRaw->dataLen = dataLen;
|
||||
|
@ -160,14 +167,15 @@ int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen) {
|
|||
}
|
||||
|
||||
int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (status == SDB_STATUS_INIT) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
pRaw->status = status;
|
||||
|
@ -175,14 +183,15 @@ int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status) {
|
|||
}
|
||||
|
||||
int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(int8_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*val = *(int8_t *)(pRaw->pData + dataPos);
|
||||
|
@ -190,14 +199,15 @@ int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val) {
|
|||
}
|
||||
|
||||
int32_t sdbGetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t *val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(uint8_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*val = *(uint8_t *)(pRaw->pData + dataPos);
|
||||
|
@ -205,14 +215,15 @@ int32_t sdbGetRawUInt8(SSdbRaw *pRaw, int32_t dataPos, uint8_t *val) {
|
|||
}
|
||||
|
||||
int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(int32_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*val = *(int32_t *)(pRaw->pData + dataPos);
|
||||
|
@ -220,14 +231,15 @@ int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) {
|
|||
}
|
||||
|
||||
int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(int16_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*val = *(int16_t *)(pRaw->pData + dataPos);
|
||||
|
@ -235,14 +247,15 @@ int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val) {
|
|||
}
|
||||
|
||||
int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + sizeof(int64_t) > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*val = *(int64_t *)(pRaw->pData + dataPos);
|
||||
|
@ -250,14 +263,15 @@ int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val) {
|
|||
}
|
||||
|
||||
int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (dataPos + valLen > pRaw->dataLen) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
return -1;
|
||||
code = TSDB_CODE_SDB_INVALID_DATA_LEN;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (pVal != NULL) {
|
||||
memcpy(pVal, pRaw->pData + dataPos, valLen);
|
||||
|
@ -266,9 +280,10 @@ int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valL
|
|||
}
|
||||
|
||||
int32_t sdbGetRawSoftVer(SSdbRaw *pRaw, int8_t *sver) {
|
||||
int32_t code = 0;
|
||||
if (pRaw == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_PTR;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
*sver = pRaw->sver;
|
||||
|
|
|
@ -1307,186 +1307,6 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
|
||||
static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols,
|
||||
int nCols, int16_t *slotIds);
|
||||
#ifdef BUILD_NO_CALL
|
||||
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
|
||||
char *err = NULL;
|
||||
size_t vlen = 0;
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
char *value = NULL;
|
||||
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(value);
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetSlow(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int8_t ltype) {
|
||||
rocksdb_writebatch_t *wb = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
SArray *pCidList = pr->pCidList;
|
||||
int num_keys = TARRAY_SIZE(pCidList);
|
||||
|
||||
char **keys_list = taosMemoryMalloc(num_keys * sizeof(char *));
|
||||
size_t *keys_list_sizes = taosMemoryMalloc(num_keys * sizeof(size_t));
|
||||
char *key_list = taosMemoryMalloc(num_keys * ROCKS_KEY_LEN);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
||||
|
||||
memcpy(key_list + i * ROCKS_KEY_LEN, &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid}, ROCKS_KEY_LEN);
|
||||
keys_list[i] = key_list + i * ROCKS_KEY_LEN;
|
||||
keys_list_sizes[i] = ROCKS_KEY_LEN;
|
||||
}
|
||||
|
||||
char **values_list = taosMemoryCalloc(num_keys, sizeof(char *));
|
||||
size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
|
||||
char **errs = taosMemoryMalloc(num_keys * sizeof(char *));
|
||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list,
|
||||
keys_list_sizes, values_list, values_list_sizes, errs);
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
if (errs[i]) {
|
||||
rocksdb_free(errs[i]);
|
||||
}
|
||||
}
|
||||
taosMemoryFree(key_list);
|
||||
taosMemoryFree(keys_list);
|
||||
taosMemoryFree(keys_list_sizes);
|
||||
taosMemoryFree(errs);
|
||||
|
||||
for (int i = 0; i < num_keys; ++i) {
|
||||
bool freeCol = true;
|
||||
SArray *pTmpColArray = NULL;
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
|
||||
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
|
||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)};
|
||||
if (pLastCol) {
|
||||
reallocVarData(&pLastCol->colVal);
|
||||
} else {
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
|
||||
pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype);
|
||||
if (!pLastCol) {
|
||||
// recalc: load from tsdb
|
||||
int16_t aCols[1] = {cid};
|
||||
int16_t slotIds[1] = {pr->pSlotIds[i]};
|
||||
pTmpColArray = NULL;
|
||||
|
||||
if (ltype) {
|
||||
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
} else {
|
||||
mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
}
|
||||
|
||||
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) {
|
||||
pLastCol = taosArrayGet(pTmpColArray, 0);
|
||||
freeCol = false;
|
||||
}
|
||||
|
||||
// still null, then make up a none col value
|
||||
if (!pLastCol) {
|
||||
pLastCol = &noneCol;
|
||||
freeCol = false;
|
||||
}
|
||||
|
||||
// store result back to rocks cache
|
||||
wb = pTsdb->rCache.rwritebatch;
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
|
||||
|
||||
taosMemoryFree(value);
|
||||
} else {
|
||||
reallocVarData(&pLastCol->colVal);
|
||||
}
|
||||
|
||||
if (wb) {
|
||||
rocksMayWrite(pTsdb, false, true, false);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
}
|
||||
|
||||
taosArrayPush(pLastArray, pLastCol);
|
||||
taosArrayDestroy(pTmpColArray);
|
||||
if (freeCol) {
|
||||
taosMemoryFree(pLastCol);
|
||||
}
|
||||
}
|
||||
taosMemoryFree(values_list);
|
||||
taosMemoryFree(values_list_sizes);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheLoadCol(STsdb *pTsdb, SCacheRowsReader *pr, int16_t slotid, tb_uid_t uid, int16_t cid,
|
||||
int8_t ltype) {
|
||||
SLastCol *pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype);
|
||||
if (!pLastCol) {
|
||||
rocksdb_writebatch_t *wb = NULL;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->rCache.rMutex);
|
||||
pLastCol = tsdbCacheLookup(pTsdb, uid, cid, ltype);
|
||||
if (!pLastCol) {
|
||||
// recalc: load from tsdb
|
||||
int16_t aCols[1] = {cid};
|
||||
int16_t slotIds[1] = {slotid};
|
||||
SArray *pTmpColArray = NULL;
|
||||
|
||||
if (ltype) {
|
||||
mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
} else {
|
||||
mergeLastRowCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds);
|
||||
}
|
||||
|
||||
if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) {
|
||||
pLastCol = taosArrayGet(pTmpColArray, 0);
|
||||
}
|
||||
|
||||
// still null, then make up a none col value
|
||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[slotid].type)};
|
||||
if (!pLastCol) {
|
||||
pLastCol = &noneCol;
|
||||
}
|
||||
|
||||
// store result back to rocks cache
|
||||
wb = pTsdb->rCache.rwritebatch;
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = pLastCol->colVal.cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
|
||||
taosMemoryFree(value);
|
||||
|
||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
*pTmpLastCol = *pLastCol;
|
||||
pLastCol = pTmpLastCol;
|
||||
|
||||
taosArrayDestroy(pTmpColArray);
|
||||
}
|
||||
|
||||
if (wb) {
|
||||
rocksMayWrite(pTsdb, false, true, false);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->rCache.rMutex);
|
||||
}
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t tsdbCacheLoadFromRaw(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SArray *remainCols,
|
||||
SCacheRowsReader *pr, int8_t ltype) {
|
||||
|
@ -3453,8 +3273,9 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
taosArrayPush(aColArray, &aCols[i]);
|
||||
}
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
STsdbRowKey lastRowKey = {.key.ts = TSKEY_MAX};
|
||||
|
||||
// inverse iterator
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||
|
||||
|
@ -3478,10 +3299,11 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
}
|
||||
// int16_t nCol = pTSchema->numOfCols;
|
||||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
STsdbRowKey rowKey = {0};
|
||||
tsdbRowGetKey(pRow, &rowKey);
|
||||
|
||||
if (lastRowTs == TSKEY_MAX) {
|
||||
lastRowTs = rowTs;
|
||||
if (lastRowKey.key.ts == TSKEY_MAX) { // first time
|
||||
lastRowKey = rowKey;
|
||||
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
|
@ -3501,13 +3323,13 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
if (slotIds[iCol] == 0) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal});
|
||||
continue;
|
||||
}
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
|
||||
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
|
||||
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
|
||||
if (pColVal->value.nData > 0) {
|
||||
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||
|
@ -3554,10 +3376,11 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
continue;
|
||||
}
|
||||
SColVal *tColVal = &lastColVal->colVal;
|
||||
if (COL_VAL_IS_VALUE(tColVal)) continue;
|
||||
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) {
|
||||
SLastCol lastCol = {.rowKey.ts = rowTs, .colVal = *pColVal};
|
||||
if (COL_VAL_IS_VALUE(pColVal)) {
|
||||
SLastCol lastCol = {.rowKey = rowKey.key, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type) /* && pColVal->value.nData > 0 */) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol);
|
||||
taosMemoryFree(pLastCol->colVal.value.pData);
|
||||
|
@ -3580,7 +3403,7 @@ static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SC
|
|||
if (aColIndex >= 0) {
|
||||
taosArrayRemove(aColArray, aColIndex);
|
||||
}
|
||||
} else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
||||
} else if (!COL_VAL_IS_VALUE(pColVal) && !setNoneCol) {
|
||||
noneCol = iCol;
|
||||
setNoneCol = true;
|
||||
}
|
||||
|
@ -3637,8 +3460,7 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
taosArrayPush(aColArray, &aCols[i]);
|
||||
}
|
||||
|
||||
TSKEY lastRowTs = TSKEY_MAX;
|
||||
|
||||
// inverse iterator
|
||||
CacheNextRowIter iter = {0};
|
||||
nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->info.suid, pr->pLDataIterArray, pr->pReadSnap, pr->lastTs, pr);
|
||||
|
||||
|
@ -3662,9 +3484,8 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
}
|
||||
// int16_t nCol = pTSchema->numOfCols;
|
||||
|
||||
TSKEY rowTs = TSDBROW_TS(pRow);
|
||||
|
||||
lastRowTs = rowTs;
|
||||
STsdbRowKey rowKey = {0};
|
||||
tsdbRowGetKey(pRow, &rowKey);
|
||||
|
||||
for (int16_t iCol = noneCol; iCol < nCols; ++iCol) {
|
||||
if (iCol >= nLastCol) {
|
||||
|
@ -3680,13 +3501,13 @@ static int32_t mergeLastRowCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray,
|
|||
if (slotIds[iCol] == 0) {
|
||||
STColumn *pTColumn = &pTSchema->columns[0];
|
||||
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowTs}));
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal});
|
||||
*pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = rowKey.key.ts}));
|
||||
taosArraySet(pColArray, 0, &(SLastCol){.rowKey = rowKey.key, .colVal = *pColVal});
|
||||
continue;
|
||||
}
|
||||
tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal);
|
||||
|
||||
*pCol = (SLastCol){.rowKey.ts = rowTs, .colVal = *pColVal};
|
||||
*pCol = (SLastCol){.rowKey = rowKey.key, .colVal = *pColVal};
|
||||
if (IS_VAR_DATA_TYPE(pColVal->value.type) /*&& pColVal->value.nData > 0*/) {
|
||||
if (pColVal->value.nData > 0) {
|
||||
pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData);
|
||||
|
|
|
@ -38,15 +38,16 @@ class TDSimClient:
|
|||
"asyncLog": "0",
|
||||
"rpcDebugFlag": "135",
|
||||
"tmrDebugFlag": "131",
|
||||
"cDebugFlag": "135",
|
||||
"uDebugFlag": "135",
|
||||
"jniDebugFlag": "135",
|
||||
"dDebugFlag":"131",
|
||||
"cDebugFlag": "131",
|
||||
"uDebugFlag": "131",
|
||||
"jniDebugFlag": "131",
|
||||
"qDebugFlag": "135",
|
||||
"supportVnodes": "1024",
|
||||
"enableQueryHb": "1",
|
||||
"telemetryReporting": "0",
|
||||
"tqDebugflag": "135",
|
||||
"wDebugflag":"135",
|
||||
"stDebugflag":"135",
|
||||
}
|
||||
|
||||
def getLogDir(self):
|
||||
|
@ -134,9 +135,9 @@ class TDDnode:
|
|||
"dDebugFlag": "131",
|
||||
"vDebugFlag": "131",
|
||||
"tqDebugFlag": "135",
|
||||
"cDebugFlag": "135",
|
||||
"cDebugFlag": "131",
|
||||
"stDebugFlag": "135",
|
||||
"smaDebugFlag": "135",
|
||||
"smaDebugFlag": "131",
|
||||
"jniDebugFlag": "131",
|
||||
"qDebugFlag": "131",
|
||||
"rpcDebugFlag": "135",
|
||||
|
|
|
@ -307,8 +307,8 @@ class TDSql:
|
|||
return col_name_list, col_type_list
|
||||
return col_name_list
|
||||
|
||||
def waitedQuery(self, sql, expectRows, timeout):
|
||||
tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectRows, timeout))
|
||||
def waitedQuery(self, sql, expectedRows, timeout):
|
||||
tdLog.info("sql: %s, try to retrieve %d rows in %d seconds" % (sql, expectedRows, timeout))
|
||||
self.sql = sql
|
||||
try:
|
||||
for i in range(timeout):
|
||||
|
@ -316,8 +316,8 @@ class TDSql:
|
|||
self.queryResult = self.cursor.fetchall()
|
||||
self.queryRows = len(self.queryResult)
|
||||
self.queryCols = len(self.cursor.description)
|
||||
tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectRows, self.queryRows))
|
||||
if self.queryRows >= expectRows:
|
||||
tdLog.info("sql: %s, try to retrieve %d rows,get %d rows" % (sql, expectedRows, self.queryRows))
|
||||
if self.queryRows >= expectedRows:
|
||||
return (self.queryRows, i)
|
||||
time.sleep(1)
|
||||
except Exception as e:
|
||||
|
@ -330,15 +330,26 @@ class TDSql:
|
|||
def getRows(self):
|
||||
return self.queryRows
|
||||
|
||||
def checkRows(self, expectRows):
|
||||
if self.queryRows == expectRows:
|
||||
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectRows))
|
||||
def checkRows(self, expectedRows):
|
||||
if self.queryRows == expectedRows:
|
||||
tdLog.info("sql:%s, queryRows:%d == expect:%d" % (self.sql, self.queryRows, expectedRows))
|
||||
return True
|
||||
else:
|
||||
caller = inspect.getframeinfo(inspect.stack()[1][0])
|
||||
args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectRows)
|
||||
args = (caller.filename, caller.lineno, self.sql, self.queryRows, expectedRows)
|
||||
tdLog.exit("%s(%d) failed: sql:%s, queryRows:%d != expect:%d" % args)
|
||||
|
||||
def checkRows_not_exited(self, expectedRows):
|
||||
"""
|
||||
Check if the query rows is equal to the expected rows
|
||||
:param expectedRows: The expected number of rows.
|
||||
:return: Returns True if the actual number of rows matches the expected number, otherwise returns False.
|
||||
"""
|
||||
if self.queryRows == expectedRows:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def checkRows_range(self, excepte_row_list):
|
||||
if self.queryRows in excepte_row_list:
|
||||
tdLog.info(f"sql:{self.sql}, queryRows:{self.queryRows} in expect:{excepte_row_list}")
|
||||
|
@ -508,7 +519,7 @@ class TDSql:
|
|||
|
||||
|
||||
# return true or false replace exit, no print out
|
||||
def checkDataNoExit(self, row, col, data):
|
||||
def checkDataNotExit(self, row, col, data):
|
||||
if self.checkRowColNoExit(row, col) == False:
|
||||
return False
|
||||
if self.queryResult[row][col] != data:
|
||||
|
@ -542,7 +553,7 @@ class TDSql:
|
|||
# loop check util checkData return true
|
||||
for i in range(loopCount):
|
||||
self.query(sql)
|
||||
if self.checkDataNoExit(row, col, data) :
|
||||
if self.checkDataNotExit(row, col, data) :
|
||||
self.checkData(row, col, data)
|
||||
return
|
||||
time.sleep(waitTime)
|
||||
|
@ -551,6 +562,19 @@ class TDSql:
|
|||
self.query(sql)
|
||||
self.checkData(row, col, data)
|
||||
|
||||
def check_rows_loop(self, expectedRows, sql, loopCount, waitTime):
|
||||
# loop check util checkData return true
|
||||
for i in range(loopCount):
|
||||
self.query(sql)
|
||||
if self.checkRows_not_exited(expectedRows):
|
||||
return
|
||||
else:
|
||||
time.sleep(waitTime)
|
||||
continue
|
||||
# last check
|
||||
self.query(sql)
|
||||
self.checkRows(expectedRows)
|
||||
|
||||
|
||||
def getData(self, row, col):
|
||||
self.checkRowCol(row, col)
|
||||
|
|
|
@ -177,21 +177,15 @@ class TDTestCase:
|
|||
if stopRole == "mnode":
|
||||
for i in range(mnodeNums):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "vnode":
|
||||
for i in range(vnodeNumbers):
|
||||
tdDnodes[i+mnodeNums].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i+mnodeNums].starttaosd()
|
||||
# sleep(10)
|
||||
elif stopRole == "dnode":
|
||||
for i in range(dnodeNumbers):
|
||||
tdDnodes[i].stoptaosd()
|
||||
# sleep(10)
|
||||
tdDnodes[i].starttaosd()
|
||||
# sleep(10)
|
||||
|
||||
# dnodeNumbers don't include database of schema
|
||||
if clusterComCheck.checkDnodes(dnodeNumbers):
|
||||
|
@ -219,7 +213,7 @@ class TDTestCase:
|
|||
tdSql.checkRows(rowsPerStb)
|
||||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='dnode')
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -227,7 +227,7 @@ class TDTestCase:
|
|||
|
||||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='dnode')
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=1,stopRole='dnode')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -168,7 +168,7 @@ class TDTestCase:
|
|||
|
||||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='dnode')
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=1,stopRole='dnode')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -170,7 +170,7 @@ class TDTestCase:
|
|||
|
||||
def run(self):
|
||||
# print(self.master_dnode.cfgDict)
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=4,stopRole='mnode')
|
||||
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='mnode')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -93,7 +93,7 @@ class TDTestCase:
|
|||
# seperate vnode and mnode in different dnodes.
|
||||
# create database and stable
|
||||
stopcount =0
|
||||
while stopcount <= 2:
|
||||
while stopcount <= 1:
|
||||
tdLog.info(" restart loop: %d"%stopcount )
|
||||
for i in range(dnodenumbers):
|
||||
tdDnodes[i].stoptaosd()
|
||||
|
|
|
@ -37,7 +37,7 @@ class ClusterComCheck:
|
|||
tdSql.init(conn.cursor())
|
||||
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def checkDnodes(self,dnodeNumbers, timeout=30):
|
||||
def checkDnodes(self,dnodeNumbers, timeout=100):
|
||||
count=0
|
||||
# print(tdSql)
|
||||
while count < timeout:
|
||||
|
|
|
@ -37,26 +37,26 @@ class TDTestCase:
|
|||
def case1(self):
|
||||
tdLog.debug("========case1 start========")
|
||||
|
||||
os.system("nohup taosBenchmark -y -B 1 -t 40 -S 1000 -n 10 -i 1000 -v 5 > /dev/null 2>&1 &")
|
||||
os.system(" taosBenchmark -y -B 1 -t 10 -S 1000 -n 10 -i 1000 -v 5 ")
|
||||
time.sleep(10)
|
||||
tdSql.execute("use test", queryTimes=100)
|
||||
tdSql.query("create stream if not exists s1 trigger at_once ignore expired 0 ignore update 0 fill_history 1 into st1 as select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s)")
|
||||
tdLog.debug("========create stream and insert data ok========")
|
||||
time.sleep(20)
|
||||
tdLog.debug("========create stream and insert data ok========")
|
||||
|
||||
tdSql.query("select _wstart,sum(voltage),groupid from meters partition by groupid interval(2s) order by groupid,_wstart")
|
||||
rowCnt = tdSql.getRows()
|
||||
results = []
|
||||
for i in range(rowCnt):
|
||||
results.append(tdSql.getData(i,1))
|
||||
results_meters = tdSql.queryResult
|
||||
|
||||
tdSql.query("select * from st1 order by groupid,_wstart")
|
||||
tdSql.checkRows(rowCnt)
|
||||
sql = "select _wstart,`sum(voltage)`,groupid from st1 order by groupid,_wstart"
|
||||
tdSql.check_rows_loop(rowCnt, sql, loopCount=100, waitTime=0.5)
|
||||
|
||||
tdSql.query(sql)
|
||||
results_st1 = tdSql.queryResult
|
||||
for i in range(rowCnt):
|
||||
data1 = tdSql.getData(i,1)
|
||||
data2 = results[i]
|
||||
data1 = results_st1[i]
|
||||
data2 = results_meters[i]
|
||||
if data1 != data2:
|
||||
tdLog.info("num: %d, act data: %d, expect data: %d"%(i, data1, data2))
|
||||
tdLog.info(f"num: {i}, act data: {data1}, expect data: {data2}")
|
||||
tdLog.exit("check data error!")
|
||||
|
||||
tdLog.debug("case1 end")
|
||||
|
@ -64,7 +64,7 @@ class TDTestCase:
|
|||
def case2(self):
|
||||
tdLog.debug("========case2 start========")
|
||||
|
||||
os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y > /dev/null 2>&1")
|
||||
os.system("taosBenchmark -d db -t 20 -v 6 -n 1000 -y")
|
||||
# create stream
|
||||
tdSql.execute("use db", queryTimes=100)
|
||||
tdSql.execute("create stream stream1 fill_history 1 into sta as select count(*) as cnt from meters interval(10a);",show=True)
|
||||
|
@ -73,7 +73,7 @@ class TDTestCase:
|
|||
sql = "select count(*) from sta"
|
||||
# loop wait max 60s to check count is ok
|
||||
tdLog.info("loop wait result ...")
|
||||
tdSql.checkDataLoop(0, 0, 100, sql, loopCount=10, waitTime=0.5)
|
||||
tdSql.checkDataLoop(0, 0, 100, sql, loopCount=100, waitTime=0.5)
|
||||
|
||||
# check all data is correct
|
||||
sql = "select * from sta where cnt != 200;"
|
||||
|
|
Loading…
Reference in New Issue