From cfce13407f5e4b290f2a21a201a52c112e8167ab Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 16 Dec 2021 15:45:09 +0800 Subject: [PATCH] [td-11818] implementate the taos_fetch_row . --- include/common/taosmsg.h | 2 +- include/libs/parser/parser.h | 2 +- source/client/inc/clientInt.h | 17 ++++- source/client/src/clientImpl.c | 22 +++++- source/client/src/clientMain.c | 14 ++++ source/client/src/clientMsgHandler.c | 72 ++++++++++++++++++- source/client/test/clientTests.cpp | 16 +++-- source/dnode/mnode/impl/src/mndUser.c | 2 +- source/libs/parser/inc/parserInt.h | 2 +- source/libs/parser/inc/parserUtil.h | 2 +- source/libs/parser/src/astValidate.c | 95 ++++++++++++++++++++++++- source/libs/parser/src/parser.c | 6 +- source/libs/parser/src/parserUtil.c | 7 +- source/libs/parser/test/parserTests.cpp | 20 +++--- 14 files changed, 242 insertions(+), 37 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 1d44255f6d..b76f82d0a2 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -804,7 +804,7 @@ typedef struct { } SVgroupsMsg, SVgroupsInfo; typedef struct { - char tbFname[TSDB_TABLE_FNAME_LEN]; // table id + char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name char stbFname[TSDB_TABLE_FNAME_LEN]; int32_t numOfTags; int32_t numOfColumns; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index c766bd99b2..32d0d03d80 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -151,7 +151,7 @@ typedef struct SParseContext { * @param msg extended error message if exists. * @return error code */ -int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t* type, void** pOutput, char* msg, int32_t msgLen); +int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t* type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen); typedef enum { PAYLOAD_TYPE_KV = 0, diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 64dce1e5b1..b7cdc5d3d3 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -20,14 +20,15 @@ extern "C" { #endif +#include #include "taos.h" #include "taosmsg.h" +#include "tdef.h" +#include "tep.h" #include "thash.h" #include "tlist.h" -#include "trpc.h" -#include "tdef.h" #include "tmsgtype.h" -#include "tep.h" +#include "trpc.h" typedef struct SQueryExecMetric { int64_t start; // start timestamp @@ -86,10 +87,18 @@ typedef struct STscObj { SAppInstInfo *pAppInfo; } STscObj; +typedef struct SClientResultInfo { + SSDataBlock *pData; + TAOS_FIELD *resultFields; + int32_t current; +} SClientResultInfo; + typedef struct SReqBody { tsem_t rspSem; // not used now void* fp; void* param; + int32_t paramLen; + SClientResultInfo* pResInfo; } SRequestBody; #define ERROR_MSG_BUF_DEFAULT_SIZE 512 @@ -142,6 +151,8 @@ void initMsgHandleFp(); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); TAOS_RES *taos_query_l(TAOS *taos, const char *sql, size_t sqlLen); +void* doFetchRow(SRequestObj* pRequest); + #ifdef __cplusplus } #endif diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a0b7d5e9a7..de7fc78c4a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -149,11 +149,12 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, size_t sqlLen) { } else { int32_t type = 0; void* output = NULL; - - code = qParseQuerySql(pRequest->sqlstr, sqlLen, pRequest->requestId, &type, &output, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE); + int32_t outputLen = 0; + code = qParseQuerySql(pRequest->sqlstr, sqlLen, pRequest->requestId, &type, &output, &outputLen, pRequest->msgBuf, ERROR_MSG_BUF_DEFAULT_SIZE); if (type == TSDB_SQL_CREATE_USER || type == TSDB_SQL_SHOW) { pRequest->type = type; pRequest->body.param = output; + pRequest->body.paramLen = outputLen; SRequestMsgBody body = {0}; buildRequestMsgFp[type](pRequest, &body); @@ -374,4 +375,21 @@ TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, c strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen)); strncpy(dbStr, db, MIN(TSDB_DB_NAME_LEN - 1, dbLen)); return taos_connect(ipStr, userStr, passStr, dbStr, port); +} + +void* doFetchRow(SRequestObj* pRequest) { + assert(pRequest != NULL); + SClientResultInfo* pResultInfo = pRequest->body.pResInfo; + + if (pResultInfo == NULL || pResultInfo->current >= pResultInfo->pData->info.rows) { + if (pResultInfo == NULL) { + pRequest->body.pResInfo = calloc(1, sizeof(SClientResultInfo)); +// pRequest->body.pResInfo. + } + // current data set are exhausted, fetch more result from node +// if (pRes->row >= pRes->numOfRows && needToFetchNewBlock(pSql)) { +// taos_fetch_rows_a(res, waitForRetrieveRsp, pSql->pTscObj); +// tsem_wait(&pSql->rspSem); +// } + } } \ No newline at end of file diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 0e71356e51..5a1f55e573 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -117,3 +117,17 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { return taos_query_l(taos, sql, strlen(sql)); } + +TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { + if (pRes == NULL) { + return NULL; + } + + SRequestObj *pRequest = (SRequestObj *) pRes; + if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || + pRequest->type == TSDB_SQL_INSERT) { + return NULL; + } + + return doFetchRow(pRequest); +} diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index d7d92ae3d1..6501aff21c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "clientInt.h" #include "clientLog.h" #include "os.h" @@ -3165,7 +3166,74 @@ int32_t buildCreateUserMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_USER; pMsgBody->msgLen = sizeof(SCreateUserMsg); pMsgBody->requestObjRefId = pRequest->self; - pMsgBody->pData = pRequest->body.param; + pMsgBody->pData = pRequest->body.param; + return 0; +} + +int32_t buildShowMsg(SRequestObj* pRequest, SRequestMsgBody* pMsgBody) { + pMsgBody->msgType = TSDB_MSG_TYPE_SHOW; + pMsgBody->msgLen = pRequest->body.paramLen; + pMsgBody->requestObjRefId = pRequest->self; + pMsgBody->pData = pRequest->body.param; +} + +STableMeta* createTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) { + assert(pTableMetaMsg != NULL && pTableMetaMsg->numOfColumns >= 2); + + size_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema); + STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize); + + pTableMeta->tableType = pTableMetaMsg->tableType; + pTableMeta->vgId = pTableMetaMsg->vgroup.vgId; + pTableMeta->suid = pTableMetaMsg->suid; + pTableMeta->uid = pTableMetaMsg->tuid; + + pTableMeta->tableInfo = (STableComInfo) { + .numOfTags = pTableMetaMsg->numOfTags, + .precision = pTableMetaMsg->precision, + .numOfColumns = pTableMetaMsg->numOfColumns, + }; + + pTableMeta->sversion = pTableMetaMsg->sversion; + pTableMeta->tversion = pTableMetaMsg->tversion; + + memcpy(pTableMeta->schema, pTableMetaMsg->pSchema, schemaSize); + + int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns; + for(int32_t i = 0; i < numOfTotalCols; ++i) { + pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; + } + + return pTableMeta; +} + +int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { + SShowRsp* pShow = (SShowRsp *)pMsg; + pShow->showId = htonl(pShow->showId); + + STableMetaMsg *pMetaMsg = &(pShow->tableMeta); + pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns); + + SSchema* pSchema = pMetaMsg->pSchema; + pMetaMsg->tuid = htobe64(pMetaMsg->tuid); + for (int i = 0; i < pMetaMsg->numOfColumns; ++i) { + pSchema->bytes = htons(pSchema->bytes); + pSchema++; + } + + STableMeta* pTableMeta = createTableMetaFromMsg(pMetaMsg); + SSchema *pTableSchema = pTableMeta->schema; + + TAOS_FIELD* pFields = calloc(1, pTableMeta->tableInfo.numOfColumns); + for (int16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; ++i, ++pSchema) { + tstrncpy(pFields[i].name, pTableSchema[i].name, tListLen(pFields[i].name)); + pFields[i].type = pTableSchema[i].type; + pFields[i].bytes = pTableSchema[i].bytes; + } + +// pRequest->body.resultFields = pFields; +// pRequest->body.numOfFields = pTableMeta->tableInfo.numOfColumns; + return 0; } @@ -3250,5 +3318,7 @@ void initMsgHandleFp() { buildRequestMsgFp[TSDB_SQL_CREATE_USER] = buildCreateUserMsg; + buildRequestMsgFp[TSDB_SQL_SHOW] = buildShowMsg; + handleRequestRspFp[TSDB_SQL_SHOW] = processShowRsp; } \ No newline at end of file diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 3e520e6ce6..62d2cb5b18 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -37,14 +37,16 @@ TEST(testCase, driverInit_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); assert(pConn != NULL); - TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); - if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - printf("failed to create user, reason:%s\n", taos_errstr(pRes)); - } +// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); +// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { +// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); +// } +// +// taos_free_result(pRes); - taos_free_result(pRes); - - pRes = taos_query(pConn, "show users"); + TAOS_RES* pRes = taos_query(pConn, "show users"); + TAOS_ROW pRow = taos_fetch_row(pRes); + assert(pRow != NULL); taos_close(pConn); } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 2111047219..419936f14c 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -471,7 +471,7 @@ static int32_t mndGetUserMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *p pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; - pMeta->numOfColumns = htons(cols); + pMeta->numOfColumns = htonl(cols); pShow->numOfColumns = cols; pShow->offset[0] = 0; diff --git a/source/libs/parser/inc/parserInt.h b/source/libs/parser/inc/parserInt.h index 16e5a3c629..8e8dc27c4f 100644 --- a/source/libs/parser/inc/parserInt.h +++ b/source/libs/parser/inc/parserInt.h @@ -60,7 +60,7 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pSqlInfo, SQ * @param type * @return */ -int32_t qParserValidateDdlSqlNode(SSqlInfo* pInfo, int64_t id, void** output, int32_t* type, char* msgBuf, int32_t msgBufLen); +int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, int32_t* outputLen, int32_t* type, char* msgBuf, int32_t msgBufLen); /** * Evaluate the numeric and timestamp arithmetic expression in the WHERE clause. diff --git a/source/libs/parser/inc/parserUtil.h b/source/libs/parser/inc/parserUtil.h index bd1d5ac633..b402621903 100644 --- a/source/libs/parser/inc/parserUtil.h +++ b/source/libs/parser/inc/parserUtil.h @@ -65,7 +65,7 @@ int32_t getExprFunctionId(SExprInfo *pExprInfo); STableMeta* tableMetaDup(const STableMeta* pTableMeta); -bool isDdlSql(SSqlInfo* pSqlInfo); +bool isDclSqlStatement(SSqlInfo* pSqlInfo); #ifdef __cplusplus } diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 26ca3c52e4..300dd53df3 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4069,7 +4069,93 @@ int32_t qParserValidateSqlNode(struct SCatalog* pCatalog, SSqlInfo* pInfo, SQuer return code; } -int32_t qParserValidateDdlSqlNode(SSqlInfo* pInfo, int64_t id, void** output, int32_t* type, char* msgBuf, int32_t msgBufLen) { +// todo remove it +static int32_t setShowInfo(struct SSqlInfo* pInfo, void** output, int32_t* msgLen, SMsgBuf* pMsgBuf) { + const char* msg1 = "invalid name"; + const char* msg2 = "wildcard string should be less than %d characters"; + const char* msg3 = "database name too long"; + const char* msg4 = "pattern is invalid"; + const char* msg5 = "database name is empty"; + const char* msg6 = "pattern string is empty"; + + /* + * database prefix in pInfo->pMiscInfo->a[0] + * wildcard in like clause in pInfo->pMiscInfo->a[1] + */ + SShowInfo* pShowInfo = &pInfo->pMiscInfo->showOpt; + int16_t showType = pShowInfo->showType; + if (showType == TSDB_MGMT_TABLE_TABLE || showType == TSDB_MGMT_TABLE_VGROUP) { + SToken* pDbPrefixToken = &pShowInfo->prefix; + if (pDbPrefixToken->type != 0) { + if (pDbPrefixToken->n >= TSDB_DB_NAME_LEN) { // db name is too long + return buildInvalidOperationMsg(pMsgBuf, msg3); + } + + if (pDbPrefixToken->n <= 0) { + return buildInvalidOperationMsg(pMsgBuf, msg5); + } + + if (parserValidateIdToken(pDbPrefixToken) != TSDB_CODE_SUCCESS) { + return buildInvalidOperationMsg(pMsgBuf, msg1); + } + + // int32_t ret = tNameSetDbName(&pTableMetaInfo->name, getAccountId(pRequest->pTsc), pDbPrefixToken); + // if (ret != TSDB_CODE_SUCCESS) { + // return buildInvalidOperationMsg(pMsgBuf, msg1); + // } + } + + // show table/stable like 'xxxx', set the like pattern for show tables + SToken* pPattern = &pShowInfo->pattern; + if (pPattern->type != 0) { + if (pPattern->type == TK_ID && pPattern->z[0] == TS_ESCAPE_CHAR) { + return buildInvalidOperationMsg(pMsgBuf, msg4); + } + + pPattern->n = strdequote(pPattern->z); + if (pPattern->n <= 0) { + return buildInvalidOperationMsg(pMsgBuf, msg6); + } + + if (pPattern->n > tsMaxWildCardsLen) { + char tmp[64] = {0}; + sprintf(tmp, msg2, tsMaxWildCardsLen); + return buildInvalidOperationMsg(pMsgBuf, tmp); + } + } + } else if (showType == TSDB_MGMT_TABLE_VNODES) { + if (pShowInfo->prefix.type == 0) { + return buildInvalidOperationMsg(pMsgBuf, "No specified dnode ep"); + } + + if (pShowInfo->prefix.type == TK_STRING) { + pShowInfo->prefix.n = strdequote(pShowInfo->prefix.z); + } + } + + SShowMsg* pShowMsg = calloc(1, sizeof(SShowMsg)); + pShowMsg->type = pShowInfo->showType; + + if (pShowInfo->showType != TSDB_MGMT_TABLE_VNODES) { + SToken* pPattern = &pShowInfo->pattern; + if (pPattern->type > 0) { // only show tables support wildcard query + strncpy(pShowMsg->payload, pPattern->z, pPattern->n); + pShowMsg->payloadLen = htons(pPattern->n); + } + } else { + SToken* pEpAddr = &pShowInfo->prefix; + assert(pEpAddr->n > 0 && pEpAddr->type > 0); + + strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n); + pShowMsg->payloadLen = htons(pEpAddr->n); + } + + *output = pShowMsg; + *msgLen = sizeof(SShowMsg) + htons(pShowMsg->payloadLen); + return TSDB_CODE_SUCCESS; +} + +int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, int64_t id, void** output, int32_t* outputLen, int32_t* type, char* msgBuf, int32_t msgBufLen) { int32_t code = 0; SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; @@ -4125,9 +4211,14 @@ int32_t qParserValidateDdlSqlNode(SSqlInfo* pInfo, int64_t id, void** output, in *output = buildUserManipulationMsg(pInfo, id, msgBuf, msgBufLen); break; } + + case TSDB_SQL_SHOW: { + code = setShowInfo(pInfo, output, outputLen, pMsgBuf); + break; + } default: break; } - return 0; + return code; } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 78ffa7a98e..4d08ddba96 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -31,7 +31,7 @@ bool qIsInsertSql(const char* pStr, size_t length) { } while (1); } -int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *type, void** pOutput, char* msg, int32_t msgLen) { +int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *type, void** pOutput, int32_t* outputLen, char* msg, int32_t msgLen) { SQueryStmtInfo* pQueryInfo = calloc(1, sizeof(SQueryStmtInfo)); if (pQueryInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; // set correct error code. @@ -45,8 +45,8 @@ int32_t qParseQuerySql(const char* pStr, size_t length, int64_t id, int32_t *typ return terrno; } - if (isDdlSql(&info)) { - int32_t code = qParserValidateDdlSqlNode(&info, id, pOutput, type, msg, msgLen); + if (isDclSqlStatement(&info)) { + int32_t code = qParserValidateDclSqlNode(&info, id, pOutput, outputLen, type, msg, msgLen); if (code == TSDB_CODE_SUCCESS) { // do nothing } diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index b9d31b1985..28c3bf20ef 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -12,8 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "parserUtil.h" -#include + #include "taosmsg.h" #include "parser.h" @@ -23,6 +22,8 @@ #include "thash.h" #include "tbuffer.h" #include "parserInt.h" +#include "parserUtil.h" +#include "tmsgtype.h" #include "queryInfoUtil.h" #include "function.h" @@ -1632,7 +1633,7 @@ uint32_t convertRelationalOperator(SToken *pToken) { } } -bool isDdlSql(SSqlInfo* pSqlInfo) { +bool isDclSqlStatement(SSqlInfo* pSqlInfo) { return (pSqlInfo->type != TSDB_SQL_SELECT); } diff --git a/source/libs/parser/test/parserTests.cpp b/source/libs/parser/test/parserTests.cpp index 07637d27cf..3a929c32f2 100644 --- a/source/libs/parser/test/parserTests.cpp +++ b/source/libs/parser/test/parserTests.cpp @@ -714,16 +714,14 @@ TEST(testCase, show_user_Test) { SSqlInfo info1 = doGenerateAST(sql1); ASSERT_EQ(info1.valid, true); + void* output = NULL; + int32_t type = 0; + int32_t len = 0; + int32_t code = qParserValidateDclSqlNode(&info1, 1, &output, &len, &type, msg, buf.len); + ASSERT_EQ(code, 0); + // convert the show command to be the select query // select name, privilege, create_time, account from information_schema.users; - - - SQueryStmtInfo* pQueryInfo = createQueryInfo(); -// setTableMetaInfo(pQueryInfo, &req); - -// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.sub.node, 0); -// ret = validateSqlNode(pSqlNode, pQueryInfo, &buf); -// ASSERT_NE(ret, 0); } TEST(testCase, create_user_Test) { @@ -737,12 +735,12 @@ TEST(testCase, create_user_Test) { SSqlInfo info1 = doGenerateAST(sql); ASSERT_EQ(info1.valid, true); - ASSERT_EQ(isDdlSql(&info1), true); + ASSERT_EQ(isDclSqlStatement(&info1), true); void* output = NULL; int32_t type = 0; - - int32_t code = qParserValidateDdlSqlNode(&info1, 1, &output, &type, msg, buf.len); + int32_t len = 0; + int32_t code = qParserValidateDclSqlNode(&info1, 1, &output, &len, &type, msg, buf.len); ASSERT_EQ(code, 0); destroySqlInfo(&info1);