fix: let certain type of write messages to be executed sequentially

This commit is contained in:
Shengliang Guan 2022-07-18 18:37:58 +08:00
parent 4e048f3614
commit 993f5fe9ec
4 changed files with 39 additions and 32 deletions

View File

@ -179,6 +179,16 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
} else {
dGTrace("vgId:%d, msg:%p put into vnode-write queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pWriteQ, pMsg);
#if 0 // tests for batch writes
if (pMsg->msgType == TDMT_VND_CREATE_TABLE) {
SRpcMsg *pDup = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM);
memcpy(pDup, pMsg, sizeof(SRpcMsg));
pDup->pCont = rpcMallocCont(pMsg->contLen);
memcpy(pDup->pCont, pMsg->pCont, pMsg->contLen);
pDup->info.handle = NULL;
taosWriteQitem(pVnode->pWriteQ, pDup);
}
#endif
}
break;
case SYNC_QUEUE:

View File

@ -53,6 +53,7 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
*(int64_t *)(dc.data + dc.pos) = uid;
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
vTrace("vgId:%d, table:%s uid:%" PRId64 " is generated", pVnode->config.vgId, name, uid);
tEndDecode(&dc);
}
@ -381,7 +382,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p
goto end;
}
vInfo("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
vDebug("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp);
int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids);
if (ret != 0) {
goto end;

View File

@ -17,35 +17,22 @@
#include "vnd.h"
static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_ALTER_REPLICA);
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) ||
(type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL);
}
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) {
if (!vnodeIsMsgBlock(type)) return;
int32_t count = atomic_add_fetch_32(&pVnode->blockCount, 1);
vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
}
static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
int32_t count = atomic_load_32(&pVnode->blockCount);
if (count <= 0) return;
vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
tsem_wait(&pVnode->syncSem);
}
}
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
if (!vnodeIsMsgBlock(type)) return;
int32_t count = atomic_load_32(&pVnode->blockCount);
if (count <= 0) return;
count = atomic_sub_fetch_32(&pVnode->blockCount, 1);
vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
if (count <= 0) {
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
if (vnodeIsMsgBlock(pMsg->msgType)) {
vTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
tsem_post(&pVnode->syncSem);
}
}
@ -143,6 +130,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
int32_t code = 0;
SRpcMsg *pMsg = NULL;
vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
for (int32_t m = 0; m < numOfMsgs; m++) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId;
@ -165,13 +154,14 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsg);
} else {
}
}
}
if (code == 0) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code < 0) {
if (code < 0) {
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
vnodeRedirectRpcMsg(pVnode, pMsg);
} else {
@ -182,15 +172,12 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
tmsgSendRsp(&rsp);
}
}
} else {
}
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
vnodeWaitBlockMsg(pVnode);
}
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
@ -213,7 +200,7 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
}
}
vnodePostBlockMsg(pVnode, pMsg->msgType);
vnodePostBlockMsg(pVnode, pMsg);
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
@ -418,7 +405,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
tmsgSendRsp(&rpcMsg);
}
vnodePostBlockMsg(pVnode, TDMT_VND_ALTER_REPLICA);
vnodePostBlockMsg(pVnode, pMsg);
}
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {

View File

@ -111,6 +111,15 @@ if $hasleader != 1 then
goto step2
endi
# sql use db;
# sql create table stb (ts timestamp, c int) tags (t int);
# sql create table t0 using stb tags (0);
# sql insert into t0 values(now, 1);
# sql show db.stables;
# sql show db.tables;
# sql show db.vgroups;
return
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd"
sql create table db.ctb using db.stb tags(101, "102")
sql insert into db.ctb values(now, 1, "2")