Merge branch '3.0' into test/jcy

This commit is contained in:
jiacy-jcy 2022-11-03 13:53:41 +08:00
commit 1a57b1677d
19 changed files with 214 additions and 25 deletions

View File

@ -1127,6 +1127,7 @@ typedef struct {
SQnodeLoad qload;
SClusterCfg clusterCfg;
SArray* pVloads; // array of SVnodeLoad
int32_t statusSeq;
} SStatusReq;
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);
@ -1148,6 +1149,7 @@ typedef struct {
int64_t dnodeVer;
SDnodeCfg dnodeCfg;
SArray* pDnodeEps; // Array of SDnodeEp
int32_t statusSeq;
} SStatusRsp;
int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp);

View File

@ -211,6 +211,7 @@ void syncCleanUp();
int64_t syncOpen(SSyncInfo* pSyncInfo);
void syncStart(int64_t rid);
void syncStop(int64_t rid);
void syncPreStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);

View File

@ -1020,6 +1020,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tEncodeI64(&encoder, pReq->qload.timeInQueryQueue) < 0) return -1;
if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1;
if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1095,6 +1096,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
if (tDecodeI64(&decoder, &pReq->qload.timeInQueryQueue) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -1126,6 +1128,7 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
if (tEncodeU16(&encoder, pDnodeEp->ep.port) < 0) return -1;
}
if (tEncodeI32(&encoder, pRsp->statusSeq) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1167,6 +1170,7 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) {
}
}
if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;

View File

@ -36,6 +36,7 @@ typedef struct SDnodeMgmt {
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
GetQnodeLoadsFp getQnodeLoadsFp;
int32_t statusSeq;
} SDnodeMgmt;
// dmHandle.c

View File

@ -32,9 +32,13 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
}
static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
const STraceId *trace = &pRsp->info.traceId;
dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code);
if (pRsp->code != 0) {
if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) {
dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId);
dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId,
pMgmt->statusSeq);
pMgmt->pData->dropped = 1;
dmWriteEps(pMgmt->pData);
}
@ -42,9 +46,9 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
dTrace("status msg received from mnode, dnodeVer:%" PRId64 " saved:%" PRId64, statusRsp.dnodeVer,
pMgmt->pData->dnodeVer);
if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq,
statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer);
pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
@ -91,6 +95,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
(*pMgmt->getQnodeLoadsFp)(&req.qload);
pMgmt->statusSeq++;
req.statusSeq = pMgmt->statusSeq;
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
tSerializeSStatusReq(pHead, contLen, &req);
@ -99,13 +106,13 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527};
SRpcMsg rpcRsp = {0};
dTrace("send status msg to mnode, dnodeVer:%" PRId64, req.dnodeVer);
dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
SEpSet epSet = {0};
dmGetMnodeEpSet(pMgmt->pData, &epSet);
rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
dError("failed to send status msg since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSet.numOfEps,
dError("failed to send status req since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSet.numOfEps,
epSet.inUse);
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
dDebug("index:%d, mnode ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port);

View File

@ -345,6 +345,19 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
}
}
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool needCheck = !online || dnodeChanged || reboot;
pDnode->accessTimes++;
pDnode->lastAccessTime = curMs;
const STraceId *trace = &pReq->info.traceId;
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) {
SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
@ -396,18 +409,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mndReleaseQnode(pMnode, pQnode);
}
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool needCheck = !online || dnodeChanged || reboot;
pDnode->accessTimes++;
pDnode->lastAccessTime = curMs;
mTrace("dnode:%d, status received, access times:%d check:%d online:%d reboot:%d changed:%d", pDnode->id,
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged);
if (needCheck) {
if (statusReq.sver != tsVersion) {
if (pDnode != NULL) {
@ -455,6 +456,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode->memTotal = statusReq.memTotal;
SStatusRsp statusRsp = {0};
statusRsp.statusSeq++;
statusRsp.dnodeVer = dnodeVer;
statusRsp.dnodeCfg.dnodeId = pDnode->id;
statusRsp.dnodeCfg.clusterId = pMnode->clusterId;

View File

@ -429,6 +429,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) {
syncLeaderTransfer(pMnode->syncMgmt.sync);
syncPreStop(pMnode->syncMgmt.sync);
}
}

View File

@ -338,6 +338,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
tsdbDataFReaderClose(&pr->pDataFReader);
tsdbUntakeReadSnap(pr->pVnode->pTsdb, pr->pReadSnap, "cache-l");
pr->pDataFReaderLast = NULL;
pr->pDataFReader = NULL;
for (int32_t j = 0; j < pr->numOfCols; ++j) {
taosMemoryFree(pRes[j]);

View File

@ -245,6 +245,7 @@ _err:
void vnodePreClose(SVnode *pVnode) {
if (pVnode) {
syncLeaderTransfer(pVnode->sync);
syncPreStop(pVnode->sync);
}
}

View File

@ -925,6 +925,15 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
SArray* pTbList = getTableNameList(pList);
int32_t numOfTables = taosArrayGetSize(pTbList);
SHashObj *uHash = NULL;
size_t listlen = taosArrayGetSize(list); // len > 0 means there already have uids
if (listlen > 0) {
uHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
for (int i = 0; i < listlen; i++) {
int64_t *uid = taosArrayGet(list, i);
taosHashPut(uHash, uid, sizeof(int64_t), &i, sizeof(i));
}
}
for (int i = 0; i < numOfTables; i++) {
char* name = taosArrayGetP(pTbList, i);
@ -933,9 +942,12 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
if (metaGetTableUidByName(metaHandle, name, &uid) == 0) {
ETableType tbType = TSDB_TABLE_MAX;
if (metaGetTableTypeByName(metaHandle, name, &tbType) == 0 && tbType == TSDB_CHILD_TABLE) {
if (NULL == uHash || taosHashGet(uHash, &uid, sizeof(uid)) == NULL) {
taosArrayPush(list, &uid);
}
} else {
taosArrayDestroy(pTbList);
taosHashCleanup(uHash);
return -1;
}
} else {
@ -944,6 +956,7 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray*
}
}
taosHashCleanup(uHash);
taosArrayDestroy(pTbList);
return 0;
}

View File

@ -218,6 +218,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo);
void syncNodeStart(SSyncNode* pSyncNode);
void syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePreClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
// option

