stream final interval operator
This commit is contained in:
parent
fba964778b
commit
5ca0e81bba
|
@ -208,6 +208,7 @@ typedef enum ENodeType {
|
|||
QUERY_NODE_PHYSICAL_PLAN_SORT,
|
||||
QUERY_NODE_PHYSICAL_PLAN_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_FILL,
|
||||
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW,
|
||||
QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW,
|
||||
|
|
|
@ -378,6 +378,13 @@ typedef enum EStreamScanMode {
|
|||
STREAM_SCAN_FROM_DATAREADER,
|
||||
} EStreamScanMode;
|
||||
|
||||
typedef struct SCatchSupporter {
|
||||
SHashObj* pWindowHashTable; // quick locate the window object for each window
|
||||
SDiskbasedBuf* pDataBuf; // buffer based on blocked-wised disk file
|
||||
int32_t keySize;
|
||||
int64_t* pKeyBuf;
|
||||
} SCatchSupporter;
|
||||
|
||||
typedef struct SStreamBlockScanInfo {
|
||||
SArray* pBlockLists; // multiple SSDatablock.
|
||||
SSDataBlock* pRes; // result SSDataBlock
|
||||
|
@ -400,6 +407,8 @@ typedef struct SStreamBlockScanInfo {
|
|||
EStreamScanMode scanMode;
|
||||
SOperatorInfo* pOperatorDumy;
|
||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
||||
SCatchSupporter childAggSup;
|
||||
SArray* childIds;
|
||||
} SStreamBlockScanInfo;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
|
@ -460,6 +469,16 @@ typedef struct SIntervalAggOperatorInfo {
|
|||
bool invertible;
|
||||
} SIntervalAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamFinalIntervalOperatorInfo {
|
||||
SOptrBasicInfo binfo; // basic info
|
||||
SGroupResInfo groupResInfo; // multiple results build supporter
|
||||
SInterval interval; // interval info
|
||||
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
|
||||
SAggSupporter aggSup; // aggregate supporter
|
||||
int32_t order; // current SSDataBlock scan order
|
||||
STimeWindowAggSupp twAggSup;
|
||||
} SStreamFinalIntervalOperatorInfo;
|
||||
|
||||
typedef struct SAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SAggSupporter aggSup;
|
||||
|
@ -696,6 +715,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
|
|||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
|
@ -771,9 +793,8 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
|
|||
TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
|
||||
int32_t order);
|
||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||
|
||||
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes,
|
||||
uint64_t groupId, int32_t numOfOutput);
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize,
|
||||
const char* pKey, const char* pDir);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -342,28 +342,6 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
|
|||
return pResultRow;
|
||||
}
|
||||
|
||||
void doClearWindow(SIntervalAggOperatorInfo* pInfo, char* pData, int16_t bytes,
|
||||
uint64_t groupId, int32_t numOfOutput) {
|
||||
SAggSupporter* pSup = &pInfo->aggSup;
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
|
||||
GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
|
||||
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].resultInfo = getResultCell(pResult, i, pInfo->binfo.rowCellInfoOffset);
|
||||
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
|
||||
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
|
||||
continue;
|
||||
}
|
||||
pResInfo->initialized = false;
|
||||
if (pCtx[i].functionId != -1) {
|
||||
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* the struct of key in hash table
|
||||
* +----------+---------------+
|
||||
|
@ -5321,3 +5299,16 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t initCatchSupporter(SCatchSupporter* pCatchSup, size_t rowSize, size_t keyBufSize,
|
||||
const char* pKey, const char* pDir) {
|
||||
pCatchSup->keySize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
|
||||
pCatchSup->pKeyBuf = taosMemoryCalloc(1, pCatchSup->keySize);
|
||||
int32_t pageSize = rowSize * 32;
|
||||
int32_t bufSize = pageSize * 4096;
|
||||
createDiskbasedBuf(&pCatchSup->pDataBuf, pageSize, bufSize, pKey, pDir);
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pCatchSup->pWindowHashTable = taosHashInit(10000, hashFn, true, HASH_NO_LOCK);;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
@ -36,6 +36,11 @@
|
|||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||
|
||||
typedef struct SWindowPosition {
|
||||
int32_t pageId;
|
||||
int32_t rowId;
|
||||
} SWindowPosition;
|
||||
|
||||
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
|
||||
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
||||
const char* dbName);
|
||||
|
@ -675,6 +680,96 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void static setSupKeyBuf(SCatchSupporter* pSup, int64_t groupId, int64_t childId, TSKEY ts) {
|
||||
int64_t* pKey = (int64_t*)pSup->pKeyBuf;
|
||||
pKey[0] = groupId;
|
||||
pKey[1] = childId;
|
||||
pKey[2] = ts;
|
||||
}
|
||||
|
||||
static int32_t catchWidonwInfo(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
|
||||
int32_t pageId, int32_t tsIndex, int64_t childId) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pDataBlock->pDataBlock, tsIndex);
|
||||
TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
|
||||
for (int32_t i = 0; i < pDataBlock->info.rows; i++) {
|
||||
setSupKeyBuf(pSup, pDataBlock->info.groupId, childId, tsCols[i]);
|
||||
SWindowPosition* p1 = (SWindowPosition*)taosHashGet(pSup->pWindowHashTable,
|
||||
pSup->pKeyBuf, pSup->keySize);
|
||||
if (p1 == NULL) {
|
||||
SWindowPosition pos = {.pageId = pageId, .rowId = i};
|
||||
int32_t code = taosHashPut(pSup->pWindowHashTable, pSup->pKeyBuf, pSup->keySize, &pos,
|
||||
sizeof(SWindowPosition));
|
||||
if (code != TSDB_CODE_SUCCESS ) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
p1->pageId = pageId;
|
||||
p1->rowId = i;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t catchDatablock(SSDataBlock* pDataBlock, SCatchSupporter* pSup,
|
||||
int32_t tsIndex, int64_t childId) {
|
||||
int32_t start = 0;
|
||||
int32_t stop = 0;
|
||||
int32_t pageSize = getBufPageSize(pSup->pDataBuf);
|
||||
while(start < pDataBlock->info.rows) {
|
||||
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pageSize);
|
||||
SSDataBlock* pDB = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
|
||||
if (pDB == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pSup->pDataBuf, pDataBlock->info.groupId, &pageId);
|
||||
if (pPage == NULL) {
|
||||
blockDataDestroy(pDB);
|
||||
return terrno;
|
||||
}
|
||||
int32_t size = blockDataGetSize(pDB) + sizeof(int32_t) + pDB->info.numOfCols * sizeof(int32_t);
|
||||
assert(size <= pageSize);
|
||||
blockDataToBuf(pPage, pDB);
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pSup->pDataBuf, pPage);
|
||||
blockDataDestroy(pDB);
|
||||
start = stop + 1;
|
||||
int32_t code = catchWidonwInfo(pDB, pSup, pageId, tsIndex, childId);
|
||||
if (code != TSDB_CODE_SUCCESS ) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* getDataFromCatch(SStreamBlockScanInfo* pInfo) {
|
||||
SSDataBlock* pBlock = pInfo->pUpdateRes;
|
||||
if (pInfo->updateResIndex < pBlock->info.rows) {
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
SCatchSupporter* pCSup = &pInfo->childAggSup;
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, 0);
|
||||
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
int32_t size = taosArrayGetSize(pInfo->childIds);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
int64_t id = *(int64_t *)taosArrayGet(pInfo->childIds, i);
|
||||
setSupKeyBuf(pCSup, pBlock->info.groupId, id,
|
||||
tsCols[pInfo->updateResIndex]);
|
||||
SWindowPosition* pos = (SWindowPosition*)taosHashGet(pCSup->pWindowHashTable,
|
||||
pCSup->pKeyBuf, pCSup->keySize);
|
||||
void* buf = getBufPage(pCSup->pDataBuf, pos->pageId);
|
||||
SSDataBlock* pDB = createOneDataBlock(pInfo->pRes, false);
|
||||
blockDataFromBuf(pDB, buf);
|
||||
SSDataBlock* pSub = blockDataExtractBlock(pDB, pos->rowId, 1);
|
||||
blockDataMerge(pInfo->pRes, pSub, NULL);
|
||||
blockDataDestroy(pDB);
|
||||
blockDataDestroy(pSub);
|
||||
}
|
||||
pInfo->updateResIndex++;
|
||||
return pInfo->pRes;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||
// NOTE: this operator does never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -688,6 +783,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) {
|
||||
if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
|
||||
SSDataBlock* pDB = getDataFromCatch(pInfo);
|
||||
if (pDB != NULL) {
|
||||
return pDB;
|
||||
} else {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
}
|
||||
}
|
||||
|
||||
if (pInfo->validBlockIndex >= total) {
|
||||
doClearBufferedBlocks(pInfo);
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
|
@ -695,7 +799,17 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
return taosArrayGetP(pInfo->pBlockLists, current);
|
||||
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
|
||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_UPDATERES;
|
||||
} else {
|
||||
int32_t code = catchDatablock(pBlock, &pInfo->childAggSup, pInfo->primaryTsIndex, 0);
|
||||
if (code != TDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
longjmp(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
return pBlock;
|
||||
} else {
|
||||
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
|
||||
blockDataDestroy(pInfo->pUpdateRes);
|
||||
|
@ -857,6 +971,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, void* pDataR
|
|||
pInfo->pOperatorDumy = pOperatorDumy;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
|
||||
size_t childKeyBufSize = sizeof(int64_t) + sizeof(int64_t) + sizeof(TSKEY);
|
||||
initCatchSupporter(&pInfo->childAggSup, 1024, childKeyBufSize,
|
||||
"StreamFinalInterval", "/tmp/"); // TODO(liuyao) get row size from phy plan
|
||||
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
pOperator->blocking = false;
|
||||
|
|
|
@ -8,6 +8,8 @@ typedef enum SResultTsInterpType {
|
|||
RESULT_ROW_END_INTERP = 2,
|
||||
} SResultTsInterpType;
|
||||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator);
|
||||
|
||||
/*
|
||||
* There are two cases to handle:
|
||||
*
|
||||
|
@ -473,8 +475,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun
|
|||
}
|
||||
|
||||
static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
|
||||
TSKEY* primaryKeys, int32_t prevPosition, SIntervalAggOperatorInfo* pInfo) {
|
||||
int32_t order = pInfo->order;
|
||||
TSKEY* primaryKeys, int32_t prevPosition, int32_t order) {
|
||||
bool ascQuery = (order == TSDB_ORDER_ASC);
|
||||
|
||||
int32_t precision = pInterval->precision;
|
||||
|
@ -723,7 +724,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
STimeWindow nextWin = win;
|
||||
while (1) {
|
||||
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo);
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -1031,18 +1032,41 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
|
|||
}
|
||||
}
|
||||
}
|
||||
static void doClearWindows(SIntervalAggOperatorInfo* pInfo, int32_t numOfOutput, SSDataBlock* pBlock) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
|
||||
void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData,
|
||||
int16_t bytes, uint64_t groupId, int32_t numOfOutput) {
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||
SResultRowPosition* p1 =
|
||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf,
|
||||
GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
SResultRow* pResult = getResultRowByPos(pSup->pResultBuf, p1);
|
||||
SqlFunctionCtx* pCtx = pBinfo->pCtx;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].resultInfo = getResultCell(pResult, i, pBinfo->rowCellInfoOffset);
|
||||
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
|
||||
if (fmIsWindowPseudoColumnFunc(pCtx[i].functionId)) {
|
||||
continue;
|
||||
}
|
||||
pResInfo->initialized = false;
|
||||
if (pCtx[i].functionId != -1) {
|
||||
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void doClearWindows(SAggSupporter* pSup, SOptrBasicInfo* pBinfo,
|
||||
SInterval* pIntrerval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||
TSKEY *tsCols = (TSKEY*)pColDataInfo->pData;
|
||||
int32_t step = 0;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i += step) {
|
||||
SResultRowInfo dumyInfo;
|
||||
dumyInfo.cur.pageId = -1;
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], &pInfo->interval,
|
||||
pInfo->interval.precision, NULL);
|
||||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pIntrerval,
|
||||
pIntrerval->precision, NULL);
|
||||
step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i,
|
||||
win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
doClearWindow(pInfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
|
||||
doClearWindow(pSup, pBinfo, (char*)&win.skey, sizeof(TKEY), pBlock->info.groupId, numOfOutput);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1084,7 +1108,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setInverFunction(pInfo->binfo.pCtx, pOperator->numOfExprs, pBlock->info.type);
|
||||
}
|
||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||
doClearWindows(pInfo, pOperator->numOfExprs, pBlock);
|
||||
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
|
||||
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
|
||||
continue;
|
||||
}
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
|
@ -1097,8 +1122,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
|
||||
// TODO: remove for stream
|
||||
/*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
|
@ -1116,6 +1139,12 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
cleanupAggSup(&pInfo->aggSup);
|
||||
}
|
||||
|
||||
void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo *)param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
}
|
||||
|
||||
bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
|
||||
for (int32_t i = 0; i < numOfCols; i++) {
|
||||
if (!fmIsInvertible(pFCtx[i].functionId)) {
|
||||
|
@ -1185,6 +1214,63 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->interval = *pInterval;
|
||||
pInfo->twAggSup = *pTwAggSupp;
|
||||
pInfo->primaryTsIndex = primaryTsSlotId;
|
||||
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(pOperator, 4096);
|
||||
|
||||
int32_t code =
|
||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock,
|
||||
keyBufSize, pTaskInfo->id.str);
|
||||
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
|
||||
|
||||
pOperator->name = "StreamFinalIntervalOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
pOperator->pExpr = pExprInfo;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfExprs = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL,
|
||||
destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow,
|
||||
NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols);
|
||||
taosMemoryFreeClear(pInfo);
|
||||
taosMemoryFreeClear(pOperator);
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
|
||||
|
@ -1548,3 +1634,91 @@ _error:
|
|||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
|
||||
int32_t tableGroupId) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfExprs;
|
||||
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
|
||||
int32_t step = 1;
|
||||
bool ascScan = true;
|
||||
TSKEY* tsCols = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
int32_t forwardStep = 0;
|
||||
|
||||
if (pSDataBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
}
|
||||
int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1);
|
||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan);
|
||||
STimeWindow nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts,
|
||||
&pInfo->interval, pInfo->interval.precision, NULL);
|
||||
while (1) {
|
||||
int32_t code =
|
||||
setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pInfo->binfo.pCtx,
|
||||
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
|
||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
|
||||
pos->groupId = tableGroupId;
|
||||
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
*(int64_t*)pos->key = pResult->win.skey;
|
||||
taosArrayPush(pUpdated, &pos);
|
||||
forwardStep =
|
||||
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
// window start(end) key interpolation
|
||||
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
|
||||
pInfo->order, false);
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
|
||||
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
|
||||
if (startPos < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return pUpdated;
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||
SStreamFinalIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
SArray* pUpdated = NULL;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
|
||||
if (pBlock->info.type == STREAM_REPROCESS) {
|
||||
doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval,
|
||||
pInfo->primaryTsIndex, pOperator->numOfExprs, pBlock);
|
||||
continue;
|
||||
}
|
||||
pUpdated = doHashInterval(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||
}
|
||||
|
||||
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue