From 350c3406dda6930435667cea4204edf51b1cced8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Mar 2022 13:23:06 +0800 Subject: [PATCH 01/10] [td-13039] fix compiler error. --- source/libs/planner/test/plannerTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 3748d37d74..1af89d71c9 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -56,7 +56,7 @@ protected: const string syntaxTreeStr = toString(query_->pRoot, false); SLogicNode* pLogicPlan = nullptr; - SPlanContext cxt = { .queryId = 1, .pAstRoot = query_->pRoot }; + SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot }; code = createLogicPlan(&cxt, &pLogicPlan); if (code != TSDB_CODE_SUCCESS) { cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl; From 309076de1f929b4633c7782396c23ed5a9d2296a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Mar 2022 14:11:34 +0800 Subject: [PATCH 02/10] [td-13039] add first/last query. --- include/libs/function/function.h | 2 +- source/libs/function/inc/builtinsimpl.h | 4 + source/libs/function/src/builtins.c | 22 ++++ source/libs/function/src/builtinsimpl.c | 164 ++++++++++++++++++++---- 4 files changed, 165 insertions(+), 27 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index c01e267c42..fde09e59da 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -163,7 +163,7 @@ typedef struct SInputColumnInfoData { typedef struct SqlFunctionCtx { SInputColumnInfoData input; SResultDataInfo resDataInfo; - uint32_t order; // asc|desc + uint32_t order; // data block scanner order: asc|desc //////////////////////////////////////////////////////////////// int32_t startRow; // start row index int32_t size; // handled processed row number diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 7ba7d7bdcc..3f28f4de7b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -37,6 +37,10 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); void minFunction(SqlFunctionCtx* pCtx); void maxFunction(SqlFunctionCtx *pCtx); +bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +void firstFunction(SqlFunctionCtx *pCtx); +void lastFunction(SqlFunctionCtx *pCtx); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index edb0acf075..1d9db9aa71 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -61,6 +61,26 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = maxFunction, .finalizeFunc = functionFinalizer }, + { + .name = "first", + .type = FUNCTION_TYPE_FIRST, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunction, + .finalizeFunc = functionFinalizer + }, + { + .name = "last", + .type = FUNCTION_TYPE_LAST, + .classification = FUNC_MGT_AGG_FUNC, + .checkFunc = stubCheckAndGetResultType, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = lastFunction, + .finalizeFunc = functionFinalizer + }, { .name = "concat", .type = FUNCTION_TYPE_CONCAT, @@ -98,6 +118,8 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType }; break; } + case FUNCTION_TYPE_FIRST: + case FUNCTION_TYPE_LAST: case FUNCTION_TYPE_MIN: case FUNCTION_TYPE_MAX: { SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index aaaee6d56c..ccac37fd0c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -72,13 +72,12 @@ void countFunction(SqlFunctionCtx *pCtx) { int32_t numOfElem = 0; /* - * 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true; - * 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true; - * 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false; + * 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataAggIsSet == true; + * 2. for general non-primary key columns, pInputCol->hasNull may be true or false, pInput->colDataAggIsSet == true; + * 3. for primary key column, pInputCol->hasNull always be false, pInput->colDataAggIsSet == false; */ SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - if (pInput->colDataAggIsSet && pInput->totalRows == pInput->numOfRows) { numOfElem = pInput->numOfRows - pInput->pColumnDataAgg[0]->numOfNull; ASSERT(numOfElem >= 0); @@ -173,7 +172,7 @@ void sumFunction(SqlFunctionCtx *pCtx) { SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); } -bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { +bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SSumRes); return true; } @@ -265,8 +264,7 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { return true; } -bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - SNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); +bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(int64_t); return true; } @@ -278,34 +276,34 @@ bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { do { \ for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ - __ctx->fpSet.process(__ctx); \ + __ctx->fpSet.process(__ctx); \ } \ } while (0); -#define DO_UPDATE_SUBSID_RES(ctx, ts) \ - do { \ +#define DO_UPDATE_SUBSID_RES(ctx, ts) \ + do { \ for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ - SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ - if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ - __ctx->tag.i = (ts); \ - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ - } \ - __ctx->fpSet.process(__ctx); \ - } \ + SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ + if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ + __ctx->tag.i = (ts); \ + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ + } \ + __ctx->fpSet.process(__ctx); \ + } \ } while (0) #define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ - do { \ - if (((left) < (right)) ^ (sign)) { \ - (left) = (right); \ - DO_UPDATE_SUBSID_RES(ctx, _ts); \ - (num) += 1; \ - } \ + do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_SUBSID_RES(ctx, _ts); \ + (num) += 1; \ + } \ } while (0) -#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ +#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ do { \ - _t* d = (_t*)((_col)->pData); \ + _t *d = (_t *)((_col)->pData); \ for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \ if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ continue; \ @@ -445,4 +443,118 @@ void minFunction(SqlFunctionCtx *pCtx) { void maxFunction(SqlFunctionCtx *pCtx) { int32_t numOfElems = doMinMaxHelper(pCtx, 0); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); -} \ No newline at end of file +} + +bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); + pEnv->calcMemSize = pNode->node.resType.bytes; + return true; +} + +// TODO fix this +// This ordinary first function only handle the data block in ascending order +void firstFunction(SqlFunctionCtx *pCtx) { + if (pCtx->order == TSDB_ORDER_DESC) { + return; + } + + int32_t numOfElems = 0; + + struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + + SColumnInfoData* pInputCol = pInput->pData[0]; + + // All null data column, return directly. + if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { + ASSERT(pInputCol->hasNull == true); + return; + } + + // Check for the first not null data + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + memcpy(buf, data, pInputCol->info.bytes); + // TODO handle the subsidary value +// if (pCtx->ptsList != NULL) { +// TSKEY k = GET_TS_DATA(pCtx, i); +// DO_UPDATE_TAG_COLUMNS(pCtx, k); +// } + + pResInfo->hasResult = DATA_SET_FLAG; + pResInfo->complete = true; + + numOfElems++; + break; + } + + SET_VAL(pResInfo, numOfElems, 1); +} + +void lastFunction(SqlFunctionCtx *pCtx) { + if (pCtx->order != TSDB_ORDER_DESC) { + return; + } + + int32_t numOfElems = 0; + + struct SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + + SColumnInfoData* pInputCol = pInput->pData[0]; + + // All null data column, return directly. + if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { + ASSERT(pInputCol->hasNull == true); + return; + } + + if (pCtx->order == TSDB_ORDER_DESC) { + for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + memcpy(buf, data, pInputCol->info.bytes); + +// TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + + pResInfo->hasResult = DATA_SET_FLAG; + pResInfo->complete = true; // set query completed on this column + numOfElems++; + break; + } + } else { // ascending order + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, NULL)) { + continue; + } + + char* data = colDataGetData(pInputCol, i); + TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0; + + if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) { + pResInfo->hasResult = DATA_SET_FLAG; + memcpy(buf, data, pCtx->inputBytes); + + *(TSKEY*)buf = ts; +// DO_UPDATE_TAG_COLUMNS(pCtx, ts); + } + + numOfElems++; + break; + } + } + + SET_VAL(pResInfo, numOfElems, 1); +} From d2bda549d021d9bcd32a97aacc11807cd0965d31 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Mar 2022 14:23:04 +0800 Subject: [PATCH 03/10] [td-13039] refactor. --- source/libs/executor/src/executorimpl.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b1b190c816..b0999cdfa7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7266,16 +7266,15 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo)); - int32_t numOfRows = 4096; - pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, numOfRows); + pInfo->binfo.capacity = 4096; + pInfo->binfo.pRes = createOutputBuf_rv(pExprInfo, pInfo->binfo.capacity); pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, &pInfo->binfo.rowCellInfoOffset); - // initResultRowInfo(&pBInfo->resultRowInfo, 8); // setFunctionResultOutput(pBInfo, MAIN_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ProjectOperator"; - // pOperator->operatorType = OP_Project; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; @@ -7283,6 +7282,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExp pOperator->numOfOutput = taosArrayGetSize(pExprInfo); pOperator->nextDataFn = doProjectOperation; + pOperator->pTaskInfo = pTaskInfo; pOperator->closeFn = destroyProjectOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); From 396c099a77fd2db92b51536097bfe1ab8c938be3 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 11 Mar 2022 01:41:57 -0500 Subject: [PATCH 04/10] TD-13981 show databases rewrite --- include/common/taosdef.h | 3 +- include/libs/nodes/plannodes.h | 2 +- include/util/tdef.h | 2 +- source/dnode/mnode/impl/src/mndInfoSchema.c | 4 +- source/libs/nodes/src/nodesCodeFuncs.c | 14 ++++ source/libs/nodes/src/nodesUtilFuncs.c | 2 + source/libs/parser/src/parAstCreater.c | 12 ++-- source/libs/parser/src/parTranslater.c | 72 +++++++++++++++------ source/libs/planner/src/planLogicCreater.c | 22 ++++++- source/libs/planner/src/planPhysiCreater.c | 17 ++++- source/libs/qcom/src/querymsg.c | 2 +- 11 files changed, 117 insertions(+), 35 deletions(-) diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 9e5e5ebdcf..89329d3c3d 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -35,7 +35,8 @@ typedef enum { TSDB_NORMAL_TABLE = 3, // ordinary table TSDB_STREAM_TABLE = 4, // table created from stream computing TSDB_TEMP_TABLE = 5, // temp table created by nest query - TSDB_TABLE_MAX = 6 + TSDB_SYSTEM_TABLE = 6, + TSDB_TABLE_MAX = 7 } ETableType; typedef enum { diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 717baf24cd..fe476b9da1 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -36,7 +36,7 @@ typedef struct SLogicNode { typedef enum EScanType { SCAN_TYPE_TAG, SCAN_TYPE_TABLE, - SCAN_TYPE_STABLE, + SCAN_TYPE_SYSTEM_TABLE, SCAN_TYPE_STREAM } EScanType; diff --git a/include/util/tdef.h b/include/util/tdef.h index a53b81894a..057725b1ff 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -99,7 +99,7 @@ extern const int32_t TYPE_BYTES[15]; #define TSDB_INS_TABLE_MNODES "mnodes" #define TSDB_INS_TABLE_MODULES "modules" #define TSDB_INS_TABLE_QNODES "qnodes" -#define TSDB_INS_TABLE_USER_DATABASE "user_database" +#define TSDB_INS_TABLE_USER_DATABASES "user_databases" #define TSDB_INS_TABLE_USER_FUNCTIONS "user_functions" #define TSDB_INS_TABLE_USER_INDEXES "user_indexes" #define TSDB_INS_TABLE_USER_STABLES "user_stables" diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index 2c391e93e8..8762204251 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -128,7 +128,7 @@ static const SInfosTableMeta infosMeta[] = {{TSDB_INS_TABLE_DNODES, dnodesSchema {TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)}, {TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)}, {TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)}, - {TSDB_INS_TABLE_USER_DATABASE, userDBSchema, tListLen(userDBSchema)}, + {TSDB_INS_TABLE_USER_DATABASES, userDBSchema, tListLen(userDBSchema)}, {TSDB_INS_TABLE_USER_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)}, {TSDB_INS_TABLE_USER_INDEXES, userIdxSchema, tListLen(userIdxSchema)}, {TSDB_INS_TABLE_USER_STABLES, userStbsSchema, tListLen(userStbsSchema)}, @@ -165,7 +165,7 @@ int32_t mndInsInitMeta(SHashObj *hash) { STableMetaRsp meta = {0}; strcpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB); - meta.tableType = TSDB_NORMAL_TABLE; + meta.tableType = TSDB_SYSTEM_TABLE; meta.sversion = 1; meta.tversion = 1; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 7b60218180..44810096d3 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -107,6 +107,8 @@ const char* nodesNodeName(ENodeType type) { return "PhysiTableSeqScan"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: return "PhysiSreamScan"; + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + return "PhysiSystemTableScan"; case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return "PhysiProject"; case QUERY_NODE_PHYSICAL_PLAN_JOIN: @@ -440,6 +442,14 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { return code; } +static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) { + return physiScanNodeToJson(pObj, pJson); +} + +static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) { + return jsonToPhysiScanNode(pJson, pObj); +} + static const char* jkProjectPhysiPlanProjections = "Projections"; static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) { @@ -1492,6 +1502,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: break; + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + return physiSysTableScanNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return physiProjectNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_JOIN: @@ -1569,6 +1581,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiTagScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: return jsonToPhysiTableScanNode(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + return jsonToPhysiSysTableScanNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return jsonToPhysiProjectNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_JOIN: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 4055faf299..a8f7219dc2 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -146,6 +146,8 @@ SNodeptr nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(STableSeqScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: return makeNode(type, sizeof(SNode)); + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: + return makeNode(type, sizeof(SSystemTableScanPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_PROJECT: return makeNode(type, sizeof(SProjectPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_JOIN: diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index f21db0f133..9f6b697371 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -396,9 +396,9 @@ static bool checkPort(SAstCreateContext* pCxt, const SToken* pPortToken, int32_t return pCxt->valid; } -static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName) { +static bool checkDbName(SAstCreateContext* pCxt, const SToken* pDbName, bool query) { if (NULL == pDbName) { - return true; + return (query ? NULL != pCxt->pQueryCxt->db : true); } pCxt->valid = pDbName->n < TSDB_DB_NAME_LEN ? true : false; return pCxt->valid; @@ -557,7 +557,7 @@ SNode* createNodeListNode(SAstCreateContext* pCxt, SNodeList* pList) { } SNode* createRealTableNode(SAstCreateContext* pCxt, const SToken* pDbName, const SToken* pTableName, const SToken* pTableAlias) { - if (!checkDbName(pCxt, pDbName) || !checkTableName(pCxt, pTableName)) { + if (!checkDbName(pCxt, pDbName, true) || !checkTableName(pCxt, pTableName)) { return NULL; } SRealTableNode* realTable = (SRealTableNode*)nodesMakeNode(QUERY_NODE_REAL_TABLE); @@ -769,7 +769,7 @@ SDatabaseOptions* setDatabaseOption(SAstCreateContext* pCxt, SDatabaseOptions* p } SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, const SToken* pDbName, SDatabaseOptions* pOptions) { - if (!checkDbName(pCxt, pDbName)) { + if (!checkDbName(pCxt, pDbName, false)) { return NULL; } SCreateDatabaseStmt* pStmt = (SCreateDatabaseStmt*)nodesMakeNode(QUERY_NODE_CREATE_DATABASE_STMT); @@ -782,7 +782,7 @@ SNode* createCreateDatabaseStmt(SAstCreateContext* pCxt, bool ignoreExists, cons } SNode* createDropDatabaseStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pDbName) { - if (!checkDbName(pCxt, pDbName)) { + if (!checkDbName(pCxt, pDbName, false)) { return NULL; } SDropDatabaseStmt* pStmt = (SDropDatabaseStmt*)nodesMakeNode(QUERY_NODE_DROP_DATABASE_STMT); @@ -904,7 +904,7 @@ SNode* createUseDatabaseStmt(SAstCreateContext* pCxt, const SToken* pDbName) { } SNode* createShowStmt(SAstCreateContext* pCxt, ENodeType type, const SToken* pDbName) { - if (!checkDbName(pCxt, pDbName)) { + if (!checkDbName(pCxt, pDbName, false)) { return NULL; } SShowStmt* pStmt = nodesMakeNode(type);; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 4be4e25eb8..8917eab326 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -499,25 +499,38 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) return TSDB_CODE_SUCCESS; } -static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNode* pRealTable) { +static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) { + size_t vgroupNum = taosArrayGetSize(pVgs); + *pVgsInfo = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum); + if (NULL == *pVgsInfo) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*pVgsInfo)->numOfVgroups = vgroupNum; + for (int32_t i = 0; i < vgroupNum; ++i) { + SVgroupInfo *vg = taosArrayGet(pVgs, i); + (*pVgsInfo)->vgroups[i] = *vg; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t setTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableNode* pRealTable) { + int32_t code = TSDB_CODE_SUCCESS; if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) { SArray* vgroupList = NULL; - int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, &vgroupList); - if (code != TSDB_CODE_SUCCESS) { - return code; + code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, &vgroupList); + if (TSDB_CODE_SUCCESS == code) { + code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList); } - - size_t vgroupNum = taosArrayGetSize(vgroupList); - pRealTable->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo) * vgroupNum); - if (NULL == pRealTable->pVgroupList) { - return TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(vgroupList); + } else if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) { + SArray* vgroupList = NULL; + char fullDbName[TSDB_DB_FNAME_LEN]; + // tNameGetFullDbName(pName, fullDbName); + snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->acctId, "test"); + code = catalogGetDBVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, fullDbName, false, &vgroupList); + if (TSDB_CODE_SUCCESS == code) { + code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList); } - pRealTable->pVgroupList->numOfVgroups = vgroupNum; - for (int32_t i = 0; i < vgroupNum; ++i) { - SVgroupInfo *vg = taosArrayGet(vgroupList, i); - pRealTable->pVgroupList->vgroups[i] = *vg; - } - taosArrayDestroy(vgroupList); } else { pRealTable->pVgroupList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo)); @@ -525,12 +538,9 @@ static int32_t setTableVgroupList(SParseContext* pCxt, SName* name, SRealTableNo return TSDB_CODE_OUT_OF_MEMORY; } pRealTable->pVgroupList->numOfVgroups = 1; - int32_t code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, name, pRealTable->pVgroupList->vgroups); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pRealTable->pVgroupList->vgroups); } - return TSDB_CODE_SUCCESS; + return code; } static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { @@ -1238,6 +1248,25 @@ static void destroyTranslateContext(STranslateContext* pCxt) { } } +static int32_t rewriteShowDatabase(STranslateContext* pCxt, SQuery* pQuery) { + SSelectStmt* pStmt = nodesMakeNode(QUERY_NODE_SELECT_STMT); + if (NULL == pStmt) { + return TSDB_CODE_OUT_OF_MEMORY; + } + SRealTableNode* pTable = nodesMakeNode(QUERY_NODE_REAL_TABLE); + if (NULL == pTable) { + nodesDestroyNode(pStmt); + return TSDB_CODE_OUT_OF_MEMORY; + } + strcpy(pTable->table.dbName, TSDB_INFORMATION_SCHEMA_DB); + strcpy(pTable->table.tableName, TSDB_INS_TABLE_USER_DATABASES); + pStmt->pFromTable = (SNode*)pTable; + + nodesDestroyNode(pQuery->pRoot); + pQuery->pRoot = (SNode*)pStmt; + return TSDB_CODE_SUCCESS; +} + typedef struct SVgroupTablesBatch { SVCreateTbBatchReq req; SVgroupInfo info; @@ -1608,6 +1637,9 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery) static int32_t rewriteQuery(STranslateContext* pCxt, SQuery* pQuery) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pQuery->pRoot)) { + case QUERY_NODE_SHOW_DATABASES_STMT: + code = rewriteShowDatabase(pCxt, pQuery); + break; case QUERY_NODE_CREATE_TABLE_STMT: if (NULL == ((SCreateTableStmt*)pQuery->pRoot)->pTags) { code = rewriteCreateTable(pCxt, pQuery); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index a93985e8ba..e1d625ff0a 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -123,6 +123,26 @@ error: return pRoot; } +static EScanType getScanType(SNodeList* pScanCols, STableMeta* pMeta) { + if (NULL == pScanCols) { + // select count(*) from t + return SCAN_TYPE_TABLE; + } + + if (TSDB_SYSTEM_TABLE == pMeta->tableType) { + return SCAN_TYPE_SYSTEM_TABLE; + } + + SNode* pCol = NULL; + FOREACH(pCol, pScanCols) { + if (COLUMN_TYPE_COLUMN == ((SColumnNode*)pCol)->colType) { + return SCAN_TYPE_TABLE; + } + } + + return SCAN_TYPE_TAG; +} + static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SRealTableNode* pRealTable) { SScanLogicNode* pScan = (SScanLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN); CHECK_ALLOC(pScan, NULL); @@ -145,7 +165,7 @@ static SLogicNode* createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe CHECK_ALLOC(pScan->node.pTargets, (SLogicNode*)pScan); } - pScan->scanType = SCAN_TYPE_TABLE; + pScan->scanType = getScanType(pCols, pScan->pMeta); pScan->scanFlag = MAIN_SCAN; pScan->scanRange = TSWINDOW_INITIALIZER; pScan->tableName.type = TSDB_TABLE_NAME_T; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 5af26b3e32..6b61a5e7a2 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -259,13 +259,26 @@ static SPhysiNode* createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* p return (SPhysiNode*)pTableScan; } +static SPhysiNode* createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SScanLogicNode* pScanLogicNode) { + SSystemTableScanPhysiNode* pScan = (SSystemTableScanPhysiNode*)makePhysiNode(pCxt, QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN); + CHECK_ALLOC(pScan, NULL); + CHECK_CODE(initScanPhysiNode(pCxt, pScanLogicNode, (SScanPhysiNode*)pScan), (SPhysiNode*)pScan); + for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) { + SQueryNodeAddr addr; + vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr); + taosArrayPush(pCxt->pExecNodeList, &addr); + } + return (SPhysiNode*)pScan; +} + static SPhysiNode* createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode) { switch (pScanLogicNode->scanType) { case SCAN_TYPE_TAG: return createTagScanPhysiNode(pCxt, pScanLogicNode); case SCAN_TYPE_TABLE: return createTableScanPhysiNode(pCxt, pSubplan, pScanLogicNode); - case SCAN_TYPE_STABLE: + case SCAN_TYPE_SYSTEM_TABLE: + return createSystemTableScanPhysiNode(pCxt, pScanLogicNode); case SCAN_TYPE_STREAM: break; default: @@ -768,7 +781,7 @@ static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) { static int32_t doBuildPhysiPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) { SSubplan* pSubplan = createPhysiSubplan(pCxt, pLogicSubplan); - CHECK_ALLOC(pSubplan, DEAL_RES_ERROR); + CHECK_ALLOC(pSubplan, TSDB_CODE_OUT_OF_MEMORY); CHECK_CODE_EXT(pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans)); ++(pQueryPlan->numOfSubplans); if (NULL != pParent) { diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 37e8b7302e..f2df34b2f1 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -166,7 +166,7 @@ static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) { } if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE && - pMetaMsg->tableType != TSDB_NORMAL_TABLE) { + pMetaMsg->tableType != TSDB_NORMAL_TABLE && pMetaMsg->tableType != TSDB_SYSTEM_TABLE) { qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType); return TSDB_CODE_TSC_INVALID_VALUE; } From 028e6d16201ddfec3ff2f7058abaeff9a98710b6 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 11 Mar 2022 02:58:29 -0500 Subject: [PATCH 05/10] TD-13981 show databases rewrite --- include/libs/nodes/plannodes.h | 6 +- include/libs/planner/planner.h | 1 + source/client/src/clientImpl.c | 7 +- source/libs/nodes/src/nodesCodeFuncs.c | 102 +++++++++++++++------ source/libs/planner/src/planPhysiCreater.c | 1 + 5 files changed, 88 insertions(+), 29 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index fe476b9da1..266ab0d77b 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -147,9 +147,13 @@ typedef struct SScanPhysiNode { SName tableName; } SScanPhysiNode; -typedef SScanPhysiNode SSystemTableScanPhysiNode; typedef SScanPhysiNode STagScanPhysiNode; +typedef struct SSystemTableScanPhysiNode { + SScanPhysiNode scan; + SEpSet mgmtEpSet; +} SSystemTableScanPhysiNode; + typedef struct STableScanPhysiNode { SScanPhysiNode scan; uint8_t scanFlag; // denotes reversed scan of data or not diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 07579e0a7d..fe1b2d27f9 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -25,6 +25,7 @@ extern "C" { typedef struct SPlanContext { uint64_t queryId; int32_t acctId; + SEpSet mgmtEpSet; SNode* pAstRoot; } SPlanContext; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 2c83e1e113..74bad09680 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -194,7 +194,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) { pRequest->type = pQuery->msgType; - SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId }; + SPlanContext cxt = { + .queryId = pRequest->requestId, + .acctId = pRequest->pTscObj->acctId, + .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), + .pAstRoot = pQuery->pRoot + }; int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList); if (code != 0) { return code; diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 44810096d3..5d007db29b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -442,12 +442,85 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { return code; } +static const char* jkEndPointFqdn = "Fqdn"; +static const char* jkEndPointPort = "Port"; + +static int32_t epToJson(const void* pObj, SJson* pJson) { + const SEp* pNode = (const SEp*)pObj; + + int32_t code = tjsonAddStringToObject(pJson, jkEndPointFqdn, pNode->fqdn); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkEndPointPort, pNode->port); + } + + return code; +} + +static int32_t jsonToEp(const SJson* pJson, void* pObj) { + SEp* pNode = (SEp*)pObj; + + int32_t code = tjsonGetStringValue(pJson, jkEndPointFqdn, pNode->fqdn); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetSmallIntValue(pJson, jkEndPointPort, &pNode->port); + } + + return code; +} + +static const char* jkEpSetInUse = "InUse"; +static const char* jkEpSetNumOfEps = "NumOfEps"; +static const char* jkEpSetEps = "Eps"; + +static int32_t epSetToJson(const void* pObj, SJson* pJson) { + const SEpSet* pNode = (const SEpSet*)pObj; + + int32_t code = tjsonAddIntegerToObject(pJson, jkEpSetInUse, pNode->inUse); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkEpSetNumOfEps, pNode->numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddArray(pJson, jkEpSetEps, epToJson, pNode->eps, sizeof(SEp), pNode->numOfEps); + } + + return code; +} + +static int32_t jsonToEpSet(const SJson* pJson, void* pObj) { + SEpSet* pNode = (SEpSet*)pObj; + + int32_t code = tjsonGetTinyIntValue(pJson, jkEpSetInUse, &pNode->inUse); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkEpSetNumOfEps, &pNode->numOfEps); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToArray(pJson, jkEpSetEps, jsonToEp, pNode->eps, sizeof(SEp)); + } + + return code; +} + +static const char* jkSysTableScanPhysiPlanMnodeEpSet = "MnodeEpSet"; + static int32_t physiSysTableScanNodeToJson(const void* pObj, SJson* pJson) { - return physiScanNodeToJson(pObj, pJson); + const SSystemTableScanPhysiNode* pNode = (const SSystemTableScanPhysiNode*)pObj; + + int32_t code = physiScanNodeToJson(pObj, pJson); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, epSetToJson, &pNode->mgmtEpSet); + } + + return code; } static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) { - return jsonToPhysiScanNode(pJson, pObj); + SSystemTableScanPhysiNode* pNode = (SSystemTableScanPhysiNode*)pObj; + + int32_t code = jsonToPhysiScanNode(pJson, pObj); + if (TSDB_CODE_SUCCESS == code) { + code = tjsonToObject(pJson, jkSysTableScanPhysiPlanMnodeEpSet, jsonToEpSet, &pNode->mgmtEpSet); + } + + return code; } static const char* jkProjectPhysiPlanProjections = "Projections"; @@ -635,31 +708,6 @@ static int32_t jsonToSubplanId(const SJson* pJson, void* pObj) { return code; } -static const char* jkEndPointFqdn = "Fqdn"; -static const char* jkEndPointPort = "Port"; - -static int32_t epToJson(const void* pObj, SJson* pJson) { - const SEp* pNode = (const SEp*)pObj; - - int32_t code = tjsonAddStringToObject(pJson, jkEndPointFqdn, pNode->fqdn); - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkEndPointPort, pNode->port); - } - - return code; -} - -static int32_t jsonToEp(const SJson* pJson, void* pObj) { - SEp* pNode = (SEp*)pObj; - - int32_t code = tjsonGetStringValue(pJson, jkEndPointFqdn, pNode->fqdn); - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetSmallIntValue(pJson, jkEndPointPort, &pNode->port); - } - - return code; -} - static const char* jkQueryNodeAddrId = "Id"; static const char* jkQueryNodeAddrInUse = "InUse"; static const char* jkQueryNodeAddrNumOfEps = "NumOfEps"; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 6b61a5e7a2..d3e7ba56ed 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -268,6 +268,7 @@ static SPhysiNode* createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SScan vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr); taosArrayPush(pCxt->pExecNodeList, &addr); } + pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet; return (SPhysiNode*)pScan; } From 68a6868310f3c8edd6b59e24b236d74e3285a2f4 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 11 Mar 2022 04:50:04 -0500 Subject: [PATCH 06/10] TD-13981 show databases rewrite --- source/libs/parser/inc/parUtil.h | 7 ++ source/libs/parser/src/parInsert.c | 2 - source/libs/parser/src/parTranslater.c | 124 +++++++++++++++++-------- source/libs/parser/src/parser.c | 7 +- 4 files changed, 97 insertions(+), 43 deletions(-) diff --git a/source/libs/parser/inc/parUtil.h b/source/libs/parser/inc/parUtil.h index 4d7a8e2a18..171b406e18 100644 --- a/source/libs/parser/inc/parUtil.h +++ b/source/libs/parser/inc/parUtil.h @@ -23,6 +23,13 @@ extern "C" { #include "os.h" #include "query.h" +#define parserFatal(param, ...) qFatal("PARSER: " param, __VA_ARGS__) +#define parserError(param, ...) qError("PARSER: " param, __VA_ARGS__) +#define parserWarn(param, ...) qWarn("PARSER: " param, __VA_ARGS__) +#define parserInfo(param, ...) qInfo("PARSER: " param, __VA_ARGS__) +#define parserDebug(param, ...) qDebug("PARSER: " param, __VA_ARGS__) +#define parserTrace(param, ...) qTrace("PARSER: " param, __VA_ARGS__) + typedef struct SMsgBuf { int32_t len; char *buf; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 43cc308483..4e23901b85 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1024,7 +1024,6 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { }; if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pOutput) { - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1043,6 +1042,5 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) { code = parseInsertBody(&context); } destroyInsertParseContext(&context); - terrno = code; return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 589dc5bf26..a0c545c24f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -69,14 +69,78 @@ static int32_t addNamespace(STranslateContext* pCxt, void* pTable) { return TSDB_CODE_SUCCESS; } -static SName* toName(int32_t acctId, const SRealTableNode* pRealTable, SName* pName) { +static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName, SName* pName) { pName->type = TSDB_TABLE_NAME_T; pName->acctId = acctId; - strcpy(pName->dbname, pRealTable->table.dbName); - strcpy(pName->tname, pRealTable->table.tableName); + strcpy(pName->dbname, pDbName); + strcpy(pName->tname, pTableName); return pName; } +static int32_t getTableMetaImpl(SParseContext* pCxt, const SName* pName, STableMeta** pMeta) { + int32_t code = catalogGetTableMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pMeta); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetTableMeta error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); + } + return code; +} + +static int32_t getTableMeta(SParseContext* pCxt, const char* pDbName, const char* pTableName, STableMeta** pMeta) { + SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId }; + strcpy(name.dbname, pDbName); + strcpy(name.tname, pTableName); + return getTableMetaImpl(pCxt, &name, pMeta); +} + +static int32_t getTableDistVgInfo(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) { + int32_t code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pVgInfo); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetTableDistVgInfo error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); + } + return code; +} + +static int32_t getDBVgInfoImpl(SParseContext* pCxt, const SName* pName, SArray** pVgInfo) { + char fullDbName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pName, fullDbName); + int32_t code = catalogGetDBVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, fullDbName, pVgInfo); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetDBVgInfo error, code:%s, dbFName:%s", tstrerror(code), fullDbName); + } + return code; +} + +static int32_t getDBVgInfo(SParseContext* pCxt, const char* pDbName, SArray** pVgInfo) { + SName name; + tNameSetDbName(&name, pCxt->acctId, pDbName, strlen(pDbName)); + char dbFname[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(&name, dbFname); + return getDBVgInfoImpl(pCxt, &name, pVgInfo); +} + +static int32_t getTableHashVgroupImpl(SParseContext* pCxt, const SName* pName, SVgroupInfo* pInfo) { + int32_t code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pInfo); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetTableHashVgroup error, code:%s, dbName:%s, tbName:%s", tstrerror(code), pName->dbname, pName->tname); + } + return code; +} + +static int32_t getTableHashVgroup(SParseContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) { + SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId }; + strcpy(name.dbname, pDbName); + strcpy(name.tname, pTableName); + return getTableHashVgroupImpl(pCxt, &name, pInfo); +} + +static int32_t getDBVgVersion(SParseContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) { + int32_t code = catalogGetDBVgVersion(pCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum); + if (TSDB_CODE_SUCCESS != code) { + parserError("catalogGetDBVgVersion error, code:%s, dbFName:%s", tstrerror(code), pDbFName); + } + return code; +} + static bool belongTable(const char* currentDb, const SColumnNode* pCol, const STableNode* pTable) { int cmp = 0; if ('\0' != pCol->dbName[0]) { @@ -517,17 +581,14 @@ static int32_t setTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableN int32_t code = TSDB_CODE_SUCCESS; if (TSDB_SUPER_TABLE == pRealTable->pMeta->tableType) { SArray* vgroupList = NULL; - code = catalogGetTableDistVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, &vgroupList); + code = getTableDistVgInfo(pCxt, pName, &vgroupList); if (TSDB_CODE_SUCCESS == code) { code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList); } taosArrayDestroy(vgroupList); } else if (TSDB_SYSTEM_TABLE == pRealTable->pMeta->tableType) { SArray* vgroupList = NULL; - char fullDbName[TSDB_DB_FNAME_LEN]; - // tNameGetFullDbName(pName, fullDbName); - snprintf(fullDbName, TSDB_DB_FNAME_LEN, "%d.%s", pCxt->acctId, "test"); - code = catalogGetDBVgInfo(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, fullDbName, false, &vgroupList); + code = getDBVgInfoImpl(pCxt, pName, &vgroupList); if (TSDB_CODE_SUCCESS == code) { code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList); } @@ -538,7 +599,7 @@ static int32_t setTableVgroupList(SParseContext* pCxt, SName* pName, SRealTableN return TSDB_CODE_OUT_OF_MEMORY; } pRealTable->pVgroupList->numOfVgroups = 1; - code = catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pName, pRealTable->pVgroupList->vgroups); + code = getTableHashVgroupImpl(pCxt, pName, pRealTable->pVgroupList->vgroups); } return code; } @@ -549,8 +610,8 @@ static int32_t translateTable(STranslateContext* pCxt, SNode* pTable) { case QUERY_NODE_REAL_TABLE: { SRealTableNode* pRealTable = (SRealTableNode*)pTable; SName name; - code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet), - toName(pCxt->pParseCxt->acctId, pRealTable, &name), &(pRealTable->pMeta)); + code = getTableMetaImpl(pCxt->pParseCxt, + toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name), &(pRealTable->pMeta)); if (TSDB_CODE_SUCCESS != code) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pRealTable->table.tableName); } @@ -906,11 +967,10 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt) { SDropTableClause* pClause = nodesListGetNode(pStmt->pTables, 0); - SName tableName = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId }; - strcpy(tableName.dbname, pClause->dbName); - strcpy(tableName.tname, pClause->tableName); STableMeta* pTableMeta = NULL; - int32_t code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &(pCxt->pParseCxt->mgmtEpSet), &tableName, &pTableMeta); + SName tableName; + int32_t code = getTableMetaImpl( + pCxt->pParseCxt, toName(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, &tableName), &pTableMeta); if (TSDB_CODE_SUCCESS == code) { if (TSDB_SUPER_TABLE == pTableMeta->tableType) { code = doTranslateDropSuperTable(pCxt, &tableName, pClause->ignoreNotExists); @@ -918,8 +978,8 @@ static int32_t translateDropTable(STranslateContext* pCxt, SDropTableStmt* pStmt // todo : drop normal table or child table code = TSDB_CODE_FAILED; } + tfree(pTableMeta); } - tfree(pTableMeta); return code; } @@ -932,13 +992,14 @@ static int32_t translateDropSuperTable(STranslateContext* pCxt, SDropSuperTableS } static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* pStmt) { + SUseDbReq usedbReq = {0}; SName name = {0}; tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName)); - - SUseDbReq usedbReq = {0}; tNameExtractFullName(&name, usedbReq.db); - - catalogGetDBVgVersion(pCxt->pParseCxt->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable); + int32_t code = getDBVgVersion(pCxt->pParseCxt, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable); + if (TSDB_CODE_SUCCESS != code) { + return code; + } pCxt->pCmdMsg = malloc(sizeof(SCmdMsgInfo)); if (NULL== pCxt->pCmdMsg) { @@ -1114,19 +1175,14 @@ static int32_t translateShow(STranslateContext* pCxt, SShowStmt* pStmt) { } static int32_t translateShowTables(STranslateContext* pCxt) { - SName name = {0}; SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq)); if (pCxt->pParseCxt->db == NULL || strlen(pCxt->pParseCxt->db) == 0) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_TSC_INVALID_OPERATION, "db not specified"); } - tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db)); - char dbFname[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(&name, dbFname); - SArray* array = NULL; - int32_t code = catalogGetDBVgInfo(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, dbFname, &array); - if (code != TSDB_CODE_SUCCESS) { + int32_t code = getDBVgInfo(pCxt->pParseCxt, pCxt->pParseCxt->db, &array); + if (TSDB_CODE_SUCCESS != code) { return code; } SVgroupInfo* info = taosArrayGet(array, 0); @@ -1353,13 +1409,6 @@ static void destroyCreateTbReqBatch(SVgroupTablesBatch* pTbBatch) { taosArrayDestroy(pTbBatch->req.pArray); } -static int32_t getTableHashVgroup(SParseContext* pCxt, const char* pDbName, const char* pTableName, SVgroupInfo* pInfo) { - SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->acctId }; - strcpy(name.dbname, pDbName); - strcpy(name.tname, pTableName); - return catalogGetTableHashVgroup(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, &name, pInfo); -} - static int32_t rewriteToVnodeModifOpStmt(SQuery* pQuery, SArray* pBufArray) { SVnodeModifOpStmt* pNewStmt = nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); if (pNewStmt == NULL) { @@ -1544,12 +1593,9 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau } static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableClause* pStmt, SHashObj* pVgroupHashmap) { - SName name = { .type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId }; - strcpy(name.dbname, pStmt->useDbName); - strcpy(name.tname, pStmt->useTableName); STableMeta* pSuperTableMeta = NULL; - int32_t code = catalogGetTableMeta(pCxt->pParseCxt->pCatalog, pCxt->pParseCxt->pTransporter, &pCxt->pParseCxt->mgmtEpSet, &name, &pSuperTableMeta); - + int32_t code = getTableMeta(pCxt->pParseCxt, pStmt->useDbName, pStmt->useTableName, &pSuperTableMeta); + SKVRowBuilder kvRowBuilder = {0}; if (TSDB_CODE_SUCCESS == code) { code = tdInitKVRowBuilder(&kvRowBuilder); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 868bd75520..08f316b539 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -38,11 +38,14 @@ static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) { } int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery) { + int32_t code = TSDB_CODE_SUCCESS; if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) { - return parseInsertSql(pCxt, pQuery); + code = parseInsertSql(pCxt, pQuery); } else { - return parseSqlIntoAst(pCxt, pQuery); + code = parseSqlIntoAst(pCxt, pQuery); } + terrno = code; + return code; } void qDestroyQuery(SQuery* pQueryNode) { From fbf664e27318f782256f71fd3acd8cbacad22203 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Mar 2022 19:29:43 +0800 Subject: [PATCH 07/10] [td-13039] fix bug in show. --- include/common/taosdef.h | 12 +- include/common/tmsg.h | 11 + source/common/src/tmsg.c | 2 + source/dnode/mgmt/impl/src/dndTransport.c | 1 + source/dnode/mnode/impl/src/mndDb.c | 218 ++++++++++---------- source/dnode/mnode/impl/src/mndInfoSchema.c | 24 +-- source/dnode/mnode/impl/src/mndShow.c | 18 +- source/libs/executor/inc/executorimpl.h | 28 ++- source/libs/executor/src/executorimpl.c | 147 ++++++++----- 9 files changed, 283 insertions(+), 178 deletions(-) diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 89329d3c3d..05208b1320 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -30,13 +30,13 @@ typedef int64_t tb_uid_t; #define IS_TSWINDOW_SPECIFIED(win) (((win).skey != INT64_MIN) || ((win).ekey != INT64_MAX)) typedef enum { - TSDB_SUPER_TABLE = 1, // super table - TSDB_CHILD_TABLE = 2, // table created from super table - TSDB_NORMAL_TABLE = 3, // ordinary table - TSDB_STREAM_TABLE = 4, // table created from stream computing - TSDB_TEMP_TABLE = 5, // temp table created by nest query + TSDB_SUPER_TABLE = 1, // super table + TSDB_CHILD_TABLE = 2, // table created from super table + TSDB_NORMAL_TABLE = 3, // ordinary table + TSDB_STREAM_TABLE = 4, // table created from stream computing + TSDB_TEMP_TABLE = 5, // temp table created by nest query TSDB_SYSTEM_TABLE = 6, - TSDB_TABLE_MAX = 7 + TSDB_TABLE_MAX = 7 } ETableType; typedef enum { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0dcf554433..a217aabc5c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -872,6 +872,17 @@ typedef struct { char data[]; } SRetrieveTableRsp; +typedef struct { + int64_t handle; + int64_t useconds; + int8_t completed; // all results are returned to client + int8_t precision; + int8_t compressed; + int32_t compLen; + int32_t numOfRows; + char data[]; +} SRetrieveMetaTableRsp; + typedef struct { char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port int32_t port; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ff853145fa..d84b1b6106 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1732,6 +1732,7 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI64(&encoder, pReq->showId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->type) < 0) return -1; if (tEncodeI8(&encoder, pReq->free) < 0) return -1; tEndEncode(&encoder); @@ -1746,6 +1747,7 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->type) < 0) return -1; if (tDecodeI8(&decoder, &pReq->free) < 0) return -1; tEndDecode(&decoder); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 78bd71b919..40101ecd16 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -102,6 +102,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SYSTABLE_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_TRANS)] = dndProcessMnodeWriteMsg; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a17a45d46a..da6573a7c7 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1324,123 +1324,133 @@ char *mnGetDbStr(char *src) { return pos; } -static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static char* getDataPosition(char* pData, SShowObj* pShow, int32_t cols, int32_t rows, int32_t capacityOfRow) { + return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows; +} + +static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity) { + int32_t cols = 0; + + char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + char *name = mnGetDbStr(pDb->name); + if (name != NULL) { + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]); + } else { + STR_TO_VARSTR(pWrite, "NULL"); + } + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int64_t *)pWrite = pDb->createdTime; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int16_t *)pWrite = pDb->cfg.numOfVgroups; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int64_t *)pWrite = 0; // todo: num of Tables + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int16_t *)pWrite = pDb->cfg.replications; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int16_t *)pWrite = pDb->cfg.quorum; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int16_t *)pWrite = pDb->cfg.daysPerFile; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + char tmp[128] = {0}; + if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) { + sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0); + } else { + sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2); + } + STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp)); + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.cacheBlockSize; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.totalBlocks; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.minRows; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.maxRows; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.walLevel; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int32_t *)pWrite = pDb->cfg.fsyncPeriod; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.compression; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.cacheLastRow; + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + char *prec = NULL; + switch (pDb->cfg.precision) { + case TSDB_TIME_PRECISION_MILLI: + prec = TSDB_TIME_PRECISION_MILLI_STR; + break; + case TSDB_TIME_PRECISION_MICRO: + prec = TSDB_TIME_PRECISION_MICRO_STR; + break; + case TSDB_TIME_PRECISION_NANO: + prec = TSDB_TIME_PRECISION_NANO_STR; + break; + default: + prec = "none"; + break; + } + STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); + cols++; + + pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); + *(int8_t *)pWrite = pDb->cfg.update; +} + +static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SDbObj *pDb = NULL; - char *pWrite; - int32_t cols = 0; - while (numOfRows < rows) { + while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb); - if (pShow->pIter == NULL) break; - - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - char *name = mnGetDbStr(pDb->name); - if (name != NULL) { - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, name, pShow->bytes[cols]); - } else { - STR_TO_VARSTR(pWrite, "NULL"); + if (pShow->pIter == NULL) { + break; } - cols++; - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pDb->createdTime; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDb->cfg.numOfVgroups; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = 0; // todo - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDb->cfg.replications; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDb->cfg.quorum; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int16_t *)pWrite = pDb->cfg.daysPerFile; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - char tmp[128] = {0}; - if (pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep1 || pDb->cfg.daysToKeep0 > pDb->cfg.daysToKeep2) { - sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2, pDb->cfg.daysToKeep0); - } else { - sprintf(tmp, "%d,%d,%d", pDb->cfg.daysToKeep0, pDb->cfg.daysToKeep1, pDb->cfg.daysToKeep2); - } - STR_WITH_SIZE_TO_VARSTR(pWrite, tmp, strlen(tmp)); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.cacheBlockSize; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.totalBlocks; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.minRows; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.maxRows; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int8_t *)pWrite = pDb->cfg.walLevel; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int32_t *)pWrite = pDb->cfg.fsyncPeriod; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int8_t *)pWrite = pDb->cfg.compression; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int8_t *)pWrite = pDb->cfg.cacheLastRow; - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - char *prec = NULL; - switch (pDb->cfg.precision) { - case TSDB_TIME_PRECISION_MILLI: - prec = TSDB_TIME_PRECISION_MILLI_STR; - break; - case TSDB_TIME_PRECISION_MICRO: - prec = TSDB_TIME_PRECISION_MICRO_STR; - break; - case TSDB_TIME_PRECISION_NANO: - prec = TSDB_TIME_PRECISION_NANO_STR; - break; - default: - prec = "none"; - break; - } - STR_WITH_SIZE_TO_VARSTR(pWrite, prec, 2); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int8_t *)pWrite = pDb->cfg.update; - cols++; + dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity); numOfRows++; sdbRelease(pSdb, pDb); } - mndVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + // Append the information_schema database into the result. + + + mndVacuumResult(data, pShow->numOfColumns, numOfRows, rowsCapacity, pShow); pShow->numOfReads += numOfRows; return numOfRows; diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index 8762204251..e60e2989be 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -39,24 +39,24 @@ static const SInfosTableSchema qnodesSchema[] = {{.name = "id", .byt {.name = "end_point", .bytes = 134, .type = TSDB_DATA_TYPE_BINARY}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, }; -static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, +static const SInfosTableSchema userDBSchema[] = {{.name = "name", .bytes = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, - {.name = "ntables", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "vgroups", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "replica", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "quorum", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "days", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "keep", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "vgroups", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "ntables", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, + {.name = "replica", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "quorum", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "days", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, + {.name = "keep", .bytes = 24 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "cache", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "blocks", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "minrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "maxrows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "wallevel", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "wallevel", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, {.name = "fsync", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "comp", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "cachelast", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "precision", .bytes = 2, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "status", .bytes = 10, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "comp", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "cachelast", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, + {.name = "precision", .bytes = 3 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, }; static const SInfosTableSchema userFuncSchema[] = {{.name = "name", .bytes = 32, .type = TSDB_DATA_TYPE_BINARY}, {.name = "created_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index ad97888ac5..83b0a0669f 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -284,6 +284,20 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) { strncpy(req.db, retrieveReq.db, tListLen(req.db)); pShow = mndCreateShowObj(pMnode, &req); + STableMetaRsp *meta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, TSDB_INS_TABLE_USER_DATABASES, strlen(TSDB_INS_TABLE_USER_DATABASES)); + pShow->numOfRows = 100; + + int32_t offset = 0; + for(int32_t i = 0; i < meta->numOfColumns; ++i) { + pShow->numOfColumns = meta->numOfColumns; + pShow->offset[i] = offset; + + int32_t bytes = meta->pSchemas[i].bytes; + pShow->rowSize += bytes; + pShow->bytes[i] = bytes; + offset += bytes; + } + if (pShow == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to process show-meta req since %s", terrstr()); @@ -330,7 +344,7 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) { size = pShow->rowSize * rowsToRead; size += SHOW_STEP_SIZE; - SRetrieveTableRsp *pRsp = rpcMallocCont(size); + SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size); if (pRsp == NULL) { mndReleaseShowObj((SShowObj*) pShow, false); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -338,6 +352,8 @@ static int32_t mndProcessRetrieveSysTableReq(SMnodeMsg *pReq) { return -1; } + pRsp->handle = htobe64(pShow->id); + // if free flag is set, client wants to clean the resources if ((retrieveReq.free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { rowsRead = (*retrieveFp)(pReq, (SShowObj*) pShow, pRsp->data, rowsToRead); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e95457b91e..30927e962d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -391,6 +391,12 @@ typedef struct SSourceDataInfo { int32_t status; } SSourceDataInfo; +typedef struct SLoadRemoteDataInfo { + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time +} SLoadRemoteDataInfo; + typedef struct SExchangeInfo { SArray* pSources; SArray* pSourceDataInfo; @@ -399,9 +405,7 @@ typedef struct SExchangeInfo { SSDataBlock* pResult; bool seqLoadData; // sequential load data or not, false by default int32_t current; - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed; // total elapsed time + SLoadRemoteDataInfo loadInfo; } SExchangeInfo; typedef struct STableScanInfo { @@ -440,14 +444,23 @@ typedef struct SStreamBlockScanInfo { void* readerHandle; // stream block reader handle } SStreamBlockScanInfo; + +typedef struct SSysScanResInfo { + struct SSysTableScanInfo *pSysScanInfo; + SRetrieveTableRsp *pRsp; + uint64_t totalRows; +} SSysScanResInfo; + typedef struct SSysTableScanInfo { union { void* pTransporter; void* readHandle; }; + SRetrieveMetaTableRsp *pRsp; + void *pCur; // cursor - SRetrieveTableReq* pReq; + SRetrieveTableReq req; SEpSet epSet; int32_t type; // show type tsem_t ready; @@ -457,8 +470,7 @@ typedef struct SSysTableScanInfo { int32_t capacity; int64_t numOfBlocks; // extract basic running information. int64_t totalRows; - int64_t elapsedTime; - int64_t totalBytes; + SLoadRemoteDataInfo loadInfo; } SSysTableScanInfo; typedef struct SOptrBasicInfo { @@ -639,8 +651,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntim SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, - int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType, SEpSet epset, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SInterval* pInterval, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b0999cdfa7..e860fe90d9 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4951,34 +4951,37 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf return TSDB_CODE_SUCCESS; } -static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SExchangeInfo *pExchangeInfo, SSourceDataInfo* pDataInfo, int32_t numOfOutput, int64_t startTs) { - char* pData = pDataInfo->pRsp->data; - SRetrieveTableRsp* pRsp = pDataInfo->pRsp; +static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t compLen, + int32_t numOfOutput, int64_t startTs, uint64_t* total) { +// char* pData = pRsp->data; for (int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i); - char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pRsp->numOfRows); + char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * numOfRows); if (tmp == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } - size_t len = pRsp->numOfRows * pColInfoData->info.bytes; + size_t len = numOfRows * pColInfoData->info.bytes; memcpy(tmp, pData, len); pColInfoData->pData = tmp; pData += len; } - pRes->info.rows = pRsp->numOfRows; + pRes->info.rows = numOfRows; int64_t el = taosGetTimestampUs() - startTs; - pExchangeInfo->totalRows += pRsp->numOfRows; - pExchangeInfo->totalSize += pRsp->compLen; - pDataInfo->totalRows += pRsp->numOfRows; + pLoadInfo->totalRows += numOfRows; + pLoadInfo->totalSize += compLen; - pExchangeInfo->totalElapsed += el; + if (total != NULL) { + *total += numOfRows; + } + + pLoadInfo->totalElapsed += el; return TSDB_CODE_SUCCESS; } @@ -4988,11 +4991,12 @@ static void* setAllSourcesCompleted(SOperatorInfo *pOperator, int64_t startTs) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int64_t el = taosGetTimestampUs() - startTs; - pExchangeInfo->totalElapsed += el; + SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; + pLoadInfo->totalElapsed += el; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, - pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); + pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0); doSetOperatorCompleted(pOperator); return NULL; @@ -5021,17 +5025,19 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i); SSDataBlock* pRes = pExchangeInfo->pResult; - + SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskID:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, - pExchangeInfo->totalRows); + pExchangeInfo->loadInfo.totalRows); pDataInfo->status = DATA_EXHAUSTED; completed += 1; continue; } - code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs); + SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; + code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, + pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows); if (code != 0) { goto _error; } @@ -5040,13 +5046,13 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo *pOperator, SEx qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, - pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, i + 1, + pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources); pDataInfo->status = DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, - pExchangeInfo->totalSize); + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, + pLoadInfo->totalSize); } if (pDataInfo->status != DATA_EXHAUSTED) { @@ -5118,10 +5124,12 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); SRetrieveTableRsp* pRsp = pDataInfo->pRsp; + SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; + if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, - pDataInfo->totalRows, pExchangeInfo->totalRows); + pDataInfo->totalRows, pLoadInfo->totalRows); pDataInfo->status = DATA_EXHAUSTED; pExchangeInfo->current += 1; @@ -5129,20 +5137,22 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo *pOperator) { } SSDataBlock* pRes = pExchangeInfo->pResult; - setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pExchangeInfo, pDataInfo, pOperator->numOfOutput, startTs); + SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp; + int32_t code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, + pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, &pDataInfo->totalRows); if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, - pDataInfo->totalRows, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1, + pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); pDataInfo->status = DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize); + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize); } return pExchangeInfo->pResult; @@ -5156,10 +5166,11 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pOperator->status == OP_EXEC_DONE) { qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", GET_TASKID(pTaskInfo), totalSources, - pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); + pLoadInfo->totalRows, pLoadInfo->totalSize, pLoadInfo->totalElapsed/1000.0); return NULL; } @@ -5405,18 +5416,16 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SSDataBlock* return pOperator; } - static int32_t loadSysTableContentCb(void* param, const SDataBuf* pMsg, int32_t code) { - SSourceDataInfo* pSourceDataInfo = (SSourceDataInfo*) param; - pSourceDataInfo->pRsp = pMsg->pData; + SSysTableScanInfo* pScanResInfo = (SSysTableScanInfo*) param; + pScanResInfo->pRsp = pMsg->pData; - SRetrieveTableRsp* pRsp = pSourceDataInfo->pRsp; + SRetrieveMetaTableRsp* pRsp = pScanResInfo->pRsp; pRsp->numOfRows = htonl(pRsp->numOfRows); pRsp->useconds = htobe64(pRsp->useconds); + pRsp->handle = htobe64(pRsp->handle); pRsp->compLen = htonl(pRsp->compLen); - - pSourceDataInfo->status = DATA_READY; - tsem_post(&pSourceDataInfo->pEx->ready); + tsem_post(&pScanResInfo->ready); } static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { @@ -5450,15 +5459,12 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { // pInfo->totalBytes; return (pInfo->pRes->info.rows == 0)? NULL:pInfo->pRes; } else { // load the meta from mnode of the given epset - if (pInfo->pReq == NULL) { - pInfo->pReq = calloc(1, sizeof(SRetrieveTableReq)); - if (pInfo->pReq == NULL) { - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } + int64_t startTs = taosGetTimestampUs(); - pInfo->pReq->type = pInfo->type; - } + pInfo->req.type = pInfo->type; + int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req); + char* buf1 = calloc(1, contLen); + tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); @@ -5468,24 +5474,40 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { return NULL; } - pMsgSendInfo->param = NULL; - pMsgSendInfo->msgInfo.pData = pInfo->pReq; - pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq); + pMsgSendInfo->param = pInfo; + pMsgSendInfo->msgInfo.pData = buf1; + pMsgSendInfo->msgInfo.len = contLen; pMsgSendInfo->msgType = TDMT_MND_SYSTABLE_RETRIEVE; - pMsgSendInfo->fp = loadRemoteDataCallback; + pMsgSendInfo->fp = loadSysTableContentCb; int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); tsem_wait(&pInfo->ready); - // handle the response and return to the caller + + SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; + pInfo->req.showId = pRsp->handle; + + if (pRsp->numOfRows == 0) { +// qDebug("%s vgId:%d, taskID:0x%"PRIx64" %d of total completed, rowsOfSource:%"PRIu64", totalRows:%"PRIu64" try next", +// GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, +// pDataInfo->totalRows, pExchangeInfo->totalRows); + return NULL; + } + + SSDataBlock* pRes = pInfo->pRes; + SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; + setSDataBlockFromFetchRsp(pRes, &pInfo->loadInfo, pTableRsp->numOfRows, + pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL); + + return pInfo->pRes; } return NULL; } -SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, - int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, int32_t tableType, + SEpSet epset, SExecTaskInfo* pTaskInfo) { SSysTableScanInfo* pInfo = calloc(1, sizeof(SSysTableScanInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5495,7 +5517,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S return NULL; } - // todo: create the schema of result data block + pInfo->pRes = pResBlock; pInfo->capacity = 4096; pInfo->type = tableType; if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { @@ -5512,11 +5534,34 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S pOperator->blockingOptr = false; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; - pOperator->numOfOutput = taosArrayGetSize(pExprInfo); + pOperator->numOfOutput = pResBlock->info.numOfCols; pOperator->nextDataFn = doSysTableScan; pOperator->closeFn = destroySysTableScannerOperatorInfo; pOperator->pTaskInfo = pTaskInfo; +#if 1 + { // todo refactor + SRpcInit rpcInit; + memset(&rpcInit, 0, sizeof(rpcInit)); + rpcInit.localPort = 0; + rpcInit.label = "DB-META"; + rpcInit.numOfThreads = 1; + rpcInit.cfp = qProcessFetchRsp; + rpcInit.sessions = tsMaxConnections; + rpcInit.connType = TAOS_CONN_CLIENT; + rpcInit.user = (char *)"root"; + rpcInit.idleTime = tsShellActivityTimer * 1000; + rpcInit.ckey = "key"; + rpcInit.spi = 1; + rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6"; + + pInfo->pTransporter = rpcOpen(&rpcInit); + if (pInfo->pTransporter == NULL) { + return NULL; // todo + } + } +#endif + return pOperator; } @@ -8125,6 +8170,14 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa taosArrayDestroy(tableIdList); return pOperator; + } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) { + SSystemTableScanPhysiNode * pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; + SSDataBlock* pResBlock = createOutputBuf_rv1(pSysScanPhyNode->scan.node.pOutputDataBlockDesc); + + SOperatorInfo* pOperator = createSysTableScanOperatorInfo(NULL, pResBlock, TSDB_MGMT_TABLE_DB, pSysScanPhyNode->mgmtEpSet, pTaskInfo); + return pOperator; + } else { + ASSERT(0); } } From f020e4c396bd203122cb522c6077412549883ba6 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 11 Mar 2022 07:17:37 -0500 Subject: [PATCH 08/10] TD-13981 show databases rewrite --- source/libs/planner/src/planPhysiCreater.c | 29 ++++++++++++++-------- tests/script/tsim/db/error1.sim | 5 ++-- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index d3e7ba56ed..93ad586e98 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -199,20 +199,27 @@ static SNodeptr createPrimaryKeyCol(SPhysiPlanContext* pCxt, uint64_t tableId) { } static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhysiNode, SNodeList* pScanCols) { - pScanPhysiNode->pScanCols = nodesMakeList(); - CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); - CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid))); + if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pScanPhysiNode) + || QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN == nodeType(pScanPhysiNode)) { + pScanPhysiNode->pScanCols = nodesMakeList(); + CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); + CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, createPrimaryKeyCol(pCxt, pScanPhysiNode->uid))); - SNode* pNode; - FOREACH(pNode, pScanCols) { - if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) { - SColumnNode* pCol = nodesListGetNode(pScanPhysiNode->pScanCols, 0); - strcpy(pCol->tableAlias, ((SColumnNode*)pNode)->tableAlias); - strcpy(pCol->colName, ((SColumnNode*)pNode)->colName); - continue; + SNode* pNode; + FOREACH(pNode, pScanCols) { + if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) { + SColumnNode* pCol = nodesListGetNode(pScanPhysiNode->pScanCols, 0); + strcpy(pCol->tableAlias, ((SColumnNode*)pNode)->tableAlias); + strcpy(pCol->colName, ((SColumnNode*)pNode)->colName); + continue; + } + CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode))); } - CHECK_CODE_EXT(nodesListStrictAppend(pScanPhysiNode->pScanCols, nodesCloneNode(pNode))); + } else { + pScanPhysiNode->pScanCols = nodesCloneList(pScanCols); + CHECK_ALLOC(pScanPhysiNode->pScanCols, TSDB_CODE_OUT_OF_MEMORY); } + return TSDB_CODE_SUCCESS; } diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim index bf9e04c017..a7f74afc10 100644 --- a/tests/script/tsim/db/error1.sim +++ b/tests/script/tsim/db/error1.sim @@ -24,8 +24,8 @@ endi print ========== stop dnode2 system sh/exec.sh -n dnode2 -s stop -x SIGKILL -print =============== create database -sql_error create database d1 vgroups 4 +#print =============== create database +#sql_error create database d1 vgroups 4 print ========== start dnode2 system sh/exec.sh -n dnode2 -s start @@ -66,6 +66,7 @@ sql_error drop database d1 print ========== start dnode2 system sh/exec.sh -n dnode2 -s start +sleep 1000 print =============== re-create database $x = 0 From d6af35adb861e93082d4272ade160a3a3a76d799 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 11 Mar 2022 23:28:43 +0800 Subject: [PATCH 09/10] [td-13039] fix bug in show db. --- source/dnode/mnode/impl/src/mndDb.c | 26 +++- source/libs/executor/inc/executorimpl.h | 61 ++------ source/libs/executor/src/executorimpl.c | 169 +---------------------- source/libs/planner/test/plannerTest.cpp | 2 +- 4 files changed, 34 insertions(+), 224 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index a6d08b6e6c..39919bc233 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1346,7 +1346,7 @@ static char* getDataPosition(char* pData, SShowObj* pShow, int32_t cols, int32_t return pData + pShow->offset[cols] * capacityOfRow + pShow->bytes[cols] * rows; } -static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity) { +static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_t rows, int32_t rowCapacity, int64_t numOfTables) { int32_t cols = 0; char* pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); @@ -1367,7 +1367,7 @@ static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_ cols++; pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); - *(int64_t *)pWrite = 0; // todo: num of Tables + *(int64_t *)pWrite = numOfTables; cols++; pWrite = getDataPosition(data, pShow, cols, rows, rowCapacity); @@ -1447,6 +1447,18 @@ static void dumpDbInfoToPayload(char* data, SDbObj* pDb, SShowObj* pShow, int32_ *(int8_t *)pWrite = pDb->cfg.update; } +static void setInformationSchemaDbCfg(SDbObj* pDbObj) { + ASSERT(pDbObj != NULL); + strncpy(pDbObj->name, TSDB_INFORMATION_SCHEMA_DB, tListLen(pDbObj->name)); + + pDbObj->createdTime = 0; + pDbObj->cfg.numOfVgroups = 0; + pDbObj->cfg.quorum = 1; + pDbObj->cfg.replications = 1; + pDbObj->cfg.update = 1; + pDbObj->cfg.precision = TSDB_TIME_PRECISION_MILLI; +} + static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int32_t rowsCapacity) { SMnode *pMnode = pReq->pMnode; SSdb *pSdb = pMnode->pSdb; @@ -1459,14 +1471,18 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3 break; } - dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity); - + dumpDbInfoToPayload(data, pDb, pShow, numOfRows, rowsCapacity, 0); numOfRows++; sdbRelease(pSdb, pDb); } // Append the information_schema database into the result. - + if (numOfRows < rowsCapacity) { + SDbObj dummyISDb = {0}; + setInformationSchemaDbCfg(&dummyISDb); + dumpDbInfoToPayload(data, &dummyISDb, pShow, numOfRows, rowsCapacity, 14); + numOfRows += 1; + } mndVacuumResult(data, pShow->numOfColumns, numOfRows, rowsCapacity, pShow); pShow->numOfReads += numOfRows; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ab60cc06a0..cdaabe46a7 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -355,28 +355,6 @@ typedef struct SQInfo { STaskCostInfo summary; } SQInfo; -typedef struct STaskParam { - char* sql; - char* tagCond; - char* colCond; - char* tbnameCond; - char* prevResult; - SArray* pTableIdList; - SExprBasicInfo** pExpr; - SExprBasicInfo** pSecExpr; - SExprInfo* pExprs; - SExprInfo* pSecExprs; - - SFilterInfo* pFilters; - - SColIndex* pGroupColIndex; - SColumnInfo* pTagColumnInfo; - SGroupbyExpr* pGroupbyExpr; - int32_t tableScanOperator; - SArray* pOperator; - struct SUdfInfo* pUdfInfo; -} STaskParam; - enum { DATA_NOT_READY = 0x1, DATA_READY = 0x2, @@ -444,13 +422,6 @@ typedef struct SStreamBlockScanInfo { void* readerHandle; // stream block reader handle } SStreamBlockScanInfo; - -typedef struct SSysScanResInfo { - struct SSysTableScanInfo *pSysScanInfo; - SRetrieveTableRsp *pRsp; - uint64_t totalRows; -} SSysScanResInfo; - typedef struct SSysTableScanInfo { union { void* pTransporter; @@ -458,18 +429,15 @@ typedef struct SSysTableScanInfo { }; SRetrieveMetaTableRsp *pRsp; - - void *pCur; // cursor - SRetrieveTableReq req; - SEpSet epSet; - int32_t type; // show type - tsem_t ready; - SSchema* pSchema; - SSDataBlock* pRes; - - int32_t capacity; - int64_t numOfBlocks; // extract basic running information. - int64_t totalRows; + void *pCur; // cursor + SRetrieveTableReq req; + SEpSet epSet; + int32_t type; // show type + tsem_t ready; + SSchema* pSchema; + SSDataBlock* pRes; + int32_t capacity; + int64_t numOfBlocks; // extract basic running information. SLoadRemoteDataInfo loadInfo; } SSysTableScanInfo; @@ -690,8 +658,6 @@ SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SArray* pExprInfo, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); // SSDataBlock* doSLimit(void* param, bool* newgroup); - -// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); @@ -709,9 +675,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters); -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, - int32_t prevResultLen, void* merger); - int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); @@ -723,11 +686,7 @@ int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg); bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); bool checkNeedToCompressQueryCol(SQInfo* pQInfo); -void setQueryStatus(STaskRuntimeEnv* pRuntimeEnv, int8_t status); -int32_t doDumpQueryResult(SQInfo* pQInfo, char* data, int8_t compressed, int32_t* compLen); - -size_t getResultSize(SQInfo* pQInfo, int64_t* numOfRows); void setTaskKilled(SExecTaskInfo* pTaskInfo); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); @@ -737,8 +696,6 @@ void calculateOperatorProfResults(SQInfo* pQInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); -void freeQueryAttr(STaskAttr* pQuery); - int32_t getMaximumIdleDurationSec(); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 382f6946ce..6780e6d14d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4965,7 +4965,6 @@ static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* size_t len = numOfRows * pColInfoData->info.bytes; memcpy(tmp, pData, len); - pColInfoData->pData = tmp; pData += len; } @@ -4982,7 +4981,6 @@ static int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* } pLoadInfo->totalElapsed += el; - return TSDB_CODE_SUCCESS; } @@ -5452,7 +5450,7 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { } } - pInfo->totalRows += numOfRows; + pInfo->loadInfo.totalRows += numOfRows; pInfo->pRes->info.rows = numOfRows; // pInfo->elapsedTime; @@ -5482,7 +5480,6 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(pInfo->pTransporter, &pInfo->epSet, &transporterId, pMsgSendInfo); - tsem_wait(&pInfo->ready); SRetrieveMetaTableRsp* pRsp = pInfo->pRsp; @@ -5495,9 +5492,8 @@ static SSDataBlock* doSysTableScan(void* param, bool* newgroup) { return NULL; } - SSDataBlock* pRes = pInfo->pRes; SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; - setSDataBlockFromFetchRsp(pRes, &pInfo->loadInfo, pTableRsp->numOfRows, + setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL); return pInfo->pRes; @@ -5565,64 +5561,6 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB return pOperator; } -void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream) { - assert(pTableScanInfo != NULL && pDownstream != NULL); - - pTableScanInfo->pExpr = pDownstream->pExpr; // TODO refactor to use colId instead of pExpr - pTableScanInfo->numOfOutput = pDownstream->numOfOutput; -#if 0 - if (pDownstream->operatorType == OP_Aggregate || pDownstream->operatorType == OP_MultiTableAggregate) { - SAggOperatorInfo* pAggInfo = pDownstream->info; - - pTableScanInfo->pCtx = pAggInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pAggInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; - } else if (pDownstream->operatorType == OP_TimeWindow || pDownstream->operatorType == OP_AllTimeWindow) { - STableIntervalOperatorInfo *pIntervalInfo = pDownstream->info; - - pTableScanInfo->pCtx = pIntervalInfo->pCtx; - pTableScanInfo->pResultRowInfo = &pIntervalInfo->resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pIntervalInfo->rowCellInfoOffset; - - } else if (pDownstream->operatorType == OP_Groupby) { - SGroupbyOperatorInfo *pGroupbyInfo = pDownstream->info; - - pTableScanInfo->pCtx = pGroupbyInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pGroupbyInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pGroupbyInfo->binfo.rowCellInfoOffset; - - } else if (pDownstream->operatorType == OP_MultiTableTimeInterval || pDownstream->operatorType == OP_AllMultiTableTimeInterval) { - STableIntervalOperatorInfo *pInfo = pDownstream->info; - - pTableScanInfo->pCtx = pInfo->pCtx; - pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset; - - } else if (pDownstream->operatorType == OP_Project) { - SProjectOperatorInfo *pInfo = pDownstream->info; - - pTableScanInfo->pCtx = pInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; - } else if (pDownstream->operatorType == OP_SessionWindow) { - SSWindowOperatorInfo* pInfo = pDownstream->info; - - pTableScanInfo->pCtx = pInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; - } else if (pDownstream->operatorType == OP_StateWindow) { - SStateWindowOperatorInfo* pInfo = pDownstream->info; - - pTableScanInfo->pCtx = pInfo->binfo.pCtx; - pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; - pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; - } else { - assert(0); - } -#endif - -} - SArray* getOrderCheckColumns(STaskAttr* pQuery) { int32_t numOfCols = (pQuery->pGroupbyExpr == NULL)? 0: taosArrayGetSize(pQuery->pGroupbyExpr->columnInfo); @@ -7995,37 +7933,6 @@ bool validateExprColumnInfo(SQueriedTableInfo *pTableInfo, SExprBasicInfo *pExpr return j != INT32_MIN; } -static bool validateQueryTableCols(SQueriedTableInfo* pTableInfo, SExprBasicInfo** pExpr, int32_t numOfOutput, - SColumnInfo* pTagCols, void* pMsg) { - int32_t numOfTotal = pTableInfo->numOfCols + pTableInfo->numOfTags; - if (pTableInfo->numOfCols < 0 || pTableInfo->numOfTags < 0 || numOfTotal > TSDB_MAX_COLUMNS) { - //qError("qmsg:%p illegal value of numOfCols %d numOfTags:%d", pMsg, pTableInfo->numOfCols, pTableInfo->numOfTags); - return false; - } - - if (numOfTotal == 0) { // table total columns are not required. -// for(int32_t i = 0; i < numOfOutput; ++i) { -// SExprBasicInfo* p = pExpr[i]; -// if ((p->functionId == FUNCTION_TAGPRJ) || -// (p->functionId == FUNCTION_TID_TAG && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || -// (p->functionId == FUNCTION_COUNT && p->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) || -// (p->functionId == FUNCTION_BLKINFO)) { -// continue; -// } -// -// return false; -// } - } - - for(int32_t i = 0; i < numOfOutput; ++i) { - if (!validateExprColumnInfo(pTableInfo, pExpr[i], pTagCols)) { - return TSDB_CODE_QRY_INVALID_MSG; - } - } - - return true; -} - static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t numOfFilters, char** pMsg) { for (int32_t f = 0; f < numOfFilters; ++f) { SColumnFilterInfo *pFilterMsg = (SColumnFilterInfo *)(*pMsg); @@ -8060,10 +7967,10 @@ static int32_t deserializeColFilterInfo(SColumnFilterInfo* pColFilters, int16_t static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, int32_t scale, int32_t precision, const char* name) { SResSchema s = {0}; s.scale = scale; - s.precision = precision; s.type = type; s.bytes = bytes; s.colId = slotId; + s.precision = precision; strncpy(s.name, name, tListLen(s.name)); return s; @@ -8179,7 +8086,6 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SArray* colList = extractScanColumnId(pScanPhyNode->pScanCols); SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, colList, tableIdList, pTaskInfo); - taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == nodeType(pPhyNode)) { @@ -8573,75 +8479,6 @@ void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) { pResultInfo->total = 0; } -FORCE_INLINE bool checkQIdEqual(void *qHandle, uint64_t qId) { - return ((SQInfo *)qHandle)->qId == qId; -} - -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, - int32_t prevResultLen, void* merger) { - int32_t code = TSDB_CODE_SUCCESS; - - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - pRuntimeEnv->qinfo = pQInfo; - - STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - - STSBuf *pTsBuf = NULL; - - if (pTsBufInfo->tsLen > 0) { // open new file to save the result - char* tsBlock = start + pTsBufInfo->tsOffset; - pTsBuf = tsBufCreateFromCompBlocks(tsBlock, pTsBufInfo->tsNumOfBlocks, pTsBufInfo->tsLen, pTsBufInfo->tsOrder, - pQueryAttr->vgId); - - if (pTsBuf == NULL) { - code = TSDB_CODE_QRY_NO_DISKSPACE; - goto _error; - } - tsBufResetPos(pTsBuf); - bool ret = tsBufNextPos(pTsBuf); - UNUSED(ret); - } - - SArray* prevResult = NULL; - if (prevResultLen > 0) { - prevResult = interResFromBinary(param->prevResult, prevResultLen); - pRuntimeEnv->prevResult = prevResult; - } - - pRuntimeEnv->currentOffset = pQueryAttr->limit.offset; - if (tsdb != NULL) { -// pQueryAttr->precision = tsdbGetCfg(tsdb)->precision; - } - - if ((QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.skey > pQueryAttr->window.ekey)) || - (!QUERY_IS_ASC_QUERY(pQueryAttr) && (pQueryAttr->window.ekey > pQueryAttr->window.skey))) { - //qDebug("QInfo:0x%"PRIx64" no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo->qId, pQueryAttr->window.skey, -// pQueryAttr->window.ekey, pQueryAttr->order.order); -// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0; - // todo free memory - return TSDB_CODE_SUCCESS; - } - - if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) { - //qDebug("QInfo:0x%"PRIx64" no table qualified for tag filter, abort query", pQInfo->qId); -// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - return TSDB_CODE_SUCCESS; - } - - // filter the qualified - if ((code = doInitQInfo(pQInfo, pTsBuf, tsdb, sourceOptr, param->tableScanOperator, param->pOperator, merger)) != TSDB_CODE_SUCCESS) { - goto _error; - } - - return code; - -_error: - // table query ref will be decrease during error handling -// doDestroyTask(pQInfo); - return code; -} - //TODO refactor void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) { if (pFilter == NULL || numOfFilters == 0) { diff --git a/source/libs/planner/test/plannerTest.cpp b/source/libs/planner/test/plannerTest.cpp index 1af89d71c9..51642199f9 100644 --- a/source/libs/planner/test/plannerTest.cpp +++ b/source/libs/planner/test/plannerTest.cpp @@ -56,7 +56,7 @@ protected: const string syntaxTreeStr = toString(query_->pRoot, false); SLogicNode* pLogicPlan = nullptr; - SPlanContext cxt = { .queryId = 1, .acctId = 0, .pAstRoot = query_->pRoot }; + SPlanContext cxt = { .queryId = 1, .acctId = 0, .mgmtEpSet = {0}, .pAstRoot = query_->pRoot }; code = createLogicPlan(&cxt, &pLogicPlan); if (code != TSDB_CODE_SUCCESS) { cout << "sql:[" << cxt_.pSql << "] logic plan code:" << code << ", strerror:" << tstrerror(code) << endl; From cab7e8516b6d3ebb1a019c11abc6d608f0a1a839 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 12 Mar 2022 16:57:17 +0800 Subject: [PATCH 10/10] fix case --- tests/script/tsim/db/basic1.sim | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/script/tsim/db/basic1.sim b/tests/script/tsim/db/basic1.sim index 21e26784bf..c07ebd0400 100644 --- a/tests/script/tsim/db/basic1.sim +++ b/tests/script/tsim/db/basic1.sim @@ -6,7 +6,7 @@ sql connect print =============== create database sql create database d1 vgroups 2 sql show databases -if $rows != 1 then +if $rows != 2 then return -1 endi @@ -40,7 +40,7 @@ endi print =============== drop database sql drop database d1 sql show databases -if $rows != 0 then +if $rows != 1 then return -1 endi @@ -49,7 +49,7 @@ sql create database d2 vgroups 2 sql create database d3 vgroups 3 sql create database d4 vgroups 4 sql show databases -if $rows != 3 then +if $rows != 4 then return -1 endi @@ -111,7 +111,7 @@ print =============== drop database sql drop database d2 sql drop database d3 sql show databases -if $rows != 1 then +if $rows != 2 then return -1 endi @@ -154,7 +154,7 @@ system sh/exec.sh -n dnode1 -s start print =============== show databases sql show databases -if $rows != 1 then +if $rows != 2 then return -1 endi