From c85a57c05505ab571f2a2fe92b8e204f02115440 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Wed, 8 Jan 2025 11:24:24 +0800 Subject: [PATCH] fix: join cols not set --- source/libs/executor/inc/dynqueryctrl.h | 2 +- .../libs/executor/src/dynqueryctrloperator.c | 33 ++++++- source/libs/executor/src/projectoperator.c | 31 +++--- tests/system-test/2-query/join.py | 98 ++++++++++++++++++- 4 files changed, 142 insertions(+), 22 deletions(-) diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index 3df0f6644c..6b524df2c6 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -71,7 +71,7 @@ typedef struct SStbJoinDynCtrlInfo { SDynQueryCtrlExecInfo execInfo; SStbJoinDynCtrlBasic basic; SStbJoinDynCtrlCtx ctx; - int16_t outputBlkId; + SDataBlockDescNode* pOutputDataBlockDesc; } SStbJoinDynCtrlInfo; typedef struct SDynQueryCtrlOperatorInfo { diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index 62f199387e..d11876288c 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -16,10 +16,14 @@ #include "executorInt.h" #include "filter.h" #include "function.h" +#include "nodes.h" #include "operator.h" #include "os.h" +#include "plannodes.h" +#include "query.h" #include "querynodes.h" #include "querytask.h" +#include "tarray.h" #include "tcompare.h" #include "tdatablock.h" #include "thash.h" @@ -901,10 +905,29 @@ static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) { - if (pBlock != NULL) { - pBlock->info.id.blockId = pStbJoin->outputBlkId; +static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) { + if (pBlock && pStbJoin && pStbJoin->pOutputDataBlockDesc) { + pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId; + if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS; + + for (int i = pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) { + SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i); + if (pSlot == NULL) { + qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + } + SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId); + colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true); + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + if (code != TSDB_CODE_SUCCESS) { + return -1; + } + } + } else { + qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL"); + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } + return TSDB_CODE_SUCCESS; } int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) { @@ -947,7 +970,7 @@ _return: pOperator->pTaskInfo->code = code; T_LONG_JMP(pOperator->pTaskInfo->env, code); } else { - seqStableJoinComposeRes(pStbJoin, *pRes); + code = seqStableJoinComposeRes(pStbJoin, *pRes); } return code; } @@ -1011,7 +1034,7 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO switch (pInfo->qType) { case DYN_QTYPE_STB_HASH: TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); - pInfo->stbJoin.outputBlkId = pPhyciNode->node.pOutputDataBlockDesc->dataBlockId; + pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc; code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch); if (TSDB_CODE_SUCCESS != code) { goto _error; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 226cde059b..cb91bae691 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -18,6 +18,7 @@ #include "functionMgt.h" #include "operator.h" #include "querytask.h" +#include "taoserror.h" #include "tdatablock.h" typedef struct SProjectOperatorInfo { @@ -875,7 +876,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc } pResult->info.rows = 1; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } if (pResult != pSrcBlock) { @@ -889,7 +890,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc if (createNewColModel) { code = blockDataEnsureCapacity(pResult, pResult->info.rows); if (code) { - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } } @@ -975,21 +976,21 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); if (pBlockList == NULL) { code = terrno; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } void* px = taosArrayPush(pBlockList, &pSrcBlock); if (px == NULL) { code = terrno; taosArrayDestroy(pBlockList); - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); if (pResColData == NULL) { code = terrno; taosArrayDestroy(pBlockList); - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; @@ -998,7 +999,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pBlockList); - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; @@ -1039,7 +1040,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t* outputColIndex = taosArrayGet(pPseudoList, 0); if (outputColIndex == NULL) { code = terrno; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; @@ -1055,7 +1056,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc if (pCtx[k].fpSet.cleanup != NULL) { pCtx[k].fpSet.cleanup(&pCtx[k]); } - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } numOfRows = pResInfo->numOfRes; @@ -1064,14 +1065,14 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*)); if (!processByRowFunctionCtx) { code = terrno; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } } void* px = taosArrayPush(processByRowFunctionCtx, &pfCtx); if (px == NULL) { code = terrno; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } } } else if (fmIsAggFunc(pfCtx->functionId)) { @@ -1110,20 +1111,20 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc SArray* pBlockList = taosArrayInit(4, POINTER_BYTES); if (pBlockList == NULL) { code = terrno; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } void* px = taosArrayPush(pBlockList, &pSrcBlock); if (px == NULL) { code = terrno; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId); if (pResColData == NULL) { taosArrayDestroy(pBlockList); code = terrno; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } SColumnInfoData idata = {.info = pResColData->info, .hasNull = true}; @@ -1132,7 +1133,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pBlockList); - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; @@ -1161,7 +1162,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0); if (pfCtx == NULL) { code = terrno; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } code = (*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx); diff --git a/tests/system-test/2-query/join.py b/tests/system-test/2-query/join.py index 1c303b6d96..12cbdea484 100644 --- a/tests/system-test/2-query/join.py +++ b/tests/system-test/2-query/join.py @@ -352,7 +352,102 @@ class TDTestCase: tdSql.execute( f"insert into {dbname}.nt1 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )" ) tdSql.execute( f"insert into {dbname}.nt1 values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )" ) - + def ts5863(self, dbname=DBNAME): + tdSql.execute(f"CREATE STABLE {dbname}.`st_quality` (`ts` TIMESTAMP, `quality` INT, `val` NCHAR(64), `rts` TIMESTAMP) \ + TAGS (`cx` VARCHAR(10), `gyd` VARCHAR(10), `gx` VARCHAR(10), `lx` VARCHAR(10)) SMA(`ts`,`quality`,`val`)") + + tdSql.execute(f"create table {dbname}.st_q1 using {dbname}.st_quality tags ('cx', 'gyd', 'gx1', 'lx1')") + + sql1 = f"select t.val as batch_no, a.tbname as sample_point_code, min(cast(a.val as double)) as `min`, \ + max(cast(a.val as double)) as `max`, avg(cast(a.val as double)) as `avg` from {dbname}.st_quality t \ + left join {dbname}.st_quality a on a.ts=t.ts and a.cx=t.cx and a.gyd=t.gyd \ + where t.ts >= 1734574900000 and t.ts <= 1734575000000 \ + and t.tbname = 'st_q1' \ + and a.tbname in ('st_q2', 'st_q3') \ + group by t.val, a.tbname" + tdSql.query(sql1) + tdSql.checkRows(0) + + tdSql.execute(f"create table {dbname}.st_q2 using {dbname}.st_quality tags ('cx2', 'gyd2', 'gx2', 'lx2')") + tdSql.execute(f"create table {dbname}.st_q3 using {dbname}.st_quality tags ('cx', 'gyd', 'gx3', 'lx3')") + tdSql.execute(f"create table {dbname}.st_q4 using {dbname}.st_quality tags ('cx', 'gyd', 'gx4', 'lx4')") + + tdSql.query(sql1) + tdSql.checkRows(0) + + tdSql.execute(f"insert into {dbname}.st_q1 values (1734574900000, 1, '1', 1734574900000)") + tdSql.query(sql1) + tdSql.checkRows(0) + tdSql.execute(f"insert into {dbname}.st_q2 values (1734574900000, 1, '1', 1734574900000)") + tdSql.query(sql1) + tdSql.checkRows(0) + tdSql.execute(f"insert into {dbname}.st_q3 values (1734574900000, 1, '1', 1734574900000)") + tdSql.query(sql1) + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 'st_q3') + tdSql.checkData(0, 2, 1) + tdSql.checkData(0, 3, 1) + tdSql.checkData(0, 4, 1) + + tdSql.execute(f"insert into {dbname}.st_q1 values (1734574900001, 2, '2', 1734574900000)") + tdSql.execute(f"insert into {dbname}.st_q3 values (1734574900001, 2, '2', 1734574900000)") + sql2 = f"select t.val as batch_no, a.tbname as sample_point_code, min(cast(a.val as double)) as `min`, \ + max(cast(a.val as double)) as `max`, avg(cast(a.val as double)) as `avg` from {dbname}.st_quality t \ + left join {dbname}.st_quality a on a.ts=t.ts and a.cx=t.cx and a.gyd=t.gyd \ + where t.ts >= 1734574900000 and t.ts <= 1734575000000 \ + and t.tbname = 'st_q1' \ + and a.tbname in ('st_q2', 'st_q3') \ + group by t.val, a.tbname order by batch_no" + tdSql.query(sql2) + tdSql.checkRows(2) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 'st_q3') + tdSql.checkData(0, 2, 1) + tdSql.checkData(0, 3, 1) + tdSql.checkData(0, 4, 1) + tdSql.checkData(1, 0, 2) + tdSql.checkData(1, 1, 'st_q3') + tdSql.checkData(1, 2, 2) + tdSql.checkData(1, 3, 2) + tdSql.checkData(1, 4, 2) + sql3 = f"select min(cast(a.val as double)) as `min` from {dbname}.st_quality t left join {dbname}.st_quality \ + a on a.ts=t.ts and a.cx=t.cx where t.tbname = 'st_q3' and a.tbname in ('st_q3', 'st_q2')" + tdSql.execute(f"insert into {dbname}.st_q1 values (1734574900002, 2, '2', 1734574900000)") + tdSql.execute(f"insert into {dbname}.st_q4 values (1734574900002, 2, '2', 1734574900000)") + tdSql.execute(f"insert into {dbname}.st_q1 values (1734574900003, 3, '3', 1734574900000)") + tdSql.execute(f"insert into {dbname}.st_q3 values (1734574900003, 3, '3', 1734574900000)") + tdSql.query(sql3) + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + sql3 = f"select min(cast(a.val as double)) as `min`, max(cast(a.val as double)) as `max`, avg(cast(a.val as double)) as `avg` \ + from {dbname}.st_quality t left join {dbname}.st_quality a \ + on a.ts=t.ts and a.cx=t.cx where t.tbname = 'st_q3' and a.tbname in ('st_q3', 'st_q2')" + tdSql.query(sql3) + tdSql.checkRows(1) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 3) + tdSql.checkData(0, 2, 2) + tdSql.query(sql1) + tdSql.checkRows(3) + tdSql.query(sql2) + tdSql.checkRows(3) + tdSql.checkData(0, 0, 1) + tdSql.checkData(0, 1, 'st_q3') + tdSql.checkData(0, 2, 1) + tdSql.checkData(0, 3, 1) + tdSql.checkData(0, 4, 1) + tdSql.checkData(1, 0, 2) + tdSql.checkData(1, 1, 'st_q3') + tdSql.checkData(1, 2, 2) + tdSql.checkData(1, 3, 2) + tdSql.checkData(1, 4, 2) + tdSql.checkData(2, 0, 3) + tdSql.checkData(2, 1, 'st_q3') + tdSql.checkData(2, 2, 3) + tdSql.checkData(2, 3, 3) + tdSql.checkData(2, 4, 3) + def run(self): tdSql.prepare() @@ -410,6 +505,7 @@ class TDTestCase: self.all_test() tdSql.query("select count(*) from db.ct1") tdSql.checkData(0, 0, self.rows) + self.ts5863(dbname=dbname1) def stop(self): tdSql.close()