ehn(query): remove redundant code.
This commit is contained in:
parent
69c2cf11d3
commit
7c61aa17d4
|
@ -136,7 +136,6 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "mnode-kill-conn", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
|
TD_DEF_MSG_TYPE(TDMT_MND_HEARTBEAT, "mnode-heartbeat", SClientHbBatchReq, SClientHbBatchRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SHOW, "mnode-show", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SHOW_RETRIEVE, "mnode-retrieve", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "mnode-systable-retrieve", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_SYSTABLE_RETRIEVE, "mnode-systable-retrieve", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_STATUS, "mnode-status", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "mnode-trans-tmr", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "mnode-trans-tmr", NULL, NULL)
|
||||||
|
@ -189,8 +188,8 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp)
|
// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES, "vnode-show-tables", SVShowTablesReq, SVShowTablesRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
|
// TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
|
||||||
|
|
|
@ -193,13 +193,6 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
|
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
|
||||||
|
|
||||||
if (pMsgInfo->msgType == TDMT_VND_SHOW_TABLES) {
|
|
||||||
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
|
|
||||||
if (pShowReqInfo->pArray == NULL) {
|
|
||||||
pShowReqInfo->currentIndex = 0; // set the first vnode/ then iterate the next vnode
|
|
||||||
pShowReqInfo->pArray = pMsgInfo->pExtension;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
|
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
|
||||||
|
|
||||||
|
@ -615,12 +608,9 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
|
||||||
|
|
||||||
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
|
|
||||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
SEpSet epSet = {0};
|
|
||||||
|
|
||||||
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
||||||
if (pRequest->type == TDMT_VND_QUERY) {
|
|
||||||
// All data has returned to App already, no need to try again
|
// All data has returned to App already, no need to try again
|
||||||
if (pResultInfo->completed) {
|
if (pResultInfo->completed) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
|
@ -646,69 +636,8 @@ void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
||||||
if (pResultInfo->numOfRows == 0) {
|
if (pResultInfo->numOfRows == 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
goto _return;
|
|
||||||
} else if (pRequest->type == TDMT_MND_SHOW) {
|
|
||||||
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
|
|
||||||
epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
|
||||||
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
|
|
||||||
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
|
|
||||||
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
|
|
||||||
SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
|
|
||||||
|
|
||||||
epSet = pVgroupInfo->epSet;
|
|
||||||
} else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
|
|
||||||
pRequest->type = TDMT_VND_SHOW_TABLES;
|
|
||||||
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
|
|
||||||
pShowReqInfo->currentIndex += 1;
|
|
||||||
if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
|
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
|
|
||||||
SVShowTablesReq* pShowReq = taosMemoryCalloc(1, sizeof(SVShowTablesReq));
|
|
||||||
pShowReq->head.vgId = htonl(pVgroupInfo->vgId);
|
|
||||||
|
|
||||||
pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
|
|
||||||
pRequest->body.requestMsg.pData = pShowReq;
|
|
||||||
|
|
||||||
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
|
||||||
epSet = pVgroupInfo->epSet;
|
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
|
||||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
|
|
||||||
tsem_wait(&pRequest->body.rspSem);
|
|
||||||
|
|
||||||
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
|
|
||||||
} else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
|
|
||||||
epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
|
|
||||||
|
|
||||||
if (pResultInfo->completed) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pResultInfo->completed) {
|
|
||||||
pResultInfo->numOfRows = 0;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
|
||||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
|
|
||||||
|
|
||||||
tsem_wait(&pRequest->body.rspSem);
|
|
||||||
|
|
||||||
pResultInfo->current = 0;
|
|
||||||
if (pResultInfo->numOfRows <= pResultInfo->current) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_return:
|
|
||||||
if (setupOneRowPtr) {
|
if (setupOneRowPtr) {
|
||||||
doSetOneRowPtr(pResultInfo);
|
doSetOneRowPtr(pResultInfo);
|
||||||
pResultInfo->current += 1;
|
pResultInfo->current += 1;
|
||||||
|
|
|
@ -90,150 +90,13 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
|
||||||
pMsgSendInfo->param = pRequest;
|
pMsgSendInfo->param = pRequest;
|
||||||
pMsgSendInfo->msgType = pRequest->type;
|
pMsgSendInfo->msgType = pRequest->type;
|
||||||
|
|
||||||
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
|
|
||||||
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
|
|
||||||
SRetrieveTableReq retrieveReq = {0};
|
|
||||||
retrieveReq.showId = pRequest->body.showInfo.execId;
|
|
||||||
|
|
||||||
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq);
|
|
||||||
void* pReq = taosMemoryMalloc(contLen);
|
|
||||||
tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq);
|
|
||||||
pMsgSendInfo->msgInfo.pData = pReq;
|
|
||||||
pMsgSendInfo->msgInfo.len = contLen;
|
|
||||||
pMsgSendInfo->msgInfo.handle = NULL;
|
|
||||||
} else {
|
|
||||||
SVShowTablesFetchReq* pFetchMsg = taosMemoryCalloc(1, sizeof(SVShowTablesFetchReq));
|
|
||||||
if (pFetchMsg == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pFetchMsg->id = htobe64(pRequest->body.showInfo.execId);
|
|
||||||
pFetchMsg->head.vgId = htonl(pRequest->body.showInfo.vgId);
|
|
||||||
|
|
||||||
pMsgSendInfo->msgInfo.pData = pFetchMsg;
|
|
||||||
pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq);
|
|
||||||
pMsgSendInfo->msgInfo.handle = NULL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
|
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
|
||||||
}
|
|
||||||
|
|
||||||
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
|
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
|
||||||
return pMsgSendInfo;
|
return pMsgSendInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|
||||||
SRequestObj* pRequest = param;
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
setErrno(pRequest, code);
|
|
||||||
tsem_post(&pRequest->body.rspSem);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
SShowRsp showRsp = {0};
|
|
||||||
tDeserializeSShowRsp(pMsg->pData, pMsg->len, &showRsp);
|
|
||||||
STableMetaRsp *pMetaMsg = &showRsp.tableMeta;
|
|
||||||
|
|
||||||
taosMemoryFreeClear(pRequest->body.resInfo.pRspMsg);
|
|
||||||
pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
|
||||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
|
||||||
|
|
||||||
if (pResInfo->fields == NULL) {
|
|
||||||
TAOS_FIELD* pFields = taosMemoryCalloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
|
|
||||||
for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
|
||||||
SSchema* pSchema = &pMetaMsg->pSchemas[i];
|
|
||||||
tstrncpy(pFields[i].name, pSchema->name, tListLen(pFields[i].name));
|
|
||||||
pFields[i].type = pSchema->type;
|
|
||||||
pFields[i].bytes = pSchema->bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
pResInfo->fields = pFields;
|
|
||||||
}
|
|
||||||
|
|
||||||
pResInfo->numOfCols = pMetaMsg->numOfColumns;
|
|
||||||
pRequest->body.showInfo.execId = showRsp.showId;
|
|
||||||
tFreeSShowRsp(&showRsp);
|
|
||||||
|
|
||||||
// todo
|
|
||||||
if (pRequest->type == TDMT_VND_SHOW_TABLES) {
|
|
||||||
SShowReqInfo* pShowInfo = &pRequest->body.showInfo;
|
|
||||||
|
|
||||||
int32_t index = pShowInfo->currentIndex;
|
|
||||||
SVgroupInfo* pInfo = taosArrayGet(pShowInfo->pArray, index);
|
|
||||||
pShowInfo->vgId = pInfo->vgId;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsem_post(&pRequest->body.rspSem);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|
||||||
SRequestObj *pRequest = param;
|
|
||||||
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
|
|
||||||
taosMemoryFreeClear(pResInfo->pRspMsg);
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
setErrno(pRequest, code);
|
|
||||||
tsem_post(&pRequest->body.rspSem);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
|
|
||||||
|
|
||||||
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
|
|
||||||
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
|
|
||||||
pRetrieve->precision = htons(pRetrieve->precision);
|
|
||||||
|
|
||||||
pResInfo->pRspMsg = pMsg->pData;
|
|
||||||
pResInfo->numOfRows = pRetrieve->numOfRows;
|
|
||||||
pResInfo->pData = pRetrieve->data;
|
|
||||||
pResInfo->completed = pRetrieve->completed;
|
|
||||||
|
|
||||||
pResInfo->current = 0;
|
|
||||||
// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
|
|
||||||
pRetrieve->completed, pRequest->body.showInfo.execId);
|
|
||||||
|
|
||||||
tsem_post(&pRequest->body.rspSem);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|
||||||
SRequestObj* pRequest = param;
|
|
||||||
|
|
||||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
|
||||||
taosMemoryFreeClear(pResInfo->pRspMsg);
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
setErrno(pRequest, code);
|
|
||||||
tsem_post(&pRequest->body.rspSem);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
|
|
||||||
|
|
||||||
pResInfo->pRspMsg = pMsg->pData;
|
|
||||||
|
|
||||||
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *) pMsg->pData;
|
|
||||||
pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows);
|
|
||||||
pFetchRsp->precision = htons(pFetchRsp->precision);
|
|
||||||
|
|
||||||
pResInfo->pRspMsg = pMsg->pData;
|
|
||||||
pResInfo->numOfRows = pFetchRsp->numOfRows;
|
|
||||||
pResInfo->pData = pFetchRsp->data;
|
|
||||||
|
|
||||||
pResInfo->current = 0;
|
|
||||||
// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows,
|
|
||||||
pFetchRsp->completed, pRequest->body.showInfo.execId);
|
|
||||||
|
|
||||||
tsem_post(&pRequest->body.rspSem);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
// todo rsp with the vnode id list
|
// todo rsp with the vnode id list
|
||||||
SRequestObj* pRequest = param;
|
SRequestObj* pRequest = param;
|
||||||
|
@ -420,13 +283,8 @@ void initMsgHandleFp() {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
|
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW)] = processShowRsp;
|
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = processRetrieveMnodeRsp;
|
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
|
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
|
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
|
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
|
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
|
||||||
|
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp;
|
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,7 +164,6 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE);
|
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, mmProcessReadMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, mmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
|
@ -95,5 +95,4 @@ void qmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QNODE_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, QNODE_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, QNODE_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, qmProcessFetchMsg, QNODE_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, qmProcessFetchMsg, QNODE_HANDLE);
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -139,11 +139,11 @@ void Testbase::SendShowRetrieveReq() {
|
||||||
void* pReq = rpcMallocCont(contLen);
|
void* pReq = rpcMallocCont(contLen);
|
||||||
tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq);
|
tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq);
|
||||||
|
|
||||||
SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW_RETRIEVE, pReq, contLen);
|
// SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW_RETRIEVE, pReq, contLen);
|
||||||
pRetrieveRsp = (SRetrieveTableRsp*)pRsp->pCont;
|
// pRetrieveRsp = (SRetrieveTableRsp*)pRsp->pCont;
|
||||||
pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
|
// pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows);
|
||||||
pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
|
// pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds);
|
||||||
pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
|
// pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen);
|
||||||
|
|
||||||
pData = pRetrieveRsp->data;
|
pData = pRetrieveRsp->data;
|
||||||
pos = 0;
|
pos = 0;
|
||||||
|
|
|
@ -335,8 +335,6 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, DEFAULT_HANDLE);
|
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
|
||||||
|
|
|
@ -73,10 +73,6 @@ int32_t qndProcessFetchMsg(SQnode *pQnode, SRpcMsg *pMsg) {
|
||||||
return qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg);
|
return qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg);
|
||||||
case TDMT_VND_DROP_TASK:
|
case TDMT_VND_DROP_TASK:
|
||||||
return qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg);
|
return qWorkerProcessDropMsg(pQnode, pQnode->pQuery, pMsg);
|
||||||
case TDMT_VND_SHOW_TABLES:
|
|
||||||
return qWorkerProcessShowMsg(pQnode, pQnode->pQuery, pMsg);
|
|
||||||
case TDMT_VND_SHOW_TABLES_FETCH:
|
|
||||||
// return vnodeGetTableList(pQnode, pMsg);
|
|
||||||
case TDMT_VND_TABLE_META:
|
case TDMT_VND_TABLE_META:
|
||||||
// return vnodeGetTableMeta(pQnode, pMsg);
|
// return vnodeGetTableMeta(pQnode, pMsg);
|
||||||
case TDMT_VND_CONSUME:
|
case TDMT_VND_CONSUME:
|
||||||
|
|
|
@ -16,7 +16,6 @@
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
|
|
||||||
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg);
|
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
int vnodeQueryOpen(SVnode *pVnode) {
|
int vnodeQueryOpen(SVnode *pVnode) {
|
||||||
|
@ -57,11 +56,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
|
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
case TDMT_VND_DROP_TASK:
|
case TDMT_VND_DROP_TASK:
|
||||||
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
|
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
case TDMT_VND_SHOW_TABLES:
|
|
||||||
return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg);
|
|
||||||
case TDMT_VND_SHOW_TABLES_FETCH:
|
|
||||||
return vnodeGetTableList(pVnode, pMsg);
|
|
||||||
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
|
|
||||||
case TDMT_VND_TABLE_META:
|
case TDMT_VND_TABLE_META:
|
||||||
return vnodeGetTableMeta(pVnode, pMsg);
|
return vnodeGetTableMeta(pVnode, pMsg);
|
||||||
case TDMT_VND_CONSUME:
|
case TDMT_VND_CONSUME:
|
||||||
|
@ -207,74 +201,3 @@ _exit:
|
||||||
tmsgSendRsp(&rpcMsg);
|
tmsgSendRsp(&rpcMsg);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void freeItemHelper(void *pItem) {
|
|
||||||
char *p = *(char **)pItem;
|
|
||||||
taosMemoryFree(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param pVnode
|
|
||||||
* @param pMsg
|
|
||||||
* @param pRsp
|
|
||||||
*/
|
|
||||||
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
|
|
||||||
SMTbCursor *pCur = metaOpenTbCursor(pVnode->pMeta);
|
|
||||||
SArray *pArray = taosArrayInit(10, POINTER_BYTES);
|
|
||||||
|
|
||||||
char *name = NULL;
|
|
||||||
int32_t totalLen = 0;
|
|
||||||
int32_t numOfTables = 0;
|
|
||||||
while ((name = metaTbCursorNext(pCur)) != NULL) {
|
|
||||||
if (numOfTables < 10000) { // TODO: temp get tables of vnode, and should del when show tables commad ok.
|
|
||||||
taosArrayPush(pArray, &name);
|
|
||||||
totalLen += strlen(name);
|
|
||||||
} else {
|
|
||||||
taosMemoryFreeClear(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
numOfTables++;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: temp debug, and should del when show tables command ok
|
|
||||||
vInfo("====vgId:%d, numOfTables: %d", pVnode->vgId, numOfTables);
|
|
||||||
if (numOfTables > 10000) {
|
|
||||||
numOfTables = 10000;
|
|
||||||
}
|
|
||||||
|
|
||||||
metaCloseTbCursor(pCur);
|
|
||||||
|
|
||||||
int32_t rowLen =
|
|
||||||
(TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 2 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4;
|
|
||||||
// int32_t numOfTables = (int32_t)taosArrayGetSize(pArray);
|
|
||||||
|
|
||||||
int32_t payloadLen = rowLen * numOfTables;
|
|
||||||
// SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
|
|
||||||
|
|
||||||
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen);
|
|
||||||
memset(pFetchRsp, 0, sizeof(SVShowTablesFetchRsp) + payloadLen);
|
|
||||||
|
|
||||||
char *p = pFetchRsp->data;
|
|
||||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
|
||||||
char *n = taosArrayGetP(pArray, i);
|
|
||||||
STR_TO_VARSTR(p, n);
|
|
||||||
|
|
||||||
p += (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
|
||||||
// taosMemoryFree(n);
|
|
||||||
}
|
|
||||||
|
|
||||||
pFetchRsp->numOfRows = htonl(numOfTables);
|
|
||||||
pFetchRsp->precision = 0;
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
|
||||||
.handle = pMsg->handle,
|
|
||||||
.ahandle = pMsg->ahandle,
|
|
||||||
.pCont = pFetchRsp,
|
|
||||||
.contLen = sizeof(SVShowTablesFetchRsp) + payloadLen,
|
|
||||||
.code = 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
tmsgSendRsp(&rpcMsg);
|
|
||||||
taosArrayDestroyEx(pArray, freeItemHelper);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue