commit
205292ce72
|
@ -202,12 +202,12 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
|
||||||
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
||||||
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
|
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
|
||||||
dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
|
dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
|
||||||
pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead->msgType], qtypeStr[qtype], pWrite->pHead->version);
|
pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead.msgType], qtypeStr[qtype], pWrite->pHead.version);
|
||||||
|
|
||||||
pWrite->code = vnodeProcessWrite(pVnode, pWrite->pHead, qtype, pWrite);
|
pWrite->code = vnodeProcessWrite(pVnode, &pWrite->pHead, qtype, pWrite);
|
||||||
if (pWrite->code <= 0) pWrite->processedCount = 1;
|
if (pWrite->code <= 0) pWrite->processedCount = 1;
|
||||||
if (pWrite->code > 0) pWrite->code = 0;
|
if (pWrite->code > 0) pWrite->code = 0;
|
||||||
if (pWrite->code == 0 && pWrite->pHead->msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
|
if (pWrite->code == 0 && pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
|
||||||
|
|
||||||
dTrace("msg:%p is processed in vwrite queue, code:0x%x", pWrite, pWrite->code);
|
dTrace("msg:%p is processed in vwrite queue, code:0x%x", pWrite, pWrite->code);
|
||||||
}
|
}
|
||||||
|
@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
|
||||||
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
|
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
|
||||||
} else {
|
} else {
|
||||||
if (qtype == TAOS_QTYPE_FWD) {
|
if (qtype == TAOS_QTYPE_FWD) {
|
||||||
vnodeConfirmForward(pVnode, pWrite->pHead->version, 0);
|
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0);
|
||||||
}
|
}
|
||||||
if (pWrite->rspRet.rsp) {
|
if (pWrite->rspRet.rsp) {
|
||||||
rpcFreeCont(pWrite->rspRet.rsp);
|
rpcFreeCont(pWrite->rspRet.rsp);
|
||||||
|
|
|
@ -49,7 +49,7 @@ typedef struct {
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
SRspRet rspRet;
|
SRspRet rspRet;
|
||||||
char reserveForSync[24];
|
char reserveForSync[24];
|
||||||
SWalHead pHead[];
|
SWalHead pHead;
|
||||||
} SVWriteMsg;
|
} SVWriteMsg;
|
||||||
|
|
||||||
// vnodeStatus
|
// vnodeStatus
|
||||||
|
|
|
@ -233,7 +233,7 @@ static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32
|
||||||
pWrite->rpcMsg = *pRpcMsg;
|
pWrite->rpcMsg = *pRpcMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
|
memcpy(&pWrite->pHead, pHead, sizeof(SWalHead) + pHead->len);
|
||||||
pWrite->pVnode = pVnode;
|
pWrite->pVnode = pVnode;
|
||||||
pWrite->qtype = qtype;
|
pWrite->qtype = qtype;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue