stream time slice

This commit is contained in:
54liuyao 2024-07-12 11:03:45 +08:00
parent 2dbedaf88b
commit 1c9011e820
34 changed files with 1286 additions and 105 deletions

View File

@ -458,6 +458,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
} ENodeType;
typedef struct {

View File

@ -39,8 +39,9 @@ extern "C" {
#define META_READER_LOCK 0x0
#define META_READER_NOLOCK 0x1
#define STREAM_STATE_BUFF_HASH 1
#define STREAM_STATE_BUFF_SORT 2
#define STREAM_STATE_BUFF_HASH 1
#define STREAM_STATE_BUFF_SORT 2
#define STREAM_STATE_BUFF_HASH_SORT 3
typedef struct SMeta SMeta;
typedef TSKEY (*GetTsFun)(void*);
@ -349,6 +350,8 @@ typedef struct SStateStore {
int32_t (*streamStateFillPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t (*streamStateFillGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateFillDel)(SStreamState* pState, const SWinKey* key);
int32_t (*streamStateFillGetNext)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen);
int32_t (*streamStateFillGetPrev)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen);
int32_t (*streamStateCurNext)(SStreamState* pState, SStreamStateCur* pCur);
int32_t (*streamStateCurPrev)(SStreamState* pState, SStreamStateCur* pCur);

View File

@ -194,14 +194,25 @@ typedef struct SIndefRowsFuncLogicNode {
bool isTimeLineFunc;
} SIndefRowsFuncLogicNode;
typedef struct SStreamOption {
int8_t triggerType;
int64_t watermark;
int64_t deleteMark;
int8_t igExpired;
int8_t igCheckUpdate;
int8_t destHasPrimaryKey;
} SStreamOption;
typedef struct SInterpFuncLogicNode {
SLogicNode node;
SNodeList* pFuncs;
STimeWindow timeRange;
int64_t interval;
EFillMode fillMode;
SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode
SLogicNode node;
SNodeList* pFuncs;
STimeWindow timeRange;
int64_t interval;
EFillMode fillMode;
SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode
//todo(liuyao) 补充clone和json等
SStreamOption streamOption;
} SInterpFuncLogicNode;
typedef struct SGroupCacheLogicNode {
@ -496,17 +507,20 @@ typedef struct SIndefRowsFuncPhysiNode {
} SIndefRowsFuncPhysiNode;
typedef struct SInterpFuncPhysiNode {
SPhysiNode node;
SNodeList* pExprs;
SNodeList* pFuncs;
STimeWindow timeRange;
int64_t interval;
int8_t intervalUnit;
EFillMode fillMode;
SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode
SPhysiNode node;
SNodeList* pExprs;
SNodeList* pFuncs;
STimeWindow timeRange;
int64_t interval;
int8_t intervalUnit;
EFillMode fillMode;
SNode* pFillValues; // SNodeListNode
SNode* pTimeSeries; // SColumnNode
SStreamOption streamOption; //todo(liuyao) 补充clone和json等
} SInterpFuncPhysiNode;
typedef SInterpFuncPhysiNode SStreamInterpFuncPhysiNode;
typedef struct SSortMergeJoinPhysiNode {
SPhysiNode node;
EJoinType joinType;
@ -635,7 +649,7 @@ typedef struct SWindowPhysiNode {
int64_t watermark;
int64_t deleteMark;
int8_t igExpired;
int8_t destHasPrimayKey;
int8_t destHasPrimaryKey;
bool mergeDataBlock;
} SWindowPhysiNode;

View File

@ -76,6 +76,8 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key);
int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen);
int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateReleaseBuf(SStreamState* pState, void* pVal, bool used);

View File

@ -37,13 +37,15 @@ typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
typedef void* (*_state_buff_create_statekey_fn)(SRowBuffPos* pPos, int64_t num);
typedef int32_t (*_state_file_remove_fn)(SStreamFileState* pFileState, const void* pKey);
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen);
typedef int32_t (*_state_file_get_fn)(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen);
typedef int32_t (*_state_file_clear_fn)(SStreamState* pState);
typedef int32_t (*_state_fun_get_fn) (SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
typedef int32_t (*range_cmpr_fn)(const SSessionKey* pWin1, const SSessionKey* pWin2);
typedef int (*__session_compare_fn_t)(const void* pWin, const void* pDatas, int pos);
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId,
int64_t checkpointId, int8_t type);
@ -69,9 +71,11 @@ int32_t streamFileStateGetSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
void* getRowStateBuff(SStreamFileState* pFileState);
void* getSearchBuff(SStreamFileState* pFileState);
void* getStateFileStore(SStreamFileState* pFileState);
bool isDeteled(SStreamFileState* pFileState, TSKEY ts);
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap);
TSKEY getFlushMark(SStreamFileState* pFileState);
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState);
int32_t getRowStateRowSize(SStreamFileState* pFileState);
@ -98,6 +102,8 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur);
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey, range_cmpr_fn cmpFn);
int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_fn_t cmpFn);
// state window
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
@ -110,6 +116,15 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
//fill
int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen);
int32_t hashSortFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen);
int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey);
void clearSearchBuff(SStreamFileState* pFileState);
int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen);
int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen);
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId);
#ifdef __cplusplus
}
#endif

View File

@ -50,6 +50,8 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillPut = streamStateFillPut;
pStore->streamStateFillGet = streamStateFillGet;
pStore->streamStateFillDel = streamStateFillDel;
pStore->streamStateFillGetNext = streamStateFillGetNext;
pStore->streamStateFillGetPrev = streamStateFillGetPrev;
pStore->streamStateCurNext = streamStateCurNext;
pStore->streamStateCurPrev = streamStateCurPrev;

View File

@ -166,6 +166,8 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillPut = streamStateFillPut;
pStore->streamStateFillGet = streamStateFillGet;
pStore->streamStateFillDel = streamStateFillDel;
pStore->streamStateFillGetNext = streamStateFillGetNext;
pStore->streamStateFillGetPrev = streamStateFillGetPrev;
pStore->streamStateCurNext = streamStateCurNext;
pStore->streamStateCurPrev = streamStateCurPrev;

View File

@ -806,6 +806,36 @@ typedef struct SStreamFillOperatorInfo {
SStreamFillInfo* pFillInfo;
} SStreamFillOperatorInfo;
typedef struct SStreamTimeSliceOperatorInfo {
SSteamOpBasicInfo basic;
STimeWindowAggSupp twAggSup;
SStreamAggSupporter streamAggSup;
SStreamFillSupporter* pFillSup;
SStreamFillInfo* pFillInfo;
SSDataBlock* pRes;
SSDataBlock* pDelRes;
bool recvCkBlock;
SSDataBlock* pCheckpointRes;
int32_t fillType;
SResultRowData leftRow;
SResultRowData valueRow;
SResultRowData rightRow;
int32_t primaryTsIndex;
SExprSupp scalarSup; // scalar calculation
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
bool destHasPrimaryKey;
SArray* historyPoints;
SArray* pUpdated; // SWinKey
SSHashObj* pUpdatedMap;
int32_t delIndex;
SArray* pDelWins; // SWinKey
SSHashObj* pDeletedMap;
uint64_t numOfDatapack;
SGroupResInfo groupResInfo;
bool ignoreNull;
} SStreamTimeSliceOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
@ -963,6 +993,8 @@ void resetUnCloseSessionWinInfo(SSHashObj* winMap);
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
void destroyFlusedPos(void* pRes);
bool isIrowtsPseudoColumn(SExprInfo* pExprInfo);
bool isIsfilledPseudoColumn(SExprInfo* pExprInfo);
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key);
@ -995,6 +1027,8 @@ void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArr
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, int32_t status);
bool getIgoreNullRes(SExprSupp* pExprSup);
bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull);

