From fecd2f5835ddee92ae80445128ca773ec4419c5e Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 31 Jan 2023 14:00:37 +0800 Subject: [PATCH 1/3] fix: the problem of creating a stream without partition by clause --- source/libs/parser/src/parTranslater.c | 10 ++++------ source/libs/planner/src/planLogicCreater.c | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 6e94883849..0e92d66d73 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3281,9 +3281,6 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { } static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (NULL == pSelect->pPartitionByList) { - return TSDB_CODE_SUCCESS; - } pCxt->currClause = SQL_CLAUSE_PARTITION_BY; int32_t code = translateExprList(pCxt, pSelect->pPartitionByList); if (TSDB_CODE_SUCCESS == code) { @@ -5733,12 +5730,13 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (NULL == pSelect->pPartitionByList) { - return addNullTagsForExistTable(pCxt, pMeta, pSelect); + code = addNullTagsForExistTable(pCxt, pMeta, pSelect); + } else { + code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect); } - - int32_t code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect); if (TSDB_CODE_SUCCESS == code) { code = addSubtableNameToCreateStreamQuery(pCxt, pStmt, pSelect); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index fef5bd654e..001ec66725 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -374,6 +374,20 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect code = addDefaultScanCol(pRealTable->pMeta, &pScan->pScanCols); } + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags && NULL == pSelect->pPartitionByList) { + pScan->pTags = nodesCloneList(pSelect->pTags); + if (NULL == pScan->pTags) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pSubtable && NULL == pSelect->pPartitionByList) { + pScan->pSubtable = nodesCloneNode(pSelect->pSubtable); + if (NULL == pScan->pSubtable) { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + // set output if (TSDB_CODE_SUCCESS == code) { code = createColumnByRewriteExprs(pScan->pScanCols, &pScan->node.pTargets); From 7532ab9abd7ffafff6de9b9b990868f81ed5368a Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 1 Feb 2023 16:43:34 +0800 Subject: [PATCH 2/3] fix: the problem of creating a stream with tag define --- source/libs/parser/src/parTranslater.c | 76 +++++++++++++++++++++++--- 1 file changed, 68 insertions(+), 8 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 0e92d66d73..0a9adaf60c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5671,10 +5671,6 @@ static SNode* createNullValue() { } static int32_t addNullTagsForExistTable(STranslateContext* pCxt, STableMeta* pMeta, SSelectStmt* pSelect) { - if (NULL == pMeta) { - return TSDB_CODE_SUCCESS; - } - int32_t numOfTags = getNumOfTags(pMeta); int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < numOfTags; ++i) { @@ -5728,12 +5724,27 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea return pCxt->errCode; } +static int32_t addNullTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + for (int32_t i = 0; TSDB_CODE_SUCCESS == code && i < LIST_LENGTH(pStmt->pTags); ++i) { + code = nodesListMakeStrictAppend(&((SSelectStmt*)pStmt->pQuery)->pTags, createNullValue()); + } + return code; +} + +static int32_t addNullTagsToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) { + if (NULL == pMeta) { + return addNullTagsForCreateTable(pCxt, pStmt); + } + return addNullTagsForExistTable(pCxt, pMeta, (SSelectStmt*)pStmt->pQuery); +} + static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STableMeta* pMeta, SCreateStreamStmt* pStmt) { int32_t code = TSDB_CODE_SUCCESS; SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (NULL == pSelect->pPartitionByList) { - code = addNullTagsForExistTable(pCxt, pMeta, pSelect); + code = addNullTagsToCreateStreamQuery(pCxt, pMeta, pStmt); } else { code = addTagsToCreateStreamQuery(pCxt, pStmt, pSelect); } @@ -6011,17 +6022,66 @@ static int32_t adjustTagsForExistTable(STranslateContext* pCxt, SCreateStreamStm return adjustOrderOfTags(pCxt, pStmt->pTags, pMeta, &pSelect->pTags, pReq); } +static int32_t adjustTagsForCreateTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if (NULL == pSelect->pPartitionByList || NULL == pSelect->pTags) { + return TSDB_CODE_SUCCESS; + } + + SNode* pTagDef = NULL; + SNode* pTagExpr = NULL; + FORBOTH(pTagDef, pStmt->pTags, pTagExpr, pSelect->pTags) { + SColumnDefNode* pDef = (SColumnDefNode*)pTagDef; + if (!dataTypeEqual(&pDef->dataType, &((SExprNode*)pTagExpr)->resType)) { + SNode* pFunc = NULL; + int32_t code = createCastFunc(pCxt, pTagExpr, pDef->dataType, &pFunc); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + REPLACE_LIST2_NODE(pFunc); + } + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t adjustTags(STranslateContext* pCxt, SCreateStreamStmt* pStmt, const STableMeta* pMeta, + SCMCreateStreamReq* pReq) { + if (NULL == pMeta) { + return adjustTagsForCreateTable(pCxt, pStmt, pReq); + } + return adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq); +} + +static bool isTagDef(SNodeList* pTags) { + if (NULL == pTags) { + return false; + } + return QUERY_NODE_COLUMN_DEF == nodeType(nodesListGetNode(pTags, 0)); +} + +static bool isTagBound(SNodeList* pTags) { + if (NULL == pTags) { + return false; + } + return QUERY_NODE_COLUMN == nodeType(nodesListGetNode(pTags, 0)); +} + static int32_t translateStreamTargetTable(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq, STableMeta** pMeta) { int32_t code = getTableMeta(pCxt, pStmt->targetDbName, pStmt->targetTabName, pMeta); if (TSDB_CODE_PAR_TABLE_NOT_EXIST == code) { - if (NULL != pStmt->pCols) { + if (NULL != pStmt->pCols || isTagBound(pStmt->pTags)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TABLE_NOT_EXIST, pStmt->targetTabName); } pReq->createStb = STREAM_CREATE_STABLE_TRUE; pReq->targetStbUid = 0; return TSDB_CODE_SUCCESS; } else { + if (isTagDef(pStmt->pTags)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Table already exist: %s", + pStmt->targetTabName); + } pReq->createStb = STREAM_CREATE_STABLE_FALSE; pReq->targetStbUid = (*pMeta)->suid; } @@ -6047,8 +6107,8 @@ static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt if (TSDB_CODE_SUCCESS == code && NULL != pMeta) { code = adjustProjectionsForExistTable(pCxt, pStmt, pMeta, pReq); } - if (TSDB_CODE_SUCCESS == code && NULL != pMeta) { - code = adjustTagsForExistTable(pCxt, pStmt, pMeta, pReq); + if (TSDB_CODE_SUCCESS == code) { + code = adjustTags(pCxt, pStmt, pMeta, pReq); } if (TSDB_CODE_SUCCESS == code) { getSourceDatabase(pStmt->pQuery, pCxt->pParseCxt->acctId, pReq->sourceDB); From 0ec071726396c30268d82ef983c804437c059e1f Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 8 Feb 2023 10:35:25 +0800 Subject: [PATCH 3/3] fix: invild free --- source/libs/executor/src/dataDeleter.c | 16 +++++++++------- source/libs/parser/src/parTranslater.c | 6 +++--- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index a8051ea7c3..184b9e0148 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -62,8 +62,8 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock); pEntry->dataLen = sizeof(SDeleterRes); -// ASSERT(1 == pEntry->numOfRows); -// ASSERT(3 == pEntry->numOfCols); + // ASSERT(1 == pEntry->numOfRows); + // ASSERT(3 == pEntry->numOfCols); pBuf->useSize = sizeof(SDataCacheEntry); @@ -142,7 +142,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, taosFreeQitem(pBuf); return TSDB_CODE_OUT_OF_MEMORY; } - + toDataCacheEntry(pDeleter, pInput, pBuf); taosWriteQitem(pDeleter->pDataBlocks, pBuf); *pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false); @@ -167,8 +167,10 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE SDataDeleterBuf* pBuf = NULL; taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); - memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); - taosFreeQitem(pBuf); + if (pBuf != NULL) { + memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); + taosFreeQitem(pBuf); + } SDataCacheEntry* pEntry = (SDataCacheEntry*)pDeleter->nextOutput.pData; *pLen = pEntry->dataLen; @@ -256,7 +258,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData deleter->pDeleter = pDeleterNode; deleter->pSchema = pDataSink->pInputDataBlockDesc; - if(pParam == NULL) { + if (pParam == NULL) { code = TSDB_CODE_QRY_INVALID_INPUT; qError("invalid input param in creating data deleter, code%s", tstrerror(code)); goto _end; @@ -275,7 +277,7 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData *pHandle = deleter; return code; - _end: +_end: if (deleter != NULL) { destroyDataSinker((SDataSinkHandle*)deleter); taosMemoryFree(deleter); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ae3f5226f5..5dceec2cfd 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3517,7 +3517,7 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS if (comp > 0) { SNode* pRightFunc = NULL; int32_t code = createCastFunc(pCxt, pRight, pLeftExpr->resType, &pRightFunc); - if (TSDB_CODE_SUCCESS != code || NULL == pRightFunc) { // deal scan coverity + if (TSDB_CODE_SUCCESS != code || NULL == pRightFunc) { // deal scan coverity return code; } REPLACE_LIST2_NODE(pRightFunc); @@ -3525,7 +3525,7 @@ static int32_t translateSetOperProject(STranslateContext* pCxt, SSetOperator* pS } else if (comp < 0) { SNode* pLeftFunc = NULL; int32_t code = createCastFunc(pCxt, pLeft, pRightExpr->resType, &pLeftFunc); - if (TSDB_CODE_SUCCESS != code || NULL == pLeftFunc) { // deal scan coverity + if (TSDB_CODE_SUCCESS != code || NULL == pLeftFunc) { // deal scan coverity return code; } REPLACE_LIST1_NODE(pLeftFunc); @@ -5809,7 +5809,7 @@ static int32_t projColPosCompar(const void* l, const void* r) { return ((SProjColPos*)l)->colId > ((SProjColPos*)r)->colId; } -static void projColPosDelete(void* p) { taosMemoryFree(((SProjColPos*)p)->pProj); } +static void projColPosDelete(void* p) { nodesDestroyNode(((SProjColPos*)p)->pProj); } static int32_t addProjToProjColPos(STranslateContext* pCxt, const SSchema* pSchema, SNode* pProj, SArray* pProjColPos) { SNode* pNewProj = nodesCloneNode(pProj);