View File

@ -729,6 +729,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
typedef enum {
SYNC_LOCAL_CMD_STEP_DOWN = 100,
SYNC_LOCAL_CMD_FOLLOWER_CMT,
} ESyncLocalCmd;
const char* syncLocalCmdGetStr(int32_t cmd);
@ -742,6 +743,7 @@ typedef struct SyncLocalCmd {
int32_t cmd;
SyncTerm sdNewTerm; // step down new term
SyncIndex fcIndex;// follower commit index
} SyncLocalCmd;

View File

@ -90,6 +90,11 @@
//
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
syncNodeEventLog(ths, "can not do follower commit");
return -1;
}
// maybe update commit index, leader notice me
if (newCommitIndex > ths->commitIndex) {
// has commit entry in local

View File

@ -81,6 +81,15 @@ void syncStop(int64_t rid) {
}
}
void syncPreStop(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) return;
syncNodePreClose(pSyncNode);
syncNodeRelease(pSyncNode);
}
static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) {
if (!syncNodeInConfig(pSyncNode, pCfg)) return false;
return abs(pCfg->replicaNum - pSyncNode->replicaNum) <= 1;
@ -435,8 +444,12 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
return -1;
}
int32_t ret = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SNodeInfo newLeader = (pSyncNode->peersNodeInfo)[0];
int32_t ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
ret = syncNodeLeaderTransferTo(pSyncNode, newLeader);
}
return ret;
}
@ -1222,6 +1235,14 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
ASSERT(ret == 0);
}
void syncNodePreClose(SSyncNode* pSyncNode) {
// stop elect timer
syncNodeStopElectTimer(pSyncNode);
// stop heartbeat timer
syncNodeStopHeartbeatTimer(pSyncNode);
}
void syncNodeClose(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) {
return;
@ -2825,11 +2846,25 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
syncNodeResetElectTimer(ths);
ths->minMatchIndex = pMsg->minMatchIndex;
#if 0
if (ths->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollowerCommit(ths, pMsg->commitIndex);
// syncNodeFollowerCommit(ths, pMsg->commitIndex);
SyncLocalCmd* pSyncMsg = syncLocalCmdBuild(ths->vgId);
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
pSyncMsg->fcIndex = pMsg->commitIndex;
SRpcMsg rpcMsgLocalCmd;
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) {
sError("vgId:%d, sync enqueue fc-commit msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont);
} else {
sTrace("vgId:%d, sync enqueue fc-commit msg, fc-index: %" PRIu64, ths->vgId, pSyncMsg->fcIndex);
}
}
}
#endif
}
if (pMsg->term >= ths->pRaftStore->currentTerm && ths->state != TAOS_SYNC_STATE_FOLLOWER) {
@ -2883,6 +2918,9 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg) {
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
syncNodeFollowerCommit(ths, pMsg->fcIndex);
} else {
syncNodeErrorLog(ths, "error local cmd");
}