View File

@ -166,6 +166,8 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t
SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle);
// clang-format on
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,

View File

@ -21,10 +21,42 @@ extern "C" {
#include "executorInt.h"
#define FILL_POS_INVALID 0
#define FILL_POS_START 1
#define FILL_POS_MID 2
#define FILL_POS_END 3
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
int64_t getDeleteMarkFromOption(SStreamOption* pOption);
void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins);
void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins);
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo);
void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal);
int32_t getNexWindowPos(SInterval* pInterval, SDataBlockInfo* pBlockInfo, TSKEY* tsCols, int32_t startPos, TSKEY eKey,
STimeWindow* pNextWin);
int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap);
void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup);
void doBuildDeleteResultImpl(SStateStore* pAPI, SStreamState* pState, SArray* pWins, int32_t* index,
SSDataBlock* pBlock);
SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index);
int32_t initResultBuf(SStreamFillSupporter* pFillSup);
void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup);
void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol);
void resetFillWindow(SResultRowData* pRowData);
bool hasPrevWindow(SStreamFillSupporter* pFillSup);
bool hasNextWindow(SStreamFillSupporter* pFillSup);
void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo);
void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo);
void setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell);
bool hasRemainCalc(SStreamFillInfo* pFillInfo);
void destroySPoint(void* ptr);
int winPosCmprImpl(const void* pKey1, const void* pKey2);
#ifdef __cplusplus
}
#endif

View File

@ -2317,6 +2317,8 @@ char* getStreamOpName(uint16_t opType) {
return "stream event";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT:
return "stream count";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
return "stream interp";
}
return "";
}

View File

@ -546,6 +546,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
pOptr = createStreamCountAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_COUNT == type) {
pOptr = createCountwindowOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC == type) {
pOptr = createStreamTimeSliceOperatorInfo(ops[0], pPhyNode, pTaskInfo, pHandle);
} else {
terrno = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = terrno;

View File

@ -1299,6 +1299,10 @@ static bool isCountWindow(SStreamScanInfo* pInfo) {
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
}
static bool isTimeSlice(SStreamScanInfo* pInfo) {
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;
}
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
@ -1795,6 +1799,11 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
return TSDB_CODE_SUCCESS;
}
static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) {
// todo(liuyao) add code 获取delete range的左邻居和右邻居作为range
return TSDB_CODE_SUCCESS;
}
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) {
blockDataCleanup(pDestBlock);
if (pSrcBlock->info.rows == 0) {
@ -1980,6 +1989,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
code = generateSessionScanRange(pInfo, pSrcBlock, pDestBlock, type);
} else if (isCountWindow(pInfo)) {
code = generateCountScanRange(pInfo, pSrcBlock, pDestBlock, type);
} else if (isTimeSlice(pInfo)) {
code = generateTimeSliceScanRange(pInfo, pSrcBlock, pDestBlock, type);
} else {
code = generateDeleteResultBlock(pInfo, pSrcBlock, pDestBlock);
}

View File

@ -715,7 +715,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimayKey;
pInfo->destHasPrimaryKey = pCountNode->window.destHasPrimaryKey;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT;
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT, true,

View File

@ -765,7 +765,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->reCkBlock = false;
pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimayKey;
pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimaryKey;
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
pInfo, pTaskInfo);

View File

