Merge pull request #9491 from taosdata/feature/3.0_liaohj
[td-11818] support show tables;
This commit is contained in:
commit
0656624db7
|
@ -1330,13 +1330,13 @@ typedef struct SVShowTablesReq {
|
||||||
} SVShowTablesReq;
|
} SVShowTablesReq;
|
||||||
|
|
||||||
typedef struct SVShowTablesRsp {
|
typedef struct SVShowTablesRsp {
|
||||||
int64_t id;
|
int32_t id;
|
||||||
STableMetaMsg metaInfo;
|
STableMetaMsg metaInfo;
|
||||||
} SVShowTablesRsp;
|
} SVShowTablesRsp;
|
||||||
|
|
||||||
typedef struct SVShowTablesFetchReq {
|
typedef struct SVShowTablesFetchReq {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
int64_t id;
|
int32_t id;
|
||||||
} SVShowTablesFetchReq;
|
} SVShowTablesFetchReq;
|
||||||
|
|
||||||
typedef struct SVShowTablesFetchRsp {
|
typedef struct SVShowTablesFetchRsp {
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#define TDENGINE_TNAME_H
|
#define TDENGINE_TNAME_H
|
||||||
|
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
|
||||||
#define TSDB_DB_NAME_T 1
|
#define TSDB_DB_NAME_T 1
|
||||||
#define TSDB_TABLE_NAME_T 2
|
#define TSDB_TABLE_NAME_T 2
|
||||||
|
@ -58,4 +59,6 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type);
|
||||||
|
|
||||||
int32_t tNameSetAcctId(SName* dst, int32_t acctId);
|
int32_t tNameSetAcctId(SName* dst, int32_t acctId);
|
||||||
|
|
||||||
|
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name);
|
||||||
|
|
||||||
#endif // TDENGINE_TNAME_H
|
#endif // TDENGINE_TNAME_H
|
||||||
|
|
|
@ -74,7 +74,6 @@ int32_t getExprFunctionLevel(const SQueryStmtInfo* pQueryInfo);
|
||||||
|
|
||||||
STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
|
STableMetaInfo* getMetaInfo(const SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
|
||||||
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
|
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
|
||||||
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name);
|
|
||||||
|
|
||||||
int32_t getNewResColId();
|
int32_t getNewResColId();
|
||||||
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
|
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
|
||||||
|
|
|
@ -54,8 +54,11 @@ int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
void qWorkerDestroy(void **qWorkerMgmt);
|
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
void qWorkerDestroy(void **qWorkerMgmt);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -490,7 +490,13 @@ void* doFetchRow(SRequestObj* pRequest) {
|
||||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
||||||
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
|
if (pRequest->type == TDMT_MND_SHOW) {
|
||||||
|
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
|
||||||
|
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
|
||||||
|
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
|
||||||
|
} else {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
|
SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest);
|
||||||
|
|
||||||
|
|
|
@ -74,39 +74,41 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) {
|
|
||||||
pMsgSendInfo->msgType = TDMT_MND_SHOW_RETRIEVE;
|
|
||||||
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
|
|
||||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
|
||||||
pMsgSendInfo->param = pRequest;
|
|
||||||
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
|
|
||||||
|
|
||||||
SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
|
|
||||||
if (pRetrieveMsg == NULL) {
|
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
pRetrieveMsg->showId = htonl(pRequest->body.execId);
|
|
||||||
pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
|
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
|
||||||
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||||
|
|
||||||
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
|
pMsgSendInfo->requestObjRefId = pRequest->self;
|
||||||
buildRetrieveMnodeMsg(pRequest, pMsgSendInfo);
|
pMsgSendInfo->requestId = pRequest->requestId;
|
||||||
|
pMsgSendInfo->param = pRequest;
|
||||||
|
pMsgSendInfo->msgType = pRequest->type;
|
||||||
|
|
||||||
|
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
|
||||||
|
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
|
||||||
|
SRetrieveTableMsg* pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
|
||||||
|
if (pRetrieveMsg == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRetrieveMsg->showId = htonl(pRequest->body.execId);
|
||||||
|
pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
|
||||||
|
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
|
||||||
|
} else {
|
||||||
|
SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
|
||||||
|
if (pFetchMsg == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFetchMsg->id = htonl(pRequest->body.execId);
|
||||||
|
pFetchMsg->head.vgId = htonl(13);
|
||||||
|
pMsgSendInfo->msgInfo.pData = pFetchMsg;
|
||||||
|
pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
|
||||||
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
|
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
|
||||||
pMsgSendInfo->msgType = pRequest->type;
|
|
||||||
pMsgSendInfo->requestId = pRequest->requestId;
|
|
||||||
pMsgSendInfo->param = pRequest;
|
|
||||||
|
|
||||||
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
|
||||||
return pMsgSendInfo;
|
return pMsgSendInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,6 +130,7 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
|
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
|
||||||
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
|
||||||
pSchema->bytes = htonl(pSchema->bytes);
|
pSchema->bytes = htonl(pSchema->bytes);
|
||||||
|
pSchema->colId = htonl(pSchema->colId);
|
||||||
pSchema++;
|
pSchema++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -157,19 +160,23 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
|
||||||
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
|
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
|
||||||
|
|
||||||
SRequestObj* pRequest = param;
|
SRequestObj* pRequest = param;
|
||||||
// tfree(pRequest->body.resInfo.pRspMsg);
|
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||||
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
|
||||||
|
tfree(pResInfo->pRspMsg);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pRequest->code = code;
|
||||||
|
terrno = code;
|
||||||
|
tsem_post(&pRequest->body.rspSem);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
|
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
|
||||||
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
|
pRetrieve->numOfRows = htonl(pRetrieve->numOfRows);
|
||||||
pRetrieve->precision = htons(pRetrieve->precision);
|
pRetrieve->precision = htons(pRetrieve->precision);
|
||||||
|
|
||||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
|
||||||
|
|
||||||
tfree(pResInfo->pRspMsg);
|
|
||||||
pResInfo->pRspMsg = pMsg->pData;
|
pResInfo->pRspMsg = pMsg->pData;
|
||||||
pResInfo->numOfRows = pRetrieve->numOfRows;
|
pResInfo->numOfRows = pRetrieve->numOfRows;
|
||||||
pResInfo->pData = pRetrieve->data; // todo fix this in async model
|
pResInfo->pData = pRetrieve->data;
|
||||||
|
|
||||||
pResInfo->current = 0;
|
pResInfo->current = 0;
|
||||||
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
||||||
|
@ -181,6 +188,42 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
|
assert(pMsg->len >= sizeof(SRetrieveTableRsp));
|
||||||
|
|
||||||
|
SRequestObj* pRequest = param;
|
||||||
|
tfree(pRequest->body.resInfo.pRspMsg);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pRequest->code = code;
|
||||||
|
terrno = code;
|
||||||
|
tsem_post(&pRequest->body.rspSem);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||||
|
|
||||||
|
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *) pMsg->pData;
|
||||||
|
pFetchRsp->numOfRows = htonl(pFetchRsp->numOfRows);
|
||||||
|
pFetchRsp->precision = htons(pFetchRsp->precision);
|
||||||
|
|
||||||
|
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
|
tfree(pResInfo->pRspMsg);
|
||||||
|
pResInfo->pRspMsg = pMsg->pData;
|
||||||
|
pResInfo->numOfRows = pFetchRsp->numOfRows;
|
||||||
|
pResInfo->pData = pFetchRsp->data;
|
||||||
|
|
||||||
|
pResInfo->current = 0;
|
||||||
|
setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);
|
||||||
|
|
||||||
|
tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows,
|
||||||
|
pFetchRsp->completed, pRequest->body.execId);
|
||||||
|
|
||||||
|
tsem_post(&pRequest->body.rspSem);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
// todo rsp with the vnode id list
|
// todo rsp with the vnode id list
|
||||||
SRequestObj* pRequest = param;
|
SRequestObj* pRequest = param;
|
||||||
|
@ -304,4 +347,7 @@ void initMsgHandleFp() {
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
|
handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
|
handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
|
||||||
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
|
handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
|
||||||
|
|
||||||
|
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = processShowRsp;
|
||||||
|
handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = processRetrieveVndRsp;
|
||||||
}
|
}
|
|
@ -227,25 +227,25 @@ TEST(testCase, use_db_test) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(testCase, drop_db_test) {
|
TEST(testCase, drop_db_test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
assert(pConn != NULL);
|
// assert(pConn != NULL);
|
||||||
|
//
|
||||||
showDB(pConn);
|
// showDB(pConn);
|
||||||
|
//
|
||||||
TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
|
// TAOS_RES* pRes = taos_query(pConn, "drop database abc1");
|
||||||
if (taos_errno(pRes) != 0) {
|
// if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
|
// printf("failed to drop db, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
// }
|
||||||
taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
|
//
|
||||||
showDB(pConn);
|
// showDB(pConn);
|
||||||
|
//
|
||||||
pRes = taos_query(pConn, "create database abc1");
|
// pRes = taos_query(pConn, "create database abc1");
|
||||||
if (taos_errno(pRes) != 0) {
|
// if (taos_errno(pRes) != 0) {
|
||||||
printf("create to drop db, reason:%s\n", taos_errstr(pRes));
|
// printf("create to drop db, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
// }
|
||||||
taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
// taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(testCase, create_stable_Test) {
|
TEST(testCase, create_stable_Test) {
|
||||||
|
@ -302,12 +302,12 @@ TEST(testCase, create_ctable_Test) {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
|
// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
|
||||||
if (taos_errno(pRes) != 0) {
|
// if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
|
// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -443,7 +443,23 @@ TEST(testCase, show_table_Test) {
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "show tables");
|
pRes = taos_query(pConn, "show tables");
|
||||||
taos_free_result(pRes);
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes));
|
||||||
|
taos_free_result(pRes);
|
||||||
|
ASSERT_TRUE(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_ROW pRow = NULL;
|
||||||
|
|
||||||
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
|
|
||||||
|
char str[512] = {0};
|
||||||
|
while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||||
|
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||||
|
printf("%s\n", str);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -259,3 +259,13 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) {
|
||||||
|
SSchema s = {0};
|
||||||
|
s.type = type;
|
||||||
|
s.bytes = bytes;
|
||||||
|
s.colId = colId;
|
||||||
|
|
||||||
|
tstrncpy(s.name, name, tListLen(s.name));
|
||||||
|
return s;
|
||||||
|
}
|
|
@ -141,6 +141,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_TABLE)] = dndProcessVnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||||
|
|
|
@ -36,12 +36,14 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
|
return qWorkerProcessCancelMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
case TDMT_VND_DROP_TASK:
|
case TDMT_VND_DROP_TASK:
|
||||||
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
|
return qWorkerProcessDropMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
|
case TDMT_VND_SHOW_TABLES:
|
||||||
|
return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
|
case TDMT_VND_SHOW_TABLES_FETCH:
|
||||||
|
return qWorkerProcessShowFetchMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
||||||
return TSDB_CODE_VND_APP_ERROR;
|
return TSDB_CODE_VND_APP_ERROR;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
|
@ -90,7 +90,6 @@ SShowMsg* buildShowMsg(SShowInfo* pShowInfo, SParseBasicCtx *pCtx, char* msgBuf,
|
||||||
SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg));
|
SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg));
|
||||||
|
|
||||||
pShowMsg->type = pShowInfo->showType;
|
pShowMsg->type = pShowInfo->showType;
|
||||||
|
|
||||||
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
|
if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) {
|
||||||
SToken* pPattern = &pShowInfo->pattern;
|
SToken* pPattern = &pShowInfo->pattern;
|
||||||
if (pPattern->type > 0) { // only show tables support wildcard query
|
if (pPattern->type > 0) { // only show tables support wildcard query
|
||||||
|
|
|
@ -18,7 +18,7 @@ static bool has(SArray* pFieldList, int32_t startIndex, const char* name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen,
|
static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** output, int32_t* outputLen,
|
||||||
SMsgBuf* pMsgBuf) {
|
SEpSet* pEpSet, SMsgBuf* pMsgBuf) {
|
||||||
const char* msg1 = "invalid name";
|
const char* msg1 = "invalid name";
|
||||||
const char* msg2 = "wildcard string should be less than %d characters";
|
const char* msg2 = "wildcard string should be less than %d characters";
|
||||||
const char* msg3 = "database name too long";
|
const char* msg3 = "database name too long";
|
||||||
|
@ -31,57 +31,69 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseBasicCtx* pCtx, void** ou
|
||||||
* wildcard in like clause in pInfo->pMiscInfo->a[1]
|
* wildcard in like clause in pInfo->pMiscInfo->a[1]
|
||||||
*/
|
*/
|
||||||
int16_t showType = pShowInfo->showType;
|
int16_t showType = pShowInfo->showType;
|
||||||
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
|
if (showType == TSDB_MGMT_TABLE_TABLE) {
|
||||||
SToken* pDbPrefixToken = &pShowInfo->prefix;
|
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
|
||||||
if (pDbPrefixToken->type != 0) {
|
*pEpSet = pCtx->mgmtEpSet;
|
||||||
if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long
|
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg3);
|
// catalogGetDBVgroupVersion()
|
||||||
|
pShowReq->head.vgId = htonl(13);
|
||||||
|
*outputLen = sizeof(SVShowTablesReq);
|
||||||
|
*output = pShowReq;
|
||||||
|
} else {
|
||||||
|
if (showType == TSDB_MGMT_TABLE_STB || showType == TSDB_MGMT_TABLE_VGROUP) {
|
||||||
|
SToken* pDbPrefixToken = &pShowInfo->prefix;
|
||||||
|
if (pDbPrefixToken->type != 0) {
|
||||||
|
if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, msg3);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pDbPrefixToken->n <= 0) {
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, msg5);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) {
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken);
|
||||||
|
// if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
// return buildInvalidOperationMsg(pMsgBuf, msg1);
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDbPrefixToken->n <= 0) {
|
// show table/stable like 'xxxx', set the like pattern for show tables
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg5);
|
SToken* pPattern = &pShowInfo->pattern;
|
||||||
|
if (pPattern->type != 0) {
|
||||||
|
if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) {
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, msg4);
|
||||||
|
}
|
||||||
|
|
||||||
|
pPattern->n = strdequote(pPattern->z);
|
||||||
|
if (pPattern->n <= 0) {
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, msg6);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pPattern->n > tsMaxWildCardsLen) {
|
||||||
|
char tmp[64] = {0};
|
||||||
|
sprintf(tmp, msg2, tsMaxWildCardsLen);
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, tmp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (showType == TSDB_MGMT_TABLE_VNODES) {
|
||||||
|
if (pShowInfo->prefix.type == 0) {
|
||||||
|
return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) {
|
if (pShowInfo->prefix.type == TK_STRING) {
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg1);
|
pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z);
|
||||||
}
|
|
||||||
|
|
||||||
// int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken);
|
|
||||||
// if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
// return buildInvalidOperationMsg(pMsgBuf, msg1);
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
// show table/stable like 'xxxx', set the like pattern for show tables
|
|
||||||
SToken* pPattern = &pShowInfo->pattern;
|
|
||||||
if (pPattern->type != 0) {
|
|
||||||
if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) {
|
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg4);
|
|
||||||
}
|
|
||||||
|
|
||||||
pPattern->n = strdequote(pPattern->z);
|
|
||||||
if (pPattern->n <= 0) {
|
|
||||||
return buildInvalidOperationMsg(pMsgBuf, msg6);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pPattern->n > tsMaxWildCardsLen) {
|
|
||||||
char tmp[64] = {0};
|
|
||||||
sprintf(tmp, msg2, tsMaxWildCardsLen);
|
|
||||||
return buildInvalidOperationMsg(pMsgBuf, tmp);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (showType == TSDB_MGMT_TABLE_VNODES) {
|
|
||||||
if (pShowInfo->prefix.type == 0) {
|
|
||||||
return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pShowInfo->prefix.type == TK_STRING) {
|
*pEpSet = pCtx->mgmtEpSet;
|
||||||
pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z);
|
*output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
|
||||||
}
|
*outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/;
|
||||||
}
|
}
|
||||||
|
|
||||||
*output = buildShowMsg(pShowInfo, pCtx, pMsgBuf->buf, pMsgBuf->len);
|
|
||||||
*outputLen = sizeof(SShowMsg) /* + htons(pShowMsg->payloadLen)*/;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -608,8 +620,9 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_SQL_SHOW: {
|
case TSDB_SQL_SHOW: {
|
||||||
code = setShowInfo(&pInfo->pMiscInfo->showOpt, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, pMsgBuf);
|
SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt;
|
||||||
pDcl->msgType = TDMT_MND_SHOW;
|
code = setShowInfo(pShowInfo, pCtx, (void**)&pDcl->pMsg, &pDcl->msgLen, &pDcl->epSet, pMsgBuf);
|
||||||
|
pDcl->msgType = (pShowInfo->showType == TSDB_MGMT_TABLE_TABLE)? TDMT_VND_SHOW_TABLES:TDMT_MND_SHOW;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -563,16 +563,6 @@ TAOS_FIELD createField(const SSchema* pSchema) {
|
||||||
return f;
|
return f;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* name) {
|
|
||||||
SSchema s = {0};
|
|
||||||
s.type = type;
|
|
||||||
s.bytes = bytes;
|
|
||||||
s.colId = colId;
|
|
||||||
|
|
||||||
tstrncpy(s.name, name, tListLen(s.name));
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema) {
|
void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema) {
|
||||||
pColumn->uid = uid;
|
pColumn->uid = uid;
|
||||||
pColumn->flag = flag;
|
pColumn->flag = flag;
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
#include "tmsg.h"
|
|
||||||
#include "query.h"
|
|
||||||
#include "qworker.h"
|
#include "qworker.h"
|
||||||
#include "qworkerInt.h"
|
#include "tname.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
|
#include "query.h"
|
||||||
|
#include "qworkerInt.h"
|
||||||
|
#include "tmsg.h"
|
||||||
|
|
||||||
int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
|
int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -634,7 +635,6 @@ int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
|
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
||||||
pRsp->code = code;
|
pRsp->code = code;
|
||||||
|
@ -665,11 +665,68 @@ int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
};
|
};
|
||||||
|
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
|
int32_t numOfCols = 6;
|
||||||
|
SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols);
|
||||||
|
|
||||||
|
int32_t cols = 0;
|
||||||
|
SSchema *pSchema = pRsp->metaInfo.pSchema;
|
||||||
|
|
||||||
|
const SSchema *s = tGetTbnameColumnSchema();
|
||||||
|
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "name");
|
||||||
|
pSchema++;
|
||||||
|
|
||||||
|
int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "created");
|
||||||
|
pSchema++;
|
||||||
|
|
||||||
|
type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
|
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "columns");
|
||||||
|
pSchema++;
|
||||||
|
|
||||||
|
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(cols++), "stable");
|
||||||
|
pSchema++;
|
||||||
|
|
||||||
|
type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "uid");
|
||||||
|
pSchema++;
|
||||||
|
|
||||||
|
type = TSDB_DATA_TYPE_INT;
|
||||||
|
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(cols++), "vgId");
|
||||||
|
|
||||||
|
pRsp->metaInfo.numOfColumns = htonl(cols);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.handle = pMsg->handle,
|
||||||
|
.ahandle = pMsg->ahandle,
|
||||||
|
.pCont = pRsp,
|
||||||
|
.contLen = sizeof(*pRsp),
|
||||||
|
.code = code,
|
||||||
|
};
|
||||||
|
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qwBuildAndSendShowFetchRsp(SRpcMsg *pMsg, SVShowTablesFetchReq* pFetchReq) {
|
||||||
|
SVShowTablesFetchRsp *pRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp));
|
||||||
|
int32_t handle = htonl(pFetchReq->id);
|
||||||
|
|
||||||
|
pRsp->numOfRows = 0;
|
||||||
|
SRpcMsg rpcMsg = {
|
||||||
|
.handle = pMsg->handle,
|
||||||
|
.ahandle = pMsg->ahandle,
|
||||||
|
.pCont = pRsp,
|
||||||
|
.contLen = sizeof(*pRsp),
|
||||||
|
.code = 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
rpcSendResponse(&rpcMsg);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) {
|
int32_t qwCheckAndSendReadyRsp(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg, int32_t rspCode) {
|
||||||
SQWSchStatus *sch = NULL;
|
SQWSchStatus *sch = NULL;
|
||||||
|
@ -801,7 +858,6 @@ int32_t qwCheckTaskCancelDrop( SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryI
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
|
int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t sId, uint64_t queryId, uint64_t taskId, SRpcMsg *pMsg) {
|
||||||
SQWSchStatus *sch = NULL;
|
SQWSchStatus *sch = NULL;
|
||||||
SQWTaskStatus *task = NULL;
|
SQWTaskStatus *task = NULL;
|
||||||
|
@ -911,7 +967,6 @@ int32_t qwQueryPostProcess(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint6
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
|
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
|
||||||
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
|
SQWorkerMgmt *mgmt = calloc(1, sizeof(SQWorkerMgmt));
|
||||||
if (NULL == mgmt) {
|
if (NULL == mgmt) {
|
||||||
|
@ -1157,6 +1212,25 @@ _return:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qWorkerProcessShowMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
|
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||||
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
SVShowTablesReq *pReq = pMsg->pCont;
|
||||||
|
QW_ERR_RET(qwBuildAndSendShowRsp(pMsg, code));
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qWorkerProcessShowFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
|
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||||
|
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
|
||||||
|
QW_ERR_RET(qwBuildAndSendShowFetchRsp(pMsg, pFetchReq));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int8_t status = 0;
|
int8_t status = 0;
|
||||||
|
@ -1182,7 +1256,6 @@ int32_t qWorkerContinueQuery(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void qWorkerDestroy(void **qWorkerMgmt) {
|
void qWorkerDestroy(void **qWorkerMgmt) {
|
||||||
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
|
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Reference in New Issue