[TD-3682]<fix>: Insufficient disk space may cause oom
This commit is contained in:
parent
590b901518
commit
b7579334de
|
@ -212,7 +212,7 @@ float tsAvailTmpDirectorySpace = 0;
|
|||
float tsAvailDataDirGB = 0;
|
||||
float tsUsedDataDirGB = 0;
|
||||
float tsReservedTmpDirectorySpace = 1.0f;
|
||||
float tsMinimalDataDirGB = 1.0f;
|
||||
float tsMinimalDataDirGB = 2.0f;
|
||||
int32_t tsTotalMemoryMB = 0;
|
||||
uint32_t tsVersion = 0;
|
||||
|
||||
|
|
|
@ -997,17 +997,24 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
|||
|
||||
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
||||
|
||||
int32_t code = 0;
|
||||
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
||||
// nodeVersion = pHead->version;
|
||||
(*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
|
||||
code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
|
||||
} else {
|
||||
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
|
||||
syncSaveIntoBuffer(pPeer, pHead);
|
||||
code = syncSaveIntoBuffer(pPeer, pHead);
|
||||
} else {
|
||||
sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus],
|
||||
pHead->version);
|
||||
code = -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
sError("%s, failed to process fwd msg, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
||||
syncRestartConnection(pPeer);
|
||||
}
|
||||
}
|
||||
|
||||
static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
|
||||
|
|
|
@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
|
|||
typedef struct {
|
||||
int32_t vgId; // global vnode group ID
|
||||
int32_t refCount; // reference count
|
||||
int64_t queuedWMsgSize;
|
||||
int32_t queuedWMsg;
|
||||
int32_t queuedRMsg;
|
||||
int32_t flowctrlLevel;
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "vnodeStatus.h"
|
||||
|
||||
#define MAX_QUEUED_MSG_NUM 100000
|
||||
#define MAX_QUEUED_MSG_SIZE 1024*1024*1024 //1GB
|
||||
|
||||
extern void * tsDnodeTmr;
|
||||
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
|
||||
|
@ -269,6 +270,13 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
|
|||
}
|
||||
}
|
||||
|
||||
if (tsAvailDataDirGB <= tsMinimalDataDirGB) {
|
||||
vError("vgId:%d, failed to write into vwqueue since no diskspace, avail:%fGB", pVnode->vgId, tsAvailDataDirGB);
|
||||
taosFreeQitem(pWrite);
|
||||
vnodeRelease(pVnode);
|
||||
return TSDB_CODE_VND_NO_DISKSPACE;
|
||||
}
|
||||
|
||||
if (!vnodeInReadyOrUpdatingStatus(pVnode)) {
|
||||
vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId,
|
||||
vnodeStatus[pVnode->status], pVnode->refCount, pVnode);
|
||||
|
@ -278,14 +286,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
|
|||
}
|
||||
|
||||
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
||||
if (queued > MAX_QUEUED_MSG_NUM) {
|
||||
int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
|
||||
|
||||
if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) {
|
||||
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
|
||||
if (ms > 100) ms = 100;
|
||||
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
|
||||
taosMsleep(ms);
|
||||
}
|
||||
|
||||
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
|
||||
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d size:%" PRId64, pVnode->vgId, pVnode->refCount,
|
||||
pVnode->queuedWMsg, pVnode->queuedWMsgSize);
|
||||
|
||||
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -308,7 +319,10 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
|
|||
SVnodeObj *pVnode = vparam;
|
||||
|
||||
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
|
||||
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued);
|
||||
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
|
||||
|
||||
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
|
||||
pWrite->rpcMsg.ahandle, queued, queuedSize);
|
||||
|
||||
taosFreeQitem(pWrite);
|
||||
vnodeRelease(pVnode);
|
||||
|
@ -344,7 +358,9 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
|
|||
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
|
||||
SVnodeObj *pVnode = pWrite->pVnode;
|
||||
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
|
||||
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0;
|
||||
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->queuedWMsgSize < MAX_QUEUED_MSG_SIZE &&
|
||||
pVnode->flowctrlLevel <= 0)
|
||||
return 0;
|
||||
|
||||
if (tsEnableFlowCtrl == 0) {
|
||||
int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2);
|
||||
|
|
Loading…
Reference in New Issue