@ -21,6 +21,7 @@
#include "ttypes.h"
#include "executorInt.h"
#include "streamexecutorInt.h"
#include "tcommon.h"
#include "thash.h"
#include "ttime.h"
@ -32,12 +33,6 @@
#include "operator.h"
#include "querytask.h"
#define FILL_POS_INVALID 0
#define FILL_POS_START 1
#define FILL_POS_MID 2
#define FILL_POS_END 3
typedef struct STimeRange {
TSKEY skey;
TSKEY ekey;
@ -133,12 +128,12 @@ static void destroyStreamFillOperatorInfo(void* param) {
taosMemoryFree(pInfo);
}
static void resetFillWindow(SResultRowData* pRowData) {
void resetFillWindow(SResultRowData* pRowData) {
pRowData->key = INT64_MIN;
taosMemoryFreeClear(pRowData->pRowVal);
}
void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, void* pState, SStorageAPI* pAPI) {
void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) {
if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
resetFillWindow(&pFillSup->cur);
} else {
@ -154,7 +149,7 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
void* pState = pOperator->pTaskInfo->streamInfo.pState;
resetPrevAndNextWindow(pFillSup, pState, pAPI);
resetPrevAndNextWindow(pFillSup);
SWinKey key = {.ts = ts, .groupId = groupId};
int32_t curVLen = 0;
@ -167,7 +162,7 @@ void getCurWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupI
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
void* pState = pOperator->pTaskInfo->streamInfo.pState;
resetPrevAndNextWindow(pFillSup, pState, pAPI);
resetPrevAndNextWindow(pFillSup);
SWinKey key = {.ts = ts, .groupId = groupId};
void* curVal = NULL;
@ -233,14 +228,14 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
pAPI->stateStore.streamStateFreeCur(pCur);
}
static bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; }
static bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; }
bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; }
bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; }
static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) {
return pFillSup->nextNext.key != INT64_MIN;
return false;
}
static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) {
void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
@ -262,30 +257,7 @@ static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKE
pRowVal->key = ts;
}
static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pRowVal, SArray* pDelta,
SFillColInfo* pFillCol, int32_t numOfCol, int32_t winCount, int32_t order) {
for (int32_t i = 0; i < numOfCol; i++) {
if (!pFillCol[i].notFillCol) {
int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i);
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
char* var = colDataGetData(pCol, rowId);
double start = 0;
GET_TYPED_DATA(start, double, pCol->info.type, var);
SResultCellData* pCell = getResultCell(pRowVal, slotId);
double end = 0;
GET_TYPED_DATA(end, double, pCell->type, pCell->pData);
double delta = 0;
if (order == TSDB_ORDER_ASC) {
delta = (end - start) / winCount;
} else {
delta = (start - end) / winCount;
}
taosArraySet(pDelta, slotId, &delta);
}
}
}
static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol,
void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol,
int32_t numOfCol) {
for (int32_t i = 0; i < numOfCol; i++) {
if (!pFillCol[i].notFillCol) {
@ -308,7 +280,7 @@ static void setFillInfoEnd(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFil
pFillInfo->end = ts;
}
static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
setFillInfoStart(start, pInterval, pFillInfo);
pFillInfo->current = pFillInfo->start;
setFillInfoEnd(end, pInterval, pFillInfo);
@ -519,7 +491,7 @@ static bool buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFill
return true;
}
static bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
if (pFillInfo->current != INT64_MIN && pFillInfo->current <= pFillInfo->end) {
return true;
}
@ -1004,7 +976,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
return pInfo->pRes;
}
static int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
int32_t initResultBuf(SStreamFillSupporter* pFillSup) {
pFillSup->rowSize = sizeof(SResultCellData) * pFillSup->numOfAllCols;
for (int i = 0; i < pFillSup->numOfAllCols; i++) {
SFillColInfo* pCol = &pFillSup->pAllColInfo[i];

View File

@ -0,0 +1,778 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "filter.h"
#include "function.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "storageapi.h"
#include "streamexecutorInt.h"
#include "tcommon.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tfill.h"
#include "ttime.h"
#define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState"
#define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint"
#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN)
typedef struct SSlicePoint {
SWinKey key;
SResultRowData* pLeftRow;
SResultRowData* pRightRow;
SRowBuffPos* pResPos;
} SSlicePoint;
void streamTimeSliceReleaseState(SOperatorInfo* pOperator) {
// todo(liuyao) add
}
void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
// todo(liuyao) add
}
void destroyStreamTimeSliceOperatorInfo(void* param) {
SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)param;
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes);
destroyStreamAggSupporter(&pInfo->streamAggSup);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
tSimpleHashCleanup(pInfo->pDeletedMap);
blockDataDestroy(pInfo->pCheckpointRes);
// todo(liuyao) 看是否有遗漏
taosMemoryFreeClear(param);
}
static void doStreamTimeSliceSaveCheckpoint(SOperatorInfo* pOperator) {
// todo(liuyao) add
}
static bool fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock) {
if (pBlock->info.rows >= pBlock->info.capacity) {
return false;
}
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
int32_t dstSlotId = GET_DEST_SLOT_ID(pFillCol);
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
if (isIrowtsPseudoColumn(pFillCol->pExpr)) {
colDataSetVal(pDstCol, pBlock->info.rows, (char*)&ts, false);
} else if (isIsfilledPseudoColumn(pFillCol->pExpr)) {
bool isFilled = false;
colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
} else {
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SResultCellData* pCell = getResultCell(pResRow, srcSlot);
setRowCell(pDstCol, pBlock->info.rows, pCell);
}
}
pBlock->info.rows += 1;
return true;
}
static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
if (inWinRange(&pFillSup->winRange, &st)) {
fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock);
}
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision);
}
}
static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
int32_t dstSlotId = GET_DEST_SLOT_ID(pFillCol);
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
int16_t type = pDstCol->info.type;
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot);
int32_t index = pBlock->info.rows;
if (isIrowtsPseudoColumn(pFillCol->pExpr)) {
colDataSetVal(pDstCol, pBlock->info.rows, (char*)&pFillInfo->current, false);
} else if (isIsfilledPseudoColumn(pFillCol->pExpr)) {
bool isFilled = true;
colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
} else {
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
colDataSetNULL(pDstCol, index);
continue;
}
SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, srcSlot);
double vCell = 0;
SPoint start = {0};
start.key = pFillInfo->pResRow->key;
start.val = pCell->pData;
SPoint cur = {0};
cur.key = pFillInfo->current;
cur.val = taosMemoryCalloc(1, pCell->bytes);
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
colDataSetVal(pDstCol, index, (const char*)cur.val, false);
destroySPoint(&cur);
}
}
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision);
pBlock->info.rows++;
}
}
static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
SSDataBlock* pRes) {
if (pFillInfo->needFill == false) {
fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes);
return;
}
if (pFillInfo->pos == FILL_POS_START) {
if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) {
pFillInfo->pos = FILL_POS_INVALID;
}
}
if (pFillInfo->type != TSDB_FILL_LINEAR) {
fillNormalRange(pFillSup, pFillInfo, pRes);
} else {
fillLinearRange(pFillSup, pFillInfo, pRes);
if (pFillInfo->pos == FILL_POS_MID) {
if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) {
pFillInfo->pos = FILL_POS_INVALID;
}
}
if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
pFillInfo->pLinearInfo->hasNext = false;
taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
pFillInfo->pResRow = &pFillSup->cur;
setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
fillLinearRange(pFillSup, pFillInfo, pRes);
}
}
if (pFillInfo->pos == FILL_POS_END) {
if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) {
pFillInfo->pos = FILL_POS_INVALID;
}
}
}
static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId, bool ignoreNull) {
if (!ignoreNull) {
return rowId;
}
for (int32_t i = rowId; rowId < pBlock->info.rows; i++) {
if (!checkNullRow(pExprSup, pBlock, rowId, ignoreNull)) {
return i;
}
}
return -1;
}
static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId,
bool ignoreNull) {
TSKEY ts = tsCols[rowId];
int32_t resRow = -1;
for (; rowId >= 0; rowId--) {
if (checkNullRow(pExprSup, pBlock, rowId, ignoreNull)) {
continue;
}
if (ts != tsCols[rowId]) {
if (resRow >= 0) {
break;
} else {
ts = tsCols[rowId];
}
}
resRow = rowId;
}
return resRow;
}
static void getPointRowDataFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup,
SSlicePoint* pPoint) {
int32_t curVLen = 0;
int32_t code =
pAggSup->stateStore.streamStateFillGet(pAggSup->pState, &pPoint->key, (void**)&pPoint->pResPos, &curVLen);
pPoint->pLeftRow = pPoint->pResPos->pRowBuff;
if (pFillSup->type == TSDB_FILL_LINEAR) {
pPoint->pRightRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize);
} else {
pPoint->pRightRow = NULL;
}
}
static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
int64_t groupId) {
void* pState = pAggSup->pState;
resetPrevAndNextWindow(pFillSup);
SWinKey key = {.ts = ts, .groupId = groupId};
void* curVal = NULL;
int32_t curVLen = 0;
int32_t code = pAggSup->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen);
if (code == TSDB_CODE_SUCCESS) {
pFillSup->cur.key = key.ts;
pFillSup->cur.pRowVal = curVal;
} else {
qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId);
pFillSup->cur.key = ts;
pFillSup->cur.pRowVal = NULL;
}
SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId};
void* preVal = NULL;
int32_t preVLen = 0;
code = pAggSup->stateStore.streamStateFillGetPrev(pState, &key, &preKey, &preVal, &preVLen);
if (code == TSDB_CODE_SUCCESS) {
pFillSup->prev.key = preKey.ts;
pFillSup->prev.pRowVal = preVal;
}
SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId};
void* nextVal = NULL;
int32_t nextVLen = 0;
code = pAggSup->stateStore.streamStateFillGetNext(pState, &key, &nextKey, &nextVal, &nextVLen);
if (code == TSDB_CODE_SUCCESS) {
pFillSup->next.key = nextKey.ts;
pFillSup->next.pRowVal = nextVal;
}
}
static void setTimeSliceFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) {
pFillInfo->needFill = false;
pFillInfo->pos = FILL_POS_START;
return;
}
TSKEY prevWKey = INT64_MIN;
TSKEY nextWKey = INT64_MIN;
if (hasPrevWindow(pFillSup)) {
prevWKey = pFillSup->prev.key;
}
if (hasNextWindow(pFillSup)) {
nextWKey = pFillSup->next.key;
}
pFillInfo->needFill = true;
pFillInfo->pos = FILL_POS_INVALID;
switch (pFillInfo->type) {
case TSDB_FILL_NULL:
case TSDB_FILL_NULL_F:
case TSDB_FILL_SET_VALUE:
case TSDB_FILL_SET_VALUE_F: {
if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
pFillInfo->needFill = false;
pFillInfo->pos = FILL_POS_START;
} else if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
} else {
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
}
copyNotFillExpData(pFillSup, pFillInfo);
} break;
case TSDB_FILL_PREV: {
if (hasNextWindow(pFillSup)) {
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
resetFillWindow(&pFillSup->prev);
pFillSup->prev.key = pFillSup->cur.key;
pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
} else {
ASSERT(hasPrevWindow(pFillSup));
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
pFillInfo->preRowKey = INT64_MIN;
}
pFillInfo->pResRow = &pFillSup->prev;
} break;
case TSDB_FILL_NEXT: {
if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
resetFillWindow(&pFillSup->next);
pFillSup->next.key = pFillSup->cur.key;
pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
pFillInfo->preRowKey = INT64_MIN;
} else {
ASSERT(hasNextWindow(pFillSup));
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
}
pFillInfo->pResRow = &pFillSup->next;
} break;
case TSDB_FILL_LINEAR: {
if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_MID;
pFillInfo->pLinearInfo->nextEnd = nextWKey;
calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols);
pFillInfo->pResRow = &pFillSup->prev;
calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols);
pFillInfo->pLinearInfo->hasNext = true;
} else if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols);
pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false;
} else {
ASSERT(hasNextWindow(pFillSup));
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->numOfAllCols);
pFillInfo->pResRow = &pFillSup->cur;
pFillInfo->pLinearInfo->hasNext = false;
}
} break;
default:
ASSERT(0);
break;
}
ASSERT(pFillInfo->pos != FILL_POS_INVALID);
}
static bool needAdjValue(SSlicePoint* pPoint, TSKEY ts, bool isLeft, int32_t fillType) {
switch (fillType) {
case TSDB_FILL_NULL:
case TSDB_FILL_NULL_F:
case TSDB_FILL_SET_VALUE:
case TSDB_FILL_SET_VALUE_F: {
if (HAS_NON_ROW_DATA(pPoint->pRightRow) && HAS_NON_ROW_DATA(pPoint->pLeftRow)) {
return true;
}
} break;
case TSDB_FILL_PREV: {
if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key < ts)) {
return true;
}
} break;
case TSDB_FILL_NEXT: {
if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key > ts)) {
return true;
}
} break;
case TSDB_FILL_LINEAR: {
if (isLeft && (HAS_NON_ROW_DATA(pPoint->pLeftRow) || pPoint->pLeftRow->key < ts)) {
return true;
} else if (!isLeft && (HAS_NON_ROW_DATA(pPoint->pRightRow) || pPoint->pRightRow->key > ts)) {
return true;
}
} break;
default:
ASSERT(0);
}
return false;
}
static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
SExprSupp* pExprSup = &pOperator->exprSupp;
int32_t numOfOutput = pExprSup->numOfExprs;
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* tsCols = (int64_t*)pColDataInfo->pData;
void* pPkVal = NULL;
int32_t pkLen = 0;
int64_t groupId = pBlock->info.id.groupId;
SColumnInfoData* pPkColDataInfo = NULL;
SStreamFillSupporter* pFillSup = pInfo->pFillSup;
SStreamFillInfo* pFillInfo = pInfo->pFillInfo;
if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
pPkColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->basic.primaryPkIndex);
}
pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
if (pFillSup->winRange.ekey <= 0) {
pFillSup->winRange.ekey = INT64_MIN;
}
int32_t startPos = 0;
for (; startPos < pBlock->info.rows; startPos++) {
if (hasSrcPrimaryKeyCol(&pInfo->basic) && pInfo->ignoreExpiredData) {
pPkVal = colDataGetData(pPkColDataInfo, startPos);
pkLen = colDataGetRowLength(pPkColDataInfo, startPos);
}
if (pInfo->ignoreExpiredData && checkExpiredData(&pAggSup->stateStore, pAggSup->pUpdateInfo, &pInfo->twAggSup,
pBlock->info.id.uid, tsCols[startPos], pPkVal, pkLen)) {
qDebug("===stream===ignore expired data, window end ts:%" PRId64 ", maxts - wartermak:%" PRId64, tsCols[startPos],
pInfo->twAggSup.maxTs - pInfo->twAggSup.waterMark);
continue;
}
if (checkNullRow(pExprSup, pBlock, startPos, pInfo->ignoreNull)) {
continue;
}
}
if (startPos >= pBlock->info.rows) {
return;
}
SResultRowInfo dumyInfo = {0};
dumyInfo.cur.pageId = -1;
STimeWindow curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC);
SSlicePoint point = {.key.ts = curWin.skey, .key.groupId = groupId};
getPointRowDataFromState(pAggSup, pFillSup, &point);
if (needAdjValue(&point, tsCols[startPos], true, pFillSup->type)) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], point.pLeftRow);
saveWinResult(&point.key, point.pResPos, pInfo->pUpdatedMap);
}
while (startPos < pBlock->info.rows) {
int32_t numOfWin = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
startPos += numOfWin;
int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull);
startPos = getQualifiedRowNumAsc(pExprSup, pBlock, startPos, pInfo->ignoreNull);
if (startPos < 0) {
break;
}
curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC);
getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId);
bool left = needAdjValue(&point, tsCols[leftRowId], true, pFillSup->type);
if (left) {
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], point.pLeftRow);
}
bool right = needAdjValue(&point, tsCols[startPos], false, pFillSup->type);
if (right) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], point.pRightRow);
}
if (left || right) {
saveWinResult(&point.key, point.pResPos, pInfo->pUpdatedMap);
}
}
}
void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup,
SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
return;
}
// clear the existed group id
pBlock->info.id.groupId = 0;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
SRowBuffPos* pPos = *(SRowBuffPos**)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
// todo(liuyao) fill 增加接口get buff from pos设置pFillSup->cur
SWinKey* pKey = (SWinKey*)pPos->pKey;
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pKey->groupId;
} else if (pBlock->info.id.groupId != pKey->groupId) {
pGroupResInfo->index--;
break;
}
getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId);
setTimeSliceFillInfo(pFillSup, pFillInfo, pKey->ts);
doStreamFillRange(pFillSup, pFillInfo, pBlock);
}
}
static SSDataBlock* buildTimeSliceResult(SOperatorInfo* pOperator) {
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
uint16_t opType = pOperator->operatorType;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
doBuildDeleteResultImpl(&pAggSup->stateStore, pAggSup->pState, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
if (pInfo->pRes->info.rows != 0) {
printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
return NULL;
}
static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) {
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
if (pOperator->status == OP_RES_TO_RETURN) {
if (hasRemainCalc(pInfo->pFillInfo) ||
(pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
blockDataCleanup(pInfo->pRes);
doStreamFillRange(pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
}
SSDataBlock* resBlock = buildTimeSliceResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->recvCkBlock) {
pInfo->recvCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes;
}
setStreamOperatorCompleted(pOperator);
return NULL;
}
SSDataBlock* fillResult = NULL;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, getStreamOpName(pOperator->operatorType),
pInfo->numOfDatapack);
pInfo->numOfDatapack = 0;
break;
}
pInfo->numOfDatapack++;
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
switch (pBlock->info.type) {
case STREAM_DELETE_RESULT: {
// todo(liuyao) add
} break;
case STREAM_NORMAL:
case STREAM_INVALID: {
SExprSupp* pExprSup = &pInfo->scalarSup;
if (pExprSup->pExprInfo != NULL) {
projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
}
} break;
case STREAM_CHECKPOINT: {
pInfo->recvCkBlock = true;
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
doStreamTimeSliceSaveCheckpoint(pOperator);
pInfo->recvCkBlock = true;
copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue;
} break;
case STREAM_CREATE_CHILD_TABLE: {
return pBlock;
} break;
default:
ASSERTS(false, "invalid SSDataBlock type");
}
doStreamTimeSliceImpl(pOperator, pBlock);
}
if (!pInfo->destHasPrimaryKey) {
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
} else {
copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
}
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
taosArrayPush(pInfo->pUpdated, pIte);
}
taosArraySort(pInfo->pUpdated, winPosCmprImpl);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
return buildTimeSliceResult(pOperator);
}
int32_t doStreamTimeSliceEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) {
// todo(liuyao) add
return 0;
}
void* doStreamTimeSliceDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
// todo(liuyao) add
return NULL;
}
static SStreamFillSupporter* initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprInfo* pExprInfo,
int32_t numOfExprs) {
SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
if (!pFillSup) {
return NULL;
}
pFillSup->numOfFillCols = numOfExprs;
int32_t numOfNotFillCols = 0;
pFillSup->pAllColInfo = createFillColInfo(pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols,
(const SNodeListNode*)(pPhyFillNode->pFillValues));
pFillSup->type = convertFillType(pPhyFillNode->fillMode);
pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
pFillSup->interval.interval = pPhyFillNode->interval;
// todo(liuyao) 初始化 pFillSup->interval其他属性
pFillSup->pAPI = NULL;
int32_t code = initResultBuf(pFillSup);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamFillSupporter(pFillSup);
return NULL;
}
pFillSup->pResMap = NULL;
pFillSup->hasDelete = false;
return pFillSup;
}
SOperatorInfo* createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) {
SStreamTimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTimeSliceOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pOperator == NULL || pInfo == NULL) {
goto _error;
}
SStreamInterpFuncPhysiNode* pInterpPhyNode = (SStreamInterpFuncPhysiNode*)pPhyNode;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(&pOperator->resultInfo, 4096);
SExprSupp* pExpSup = &pOperator->exprSupp;
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs);
int32_t code = initExprSupp(pExpSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (pInterpPhyNode->pExprs != NULL) {
int32_t num = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pInterpPhyNode->streamOption.watermark,
.calTrigger = pInterpPhyNode->streamOption.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMarkFromOption(&pInterpPhyNode->streamOption),
};
pInfo->primaryTsIndex = ((SColumnNode*)pInterpPhyNode->pTimeSeries)->slotId;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState,
sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
pInfo->ignoreExpiredData = pInterpPhyNode->streamOption.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL;
pInfo->pUpdatedMap = NULL;
pInfo->historyPoints = taosArrayInit(4, sizeof(SWinKey));
if (!pInfo->historyPoints) {
goto _error;
}
pInfo->recvCkBlock = false;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->destHasPrimaryKey = pInterpPhyNode->streamOption.destHasPrimaryKey;
pInfo->numOfDatapack = 0;
pInfo->pFillSup = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
true, OP_NOT_OPENED, pInfo, pTaskInfo);
// for stream
void* buff = NULL;
int32_t len = 0;
int32_t res = pTaskInfo->storageAPI.stateStore.streamStateGetInfo(
pTaskInfo->streamInfo.pState, STREAM_TIME_SLICE_OP_CHECKPOINT_NAME, strlen(STREAM_TIME_SLICE_OP_CHECKPOINT_NAME),
&buff, &len);
if (res == TSDB_CODE_SUCCESS) {
doStreamTimeSliceDecodeOpState(buff, len, pOperator);
taosMemoryFree(buff);
}
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamTimeSlice, NULL, destroyStreamTimeSliceOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState);
if (downstream) {
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ) {
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->igCheckUpdate = true;
}
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup,
&pInfo->basic);
code = appendDownstream(pOperator, &downstream, 1);
}
return pOperator;
_error:
if (pInfo != NULL) {
destroyStreamTimeSliceOperatorInfo(pInfo);
}
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
return NULL;
}

