refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-05-19 16:49:43 +08:00
parent 90a6ec0917
commit e0b9da9c3d
7 changed files with 26 additions and 56 deletions

View File

@ -14,8 +14,6 @@
*/ */
#include "executor.h" #include "executor.h"
#include <executorimpl.h>
#include <vnode.h>
#include "executorimpl.h" #include "executorimpl.h"
#include "planner.h" #include "planner.h"
#include "tdatablock.h" #include "tdatablock.h"

View File

@ -343,21 +343,24 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
return pResultRow; return pResultRow;
} }
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes, void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes, uint64_t groupId, int32_t numOfOutput) {
uint64_t groupId, int32_t numOfOutput) {
SAggSupporter* pSup = &pInfo->aggSup; SAggSupporter* pSup = &pInfo->aggSup;
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
GET_RES_WINDOW_KEY_LEN(bytes));
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1); SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset); pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset);
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) { if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
continue; continue;
} }
pResInfo->initialized = false; pResInfo->initialized = false;
if (pCtx[i].functionId != -1) { if (pCtx[i].functionId != -1) {
pCtx[i].fpSet.init(&pCtx[i], pResInfo); pCtx[i].fpSet.init(&pCtx[i], pResInfo);
@ -608,8 +611,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
int32_t numOfRows = pCtx[k].input.numOfRows; int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t startOffset = pCtx[k].input.startRowIndex; int32_t startOffset = pCtx[k].input.startRowIndex;
int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1); pCtx[k].input.startRowIndex = offset;
pCtx[k].input.startRowIndex = pos;
pCtx[k].input.numOfRows = forwardStep; pCtx[k].input.numOfRows = forwardStep;
if (tsCol != NULL) { if (tsCol != NULL) {
@ -780,45 +782,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
} }
} }
} }
// setBlockStatisInfo(&pCtx[i], pBlock, pOperator->pExpr[i].base.pColumns);
// uint32_t flag = pOperator->pExpr[i].base.pParam[0].pCol->flag;
// if (TSDB_COL_IS_NORMAL_COL(flag) /*|| (pCtx[i].functionId == FUNCTION_BLKINFO) ||
// (TSDB_COL_IS_TAG(flag) && pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)*/) {
// SColumn* pCol = pOperator->pExpr[i].base.pParam[0].pCol;
// if (pCtx[i].columnIndex == -1) {
// for(int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
// SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, j);
// if (pColData->info.colId == pCol->colId) {
// pCtx[i].columnIndex = j;
// break;
// }
// }
// }
// uint32_t status = aAggs[pCtx[i].functionId].status;
// if ((status & (FUNCSTATE_SELECTIVITY | FUNCSTATE_NEED_TS)) != 0) {
// SColumnInfoData* tsInfo = taosArrayGet(pBlock->pDataBlock, 0);
// In case of the top/bottom query again the nest query result, which has no timestamp column
// don't set the ptsList attribute.
// if (tsInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
// pCtx[i].ptsList = (int64_t*) tsInfo->pData;
// } else {
// pCtx[i].ptsList = NULL;
// }
// }
// } else if (TSDB_COL_IS_UD_COL(pCol->flag) && (pOperator->pRuntimeEnv->scanFlag == MERGE_STAGE)) {
// SColIndex* pColIndex = &pOperator->pExpr[i].base.colInfo;
// SColumnInfoData* p = taosArrayGet(pBlock->pDataBlock, pColIndex->colIndex);
//
// pCtx[i].pInput = p->pData;
// assert(p->info.colId == pColIndex->info.colId && pCtx[i].inputType == p->info.type);
// for(int32_t j = 0; j < pBlock->info.rows; ++j) {
// char* dst = p->pData + j * p->info.bytes;
// taosVariantDump(&pOperator->pExpr[i].base.param[1], dst, p->info.type, true);
// }
// }
} }
return code; return code;

View File

@ -784,11 +784,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); //TODO(liuyao) get invertible from plan
if (upRes) { if (upRes) {
pInfo->pUpdateRes = upRes; pInfo->pUpdateRes = upRes;
if (upRes->info.type = STREAM_REPROCESS) { if (upRes->info.type == STREAM_REPROCESS) {
pInfo->updateResIndex = 0; pInfo->updateResIndex = 0;
prepareDataScan(pInfo); prepareDataScan(pInfo);
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES; pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
} else if (upRes->info.type = STREAM_INVERT) { } else if (upRes->info.type == STREAM_INVERT) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
return upRes; return upRes;
} }

View File

@ -1048,7 +1048,9 @@ static void doClearWindows(SIntervalAggOperatorInfo* pInfo, int32_t numOfOutput,
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SIntervalAggOperatorInfo* pInfo = pOperator->info; SIntervalAggOperatorInfo* pInfo = pOperator->info;
int32_t order = TSDB_ORDER_ASC; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
pInfo->order = TSDB_ORDER_ASC;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
@ -1062,11 +1064,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
} }
// STimeWindow win = {0};
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = NULL; SArray* pUpdated = NULL;
while (1) { while (1) {
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
@ -1079,15 +1079,17 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now. // caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
if (pInfo->invertible) { if (pInfo->invertible) {
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type); setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
} }
if (pBlock->info.type == STREAM_REPROCESS) { if (pBlock->info.type == STREAM_REPROCESS) {
doClearWindows(pInfo, pOperator->numOfExprs, pBlock); doClearWindows(pInfo, pOperator->numOfExprs, pBlock);
qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo));
continue; continue;
} }
pInfo->order = TSDB_ORDER_ASC;
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
} }

View File

@ -203,6 +203,9 @@ bool fmIsInvertible(int32_t funcId) {
case FUNCTION_TYPE_SUM: case FUNCTION_TYPE_SUM:
case FUNCTION_TYPE_STDDEV: case FUNCTION_TYPE_STDDEV:
case FUNCTION_TYPE_AVG: case FUNCTION_TYPE_AVG:
case FUNCTION_TYPE_WSTARTTS:
case FUNCTION_TYPE_WENDTS:
case FUNCTION_TYPE_WDURATION:
res = true; res = true;
break; break;
default: default:

View File

@ -137,7 +137,11 @@ endi
sql insert into t1 values(1648791223001,12,14,13,11.1); sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 500 sleep 500
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05
print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
if $rows != 4 then if $rows != 4 then
print ======$rows print ======$rows

View File

@ -354,7 +354,7 @@ class TDTestCase:
event.wait() event.wait()
tdLog.info("start consume processor") tdLog.info("start consume processor")
pollDelay = 5 pollDelay = 15
showMsg = 1 showMsg = 1
showRow = 1 showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)