Merge pull request #5737 from taosdata/hotfix/TD-3682
[TD-3682]<fix>: Insufficient disk space may cause oom
This commit is contained in:
commit
fc7579cd20
|
@ -212,7 +212,7 @@ float tsAvailTmpDirectorySpace = 0;
|
||||||
float tsAvailDataDirGB = 0;
|
float tsAvailDataDirGB = 0;
|
||||||
float tsUsedDataDirGB = 0;
|
float tsUsedDataDirGB = 0;
|
||||||
float tsReservedTmpDirectorySpace = 1.0f;
|
float tsReservedTmpDirectorySpace = 1.0f;
|
||||||
float tsMinimalDataDirGB = 1.0f;
|
float tsMinimalDataDirGB = 2.0f;
|
||||||
int32_t tsTotalMemoryMB = 0;
|
int32_t tsTotalMemoryMB = 0;
|
||||||
uint32_t tsVersion = 0;
|
uint32_t tsVersion = 0;
|
||||||
|
|
||||||
|
|
|
@ -997,17 +997,24 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
||||||
|
|
||||||
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
||||||
// nodeVersion = pHead->version;
|
// nodeVersion = pHead->version;
|
||||||
(*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
|
code = (*pNode->writeToCacheFp)(pNode->vgId, pHead, TAOS_QTYPE_FWD, NULL);
|
||||||
} else {
|
} else {
|
||||||
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
|
if (nodeSStatus != TAOS_SYNC_STATUS_INIT) {
|
||||||
syncSaveIntoBuffer(pPeer, pHead);
|
code = syncSaveIntoBuffer(pPeer, pHead);
|
||||||
} else {
|
} else {
|
||||||
sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus],
|
sError("%s, forward discarded since sstatus:%s, hver:%" PRIu64, pPeer->id, syncStatus[nodeSStatus],
|
||||||
pHead->version);
|
pHead->version);
|
||||||
|
code = -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
sError("%s, failed to process fwd msg, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
||||||
|
syncRestartConnection(pPeer);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
|
static void syncProcessPeersStatusMsg(SPeersStatus *pPeersStatus, SSyncPeer *pPeer) {
|
||||||
|
|
|
@ -37,6 +37,7 @@ extern int32_t vDebugFlag;
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId; // global vnode group ID
|
int32_t vgId; // global vnode group ID
|
||||||
int32_t refCount; // reference count
|
int32_t refCount; // reference count
|
||||||
|
int64_t queuedWMsgSize;
|
||||||
int32_t queuedWMsg;
|
int32_t queuedWMsg;
|
||||||
int32_t queuedRMsg;
|
int32_t queuedRMsg;
|
||||||
int32_t flowctrlLevel;
|
int32_t flowctrlLevel;
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "vnodeStatus.h"
|
#include "vnodeStatus.h"
|
||||||
|
|
||||||
#define MAX_QUEUED_MSG_NUM 100000
|
#define MAX_QUEUED_MSG_NUM 100000
|
||||||
|
#define MAX_QUEUED_MSG_SIZE 1024*1024*1024 //1GB
|
||||||
|
|
||||||
extern void * tsDnodeTmr;
|
extern void * tsDnodeTmr;
|
||||||
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
|
static int32_t (*vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *, void *pCont, SRspRet *);
|
||||||
|
@ -269,6 +270,13 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tsAvailDataDirGB <= tsMinimalDataDirGB) {
|
||||||
|
vError("vgId:%d, failed to write into vwqueue since no diskspace, avail:%fGB", pVnode->vgId, tsAvailDataDirGB);
|
||||||
|
taosFreeQitem(pWrite);
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return TSDB_CODE_VND_NO_DISKSPACE;
|
||||||
|
}
|
||||||
|
|
||||||
if (!vnodeInReadyOrUpdatingStatus(pVnode)) {
|
if (!vnodeInReadyOrUpdatingStatus(pVnode)) {
|
||||||
vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId,
|
vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId,
|
||||||
vnodeStatus[pVnode->status], pVnode->refCount, pVnode);
|
vnodeStatus[pVnode->status], pVnode->refCount, pVnode);
|
||||||
|
@ -278,14 +286,17 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
||||||
if (queued > MAX_QUEUED_MSG_NUM) {
|
int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
|
||||||
|
|
||||||
|
if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) {
|
||||||
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
|
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
|
||||||
if (ms > 100) ms = 100;
|
if (ms > 100) ms = 100;
|
||||||
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
|
vDebug("vgId:%d, too many msg:%d in vwqueue, flow control %dms", pVnode->vgId, queued, ms);
|
||||||
taosMsleep(ms);
|
taosMsleep(ms);
|
||||||
}
|
}
|
||||||
|
|
||||||
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
|
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d size:%" PRId64, pVnode->vgId, pVnode->refCount,
|
||||||
|
pVnode->queuedWMsg, pVnode->queuedWMsgSize);
|
||||||
|
|
||||||
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
|
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -308,7 +319,10 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
|
||||||
SVnodeObj *pVnode = vparam;
|
SVnodeObj *pVnode = vparam;
|
||||||
|
|
||||||
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
|
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
|
||||||
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d", pVnode->vgId, pWrite, pWrite->rpcMsg.ahandle, queued);
|
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
|
||||||
|
|
||||||
|
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
|
||||||
|
pWrite->rpcMsg.ahandle, queued, queuedSize);
|
||||||
|
|
||||||
taosFreeQitem(pWrite);
|
taosFreeQitem(pWrite);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
|
@ -344,7 +358,9 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
|
||||||
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
|
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
|
||||||
SVnodeObj *pVnode = pWrite->pVnode;
|
SVnodeObj *pVnode = pWrite->pVnode;
|
||||||
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
|
if (pWrite->qtype != TAOS_QTYPE_RPC) return 0;
|
||||||
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->flowctrlLevel <= 0) return 0;
|
if (pVnode->queuedWMsg < MAX_QUEUED_MSG_NUM && pVnode->queuedWMsgSize < MAX_QUEUED_MSG_SIZE &&
|
||||||
|
pVnode->flowctrlLevel <= 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
if (tsEnableFlowCtrl == 0) {
|
if (tsEnableFlowCtrl == 0) {
|
||||||
int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2);
|
int32_t ms = (int32_t)pow(2, pVnode->flowctrlLevel + 2);
|
||||||
|
|
Loading…
Reference in New Issue