View File

@ -161,7 +161,7 @@ int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
return tSimpleHashPut(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
}
static int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
int32_t saveWinResult(SWinKey* pKey, SRowBuffPos* pPos, SSHashObj* pUpdatedMap) {
return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), &pPos, POINTER_BYTES);
}
@ -189,7 +189,7 @@ static int32_t compareWinKey(void* pKey, void* data, int32_t index) {
return winKeyCmprImpl(pKey, pDataPos);
}
static void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) {
void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins) {
taosArraySort(pDelWins, winKeyCmprImpl);
taosArrayRemoveDuplicate(pDelWins, winKeyCmprImpl, NULL);
int32_t delSize = taosArrayGetSize(pDelWins);
@ -364,6 +364,10 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index,
SSDataBlock* pBlock) {
doBuildDeleteResultImpl(&pInfo->stateStore, pInfo->pState, pWins, index, pBlock);
}
void doBuildDeleteResultImpl(SStateStore* pAPI, SStreamState* pState, SArray* pWins, int32_t* index, SSDataBlock* pBlock) {
blockDataCleanup(pBlock);
int32_t size = taosArrayGetSize(pWins);
if (*index == size) {
@ -376,7 +380,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
for (int32_t i = *index; i < size; i++) {
SWinKey* pWin = taosArrayGet(pWins, i);
void* tbname = NULL;
pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname, false);
pAPI->streamStateGetParName(pState, pWin->groupId, &tbname, false);
if (tbname == NULL) {
appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
} else {
@ -384,7 +388,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
}
pInfo->stateStore.streamStateFreeVal(tbname);
pAPI->streamStateFreeVal(tbname);
(*index)++;
}
}
@ -994,7 +998,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
}
}
static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) {
int winPosCmprImpl(const void* pKey1, const void* pKey2) {
SRowBuffPos* pos1 = *(SRowBuffPos**)pKey1;
SRowBuffPos* pos2 = *(SRowBuffPos**)pKey2;
SWinKey* pWin1 = (SWinKey*)pos1->pKey;
@ -1224,7 +1228,7 @@ void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) {
}
}
static void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) {
void copyIntervalDeleteKey(SSHashObj* pMap, SArray* pWins) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pMap, pIte, &iter)) != NULL) {
@ -1468,6 +1472,14 @@ int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) {
return deleteMark;
}
int64_t getDeleteMarkFromOption(SStreamOption* pOption) {
if (pOption->deleteMark <= 0) {
return DEAULT_DELETE_MARK;
}
int64_t deleteMark = TMAX(pOption->deleteMark, pOption->watermark);
return deleteMark;
}
static TSKEY compareTs(void* pKey) {
SWinKey* pWinKey = (SWinKey*)pKey;
return pWinKey->ts;
@ -1633,7 +1645,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->clearState = false;
pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey));
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey;
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey;
pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
@ -3080,7 +3092,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->clearState = false;
pInfo->recvGetAll = false;
pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimayKey;
pInfo->destHasPrimaryKey = pSessionNode->window.destHasPrimaryKey;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
@ -3999,7 +4011,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvGetAll = false;
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimayKey;
pInfo->destHasPrimaryKey = pStateNode->window.destHasPrimaryKey;
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
@ -4272,7 +4284,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimayKey;
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey;
// for stream
void* buff = NULL;
@ -4599,6 +4611,6 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
}
void setStreamOperatorCompleted(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
setOperatorCompleted(pOperator);
}

