Merge pull request #12702 from taosdata/feature/3_liaohj
fix(query): set hasNull flag to be true when initializing columnInfodata by default.
This commit is contained in:
commit
24fb3c9e41
|
@ -420,6 +420,11 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
|
||||||
setQueryTimewindow(pReadHandle, pCond);
|
setQueryTimewindow(pReadHandle, pCond);
|
||||||
|
|
||||||
if (pCond->numOfCols > 0) {
|
if (pCond->numOfCols > 0) {
|
||||||
|
int32_t rowLen = 0;
|
||||||
|
for(int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||||
|
rowLen += pCond->colList[i].bytes;
|
||||||
|
}
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
|
pReadHandle->suppInfo.pstatis = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnDataAgg));
|
||||||
if (pReadHandle->suppInfo.pstatis == NULL) {
|
if (pReadHandle->suppInfo.pstatis == NULL) {
|
||||||
|
|
|
@ -14,8 +14,6 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include <executorimpl.h>
|
|
||||||
#include <vnode.h>
|
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "planner.h"
|
#include "planner.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
|
|
@ -586,8 +586,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
|
||||||
int32_t numOfRows = pCtx[k].input.numOfRows;
|
int32_t numOfRows = pCtx[k].input.numOfRows;
|
||||||
int32_t startOffset = pCtx[k].input.startRowIndex;
|
int32_t startOffset = pCtx[k].input.startRowIndex;
|
||||||
|
|
||||||
int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1);
|
pCtx[k].input.startRowIndex = offset;
|
||||||
pCtx[k].input.startRowIndex = pos;
|
|
||||||
pCtx[k].input.numOfRows = forwardStep;
|
pCtx[k].input.numOfRows = forwardStep;
|
||||||
|
|
||||||
if (tsCol != NULL) {
|
if (tsCol != NULL) {
|
||||||
|
@ -758,45 +757,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns);
|
|
||||||
// uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag;
|
|
||||||
// if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) ||
|
|
||||||
// (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) {
|
|
||||||
|
|
||||||
// SColumn* pCol = pOperator->pExpr[i].base.pParam[0].pCol;
|
|
||||||
// if (pCtx[i].columnIndex == -1) {
|
|
||||||
// for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
|
|
||||||
// SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
|
|
||||||
// if (pColData->info.colId == pCol->colId) {
|
|
||||||
// pCtx[i].columnIndex = j;
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// uint32_t status = aAggs[pCtx[i].functionId].status;
|
|
||||||
// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) {
|
|
||||||
// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
|
||||||
// In case of the top/bottom query again the nest query result, which has no timestamp column
|
|
||||||
// don't set the ptsList attribute.
|
|
||||||
// if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
|
||||||
// pCtx[i].ptsList = (int64_t*) tsInfo->pData;
|
|
||||||
// } else {
|
|
||||||
// pCtx[i].ptsList = NULL;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
|
|
||||||
// SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
|
|
||||||
// SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
|
|
||||||
//
|
|
||||||
// pCtx[i].pInput = p->pData;
|
|
||||||
// assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type);
|
|
||||||
// for(int32_t j = 0; j < pBlock->info.rows; ++j) {
|
|
||||||
// char* dst = p->pData + j * p->info.bytes;
|
|
||||||
// taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -2842,9 +2802,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData idata = {0};
|
SColumnInfoData idata = {0};
|
||||||
|
|
||||||
idata.info.type = pSchema[i].type;
|
idata.info.type = pSchema[i].type;
|
||||||
idata.info.bytes = pSchema[i].bytes;
|
idata.info.bytes = pSchema[i].bytes;
|
||||||
idata.info.colId = pSchema[i].colId;
|
idata.info.colId = pSchema[i].colId;
|
||||||
|
idata.hasNull = true;
|
||||||
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &idata);
|
taosArrayPush(pBlock->pDataBlock, &idata);
|
||||||
if (IS_VAR_DATA_TYPE(idata.info.type)) {
|
if (IS_VAR_DATA_TYPE(idata.info.type)) {
|
||||||
|
|
|
@ -898,11 +898,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
|
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
|
||||||
if (upRes) {
|
if (upRes) {
|
||||||
pInfo->pUpdateRes = upRes;
|
pInfo->pUpdateRes = upRes;
|
||||||
if (upRes->info.type = STREAM_REPROCESS) {
|
if (upRes->info.type == STREAM_REPROCESS) {
|
||||||
pInfo->updateResIndex = 0;
|
pInfo->updateResIndex = 0;
|
||||||
prepareDataScan(pInfo);
|
prepareDataScan(pInfo);
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
|
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
|
||||||
} else if (upRes->info.type = STREAM_INVERT) {
|
} else if (upRes->info.type == STREAM_INVERT) {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||||
return upRes;
|
return upRes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1072,7 +1072,9 @@ static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
|
||||||
|
|
||||||
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
SIntervalAggOperatorInfo* pInfo = pOperator->info;
|
SIntervalAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
pInfo->order = TSDB_ORDER_ASC;
|
||||||
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1086,11 +1088,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
// STimeWindow win = {0};
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
SArray* pUpdated = NULL;
|
SArray* pUpdated = NULL;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
@ -1103,16 +1103,18 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
|
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
|
||||||
// caller. Note that all the time window are not close till now.
|
// caller. Note that all the time window are not close till now.
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
|
||||||
if (pInfo->invertible) {
|
if (pInfo->invertible) {
|
||||||
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
|
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||||
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
|
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
|
||||||
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
|
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
|
||||||
|
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
pInfo->order = TSDB_ORDER_ASC;
|
|
||||||
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -207,6 +207,9 @@ bool fmIsInvertible(int32_t funcId) {
|
||||||
case FUNCTION_TYPE_SUM:
|
case FUNCTION_TYPE_SUM:
|
||||||
case FUNCTION_TYPE_STDDEV:
|
case FUNCTION_TYPE_STDDEV:
|
||||||
case FUNCTION_TYPE_AVG:
|
case FUNCTION_TYPE_AVG:
|
||||||
|
case FUNCTION_TYPE_WSTARTTS:
|
||||||
|
case FUNCTION_TYPE_WENDTS:
|
||||||
|
case FUNCTION_TYPE_WDURATION:
|
||||||
res = true;
|
res = true;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -137,7 +137,11 @@ endi
|
||||||
|
|
||||||
sql insert into t1 values(1648791223001,12,14,13,11.1);
|
sql insert into t1 values(1648791223001,12,14,13,11.1);
|
||||||
sleep 500
|
sleep 500
|
||||||
sql select `_wstartts`, c1, c2 ,c3 ,c4, c5 from streamt;
|
sql select * from streamt;
|
||||||
|
|
||||||
|
print count(*) , count(d) , sum(a) , max(b) , min(c)
|
||||||
|
print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05
|
||||||
|
print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
|
||||||
|
|
||||||
if $rows != 4 then
|
if $rows != 4 then
|
||||||
print ======$rows
|
print ======$rows
|
||||||
|
|
|
@ -354,7 +354,7 @@ class TDTestCase:
|
||||||
event.wait()
|
event.wait()
|
||||||
|
|
||||||
tdLog.info("start consume processor")
|
tdLog.info("start consume processor")
|
||||||
pollDelay = 5
|
pollDelay = 15
|
||||||
showMsg = 1
|
showMsg = 1
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
Loading…
Reference in New Issue