enh: vnodeCommit on consensus only

This commit is contained in:
Benguang Zhao 2022-12-23 20:16:23 +08:00
parent a09870a981
commit 5159d60f56
17 changed files with 122 additions and 40 deletions

View File

@ -49,10 +49,13 @@ extern "C" {
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
#define SYNC_SNAP_RESEND_MS 1000 * 60 #define SYNC_SNAP_RESEND_MS 1000 * 60
#define SYNC_VND_COMMIT_MIN_MS 200
#define SYNC_VND_COMMIT_MAX_MS 60000
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1 #define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID -1 // 0xFFFFFFFFFFFFFFFF #define SYNC_TERM_INVALID -1
typedef enum { typedef enum {
SYNC_STRATEGY_NO_SNAPSHOT = 0, SYNC_STRATEGY_NO_SNAPSHOT = 0,

View File

@ -79,6 +79,8 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN] = {0}; char path[TSDB_FILENAME_LEN] = {0};
vnodeProposeCommitOnNeed(pVnode->pImpl);
taosThreadRwlockWrlock(&pMgmt->lock); taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosThreadRwlockUnlock(&pMgmt->lock); taosThreadRwlockUnlock(&pMgmt->lock);

View File

@ -88,6 +88,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode);
// meta // meta
typedef struct SMeta SMeta; // todo: remove typedef struct SMeta SMeta; // todo: remove

View File

@ -102,6 +102,7 @@ void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode);
int vnodeShouldCommit(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -348,6 +348,7 @@ struct SVnode {
STQ* pTq; STQ* pTq;
SSink* pSink; SSink* pSink;
tsem_t canCommit; tsem_t canCommit;
int64_t commitMs;
int64_t sync; int64_t sync;
TdThreadMutex lock; TdThreadMutex lock;
bool blocked; bool blocked;

View File

@ -203,6 +203,7 @@ _err:
int metaClose(SMeta *pMeta) { int metaClose(SMeta *pMeta) {
if (pMeta) { if (pMeta) {
if (pMeta->txn) tdbTxnClose(pMeta->txn);
if (pMeta->pCache) metaCacheClose(pMeta); if (pMeta->pCache) metaCacheClose(pMeta);
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);

View File

@ -59,6 +59,17 @@ int vnodeBegin(SVnode *pVnode) {
} }
int vnodeShouldCommit(SVnode *pVnode) { int vnodeShouldCommit(SVnode *pVnode) {
if (!pVnode->inUse || !osDataSpaceAvailable()) {
return false;
}
int64_t nowMs = taosGetMonoTimestampMs();
return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pVnode->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
(pVnode->inUse->size > 0 && pVnode->commitMs + SYNC_VND_COMMIT_MAX_MS < nowMs));
}
int vnodeShouldCommitOld(SVnode *pVnode) {
if (pVnode->inUse) { if (pVnode->inUse) {
return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size); return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
} }
@ -194,6 +205,7 @@ static void vnodePrepareCommit(SVnode *pVnode) {
vnodeBufPoolUnRef(pVnode->inUse); vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL; pVnode->inUse = NULL;
} }
static int32_t vnodeCommitTask(void *arg) { static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0; int32_t code = 0;
@ -210,6 +222,7 @@ _exit:
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
return code; return code;
} }
int vnodeAsyncCommit(SVnode *pVnode) { int vnodeAsyncCommit(SVnode *pVnode) {
int32_t code = 0; int32_t code = 0;
@ -257,7 +270,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
SVnode *pVnode = pInfo->pVnode; SVnode *pVnode = pInfo->pVnode;
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm); pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
pVnode->commitMs = taosGetMonoTimestampMs();
// persist wal before starting // persist wal before starting
if (walPersist(pVnode->pWal) < 0) { if (walPersist(pVnode->pWal) < 0) {

View File

@ -249,15 +249,18 @@ void vnodePreClose(SVnode *pVnode) {
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
vnodeSyncCommit(pVnode);
vnodeSyncClose(pVnode); vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode); vnodeQueryClose(pVnode);
tsem_wait(&pVnode->canCommit);
walClose(pVnode->pWal); walClose(pVnode->pWal);
tqClose(pVnode->pTq); tqClose(pVnode->pTq);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
smaClose(pVnode->pSma); smaClose(pVnode->pSma);
metaClose(pVnode->pMeta); metaClose(pVnode->pMeta);
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);
tsem_post(&pVnode->canCommit);
// destroy handle // destroy handle
tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&(pVnode->canCommit));
tsem_destroy(&pVnode->syncSem); tsem_destroy(&pVnode->syncSem);

View File

@ -200,6 +200,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// skip header // skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead); len = pMsg->contLen - sizeof(SMsgHead);
bool needCommit = false;
switch (pMsg->msgType) { switch (pMsg->msgType) {
/* META */ /* META */
@ -296,9 +297,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp); vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
break; break;
case TDMT_VND_COMMIT: case TDMT_VND_COMMIT:
vnodeSyncCommit(pVnode); needCommit = true;
vnodeBegin(pVnode); break;
goto _exit;
default: default:
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
return -1; return -1;
@ -315,7 +315,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
} }
// commit if need // commit if need
if (vnodeShouldCommit(pVnode)) { if (needCommit) {
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
vnodeAsyncCommit(pVnode); vnodeAsyncCommit(pVnode);

View File

@ -101,6 +101,64 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code)
} }
} }
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
int64_t seq = 0;
taosThreadMutexLock(&pVnode->lock);
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
if (wait) {
ASSERT(!pVnode->blocked);
pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
#if 0
pVnode->blockInfo = pMsg->info;
#endif
}
taosThreadMutexUnlock(&pVnode->lock);
if (code > 0) {
vnodeHandleWriteMsg(pVnode, pMsg);
} else if (code < 0) {
if (terrno != 0) code = terrno;
vnodeHandleProposeError(pVnode, pMsg, code);
}
if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
return code;
}
void vnodeProposeCommitOnNeed(SVnode *pVnode) {
if (!vnodeShouldCommit(pVnode)) {
return;
}
int32_t contLen = sizeof(SMsgHead);
SMsgHead *pHead = rpcMallocCont(contLen);
pHead->contLen = contLen;
pHead->vgId = pVnode->config.vgId;
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_VND_COMMIT;
rpcMsg.contLen = contLen;
rpcMsg.pCont = pHead;
rpcMsg.info.noResp = 1;
bool isWeak = false;
if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
goto _out;
}
vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId);
_out:
pVnode->commitMs = taosGetMonoTimestampMs();
rpcFreeCont(rpcMsg.pCont);
rpcMsg.pCont = NULL;
}
#if BATCH_ENABLE #if BATCH_ENABLE
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) { static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
@ -178,6 +236,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
continue; continue;
} }
vnodeProposeCommitOnNeed(pVnode);
code = vnodePreProcessWriteMsg(pVnode, pMsg); code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) { if (code != 0) {
vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
@ -205,34 +265,6 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
#else #else
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
int64_t seq = 0;
taosThreadMutexLock(&pVnode->lock);
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
if (wait) {
ASSERT(!pVnode->blocked);
pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
#if 0
pVnode->blockInfo = pMsg->info;
#endif
}
taosThreadMutexUnlock(&pVnode->lock);
if (code > 0) {
vnodeHandleWriteMsg(pVnode, pMsg);
} else if (code < 0) {
if (terrno != 0) code = terrno;
vnodeHandleProposeError(pVnode, pMsg, code);
}
if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
return code;
}
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle; SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
@ -256,6 +288,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
continue; continue;
} }
vnodeProposeCommitOnNeed(pVnode);
code = vnodePreProcessWriteMsg(pVnode, pMsg); code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) { if (code != 0) {
vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());

View File

@ -326,6 +326,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
} }
// update // update
ASSERT(pBuf->startIndex < index);
ASSERT(index - pBuf->startIndex < pBuf->size);
ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL); ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
pEntry = NULL; pEntry = NULL;
@ -454,6 +456,11 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
} }
if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
sInfo("vgId:%d, fsm execute vnode commit. index: %" PRId64 ", term: %" PRId64 "", pNode->vgId, pEntry->index,
pEntry->term);
}
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);