View File

@ -168,12 +168,12 @@ static FORCE_INLINE int32_t timeSliceEnsureBlockCapacity(STimeSliceOperatorInfo*
return TSDB_CODE_SUCCESS;
}
static bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) {
bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) {
char *name = pExprInfo->pExpr->_function.functionName;
return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts") == 0);
}
static bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
char *name = pExprInfo->pExpr->_function.functionName;
return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
}
@ -233,7 +233,7 @@ static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
return (functionType == FUNCTION_TYPE_GROUP_KEY);
}
static bool getIgoreNullRes(SExprSupp* pExprSup) {
bool getIgoreNullRes(SExprSupp* pExprSup) {
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
@ -250,7 +250,7 @@ static bool getIgoreNullRes(SExprSupp* pExprSup) {
return false;
}
static bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull) {
bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull) {
if (!ignoreNull) {
return false;
}

View File

@ -2893,7 +2893,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "interp",
.type = FUNCTION_TYPE_INTERP,
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
.translateFunc = translateInterp,
.getEnvFunc = getSelectivityFuncEnv,
.initFunc = functionSetup,

View File

@ -762,7 +762,7 @@ static int32_t physiWindowCopy(const SWindowPhysiNode* pSrc, SWindowPhysiNode* p
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(destHasPrimayKey);
COPY_SCALAR_FIELD(destHasPrimaryKey);
return TSDB_CODE_SUCCESS;
}

View File

@ -413,6 +413,8 @@ const char* nodesNodeName(ENodeType type) {
return "PhysiIndefRowsFunc";
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
return "PhysiInterpFunc";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
return "PhysiStreamInterpFunc";
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return "PhysiDispatch";
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
@ -2740,7 +2742,7 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddBoolToObject(pJson, jkWindowPhysiPlanMergeDataBlock, pNode->mergeDataBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanDestHasPrimaryKey, pNode->destHasPrimayKey);
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanDestHasPrimaryKey, pNode->destHasPrimaryKey);
}
return code;
@ -2778,7 +2780,7 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
code = tjsonGetBoolValue(pJson, jkWindowPhysiPlanMergeDataBlock, &pNode->mergeDataBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanDestHasPrimaryKey, &pNode->destHasPrimayKey);
code = tjsonGetTinyIntValue(pJson, jkWindowPhysiPlanDestHasPrimaryKey, &pNode->destHasPrimaryKey);
}
return code;
@ -5619,6 +5621,7 @@ static const char* jkSelectStmtLimit = "Limit";
static const char* jkSelectStmtSlimit = "Slimit";
static const char* jkSelectStmtStmtName = "StmtName";
static const char* jkSelectStmtHasAggFuncs = "HasAggFuncs";
static const char* jkSelectStmtInterpFuncs = "HasInterpFuncs";
static int32_t selectStmtToJson(const void* pObj, SJson* pJson) {
const SSelectStmt* pNode = (const SSelectStmt*)pObj;
@ -5666,6 +5669,9 @@ static int32_t selectStmtToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSelectStmtHasAggFuncs, pNode->hasAggFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSelectStmtInterpFuncs, pNode->hasInterpFunc);
}
return code;
}
@ -5716,6 +5722,9 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSelectStmtHasAggFuncs, &pNode->hasAggFuncs);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSelectStmtInterpFuncs, &pNode->hasInterpFunc);
}
return code;
}
@ -7793,6 +7802,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
return physiIndefRowsFuncNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
return physiInterpFuncNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return physiDispatchNodeToJson(pObj, pJson);
@ -8151,6 +8161,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC:
return jsonToPhysiIndefRowsFuncNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
return jsonToPhysiInterpFuncNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return jsonToPhysiDispatchNode(pJson, pObj);