View File

@ -3400,6 +3400,8 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
const char* syncLocalCmdGetStr(int32_t cmd) {
if (cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
return "step-down";
} else if (cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
return "follower-commit";
}
return "unknown-local-cmd";
@ -3511,6 +3513,9 @@ cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->sdNewTerm);
cJSON_AddStringToObject(pRoot, "sd-new-term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pMsg->fcIndex);
cJSON_AddStringToObject(pRoot, "fc-index", u64buf);
}
cJSON* pJson = cJSON_CreateObject();

View File

@ -21,6 +21,7 @@ SyncLocalCmd *createMsg() {
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->sdNewTerm = 123;
pMsg->fcIndex = 456;
pMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
return pMsg;

View File

@ -449,6 +449,7 @@
./test.sh -f tsim/tag/smallint.sim
./test.sh -f tsim/tag/tinyint.sim
./test.sh -f tsim/tag/drop_tag.sim
./test.sh -f tsim/tag/tbNameIn.sim
./test.sh -f tmp/monitor.sim
#======================b1-end===============

View File

@ -0,0 +1,102 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step1
sql drop database if exists db1;
sql create database db1 vgroups 3;
sql use db1;
sql create stable st1 (ts timestamp, f1 int) tags(tg1 int);
sql create table tb1 using st1 tags(1);
sql create table tb2 using st1 tags(2);
sql create table tb3 using st1 tags(3);
sql create table tb4 using st1 tags(4);
sql create table tb5 using st1 tags(5);
sql create table tb6 using st1 tags(6);
sql create table tb7 using st1 tags(7);
sql create table tb8 using st1 tags(8);
sql insert into tb1 values ('2022-07-10 16:31:01', 1);
sql insert into tb2 values ('2022-07-10 16:31:02', 2);
sql insert into tb3 values ('2022-07-10 16:31:03', 3);
sql insert into tb4 values ('2022-07-10 16:31:04', 4);
sql insert into tb5 values ('2022-07-10 16:31:05', 5);
sql insert into tb6 values ('2022-07-10 16:31:06', 6);
sql insert into tb7 values ('2022-07-10 16:31:07', 7);
sql insert into tb8 values ('2022-07-10 16:31:08', 8);
sql select * from tb1 where tbname in ('tb1');
if $rows != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
sql select * from tb1 where tbname in ('tb1','tb1');
if $rows != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
sql select * from tb1 where tbname in ('tb1','tb2','tb1');
if $rows != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
sql select * from tb1 where tbname in ('tb1','tb2','st1');
if $rows != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
sql select * from tb1 where tbname = 'tb1';
if $rows != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
sql select * from tb1 where tbname > 'tb1';
if $rows != 0 then
return -1
endi
sql select * from st1 where tbname in ('tb1');
if $rows != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
sql select * from st1 where tbname in ('tb1','tb1');
if $rows != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
sql select * from st1 where tbname in ('tb1','tb2','tb1');
if $rows != 2 then
return -1
endi
sql select * from st1 where tbname in ('tb1','tb2','st1');
if $rows != 2 then
return -1
endi
sql select * from st1 where tbname = 'tb1';
if $rows != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
sql select * from st1 where tbname > 'tb1';
if $rows != 7 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -172,7 +172,7 @@ class TDTestCase:
def run(self):
# print(self.master_dnode.cfgDict)
self.fiveDnodeThreeMnode(dnodeNumbers=6,mnodeNums=3,restartNumbers=2,stopRole='dnode')
self.fiveDnodeThreeMnode(dnodeNumbers=5,mnodeNums=3,restartNumbers=2,stopRole='dnode')
def stop(self):
tdSql.close()