refactor: add alter-confirm while alter db

This commit is contained in:
Shengliang Guan 2022-06-05 16:34:06 +08:00
parent 980e289cdf
commit 36fd0dac68
5 changed files with 52 additions and 14 deletions

View File

@ -40,9 +40,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error #define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
//common & util //common & util
#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001) #define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0003)
#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0002) #define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0004)
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0003) #define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0005)
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0010) #define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0010)
#define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0011) #define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0011)
#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0012) #define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0012)

View File

@ -118,25 +118,36 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
pMsg = *(SRpcMsg **)taosArrayGet(pArray, m); pMsg = *(SRpcMsg **)taosArrayGet(pArray, m);
code = vnodePreprocessReq(pVnode->pImpl, pMsg); code = vnodePreprocessReq(pVnode->pImpl, pMsg);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) continue; if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
if (code != 0) { dTrace("vgId:%d, msg:%p in progress and no rsp", pVnode->vgId, pMsg);
dError("vgId:%d, msg:%p failed to write since %s", pVnode->vgId, pMsg, tstrerror(code));
vmSendRsp(pMsg, code);
continue; continue;
} }
if (pMsg->msgType != TDMT_VND_ALTER_REPLICA) {
code = syncPropose(sync, pMsg, false); code = syncPropose(sync, pMsg, false);
}
if (code == TAOS_SYNC_PROPOSE_SUCCESS) { if (code == TAOS_SYNC_PROPOSE_SUCCESS) {
dTrace("vgId:%d, msg:%p is proposed and no rsp", pVnode->vgId, pMsg);
continue; continue;
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) { } else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
dTrace("vgId:%d, msg:%p is redirect since not leader", pVnode->vgId, pMsg);
SEpSet newEpSet = {0}; SEpSet newEpSet = {0};
syncGetEpSet(sync, &newEpSet); syncGetEpSet(sync, &newEpSet);
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps; newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
}
dTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->vgId, pMsg,
newEpSet.numOfEps, newEpSet.inUse);
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
dTrace("vgId:%d, msg:%p ep:%s:%u", pVnode->vgId, pMsg, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
}
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet); tmsgSendRedirectRsp(&rsp, &newEpSet);
} else { } else {
dError("vgId:%d, msg:%p failed to write since %s", pVnode->vgId, pMsg, tstrerror(code)); dError("vgId:%d, msg:%p failed to propose write since %s, code:0x%x", pVnode->vgId, pMsg, tstrerror(code), code);
vmSendRsp(pMsg, code); vmSendRsp(pMsg, code);
} }
} }
@ -251,7 +262,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
switch (qtype) { switch (qtype) {
case QUERY_QUEUE: case QUERY_QUEUE:
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
dTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); dTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pQueryQ, pMsg); taosWriteQitem(pVnode->pQueryQ, pMsg);
break; break;

View File

@ -870,9 +870,16 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
pAction->rawWritten = 0; pAction->rawWritten = 0;
pAction->msgSent = 0; pAction->msgSent = 0;
pAction->msgReceived = 0; pAction->msgReceived = 0;
pAction->errCode = 0; if (pAction->errCode == TSDB_CODE_RPC_REDIRECT) {
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
action, pAction->epSet.inUse);
} else {
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action); mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
} }
pAction->errCode = 0;
}
} }
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {

View File

@ -69,7 +69,7 @@ int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = syncReconfig(pVnode->sync, &cfg); int32_t code = syncReconfig(pVnode->sync, &cfg);
if (code == TAOS_SYNC_PROPOSE_SUCCESS) { if (code == TAOS_SYNC_PROPOSE_SUCCESS) {
// todo refactor // todo refactor
SRpcMsg rsp = {.info = pMsg->info, .code = terrno}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }

View File

@ -78,6 +78,15 @@ if $data(4)[2] != 1 then
endi endi
# v1_dnode # v1_dnode
$hasleader = 0
$x = 0
step2:
$x = $x + 1
sleep 1000
if $x == 20 then
print ====> dnode not ready!
return -1
endi
sql show db.vgroups sql show db.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08
if $data(2)[3] != 4 then if $data(2)[3] != 4 then
@ -89,6 +98,18 @@ endi
if $data(2)[7] != 2 then if $data(2)[7] != 2 then
return -1 return -1
endi endi
if $data(2)[4] == leader then
$hasleader = 1
endi
if $data(2)[6] == leader then
$hasleader = 1
endi
if $data(2)[8] == leader then
$hasleader = 1
endi
if $hasleader != 1 then
goto step2
endi
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd" sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd"
sql create table db.ctb using db.stb tags(101, "102") sql create table db.ctb using db.stb tags(101, "102")