View File

@ -3183,7 +3183,7 @@ static int32_t physiWindowNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = tlvEncodeBool(pEncoder, PHY_WINDOW_CODE_MERGE_DATA_BLOCK, pNode->mergeDataBlock);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_DEST_HAS_PRIMARY_KEY, pNode->destHasPrimayKey);
code = tlvEncodeI8(pEncoder, PHY_WINDOW_CODE_DEST_HAS_PRIMARY_KEY, pNode->destHasPrimaryKey);
}
return code;
@ -3227,7 +3227,7 @@ static int32_t msgToPhysiWindowNode(STlvDecoder* pDecoder, void* pObj) {
code = tlvDecodeBool(pTlv, &pNode->mergeDataBlock);
break;
case PHY_WINDOW_CODE_DEST_HAS_PRIMARY_KEY:
code = tlvDecodeI8(pTlv, &pNode->destHasPrimayKey);
code = tlvDecodeI8(pTlv, &pNode->destHasPrimaryKey);
break;
default:
break;
@ -4539,6 +4539,7 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = physiIndefRowsFuncNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
code = physiInterpFuncNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
@ -4701,6 +4702,7 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
code = msgToPhysiIndefRowsFuncNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
code = msgToPhysiInterpFuncNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:

View File

@ -723,6 +723,8 @@ SNode* nodesMakeNode(ENodeType type) {
return makeNode(type, sizeof(SSubplan));
case QUERY_NODE_PHYSICAL_PLAN:
return makeNode(type, sizeof(SQueryPlan));
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
return makeNode(type, sizeof(SStreamInterpFuncPhysiNode));
default:
break;
}
@ -1652,7 +1654,8 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pPhyNode->pFuncs);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC: {
case QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC: {
SInterpFuncPhysiNode* pPhyNode = (SInterpFuncPhysiNode*)pNode;
destroyPhysiNode((SPhysiNode*)pPhyNode);
nodesDestroyList(pPhyNode->pExprs);

View File

@ -5496,8 +5496,15 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (pSelect->pRange != NULL && QUERY_NODE_OPERATOR == nodeType(pSelect->pRange) && pSelect->pEvery == NULL) {
// single point interp every can be omitted
} else {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
"Missing RANGE clause, EVERY clause or FILL clause");
if (pCxt->createStream) {
if (NULL == pSelect->pEvery || NULL == pSelect->pFill) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
"Missing EVERY clause or FILL clause");
}
} else {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
"Missing RANGE clause, EVERY clause or FILL clause");
}
}
}

View File