View File

@ -219,6 +219,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
ASSERT(pEntry->index == index); ASSERT(pEntry->index == index);
if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
walFsync(pWal, true);
}
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
return 0; return 0;

View File

@ -107,7 +107,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p
taosThreadMutexUnlock(&pObj->mutex); taosThreadMutexUnlock(&pObj->mutex);
return 1; // get one object return 1; // get one object
} else { } else {
sNError(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq); sNTrace(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq);
} }
taosThreadMutexUnlock(&pObj->mutex); taosThreadMutexUnlock(&pObj->mutex);

View File

@ -74,7 +74,12 @@ int32_t tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, i
int32_t tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), int32_t tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *),
void *xArg, int flags); void *xArg, int flags);
int32_t tdbTxnClose(TXN *pTxn); int32_t tdbTxnCloseImpl(TXN *pTxn);
#define tdbTxnClose(pTxn) \
do { \
tdbTxnCloseImpl(pTxn); \
(pTxn) = NULL; \
} while (0)
// other // other
void tdbFree(void *); void tdbFree(void *);

View File

@ -77,7 +77,7 @@ int tdbPageDestroy(SPage *pPage, void (*xFree)(void *arg, void *ptr), void *arg)
u8 *ptr; u8 *ptr;
tdbTrace("page/destroy: %p/%d %p", pPage, pPage->id, xFree); tdbTrace("page/destroy: %p/%d %p", pPage, pPage->id, xFree);
ASSERT(!pPage->isDirty); // ASSERT(!pPage->isDirty);
ASSERT(xFree); ASSERT(xFree);
for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) { for (int iOvfl = 0; iOvfl < pPage->nOverflow; iOvfl++) {

View File

@ -28,13 +28,18 @@ int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void
return 0; return 0;
} }
int tdbTxnClose(TXN *pTxn) { int tdbTxnCloseImpl(TXN *pTxn) {
if (pTxn) { if (pTxn) {
if (pTxn->jPageSet) { if (pTxn->jPageSet) {
hashset_destroy(pTxn->jPageSet); hashset_destroy(pTxn->jPageSet);
pTxn->jPageSet = NULL; pTxn->jPageSet = NULL;
} }
if (pTxn->jfd) {
tdbOsClose(pTxn->jfd);
ASSERT(pTxn->jfd == NULL);
}
tdbOsFree(pTxn); tdbOsFree(pTxn);
} }

View File

@ -1124,7 +1124,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) { if (ret != 0) {
tGTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
uv_err_name(ret)); uv_err_name(ret));
uv_timer_stop(conn->timer); uv_timer_stop(conn->timer);