fix: join cols not set

This commit is contained in:
factosea 2025-01-08 11:24:24 +08:00
parent 756fcd17f7
commit c85a57c055
4 changed files with 142 additions and 22 deletions

View File

@ -71,7 +71,7 @@ typedef struct SStbJoinDynCtrlInfo {
SDynQueryCtrlExecInfo execInfo;
SStbJoinDynCtrlBasic basic;
SStbJoinDynCtrlCtx ctx;
int16_t outputBlkId;
SDataBlockDescNode* pOutputDataBlockDesc;
} SStbJoinDynCtrlInfo;
typedef struct SDynQueryCtrlOperatorInfo {

View File

@ -16,10 +16,14 @@
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "nodes.h"
#include "operator.h"
#include "os.h"
#include "plannodes.h"
#include "query.h"
#include "querynodes.h"
#include "querytask.h"
#include "tarray.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "thash.h"
@ -901,10 +905,29 @@ static int32_t seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock**
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE void seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
if (pBlock != NULL) {
pBlock->info.id.blockId = pStbJoin->outputBlkId;
static int32_t seqStableJoinComposeRes(SStbJoinDynCtrlInfo* pStbJoin, SSDataBlock* pBlock) {
if (pBlock && pStbJoin && pStbJoin->pOutputDataBlockDesc) {
pBlock->info.id.blockId = pStbJoin->pOutputDataBlockDesc->dataBlockId;
if (!pBlock->pDataBlock) return TSDB_CODE_SUCCESS;
for (int i = pBlock->pDataBlock->size; i < pStbJoin->pOutputDataBlockDesc->pSlots->length; i++) {
SSlotDescNode* pSlot = (SSlotDescNode*)nodesListGetNode(pStbJoin->pOutputDataBlockDesc->pSlots, i);
if (pSlot == NULL) {
qError("seqStableJoinComposeRes: pSlot is NULL, i:%d", i);
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
SColumnInfoData colInfo = createColumnInfoData(pSlot->dataType.type, pSlot->dataType.bytes, pSlot->slotId);
colInfoDataEnsureCapacity(&colInfo, pBlock->info.rows, true);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
}
} else {
qError("seqStableJoinComposeRes: pBlock or pStbJoin is NULL");
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
}
return TSDB_CODE_SUCCESS;
}
int32_t seqStableJoin(SOperatorInfo* pOperator, SSDataBlock** pRes) {
@ -947,7 +970,7 @@ _return:
pOperator->pTaskInfo->code = code;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
} else {
seqStableJoinComposeRes(pStbJoin, *pRes);
code = seqStableJoinComposeRes(pStbJoin, *pRes);
}
return code;
}
@ -1011,7 +1034,7 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO
switch (pInfo->qType) {
case DYN_QTYPE_STB_HASH:
TAOS_MEMCPY(&pInfo->stbJoin.basic, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin));
pInfo->stbJoin.outputBlkId = pPhyciNode->node.pOutputDataBlockDesc->dataBlockId;
pInfo->stbJoin.pOutputDataBlockDesc = pPhyciNode->node.pOutputDataBlockDesc;
code = initSeqStbJoinTableHash(&pInfo->stbJoin.ctx.prev, pInfo->stbJoin.basic.batchFetch);
if (TSDB_CODE_SUCCESS != code) {
goto _error;

View File

@ -18,6 +18,7 @@
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "taoserror.h"
#include "tdatablock.h"
typedef struct SProjectOperatorInfo {
@ -875,7 +876,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
}
pResult->info.rows = 1;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
if (pResult != pSrcBlock) {
@ -889,7 +890,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
if (createNewColModel) {
code = blockDataEnsureCapacity(pResult, pResult->info.rows);
if (code) {
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -975,21 +976,21 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
if (pBlockList == NULL) {
code = terrno;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
void* px = taosArrayPush(pBlockList, &pSrcBlock);
if (px == NULL) {
code = terrno;
taosArrayDestroy(pBlockList);
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
if (pResColData == NULL) {
code = terrno;
taosArrayDestroy(pBlockList);
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
@ -998,7 +999,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pBlockList);
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
@ -1039,7 +1040,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
if (outputColIndex == NULL) {
code = terrno;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
@ -1055,7 +1056,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
if (pCtx[k].fpSet.cleanup != NULL) {
pCtx[k].fpSet.cleanup(&pCtx[k]);
}
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
numOfRows = pResInfo->numOfRes;
@ -1064,14 +1065,14 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
processByRowFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
if (!processByRowFunctionCtx) {
code = terrno;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
void* px = taosArrayPush(processByRowFunctionCtx, &pfCtx);
if (px == NULL) {
code = terrno;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else if (fmIsAggFunc(pfCtx->functionId)) {
@ -1110,20 +1111,20 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
if (pBlockList == NULL) {
code = terrno;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
void* px = taosArrayPush(pBlockList, &pSrcBlock);
if (px == NULL) {
code = terrno;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
SColumnInfoData* pResColData = taosArrayGet(pResult->pDataBlock, outputSlotId);
if (pResColData == NULL) {
taosArrayDestroy(pBlockList);
code = terrno;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
@ -1132,7 +1133,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pBlockList);
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
@ -1161,7 +1162,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0);
if (pfCtx == NULL) {
code = terrno;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
code = (*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx);

View File

@ -352,7 +352,102 @@ class TDTestCase:
tdSql.execute( f"insert into {dbname}.nt1 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )" )
tdSql.execute( f"insert into {dbname}.nt1 values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )" )
def ts5863(self, dbname=DBNAME):
tdSql.execute(f"CREATE STABLE {dbname}.`st_quality` (`ts` TIMESTAMP, `quality` INT, `val` NCHAR(64), `rts` TIMESTAMP) \
TAGS (`cx` VARCHAR(10), `gyd` VARCHAR(10), `gx` VARCHAR(10), `lx` VARCHAR(10)) SMA(`ts`,`quality`,`val`)")
tdSql.execute(f"create table {dbname}.st_q1 using {dbname}.st_quality tags ('cx', 'gyd', 'gx1', 'lx1')")
sql1 = f"select t.val as batch_no, a.tbname as sample_point_code, min(cast(a.val as double)) as `min`, \
max(cast(a.val as double)) as `max`, avg(cast(a.val as double)) as `avg` from {dbname}.st_quality t \
left join {dbname}.st_quality a on a.ts=t.ts and a.cx=t.cx and a.gyd=t.gyd \
where t.ts >= 1734574900000 and t.ts <= 1734575000000 \
and t.tbname = 'st_q1' \
and a.tbname in ('st_q2', 'st_q3') \
group by t.val, a.tbname"
tdSql.query(sql1)
tdSql.checkRows(0)
tdSql.execute(f"create table {dbname}.st_q2 using {dbname}.st_quality tags ('cx2', 'gyd2', 'gx2', 'lx2')")
tdSql.execute(f"create table {dbname}.st_q3 using {dbname}.st_quality tags ('cx', 'gyd', 'gx3', 'lx3')")
tdSql.execute(f"create table {dbname}.st_q4 using {dbname}.st_quality tags ('cx', 'gyd', 'gx4', 'lx4')")
tdSql.query(sql1)
tdSql.checkRows(0)
tdSql.execute(f"insert into {dbname}.st_q1 values (1734574900000, 1, '1', 1734574900000)")
tdSql.query(sql1)
tdSql.checkRows(0)
tdSql.execute(f"insert into {dbname}.st_q2 values (1734574900000, 1, '1', 1734574900000)")
tdSql.query(sql1)
tdSql.checkRows(0)
tdSql.execute(f"insert into {dbname}.st_q3 values (1734574900000, 1, '1', 1734574900000)")
tdSql.query(sql1)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 'st_q3')
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, 1)
tdSql.checkData(0, 4, 1)
tdSql.execute(f"insert into {dbname}.st_q1 values (1734574900001, 2, '2', 1734574900000)")
tdSql.execute(f"insert into {dbname}.st_q3 values (1734574900001, 2, '2', 1734574900000)")
sql2 = f"select t.val as batch_no, a.tbname as sample_point_code, min(cast(a.val as double)) as `min`, \
max(cast(a.val as double)) as `max`, avg(cast(a.val as double)) as `avg` from {dbname}.st_quality t \
left join {dbname}.st_quality a on a.ts=t.ts and a.cx=t.cx and a.gyd=t.gyd \
where t.ts >= 1734574900000 and t.ts <= 1734575000000 \
and t.tbname = 'st_q1' \
and a.tbname in ('st_q2', 'st_q3') \
group by t.val, a.tbname order by batch_no"
tdSql.query(sql2)
tdSql.checkRows(2)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 'st_q3')
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, 1)
tdSql.checkData(0, 4, 1)
tdSql.checkData(1, 0, 2)
tdSql.checkData(1, 1, 'st_q3')
tdSql.checkData(1, 2, 2)
tdSql.checkData(1, 3, 2)
tdSql.checkData(1, 4, 2)
sql3 = f"select min(cast(a.val as double)) as `min` from {dbname}.st_quality t left join {dbname}.st_quality \
a on a.ts=t.ts and a.cx=t.cx where t.tbname = 'st_q3' and a.tbname in ('st_q3', 'st_q2')"
tdSql.execute(f"insert into {dbname}.st_q1 values (1734574900002, 2, '2', 1734574900000)")
tdSql.execute(f"insert into {dbname}.st_q4 values (1734574900002, 2, '2', 1734574900000)")
tdSql.execute(f"insert into {dbname}.st_q1 values (1734574900003, 3, '3', 1734574900000)")
tdSql.execute(f"insert into {dbname}.st_q3 values (1734574900003, 3, '3', 1734574900000)")
tdSql.query(sql3)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
sql3 = f"select min(cast(a.val as double)) as `min`, max(cast(a.val as double)) as `max`, avg(cast(a.val as double)) as `avg` \
from {dbname}.st_quality t left join {dbname}.st_quality a \
on a.ts=t.ts and a.cx=t.cx where t.tbname = 'st_q3' and a.tbname in ('st_q3', 'st_q2')"
tdSql.query(sql3)
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 3)
tdSql.checkData(0, 2, 2)
tdSql.query(sql1)
tdSql.checkRows(3)
tdSql.query(sql2)
tdSql.checkRows(3)
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 1, 'st_q3')
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, 1)
tdSql.checkData(0, 4, 1)
tdSql.checkData(1, 0, 2)
tdSql.checkData(1, 1, 'st_q3')
tdSql.checkData(1, 2, 2)
tdSql.checkData(1, 3, 2)
tdSql.checkData(1, 4, 2)
tdSql.checkData(2, 0, 3)
tdSql.checkData(2, 1, 'st_q3')
tdSql.checkData(2, 2, 3)
tdSql.checkData(2, 3, 3)
tdSql.checkData(2, 4, 3)
def run(self):
tdSql.prepare()
@ -410,6 +505,7 @@ class TDTestCase:
self.all_test()
tdSql.query("select count(*) from db.ct1")
tdSql.checkData(0, 0, self.rows)
self.ts5863(dbname=dbname1)
def stop(self):
tdSql.close()