@ -894,6 +894,15 @@ static bool isInterpFunc(int32_t funcId) {
return fmIsInterpFunc(funcId) || fmIsInterpPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId);
}
static void initStreamOption(SLogicPlanContext* pCxt, SStreamOption* pOption) {
pOption->triggerType = pCxt->pPlanCxt->triggerType;
pOption->watermark = pCxt->pPlanCxt->watermark;
pOption->deleteMark = pCxt->pPlanCxt->deleteMark;
pOption->igExpired = pCxt->pPlanCxt->igExpired;
pOption->igCheckUpdate = pCxt->pPlanCxt->igCheckUpdate;
pOption->destHasPrimaryKey = pCxt->pPlanCxt->destHasPrimaryKey;
}
static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (!pSelect->hasInterpFunc) {
return TSDB_CODE_SUCCESS;
@ -934,6 +943,10 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
code = createColumnByRewriteExprs(pInterpFunc->pFuncs, &pInterpFunc->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
initStreamOption(pCxt, &pInterpFunc->streamOption);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pInterpFunc;
} else {

View File

@ -1684,8 +1684,9 @@ static int32_t createIndefRowsFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList*
static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren,
SInterpFuncLogicNode* pFuncLogicNode, SPhysiNode** pPhyNode) {
SInterpFuncPhysiNode* pInterpFunc =
(SInterpFuncPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pFuncLogicNode, QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC);
SInterpFuncPhysiNode* pInterpFunc = (SInterpFuncPhysiNode*)makePhysiNode(
pCxt, (SLogicNode*)pFuncLogicNode,
pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC : QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC);
if (NULL == pInterpFunc) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -1728,6 +1729,10 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
code = setConditionsSlotId(pCxt, (const SLogicNode*)pFuncLogicNode, (SPhysiNode*)pInterpFunc);
}
if (pCxt->pPlanCxt->streamQuery) {
pInterpFunc->streamOption = pFuncLogicNode->streamOption;
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pInterpFunc;
} else {
@ -1869,7 +1874,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
pWindow->deleteMark = pWindowLogicNode->deleteMark;
pWindow->igExpired = pWindowLogicNode->igExpired;
if (pCxt->pPlanCxt->streamQuery) {
pWindow->destHasPrimayKey = pCxt->pPlanCxt->destHasPrimaryKey;
pWindow->destHasPrimaryKey = pCxt->pPlanCxt->destHasPrimaryKey;
}
pWindow->mergeDataBlock = (GROUP_ACTION_KEEP == pWindowLogicNode->node.groupAction ? false : true);
pWindow->node.inputTsOrder = pWindowLogicNode->node.inputTsOrder;

View File

@ -118,6 +118,7 @@ int32_t doValidatePhysiNode(SValidatePlanContext* pCxt, SNode* pNode) {
case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN:
case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:
case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL:
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC:
break;
case QUERY_NODE_PHYSICAL_SUBPLAN:
return validateSubplanNode(pCxt, (SSubplan*)pNode);

View File

@ -160,7 +160,7 @@ int32_t streamStatePut_rocksdb(SStreamState* pState, const SWinKey* key, const v
int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key);
int32_t streamStateClear_rocksdb(SStreamState* pState);
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurNext_rocksdb(SStreamStateCur* pCur);
int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
@ -205,6 +205,7 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState);
// partag cf
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);

View File

@ -2817,7 +2817,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
return 0;
}
int32_t streamStateCurNext_rocksdb(SStreamState* pState, SStreamStateCur* pCur) {
int32_t streamStateCurNext_rocksdb(SStreamStateCur* pCur) {
if (!pCur) {
return -1;
}
@ -2982,6 +2982,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
STREAM_STATE_DEL_ROCKSDB(pState, "state", &maxStateKey);
return pCur;
}
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
stDebug("streamStateGetCur_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
@ -3480,6 +3481,12 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
streamStateFreeCur(pCur);
return NULL;
}
SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState) {
SWinKey key = {.groupId = UINT64_MAX, .ts = INT64_MAX};
return streamStateFillSeekKeyNext_rocksdb(pState, &key);
}
#ifdef BUILD_NO_CALL
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
stDebug("streamStateSessionGetKeyByRange_rocksdb");
@ -3518,7 +3525,7 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
}
if (c > 0) {
streamStateCurNext_rocksdb(pState, pCur);
streamStateCurNext_rocksdb(pCur);
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, NULL, 0);
if (code == 0 && sessionRangeKeyCmpr(key, &resKey) == 0) {
*curKey = resKey;
@ -3565,7 +3572,7 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
goto _end;
}
taosMemoryFreeClear(*pVal);
streamStateCurNext_rocksdb(pState, pCur);
streamStateCurNext_rocksdb(pCur);
} else {
*key = originKey;
streamStateFreeCur(pCur);
@ -3611,7 +3618,7 @@ int32_t streamStateSessionClear_rocksdb(SStreamState* pState) {
}
taosMemoryFreeClear(buf);
streamStateCurNext_rocksdb(pState, pCur);
streamStateCurNext_rocksdb(pCur);
}
streamStateFreeCur(pCur);
return -1;
@ -3642,7 +3649,7 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey*
goto _end;
}
streamStateCurNext_rocksdb(pState, pCur);
streamStateCurNext_rocksdb(pCur);
} else {
*key = tmpKey;
streamStateFreeCur(pCur);

View File

@ -20,12 +20,10 @@
#include "tcommon.h"
#include "tsimplehash.h"
typedef int (*__session_compare_fn_t)(const SSessionKey* pWin, const void* pDatas, int pos);
int sessionStateKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
int sessionStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
return sessionWinKeyCmpr(pWin1, pWin2);
return sessionWinKeyCmpr((SSessionKey*)pWin1, pWin2);
}
int sessionStateRangeKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
@ -595,7 +593,7 @@ int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) {
if (pCur && pCur->buffIndex >= 0) {
pCur->buffIndex++;
} else {
streamStateCurNext_rocksdb(NULL, pCur);
streamStateCurNext_rocksdb(pCur);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -0,0 +1,164 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tstreamFileState.h"
#include "query.h"
#include "streamBackendRocksdb.h"
#include "tcommon.h"
#include "tsimplehash.h"
#define NUM_OF_FLUSED_WIN 64
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
SWinKey* pWin2 = taosArrayGet(pDatas, pos);
return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
}
int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen) {
int32_t code = getRowBuff(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen);
SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, sizeof(SWinKey));
tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
}
if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
TSKEY ts = getFlushMark(pFileState);
SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX};
void* pState = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start);
int32_t code = TSDB_CODE_SUCCESS;
for(int32_t i = 0; i < NUM_OF_FLUSED_WIN && code == TSDB_CODE_SUCCESS; i++) {
SWinKey tmp = {.groupId = pKey->groupId};
code = streamStateGetGroupKVByCur_rocksdb(pCur, &tmp, NULL, 0);
if (code != TSDB_CODE_SUCCESS) {
break;
}
taosArrayPush(pWinStates, &tmp);
code = streamStateCurPrev_rocksdb(pCur);
}
taosArraySort(pWinStates, winKeyCmprImpl);
streamStateFreeCur(pCur);
}
int32_t size = taosArrayGetSize(pWinStates);
if (!isFlushedState(pFileState, pKey->ts, 0)) {
// find the first position which is smaller than the pKey
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index == -1) {
index = 0;
}
taosArrayInsert(pWinStates, index, pKey);
}
return code;
}
int32_t hashSortFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
void* pState = getStateFileStore(pFileState);
return streamStateFillGet_rocksdb(pState, pKey, data, pDataLen);
}
int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
void* pState = getStateFileStore(pFileState);
return streamStateFillDel_rocksdb(pState, pKey);
}
void clearSearchBuff(SStreamFileState* pFileState) {
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
if (!pSearchBuff) {
return;
}
void* pIte = NULL;
int32_t iter = 0;
void* pBuff = getRowStateBuff(pFileState);
while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
SArray* pWinStates = *((void**)pIte);
int32_t size = taosArrayGetSize(pWinStates);
if (size > 0) {
TSKEY ts = getFlushMark(pFileState);
SWinKey key = *(SWinKey*)taosArrayGet(pWinStates, 0);
key.ts = ts;
int32_t num = binarySearch(pWinStates, size, &key, fillStateKeyCompare);
if (size > NUM_OF_FLUSED_WIN) {
num = TMIN(num, size - NUM_OF_FLUSED_WIN);
taosArrayRemoveBatch(pWinStates, 0, num, NULL);
}
}
}
}
int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) {
SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void* pState = getStateFileStore(pFileState);
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey);
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void **)pVal, pVLen);
streamStateFreeCur(pCur);
return code;
}
int32_t size = taosArrayGetSize(pWinStates);
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index == -1) {
SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey);
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void **)pVal, pVLen);
streamStateFreeCur(pCur);
return code;
} else {
if (index == size - 1) {
return TSDB_CODE_FAILED;
}
SWinKey* pNext = taosArrayGet(pWinStates, index + 1);
*pResKey = *pNext;
return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen);
}
return TSDB_CODE_FAILED;
}
int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) {
SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void* pState = getStateFileStore(pFileState);
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen);
streamStateFreeCur(pCur);
return code;
}
int32_t size = taosArrayGetSize(pWinStates);
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index == -1 || index == 0) {
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen);
streamStateFreeCur(pCur);
return code;
} else {
SWinKey* pNext = taosArrayGet(pWinStates, index - 1);
*pResKey = *pNext;
return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen);
}
return TSDB_CODE_FAILED;
}

