commit
c1d7a1a7ba
|
@ -92,33 +92,23 @@ void dnodeDispatchToVReadQueue(SRpcMsg *pMsg) {
|
|||
pHead->vgId = htonl(pHead->vgId);
|
||||
pHead->contLen = htonl(pHead->contLen);
|
||||
|
||||
taos_queue queue = vnodeAcquireRqueue(pHead->vgId);
|
||||
|
||||
if (queue == NULL) {
|
||||
leftLen -= pHead->contLen;
|
||||
pCont -= pHead->contLen;
|
||||
continue;
|
||||
void *pVnode = vnodeAcquire(pHead->vgId);
|
||||
if (pVnode != NULL) {
|
||||
int32_t code = vnodeWriteToRQueue(pVnode, pCont, pHead->contLen, TAOS_QTYPE_RPC, pMsg);
|
||||
if (code == TSDB_CODE_SUCCESS) queuedMsgNum++;
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
|
||||
// put message into queue
|
||||
SVReadMsg *pRead = taosAllocateQitem(sizeof(SVReadMsg));
|
||||
pRead->rpcMsg = *pMsg;
|
||||
pRead->pCont = pCont;
|
||||
pRead->contLen = pHead->contLen;
|
||||
|
||||
// next vnode
|
||||
leftLen -= pHead->contLen;
|
||||
pCont -= pHead->contLen;
|
||||
queuedMsgNum++;
|
||||
|
||||
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
|
||||
}
|
||||
|
||||
if (queuedMsgNum == 0) {
|
||||
SRpcMsg rpcRsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID};
|
||||
rpcSendResponse(&rpcRsp);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
}
|
||||
|
||||
void *dnodeAllocVReadQueue(void *pVnode) {
|
||||
|
@ -162,50 +152,48 @@ void dnodeFreeVReadQueue(void *rqueue) {
|
|||
|
||||
void dnodeSendRpcVReadRsp(void *pVnode, SVReadMsg *pRead, int32_t code) {
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pRead->rpcMsg.handle,
|
||||
.handle = pRead->rpcHandle,
|
||||
.pCont = pRead->rspRet.rsp,
|
||||
.contLen = pRead->rspRet.len,
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
rpcFreeCont(pRead->rpcMsg.pCont);
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
|
||||
void dnodeDispatchNonRspMsg(void *pVnode, SVReadMsg *pRead, int32_t code) {
|
||||
rpcFreeCont(pRead->rpcMsg.pCont);
|
||||
vnodeRelease(pVnode);
|
||||
}
|
||||
|
||||
static void *dnodeProcessReadQueue(void *param) {
|
||||
SVReadMsg *pReadMsg;
|
||||
SVReadMsg *pRead;
|
||||
int32_t qtype;
|
||||
void * pVnode;
|
||||
|
||||
while (1) {
|
||||
if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pReadMsg, &pVnode) == 0) {
|
||||
if (taosReadQitemFromQset(tsVReadQset, &qtype, (void **)&pRead, &pVnode) == 0) {
|
||||
dDebug("qset:%p dnode vread got no message from qset, exiting", tsVReadQset);
|
||||
break;
|
||||
}
|
||||
|
||||
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle,
|
||||
taosMsg[pReadMsg->rpcMsg.msgType], qtype, pReadMsg);
|
||||
dDebug("%p, msg:%p:%s will be processed in vread queue, qtype:%d", pRead->rpcAhandle, pRead,
|
||||
taosMsg[pRead->msgType], qtype);
|
||||
|
||||
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
|
||||
int32_t code = vnodeProcessRead(pVnode, pRead);
|
||||
|
||||
if (qtype == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
|
||||
dnodeSendRpcVReadRsp(pVnode, pReadMsg, code);
|
||||
dnodeSendRpcVReadRsp(pVnode, pRead, code);
|
||||
} else {
|
||||
if (code == TSDB_CODE_QRY_HAS_RSP) {
|
||||
dnodeSendRpcVReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
|
||||
dnodeSendRpcVReadRsp(pVnode, pRead, pRead->code);
|
||||
} else { // code == TSDB_CODE_QRY_NOT_READY, do not return msg to client
|
||||
assert(pReadMsg->rpcMsg.handle == NULL || (pReadMsg->rpcMsg.handle != NULL && pReadMsg->rpcMsg.msgType == 5));
|
||||
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
|
||||
assert(pRead->rpcHandle == NULL || (pRead->rpcHandle != NULL && pRead->msgType == 5));
|
||||
dnodeDispatchNonRspMsg(pVnode, pRead, code);
|
||||
}
|
||||
}
|
||||
|
||||
taosFreeQitem(pReadMsg);
|
||||
taosFreeQitem(pRead);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
|
|
@ -37,10 +37,15 @@ typedef struct {
|
|||
} SRspRet;
|
||||
|
||||
typedef struct {
|
||||
SRspRet rspRet;
|
||||
void * pCont;
|
||||
int32_t code;
|
||||
int32_t contLen;
|
||||
SRpcMsg rpcMsg;
|
||||
void * rpcHandle;
|
||||
void * rpcAhandle;
|
||||
void * qhandle;
|
||||
int8_t qtype;
|
||||
int8_t msgType;
|
||||
SRspRet rspRet;
|
||||
char pCont[];
|
||||
} SVReadMsg;
|
||||
|
||||
typedef struct {
|
||||
|
@ -62,13 +67,11 @@ int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
|
|||
int32_t vnodeClose(int32_t vgId);
|
||||
|
||||
void* vnodeAcquire(int32_t vgId); // add refcount
|
||||
void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
|
||||
void vnodeRelease(void *pVnode); // dec refCount
|
||||
void* vnodeGetWal(void *pVnode);
|
||||
|
||||
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam);
|
||||
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam);
|
||||
int32_t vnodeCheckWrite(void *pVnode);
|
||||
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
|
||||
void vnodeBuildStatusMsg(void *param);
|
||||
void vnodeConfirmForward(void *param, uint64_t version, int32_t code);
|
||||
|
@ -77,8 +80,8 @@ void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
|
|||
int32_t vnodeInitResources();
|
||||
void vnodeCleanupResources();
|
||||
|
||||
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pReadMsg);
|
||||
int32_t vnodeCheckRead(void *pVnode);
|
||||
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
|
||||
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -468,21 +468,6 @@ void *vnodeAcquire(int32_t vgId) {
|
|||
return *ppVnode;
|
||||
}
|
||||
|
||||
void *vnodeAcquireRqueue(int32_t vgId) {
|
||||
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||
if (pVnode == NULL) return NULL;
|
||||
|
||||
int32_t code = vnodeCheckRead(pVnode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
vInfo("vgId:%d, can not provide read service, status is %s", vgId, vnodeStatus[pVnode->status]);
|
||||
vnodeRelease(pVnode);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pVnode->rqueue;
|
||||
}
|
||||
|
||||
void *vnodeGetWal(void *pVnode) {
|
||||
return ((SVnodeObj *)pVnode)->wal;
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ void vnodeInitReadFp(void) {
|
|||
//
|
||||
int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) {
|
||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||
int msgType = pReadMsg->rpcMsg.msgType;
|
||||
int32_t msgType = pReadMsg->msgType;
|
||||
|
||||
if (vnodeProcessReadMsgFp[msgType] == NULL) {
|
||||
vDebug("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
|
||||
|
@ -56,7 +56,7 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pReadMsg) {
|
|||
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
|
||||
}
|
||||
|
||||
int32_t vnodeCheckRead(void *param) {
|
||||
static int32_t vnodeCheckRead(void *param) {
|
||||
SVnodeObj *pVnode = param;
|
||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
||||
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
|
||||
|
@ -78,24 +78,58 @@ int32_t vnodeCheckRead(void *param) {
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
|
||||
|
||||
int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qtype, void *rparam) {
|
||||
SVnodeObj *pVnode = vparam;
|
||||
|
||||
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_QUERY) {
|
||||
int32_t code = vnodeCheckRead(pVnode);
|
||||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
}
|
||||
|
||||
SVReadMsg *pRead = (SVReadMsg *)taosAllocateQitem(sizeof(SVReadMsg));
|
||||
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
||||
pRead->pCont = qhandle;
|
||||
pRead->contLen = 0;
|
||||
pRead->rpcMsg.ahandle = ahandle;
|
||||
int32_t size = sizeof(SVReadMsg) + contLen;
|
||||
SVReadMsg *pRead = taosAllocateQitem(size);
|
||||
if (pRead == NULL) {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (rparam != NULL) {
|
||||
SRpcMsg *pRpcMsg = rparam;
|
||||
pRead->rpcHandle = pRpcMsg->handle;
|
||||
pRead->rpcAhandle = pRpcMsg->ahandle;
|
||||
pRead->msgType = pRpcMsg->msgType;
|
||||
pRead->code = pRpcMsg->code;
|
||||
}
|
||||
|
||||
if (contLen != 0) {
|
||||
pRead->contLen = contLen;
|
||||
memcpy(pRead->pCont, pCont, contLen);
|
||||
} else {
|
||||
pRead->qhandle = pCont;
|
||||
}
|
||||
|
||||
pRead->qtype = qtype;
|
||||
|
||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
vTrace("vgId:%d, get vnode rqueue, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||
|
||||
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
|
||||
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
|
||||
|
||||
taosWriteQitem(pVnode->rqueue, qtype, pRead);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle, void *ahandle) {
|
||||
SRpcMsg rpcMsg = {0};
|
||||
rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
||||
rpcMsg.ahandle = ahandle;
|
||||
|
||||
int32_t code = vnodeWriteToRQueue(pVnode, qhandle, 0, TAOS_QTYPE_QUERY, &rpcMsg);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
vDebug("QInfo:%p add to vread queue for exec query", *qhandle);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pRet response message object
|
||||
|
@ -155,18 +189,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
memset(pRet, 0, sizeof(SRspRet));
|
||||
|
||||
// qHandle needs to be freed correctly
|
||||
if (pReadMsg->rpcMsg.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||
if (pReadMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||
SRetrieveTableMsg *killQueryMsg = (SRetrieveTableMsg *)pReadMsg->pCont;
|
||||
killQueryMsg->free = htons(killQueryMsg->free);
|
||||
killQueryMsg->qhandle = htobe64(killQueryMsg->qhandle);
|
||||
|
||||
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcMsg.handle);
|
||||
assert(pReadMsg->rpcMsg.contLen > 0 && killQueryMsg->free == 1);
|
||||
vWarn("QInfo:%p connection %p broken, kill query", (void *)killQueryMsg->qhandle, pReadMsg->rpcHandle);
|
||||
assert(pReadMsg->contLen > 0 && killQueryMsg->free == 1);
|
||||
|
||||
void **qhandle = qAcquireQInfo(pVnode->qMgmt, (uint64_t)killQueryMsg->qhandle);
|
||||
if (qhandle == NULL || *qhandle == NULL) {
|
||||
vWarn("QInfo:%p invalid qhandle, no matched query handle, conn:%p", (void *)killQueryMsg->qhandle,
|
||||
pReadMsg->rpcMsg.handle);
|
||||
pReadMsg->rpcHandle);
|
||||
} else {
|
||||
assert(*qhandle == (void *)killQueryMsg->qhandle);
|
||||
|
||||
|
@ -208,9 +242,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
}
|
||||
|
||||
if (handle != NULL &&
|
||||
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, QInfo:%p, query discarded since link is broken, %p", pVnode->vgId, *handle,
|
||||
pReadMsg->rpcMsg.handle);
|
||||
pReadMsg->rpcHandle);
|
||||
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
return pRsp->code;
|
||||
|
@ -221,7 +255,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
|
||||
if (handle != NULL) {
|
||||
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
|
||||
code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcMsg.ahandle);
|
||||
code = vnodePutItemIntoReadQueue(pVnode, handle, pReadMsg->rpcHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pRsp->code = code;
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
|
@ -230,7 +264,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
}
|
||||
} else {
|
||||
assert(pCont != NULL);
|
||||
void **qhandle = (void **)pCont;
|
||||
void **qhandle = (void **)pReadMsg->qhandle;
|
||||
|
||||
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
|
||||
|
||||
|
@ -242,14 +276,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
// build query rsp, the retrieve request has reached here already
|
||||
if (buildRes) {
|
||||
// update the connection info according to the retrieve connection
|
||||
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*qhandle);
|
||||
assert(pReadMsg->rpcMsg.handle != NULL);
|
||||
pReadMsg->rpcHandle = qGetResultRetrieveMsg(*qhandle);
|
||||
assert(pReadMsg->rpcHandle != NULL);
|
||||
|
||||
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
|
||||
pReadMsg->rpcMsg.handle);
|
||||
pReadMsg->rpcHandle);
|
||||
|
||||
// set the real rsp error code
|
||||
pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcMsg.ahandle);
|
||||
pReadMsg->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qhandle, &freehandle, pReadMsg->rpcHandle);
|
||||
|
||||
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
|
||||
code = TSDB_CODE_QRY_HAS_RSP;
|
||||
|
@ -283,7 +317,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
|
||||
|
||||
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void *)pRetrieve->qhandle,
|
||||
pRetrieve->free, pReadMsg->rpcMsg.handle);
|
||||
pRetrieve->free, pReadMsg->rpcHandle);
|
||||
|
||||
memset(pRet, 0, sizeof(SRspRet));
|
||||
|
||||
|
@ -314,9 +348,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
}
|
||||
|
||||
// register the qhandle to connect to quit query immediate if connection is broken
|
||||
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle,
|
||||
pReadMsg->rpcMsg.handle);
|
||||
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcHandle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcHandle);
|
||||
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
qKillQuery(*handle);
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||
|
@ -326,7 +359,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
bool freeHandle = true;
|
||||
bool buildRes = false;
|
||||
|
||||
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcMsg.handle);
|
||||
code = qRetrieveQueryResultInfo(*handle, &buildRes, pReadMsg->rpcHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// TODO handle malloc failure
|
||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
|
@ -337,7 +370,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
assert(buildRes == true);
|
||||
#if _NON_BLOCKING_RETRIEVE
|
||||
if (!buildRes) {
|
||||
assert(pReadMsg->rpcMsg.handle != NULL);
|
||||
assert(pReadMsg->rpcHandle != NULL);
|
||||
|
||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false);
|
||||
return TSDB_CODE_QRY_NOT_READY;
|
||||
|
@ -345,7 +378,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pReadMsg) {
|
|||
#endif
|
||||
|
||||
// ahandle is the sqlObj pointer
|
||||
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle);
|
||||
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcHandle);
|
||||
}
|
||||
|
||||
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
|
||||
|
|
|
@ -97,7 +97,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
|
|||
return syncCode;
|
||||
}
|
||||
|
||||
int32_t vnodeCheckWrite(void *param) {
|
||||
static int32_t vnodeCheckWrite(void *param) {
|
||||
SVnodeObj *pVnode = param;
|
||||
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
|
||||
vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||
|
|
Loading…
Reference in New Issue