serialize retrieve request
This commit is contained in:
parent
b509911795
commit
fb34af9106
|
@ -97,14 +97,15 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
|
|||
|
||||
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
|
||||
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
|
||||
SRetrieveTableReq* pRetrieveMsg = calloc(1, sizeof(SRetrieveTableReq));
|
||||
if (pRetrieveMsg == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
SRetrieveTableReq retrieveReq = {0};
|
||||
retrieveReq.showId = pRequest->body.showInfo.execId;
|
||||
|
||||
pRetrieveMsg->showId = htobe64(pRequest->body.showInfo.execId);
|
||||
pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq);
|
||||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq);
|
||||
void* pReq = malloc(contLen);
|
||||
tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq);
|
||||
|
||||
pMsgSendInfo->msgInfo.pData = pReq;
|
||||
pMsgSendInfo->msgInfo.len = contLen;
|
||||
} else {
|
||||
SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
|
||||
if (pFetchMsg == NULL) {
|
||||
|
|
|
@ -132,13 +132,15 @@ int32_t Testbase::GetMetaNum() { return pMeta->numOfColumns; }
|
|||
const char* Testbase::GetMetaTbName() { return pMeta->tbName; }
|
||||
|
||||
void Testbase::SendShowRetrieveReq() {
|
||||
int32_t contLen = sizeof(SRetrieveTableReq);
|
||||
SRetrieveTableReq retrieveReq = {0};
|
||||
retrieveReq.showId = showId;
|
||||
retrieveReq.free = 0;
|
||||
|
||||
SRetrieveTableReq* pRetrieve = (SRetrieveTableReq*)rpcMallocCont(contLen);
|
||||
pRetrieve->showId = htobe64(showId);
|
||||
pRetrieve->free = 0;
|
||||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq);
|
||||
void* pReq = rpcMallocCont(contLen);
|
||||
tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq);
|
||||
|
||||
SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW_RETRIEVE, pRetrieve, contLen);
|
||||
SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW_RETRIEVE, pReq, contLen);
|
||||
pRetrieveRsp = (SRetrieveTableRsp*)pRsp->pCont;
|
||||
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
|
||||
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
|
||||
|
|
|
@ -180,10 +180,13 @@ static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq) {
|
|||
int32_t size = 0;
|
||||
int32_t rowsRead = 0;
|
||||
|
||||
SRetrieveTableReq *pRetrieve = pReq->rpcMsg.pCont;
|
||||
int64_t showId = htobe64(pRetrieve->showId);
|
||||
SRetrieveTableReq retrieveReq = {0};
|
||||
if (tDeserializeSRetrieveTableReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &retrieveReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SShowObj *pShow = mndAcquireShowObj(pMnode, showId);
|
||||
SShowObj *pShow = mndAcquireShowObj(pMnode, retrieveReq.showId);
|
||||
if (pShow == NULL) {
|
||||
terrno = TSDB_CODE_MND_INVALID_SHOWOBJ;
|
||||
mError("failed to process show-retrieve req:%p since %s", pShow, terrstr());
|
||||
|
@ -207,7 +210,7 @@ static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq) {
|
|||
pShow->numOfReads = pShow->numOfRows;
|
||||
}
|
||||
|
||||
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
rowsToRead = pShow->numOfRows - pShow->numOfReads;
|
||||
}
|
||||
|
||||
|
@ -231,7 +234,7 @@ static int32_t mndProcessRetrieveReq(SMnodeMsg *pReq) {
|
|||
}
|
||||
|
||||
// if free flag is set, client wants to clean the resources
|
||||
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
rowsRead = (*retrieveFp)(pReq, pShow, pRsp->data, rowsToRead);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue