refactor: do some internal refactor.
This commit is contained in:
parent
124fb5fc50
commit
2c8b962b5a
|
@ -160,13 +160,6 @@ int32_t qAsyncKillTask(qTaskInfo_t tinfo);
|
|||
*/
|
||||
void qDestroyTask(qTaskInfo_t tinfo);
|
||||
|
||||
/**
|
||||
* Get the queried table uid
|
||||
* @param qHandle
|
||||
* @return
|
||||
*/
|
||||
int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
|
||||
|
||||
/**
|
||||
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
|
||||
*
|
||||
|
|
|
@ -161,4 +161,6 @@ int32_t convertFillType(int32_t mode);
|
|||
int32_t resultrowComparAsc(const void* p1, const void* p2);
|
||||
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified);
|
||||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
|
||||
#endif // TDENGINE_QUERYUTIL_H
|
||||
|
|
|
@ -537,23 +537,6 @@ typedef struct SStreamIntervalOperatorInfo {
|
|||
SWinKey delKey;
|
||||
} SStreamIntervalOperatorInfo;
|
||||
|
||||
typedef struct SFillOperatorInfo {
|
||||
struct SFillInfo* pFillInfo;
|
||||
SSDataBlock* pRes;
|
||||
SSDataBlock* pFinalRes;
|
||||
int64_t totalInputRows;
|
||||
void** p;
|
||||
SSDataBlock* existNewGroupBlock;
|
||||
STimeWindow win;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t primaryTsCol;
|
||||
int32_t primarySrcSlotId;
|
||||
uint64_t curGroupId; // current handled group id
|
||||
SExprInfo* pExprInfo;
|
||||
int32_t numOfExpr;
|
||||
SExprSupp noFillExprSupp;
|
||||
} SFillOperatorInfo;
|
||||
|
||||
typedef struct SDataGroupInfo {
|
||||
uint64_t groupId;
|
||||
int64_t numOfRows;
|
||||
|
@ -805,8 +788,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
|||
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
|
||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
|
||||
|
||||
void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo);
|
||||
|
||||
int32_t getMaximumIdleDurationSec();
|
||||
|
||||
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
|
||||
|
@ -824,9 +805,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
|
|||
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
|
||||
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
|
||||
uint64_t* pGp, void* pTbName);
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
|
||||
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
|
||||
|
||||
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
|
||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
|
|
@ -25,6 +25,8 @@ extern "C" {
|
|||
#include "tcommon.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
|
||||
|
||||
struct SSDataBlock;
|
||||
|
||||
typedef struct SFillColInfo {
|
||||
|
@ -113,12 +115,12 @@ typedef struct SStreamFillInfo {
|
|||
|
||||
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
|
||||
|
||||
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
|
||||
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
|
||||
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
|
||||
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillCols, const struct SNodeListNode* val);
|
||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
|
||||
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
|
||||
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
|
||||
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillCols, const struct SNodeListNode* val);
|
||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||
|
||||
SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||
|
@ -128,6 +130,8 @@ void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
|
|||
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
||||
int64_t getFillInfoStart(struct SFillInfo* pFillInfo);
|
||||
|
||||
bool fillIfWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnInfoData* pDstColInfoData,
|
||||
int32_t rowIndex);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -15,26 +15,15 @@
|
|||
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
#include "os.h"
|
||||
#include "querynodes.h"
|
||||
#include "tfill.h"
|
||||
#include "tname.h"
|
||||
#include "tref.h"
|
||||
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
#include "tsort.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#include "executorimpl.h"
|
||||
#include "index.h"
|
||||
#include "query.h"
|
||||
#include "tcompare.h"
|
||||
#include "thash.h"
|
||||
#include "ttypes.h"
|
||||
#include "vnode.h"
|
||||
|
||||
typedef struct SFetchRspHandleWrapper {
|
||||
uint32_t exchangeId;
|
||||
|
|
|
@ -2002,3 +2002,13 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
|
||||
if (!pBlock || pBlock->info.rows == 0) {
|
||||
qDebug("===stream===printDataBlock: Block is Null or Empty");
|
||||
return;
|
||||
}
|
||||
char* pBuf = NULL;
|
||||
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
|
||||
taosMemoryFree(pBuf);
|
||||
}
|
|
@ -704,6 +704,20 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
|
||||
STaskCostInfo* pSummary = &pTaskInfo->cost;
|
||||
|
||||
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
|
||||
if (pSummary->pRecoder != NULL) {
|
||||
qDebug(
|
||||
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
|
||||
"load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pSummary->extractListTime, pSummary->groupIdMapTime,
|
||||
pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows,
|
||||
pRecorder->totalCheckedRows);
|
||||
}
|
||||
}
|
||||
|
||||
void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
|
||||
if (pTaskInfo == NULL) {
|
||||
|
|
|
@ -91,7 +91,6 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
|
|||
|
||||
static void releaseQueryBuf(size_t numOfTables);
|
||||
|
||||
static void destroyFillOperatorInfo(void* param);
|
||||
static void destroyAggOperatorInfo(void* param);
|
||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||
|
@ -1157,20 +1156,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
}
|
||||
}
|
||||
|
||||
void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
|
||||
STaskCostInfo* pSummary = &pTaskInfo->cost;
|
||||
|
||||
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
|
||||
if (pSummary->pRecoder != NULL) {
|
||||
qDebug(
|
||||
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
|
||||
"load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pSummary->extractListTime, pSummary->groupIdMapTime,
|
||||
pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows,
|
||||
pRecorder->totalCheckedRows);
|
||||
}
|
||||
}
|
||||
|
||||
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
|
||||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
//
|
||||
|
@ -1513,179 +1498,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
|||
return (rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
|
||||
int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
|
||||
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
|
||||
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
|
||||
|
||||
pInfo->curGroupId = pInfo->existNewGroupBlock->info.id.groupId;
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
|
||||
pInfo->pRes->info.id.groupId = pInfo->curGroupId;
|
||||
return;
|
||||
}
|
||||
|
||||
// handle the cached new group data block
|
||||
if (pInfo->existNewGroupBlock) {
|
||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
setInputDataBlock(pSup, pBlock, order, scanFlag, false);
|
||||
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
|
||||
|
||||
// reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
|
||||
pInfo->pRes->info.rows = 0;
|
||||
SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
|
||||
setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
|
||||
|
||||
projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs, NULL);
|
||||
pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
|
||||
}
|
||||
|
||||
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > 0) {
|
||||
pResBlock->info.id.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
|
||||
if (pBlock == NULL) {
|
||||
if (pInfo->totalInputRows == 0) {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
|
||||
} else {
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
||||
blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
|
||||
doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
|
||||
|
||||
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.id.groupId) {
|
||||
pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block
|
||||
pInfo->totalInputRows += pInfo->pRes->info.rows;
|
||||
|
||||
if (order == pInfo->pFillInfo->order) {
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
|
||||
} else {
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey);
|
||||
}
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
|
||||
} else if (pInfo->curGroupId != pBlock->info.id.groupId) { // the new group data block
|
||||
pInfo->existNewGroupBlock = pBlock;
|
||||
|
||||
// Fill the previous group data block, before handle the data block of new group.
|
||||
// Close the fill operation for previous group data block
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
|
||||
|
||||
// current group has no more result to return
|
||||
if (pResBlock->info.rows > 0) {
|
||||
// 1. The result in current group not reach the threshold of output result, continue
|
||||
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
|
||||
if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
|
||||
pResBlock->info.id.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
|
||||
pResBlock->info.id.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
assert(pBlock != NULL);
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
|
||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > pResultInfo->threshold) {
|
||||
pResBlock->info.id.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* fillResult = NULL;
|
||||
while (true) {
|
||||
fillResult = doFillImpl(pOperator);
|
||||
if (fillResult == NULL) {
|
||||
setOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
|
||||
if (fillResult->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (fillResult != NULL) {
|
||||
pOperator->resultInfo.totalRows += fillResult->info.rows;
|
||||
}
|
||||
|
||||
return fillResult;
|
||||
}
|
||||
|
||||
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SExprInfo* pExprInfo = &pExpr[i];
|
||||
|
@ -1955,167 +1767,6 @@ void destroyAggOperatorInfo(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
void destroyFillOperatorInfo(void* param) {
|
||||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
|
||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);
|
||||
|
||||
cleanupExprSupp(&pInfo->noFillExprSupp);
|
||||
|
||||
taosMemoryFreeClear(pInfo->p);
|
||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
|
||||
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
|
||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
|
||||
|
||||
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
|
||||
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
|
||||
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
|
||||
|
||||
pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||
pInfo->primaryTsCol, order, id);
|
||||
|
||||
if (order == TSDB_ORDER_ASC) {
|
||||
pInfo->win.skey = win.skey;
|
||||
pInfo->win.ekey = win.ekey;
|
||||
} else {
|
||||
pInfo->win.skey = win.ekey;
|
||||
pInfo->win.ekey = win.skey;
|
||||
}
|
||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||
|
||||
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
|
||||
taosMemoryFree(pInfo->pFillInfo);
|
||||
taosMemoryFree(pInfo->p);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
|
||||
if (pInfo->noFillExprSupp.numOfExprs == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
|
||||
SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
|
||||
if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
|
||||
exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
|
||||
const char* idStr) {
|
||||
bool wstartExist = isWstartColumnExist(pInfo);
|
||||
|
||||
if (wstartExist == false) {
|
||||
if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
|
||||
qError("pWStartTs of fill physical node is not a target node, %s", idStr);
|
||||
return TSDB_CODE_QRY_SYS_ERROR;
|
||||
}
|
||||
|
||||
SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
|
||||
pExprSupp->numOfExprs += 1;
|
||||
pExprSupp->pExprInfo = pExpr;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
|
||||
SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
|
||||
pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs);
|
||||
int32_t code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SInterval* pInterval =
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||
|
||||
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||
pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
|
||||
COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
|
||||
|
||||
code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
|
||||
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
|
||||
pTaskInfo->id.str, pInterval, type, order);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
|
||||
blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
|
||||
|
||||
code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
if (pInfo != NULL) {
|
||||
destroyFillOperatorInfo(pInfo);
|
||||
}
|
||||
|
||||
pTaskInfo->code = code;
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
|
||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||
if (pTaskInfo == NULL) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1335,30 +1335,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
|
|||
return code;
|
||||
}
|
||||
|
||||
static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* pResBlock) {
|
||||
if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
|
||||
if (pBlock == NULL || pBlock->info.rows == 0) return;
|
||||
|
||||
SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
|
||||
ASSERT(pSrcBlock->info.rows == 1);
|
||||
|
||||
blockDataEnsureCapacity(pResBlock, 1);
|
||||
|
||||
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
|
||||
ASSERT(pResBlock->info.rows == 1);
|
||||
|
||||
// build tagArray
|
||||
/*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
|
||||
/*STagVal tagVal = {*/
|
||||
/*.cid = 0,*/
|
||||
/*.type = 0,*/
|
||||
/*};*/
|
||||
// build STag
|
||||
// set STag
|
||||
|
||||
blockDataDestroy(pSrcBlock);
|
||||
}
|
||||
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
|
||||
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -677,16 +677,6 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
|
|||
}
|
||||
}
|
||||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
|
||||
if (!pBlock || pBlock->info.rows == 0) {
|
||||
qDebug("===stream===printDataBlock: Block is Null or Empty");
|
||||
return;
|
||||
}
|
||||
char* pBuf = NULL;
|
||||
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
|
||||
taosMemoryFree(pBuf);
|
||||
}
|
||||
|
||||
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);
|
||||
|
||||
int32_t binarySearchCom(void* keyList, int num, void* pKey, int order, __compare_fn_t comparefn) {
|
||||
|
@ -3854,7 +3844,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
||||
int64_t groupId = pSDataBlock->info.id.groupId;
|
||||
uint64_t groupId = pSDataBlock->info.id.groupId;
|
||||
int64_t code = TSDB_CODE_SUCCESS;
|
||||
TSKEY* tsCols = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
|
|
Loading…
Reference in New Issue