Merge pull request #6648 from junli1026/jun/vNodeWrite
Fix SVWriteMsg memory allocation size bug
This commit is contained in:
commit
1e11d8d722
|
@ -202,12 +202,12 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
|
|||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
|
||||
dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
|
||||
pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead.msgType], qtypeStr[qtype], pWrite->pHead.version);
|
||||
pWrite->rpcMsg.ahandle, taosMsg[pWrite->walHead.msgType], qtypeStr[qtype], pWrite->walHead.version);
|
||||
|
||||
pWrite->code = vnodeProcessWrite(pVnode, &pWrite->pHead, qtype, pWrite);
|
||||
pWrite->code = vnodeProcessWrite(pVnode, &pWrite->walHead, qtype, pWrite);
|
||||
if (pWrite->code <= 0) atomic_add_fetch_32(&pWrite->processedCount, 1);
|
||||
if (pWrite->code > 0) pWrite->code = 0;
|
||||
if (pWrite->code == 0 && pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
|
||||
if (pWrite->code == 0 && pWrite->walHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
|
||||
|
||||
dTrace("msg:%p is processed in vwrite queue, code:0x%x", pWrite, pWrite->code);
|
||||
}
|
||||
|
@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
|
|||
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
|
||||
} else {
|
||||
if (qtype == TAOS_QTYPE_FWD) {
|
||||
vnodeConfirmForward(pVnode, pWrite->pHead.version, pWrite->code, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
||||
vnodeConfirmForward(pVnode, pWrite->walHead.version, pWrite->code, pWrite->walHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
||||
}
|
||||
if (pWrite->rspRet.rsp) {
|
||||
rpcFreeCont(pWrite->rspRet.rsp);
|
||||
|
|
|
@ -49,7 +49,7 @@ typedef struct {
|
|||
SRpcMsg rpcMsg;
|
||||
SRspRet rspRet;
|
||||
char reserveForSync[24];
|
||||
SWalHead pHead;
|
||||
SWalHead walHead;
|
||||
} SVWriteMsg;
|
||||
|
||||
// vnodeStatus
|
||||
|
|
|
@ -90,7 +90,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
|
|||
|
||||
// forward to peers, even it is WAL/FWD, it shall be called to update version in sync
|
||||
int32_t syncCode = 0;
|
||||
bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
||||
bool force = (pWrite == NULL ? false : pWrite->walHead.msgType != TSDB_MSG_TYPE_SUBMIT);
|
||||
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
|
||||
if (syncCode < 0) {
|
||||
pHead->version = 0;
|
||||
|
@ -237,7 +237,7 @@ static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32
|
|||
return NULL;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
|
||||
int32_t size = sizeof(SVWriteMsg) + pHead->len;
|
||||
SVWriteMsg *pWrite = taosAllocateQitem(size);
|
||||
if (pWrite == NULL) {
|
||||
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
|
@ -248,7 +248,7 @@ static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32
|
|||
pWrite->rpcMsg = *pRpcMsg;
|
||||
}
|
||||
|
||||
memcpy(&pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
|
||||
memcpy(&pWrite->walHead, pHead, sizeof(SWalHead) + pHead->len);
|
||||
pWrite->pVnode = pVnode;
|
||||
pWrite->qtype = qtype;
|
||||
|
||||
|
@ -286,7 +286,7 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
|
|||
}
|
||||
|
||||
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
||||
int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
|
||||
int64_t queuedSize = atomic_add_fetch_64(&pVnode->queuedWMsgSize, pWrite->walHead.len);
|
||||
|
||||
if (queued > MAX_QUEUED_MSG_NUM || queuedSize > MAX_QUEUED_MSG_SIZE) {
|
||||
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
|
||||
|
@ -330,7 +330,7 @@ void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
|
|||
SVnodeObj *pVnode = vparam;
|
||||
if (pVnode) {
|
||||
int32_t queued = atomic_sub_fetch_32(&pVnode->queuedWMsg, 1);
|
||||
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->pHead.len);
|
||||
int64_t queuedSize = atomic_sub_fetch_64(&pVnode->queuedWMsgSize, pWrite->walHead.len);
|
||||
|
||||
vTrace("vgId:%d, msg:%p, app:%p, free from vwqueue, queued:%d size:%" PRId64, pVnode->vgId, pWrite,
|
||||
pWrite->rpcMsg.ahandle, queued, queuedSize);
|
||||
|
|
Loading…
Reference in New Issue