From b7368e2133494dc73834411dd4d06192c5c2f3aa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Mar 2022 10:59:15 +0800 Subject: [PATCH 1/3] [td-13039]add api for stream computing. --- source/dnode/mnode/impl/CMakeLists.txt | 2 +- source/dnode/mnode/impl/src/mndStream.c | 33 ++++++------------------- source/libs/parser/src/parTranslater.c | 25 ++++++++++--------- 3 files changed, 22 insertions(+), 38 deletions(-) diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index 514bba19f4..dd2caf1f7f 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -6,7 +6,7 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( - mnode scheduler sdb wal transport cjson sync monitor + mnode scheduler sdb wal transport cjson sync monitor parser ) if(${BUILD_TEST}) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index bb7891ad2d..ca34938faf 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -14,6 +14,7 @@ */ #include "mndStream.h" +#include "parser.h" #include "mndAuth.h" #include "mndDb.h" #include "mndDnode.h" @@ -218,28 +219,6 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } -static SArray *mndExtractNamesFromAst(const SNode *pAst) { - if (pAst->type != QUERY_NODE_SELECT_STMT) return NULL; - - SArray *names = taosArrayInit(0, sizeof(void *)); - if (names == NULL) { - return NULL; - } - SSelectStmt *pSelect = (SSelectStmt *)pAst; - SNodeList *pNodes = pSelect->pProjectionList; - SListCell *pCell = pNodes->pHead; - while (pCell != NULL) { - if (pCell->pNode->type != QUERY_NODE_FUNCTION) { - continue; - } - SFunctionNode *pFunction = (SFunctionNode *)pCell->pNode; - char *name = strdup(pFunction->node.aliasName); - taosArrayPush(names, &name); - pCell = pCell->pNext; - } - return names; -} - static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { if (NULL == ast) { return TSDB_CODE_SUCCESS; @@ -273,14 +252,16 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } #if 1 - SArray *names = mndExtractNamesFromAst(pAst); + SSchemaWrapper sw = {0}; + qExtractResultSchema(pAst, (int32_t*)&sw.nCols, &sw.pSchema); + printf("|"); - for (int i = 0; i < taosArrayGetSize(names); i++) { - printf(" %15s |", (char *)taosArrayGetP(names, i)); + for (int i = 0; i < sw.nCols; i++) { + printf(" %15s |", (char *)sw.pSchema[i].name); } printf("\n=======================================================\n"); - pStream->ColAlias = names; + pStream->ColAlias = NULL; #endif if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(ast, &pStream->physicalPlan)) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f82ce2c1b4..afd29d7f74 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1696,24 +1696,27 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) { return code; } -static int32_t setReslutSchema(STranslateContext* pCxt, SQuery* pQuery) { - if (QUERY_NODE_SELECT_STMT == nodeType(pQuery->pRoot)) { - SSelectStmt* pSelect = (SSelectStmt*)pQuery->pRoot; - pQuery->numOfResCols = LIST_LENGTH(pSelect->pProjectionList); - pQuery->pResSchema = calloc(pQuery->numOfResCols, sizeof(SSchema)); - if (NULL == pQuery->pResSchema) { - return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_OUT_OF_MEMORY); +int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema) { + if (QUERY_NODE_SELECT_STMT == nodeType(pRoot)) { + SSelectStmt* pSelect = (SSelectStmt*) pRoot; + *numOfCols = LIST_LENGTH(pSelect->pProjectionList); + *pSchema = calloc((*numOfCols), sizeof(SSchema)); + if (NULL == (*pSchema)) { + return TSDB_CODE_OUT_OF_MEMORY; } + SNode* pNode; int32_t index = 0; FOREACH(pNode, pSelect->pProjectionList) { SExprNode* pExpr = (SExprNode*)pNode; - pQuery->pResSchema[index].type = pExpr->resType.type; - pQuery->pResSchema[index].bytes = pExpr->resType.bytes; - strcpy(pQuery->pResSchema[index].name, pExpr->aliasName); + (*pSchema)[index].type = pExpr->resType.type; + (*pSchema)[index].bytes = pExpr->resType.bytes; + (*pSchema)[index].colId = index + 1; + strcpy((*pSchema)[index].name, pExpr->aliasName); index +=1; } } + return TSDB_CODE_SUCCESS; } @@ -2297,7 +2300,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { pQuery->haveResultSet = true; pQuery->directRpc = false; pQuery->msgType = TDMT_VND_QUERY; - code = setReslutSchema(pCxt, pQuery); + code = qExtractResultSchema(pQuery->pRoot, &pQuery->numOfResCols, &pQuery->pResSchema); break; case QUERY_NODE_VNODE_MODIF_STMT: pQuery->haveResultSet = false; From ca9b7c42961269aaa06c33df435db0cfc83a5c34 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Mar 2022 12:46:26 +0800 Subject: [PATCH 2/3] [td-13039] fix compiling error. --- include/libs/parser/parser.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 860c77de15..2254298e5c 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -60,6 +60,8 @@ int32_t qParseQuerySql(SParseContext* pCxt, SQuery** pQuery); void qDestroyQuery(SQuery* pQueryNode); +int32_t qExtractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema); + #ifdef __cplusplus } #endif From b5d011901ef4eb71f7522a5970b0df99fe884562 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 25 Mar 2022 13:45:57 +0800 Subject: [PATCH 3/3] [td-14288] fix bug in first query. --- source/libs/executor/src/executorimpl.c | 2 +- source/libs/function/src/builtinsimpl.c | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 09ffce2837..04a612b374 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2208,7 +2208,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num } for(int32_t i = 1; i < numOfOutput; ++i) { - (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i].resDataInfo.interBufSize); + (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pFuncCtx[i - 1].resDataInfo.interBufSize); } setCtxTagColumnInfo(pFuncCtx, numOfOutput); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 9817020817..610e5a0bb2 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -528,11 +528,10 @@ void firstFunction(SqlFunctionCtx *pCtx) { char* buf = GET_ROWCELL_INTERBUF(pResInfo); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pInputCol = pInput->pData[0]; // All null data column, return directly. - if (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) { + if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { ASSERT(pInputCol->hasNull == true); return; }