View File

@ -359,12 +359,23 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void*
// todo refactor
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) {
#ifdef USE_ROCKSDB
if (pState->pFileState) {
return getHashSortRowBuff(pState->pFileState, key, pVal, pVLen);
}
return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
#else
return tdbTbGet(pState->pTdbState->pFillStateDb, key, sizeof(SWinKey), pVal, pVLen);
#endif
}
int32_t streamStateFillGetNext(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) {
return getHashSortNextRow(pState->pFileState, pKey, pResKey, pVal, pVLen);
}
int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen) {
return getHashSortPrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen);
}
// todo refactor
int32_t streamStateFillDel(SStreamState* pState, const SWinKey* key) {
#ifdef USE_ROCKSDB

View File

@ -27,6 +27,8 @@
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
#define MIN_NUM_OF_ROW_BUFF 10240
#define MIN_NUM_OF_RECOVER_ROW_BUFF 128
#define MIN_NUM_SEARCH_BUCKET 128
#define MAX_ARRAY_SIZE 1024
#define TASK_KEY "streamFileState"
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
@ -49,6 +51,7 @@ struct SStreamFileState {
GetTsFun getTs;
char* id;
char* cfName;
void* searchBuff;
_state_buff_cleanup_fn stateBuffCleanupFn;
_state_buff_remove_fn stateBuffRemoveFn;
@ -91,7 +94,7 @@ int32_t intervalFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateDel_rocksdb(pFileState->pFileStore, pKey);
}
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) {
int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
return streamStateGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
}
@ -107,7 +110,7 @@ int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
}
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, int32_t* pDataLen) {
int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen) {
return streamStateSessionGet_rocksdb(pFileState->pFileStore, pKey, data, pDataLen);
}
@ -160,7 +163,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->stateFileClearFn = streamStateClear_rocksdb;
pFileState->cfName = taosStrdup("state");
pFileState->stateFunctionGetFn = getRowBuff;
} else {
} else if (type == STREAM_STATE_BUFF_SORT) {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = sessionWinStateCleanup;
pFileState->stateBuffRemoveFn = deleteSessionWinStateBuffFn;
@ -172,6 +175,19 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
pFileState->stateFileClearFn = streamStateSessionClear_rocksdb;
pFileState->cfName = taosStrdup("sess");
pFileState->stateFunctionGetFn = getSessionRowBuff;
} else if (type == STREAM_STATE_BUFF_HASH_SORT) {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
pFileState->stateFileGetFn = hashSortFileGetFn;
pFileState->stateFileClearFn = NULL;
pFileState->cfName = taosStrdup("fill");
pFileState->stateFunctionGetFn = NULL;
}
if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowStateBuff) {
@ -194,8 +210,10 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
// todo(liuyao) optimize
if (type == STREAM_STATE_BUFF_HASH) {
recoverSnapshot(pFileState, checkpointId);
} else {
} else if (type == STREAM_STATE_BUFF_SORT) {
recoverSesssion(pFileState, checkpointId);
} else if (type == STREAM_STATE_BUFF_HASH_SORT) {
recoverFillSnapshot(pFileState, checkpointId);
}
void* valBuf = NULL;
@ -361,6 +379,11 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
}
}
if (pFileState->searchBuff) {
clearFlushedRowBuff(pFileState, pFlushList, pFileState->curRowCount);
}
flushSnapshot(pFileState, pFlushList, false);
SListIter fIter = {0};
@ -461,7 +484,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
int32_t len = 0;
void* p = NULL;
code = streamStateGet_rocksdb(pFileState->pFileStore, pKey, &p, &len);
code = pFileState->stateFileGetFn(pFileState->pFileStore, pKey, &p, &len);
qDebug("===stream===get %" PRId64 " from disc, res %d", ts, code);
if (code == TSDB_CODE_SUCCESS) {
memcpy(pNewPos->pRowBuff, p, len);
@ -597,6 +620,8 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
streamStateClearBatch(batch);
clearSearchBuff(pFileState);
int64_t elapsed = taosGetTimestampMs() - st;
qDebug("%s flush to disk in batch model completed, rows:%d, batch size:%d, elapsed time:%" PRId64 "ms",
pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
@ -755,6 +780,7 @@ void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts) {
}
void* getRowStateBuff(SStreamFileState* pFileState) { return pFileState->rowStateBuff; }
void* getSearchBuff(SStreamFileState* pFileState) { return pFileState->searchBuff; }
void* getStateFileStore(SStreamFileState* pFileState) { return pFileState->pFileStore; }
@ -764,8 +790,56 @@ bool isDeteled(SStreamFileState* pFileState, TSKEY ts) {
bool isFlushedState(SStreamFileState* pFileState, TSKEY ts, TSKEY gap) { return ts <= (pFileState->flushMark + gap); }
TSKEY getFlushMark(SStreamFileState* pFileState) {return pFileState->flushMark;};
int32_t getRowStateRowSize(SStreamFileState* pFileState) { return pFileState->rowSize; }
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) {
return pFileState->stateFunctionGetFn(pFileState, pKey, keyLen, pVal, pVLen);
}
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS;
if (pFileState->maxTs != INT64_MIN) {
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
? INT64_MIN
: pFileState->maxTs - pFileState->deleteMark;
deleteExpiredCheckPoint(pFileState, mark);
}
SStreamStateCur* pCur = streamStateFillSeekToLast_rocksdb(pFileState->pFileStore);
if (pCur == NULL) {
return -1;
}
int32_t recoverNum = TMIN(MIN_NUM_OF_RECOVER_ROW_BUFF, pFileState->maxRowCount);
while (code == TSDB_CODE_SUCCESS) {
if (pFileState->curRowCount >= recoverNum) {
break;
}
void* pVal = NULL;
int32_t vlen = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
code = streamStateFillGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
if (code != TSDB_CODE_SUCCESS || isFlushedState(pFileState, pFileState->getTs(pNewPos->pKey), 0) ) {
destroyRowBuffPos(pNewPos);
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
taosMemoryFreeClear(pNode);
taosMemoryFreeClear(pVal);
break;
}
ASSERT(vlen == pFileState->rowSize);
memcpy(pNewPos->pRowBuff, pVal, vlen);
taosMemoryFreeClear(pVal);
pNewPos->beFlushed = true;
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
destroyRowBuffPos(pNewPos);
break;
}
code = streamStateCurPrev_rocksdb(pCur);
}
streamStateFreeCur(pCur);
return TSDB_CODE_SUCCESS;
}