stream twa

This commit is contained in:
54liuyao 2024-10-16 12:16:59 +08:00
parent 944457d7a6
commit 3e49f40c74
23 changed files with 1359 additions and 69 deletions

View File

@ -459,6 +459,7 @@ typedef enum ENodeType {
QUERY_NODE_PHYSICAL_PLAN_STREAM_COUNT,
QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL,
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL,
} ENodeType;
typedef struct {

View File

@ -42,6 +42,7 @@ extern "C" {
#define STREAM_STATE_BUFF_HASH 1
#define STREAM_STATE_BUFF_SORT 2
#define STREAM_STATE_BUFF_HASH_SORT 3
#define STREAM_STATE_BUFF_HASH_SEARCH 4
typedef struct SMeta SMeta;
typedef TSKEY (*GetTsFun)(void*);
@ -341,6 +342,8 @@ typedef struct SStateStore {
void (*streamStateReleaseBuf)(SStreamState* pState, void* pVal, bool used);
void (*streamStateClearBuff)(SStreamState* pState, void* pVal);
void (*streamStateFreeVal)(void* val);
int32_t (*streamStateGetPrev)(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal,
int32_t* pVLen, int32_t* pWinCode);
int32_t (*streamStatePut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t (*streamStateGet)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode);
@ -372,9 +375,11 @@ typedef struct SStateStore {
SStreamStateCur* (*streamStateFillSeekKeyPrev)(SStreamState* pState, const SWinKey* key);
void (*streamStateFreeCur)(SStreamStateCur* pCur);
int32_t (*streamStateGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateFillGetGroupKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t (*streamStateGetKVByCur)(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
void (*streamStateClearExpiredState)(SStreamState* pState);
int32_t (*streamStateSessionAddIfNotExist)(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal,
int32_t* pVLen, int32_t* pWinCode);
int32_t (*streamStateSessionPut)(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen);

View File

@ -448,6 +448,7 @@ typedef struct SSelectStmt {
bool hasCountFunc;
bool hasUdaf;
bool hasStateKey;
bool hasTwaOrElapsedFunc;
bool onlyHasKeepOrderFunc;
bool groupSort;
bool tagScan;

View File

@ -49,6 +49,8 @@ void streamStateClear(SStreamState* pState);
void streamStateSetNumber(SStreamState* pState, int32_t number, int32_t tsIdex);
void streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, void* pVal, int32_t vLen);
int32_t streamStateGetInfo(SStreamState* pState, void* pKey, int32_t keyLen, void** pVal, int32_t* pLen);
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode);
// session window
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, TSKEY gap, void** pVal, int32_t* pVLen,
@ -102,9 +104,12 @@ SStreamStateCur* streamStateFillSeekKeyPrev(SStreamState* pState, const SWinKey*
void streamStateFreeCur(SStreamStateCur* pCur);
void streamStateResetCur(SStreamStateCur* pCur);
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
// twa
void streamStateClearExpiredState(SStreamState* pState);
void streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
void streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);

View File

@ -142,6 +142,12 @@ int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, v
void streamFileStateGroupCurNext(SStreamStateCur* pCur);
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen);
SSHashObj* getGroupIdCache(SStreamFileState* pFileState);
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos);
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
int32_t* pVLen, int32_t* pWinCode);
//twa
void clearExpiredState(SStreamFileState* pFileState);
#ifdef __cplusplus
}

View File

@ -46,6 +46,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateSaveInfo = streamStateSaveInfo;
pStore->streamStateGetInfo = streamStateGetInfo;
pStore->streamStateSetNumber = streamStateSetNumber;
pStore->streamStateGetPrev = streamStateGetPrev;
pStore->streamStateFillPut = streamStateFillPut;
pStore->streamStateFillGet = streamStateFillGet;
@ -63,9 +64,11 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillSeekKeyPrev = streamStateFillSeekKeyPrev;
pStore->streamStateFreeCur = streamStateFreeCur;
pStore->streamStateGetGroupKVByCur = streamStateGetGroupKVByCur;
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
pStore->streamStateSessionPut = streamStateSessionPut;
pStore->streamStateSessionGet = streamStateSessionGet;
@ -78,11 +81,6 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCountGetKeyByRange = streamStateCountGetKeyByRange;
pStore->streamStateSessionAllocWinBuffByNextPosition = streamStateSessionAllocWinBuffByNextPosition;
// void initStreamStateAPI(SStorageAPI* pAPI) {
// initStateStoreAPI(&pAPI->stateStore);
// initFunctionStateStore(&pAPI->functionStore);
// }
pStore->updateInfoInit = updateInfoInit;
pStore->updateInfoFillBlockData = updateInfoFillBlockData;
pStore->updateInfoIsUpdated = updateInfoIsUpdated;

View File

@ -162,6 +162,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateSaveInfo = streamStateSaveInfo;
pStore->streamStateGetInfo = streamStateGetInfo;
pStore->streamStateSetNumber = streamStateSetNumber;
pStore->streamStateGetPrev = streamStateGetPrev;
pStore->streamStateFillPut = streamStateFillPut;
pStore->streamStateFillGet = streamStateFillGet;
@ -179,9 +180,11 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateFillSeekKeyPrev = streamStateFillSeekKeyPrev;
pStore->streamStateFreeCur = streamStateFreeCur;
pStore->streamStateGetGroupKVByCur = streamStateGetGroupKVByCur;
pStore->streamStateFillGetGroupKVByCur = streamStateFillGetGroupKVByCur;
pStore->streamStateGetKVByCur = streamStateGetKVByCur;
pStore->streamStateClearExpiredState = streamStateClearExpiredState;
pStore->streamStateSessionAddIfNotExist = streamStateSessionAddIfNotExist;
pStore->streamStateSessionPut = streamStateSessionPut;
pStore->streamStateSessionGet = streamStateSessionGet;

View File

@ -536,6 +536,7 @@ typedef struct SStreamScanInfo {
SSDataBlock* pCheckpointRes;
int8_t pkColType;
int32_t pkColLen;
bool useGetResultRange;
} SStreamScanInfo;
typedef struct {
@ -818,6 +819,10 @@ typedef struct SStreamFillOperatorInfo {
int32_t primaryTsCol;
int32_t primarySrcSlotId;
SStreamFillInfo* pFillInfo;
SStreamAggSupporter* pStreamAggSup;
SArray* pCloseTs;
SArray* pUpdated;
SGroupResInfo groupResInfo;
} SStreamFillOperatorInfo;
typedef struct SStreamTimeSliceOperatorInfo {
@ -850,8 +855,32 @@ typedef struct SStreamTimeSliceOperatorInfo {
SGroupResInfo groupResInfo;
bool ignoreNull;
bool isHistoryOp;
struct SOperatorInfo* pOperator;
} SStreamTimeSliceOperatorInfo;
typedef struct SStreamIntervalSliceOperatorInfo {
SSteamOpBasicInfo basic;
SOptrBasicInfo binfo;
STimeWindowAggSupp twAggSup;
SStreamAggSupporter streamAggSup;
SExprSupp scalarSup;
SInterval interval;
bool recvCkBlock;
SSDataBlock* pCheckpointRes;
int32_t primaryTsIndex;
SSHashObj* pUpdatedMap; // SWinKey
SArray* pUpdated; // SWinKey
SSHashObj* pDeletedMap;
SArray* pDelWins;
SSDataBlock* pDelRes;
int32_t delIndex;
bool destHasPrimaryKey;
int64_t endTs;
SGroupResInfo groupResInfo;
struct SOperatorInfo* pOperator;
bool hasFill;
} SStreamIntervalSliceOperatorInfo;
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)

View File

@ -30,6 +30,16 @@ extern "C" {
#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN)
#define HAS_ROW_DATA(pRowData) (pRowData && pRowData->key != INT64_MIN)
#define IS_INVALID_WIN_KEY(ts) ((ts) == INT64_MIN)
#define IS_VALID_WIN_KEY(ts) ((ts) != INT64_MIN)
#define SET_WIN_KEY_INVALID(ts) ((ts) = INT64_MIN)
#define IS_NORMAL_INTERVAL_OP(op) \
((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || \
(op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_CONTINUE_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL)
typedef struct SSliceRowData {
TSKEY key;
char pRowVal[];
@ -73,9 +83,23 @@ int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId,
void resetStreamFillSup(SStreamFillSupporter* pFillSup);
void setPointBuff(SSlicePoint* pPoint, SStreamFillSupporter* pFillSup);
int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap);
int winPosCmprImpl(const void* pKey1, const void* pKey2);
void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index);
int32_t getDownstreamRes(struct SOperatorInfo* downstream, SSDataBlock** ppRes, SColumnInfo** ppPkCol);
void destroyFlusedppPos(void* ppRes);
void doBuildStreamIntervalResult(struct SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock,
SGroupResInfo* pGroupResInfo);
void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal,
int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol);
int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId, bool ignoreNull);
int32_t createStreamIntervalSliceOperatorInfo(struct SOperatorInfo* downstream, SPhysiNode* pPhyNode,
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
struct SOperatorInfo** ppOptInfo);
#ifdef __cplusplus
}

View File

@ -25,6 +25,7 @@
#include "tdatablock.h"
#include "tmsg.h"
#include "ttime.h"
#include "operator.h"
#include "query.h"
@ -3397,9 +3398,7 @@ static bool isStreamWindow(SStreamScanInfo* pInfo) {
isTimeSlice(pInfo);
}
static int32_t copyGetResultBlock(SSDataBlock* dest, const SSDataBlock* src) {
TSKEY start = src->info.window.skey;
TSKEY end = src->info.window.ekey;
static int32_t copyGetResultBlock(SSDataBlock* dest, TSKEY start, TSKEY end) {
int32_t code = blockDataEnsureCapacity(dest, 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -3635,7 +3634,11 @@ FETCH_NEXT_BLOCK:
case STREAM_GET_RESULT: {
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
pInfo->updateResIndex = 0;
code = copyGetResultBlock(pInfo->pUpdateRes, pBlock);
TSKEY endKey = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval);
if (pInfo->useGetResultRange == true) {
endKey = pBlock->info.window.ekey;
}
code = copyGetResultBlock(pInfo->pUpdateRes, pBlock->info.window.skey, endKey);
QUERY_CHECK_CODE(code, lino, _end);
pInfo->pUpdateInfo->maxDataVersion = -1;
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL);
@ -4445,6 +4448,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn;
pInfo->pFillSup = NULL;
pInfo->useGetResultRange = false;
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);

View File

@ -86,6 +86,9 @@ void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end)
}
void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
if (pFillSup == NULL) {
return;
}
pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
tSimpleHashCleanup(pFillSup->pResMap);
pFillSup->pResMap = NULL;
@ -139,6 +142,10 @@ static void destroyStreamFillOperatorInfo(void* param) {
pInfo->pDelRes = NULL;
taosArrayDestroy(pInfo->matchInfo.pList);
pInfo->matchInfo.pList = NULL;
taosArrayDestroy(pInfo->pUpdated);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->pCloseTs);
taosMemoryFree(pInfo);
}
@ -183,7 +190,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId};
void* preVal = NULL;
int32_t preVLen = 0;
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
if (code == TSDB_CODE_SUCCESS) {
pFillSup->prev.key = preKey.ts;
@ -202,7 +209,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId};
void* nextVal = NULL;
int32_t nextVLen = 0;
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
if (code == TSDB_CODE_SUCCESS) {
pFillSup->next.key = nextKey.ts;
pFillSup->next.pRowVal = nextVal;
@ -211,7 +218,7 @@ void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId,
SWinKey nextNextKey = {.groupId = groupId};
void* nextNextVal = NULL;
int32_t nextNextVLen = 0;
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen);
code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen);
if (code == TSDB_CODE_SUCCESS) {
pFillSup->nextNext.key = nextNextKey.ts;
pFillSup->nextNext.pRowVal = nextNextVal;
@ -859,7 +866,7 @@ static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_
qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts,
groupId);
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &key, (const void**)&val, &len);
code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &key, (const void**)&val, &len);
pAPI->stateStore.streamStateFreeCur(pCur);
qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code);
}
@ -937,7 +944,7 @@ static int32_t doDeleteFillResult(SOperatorInfo* pOperator) {
SWinKey delKey = {.groupId = delGroupId, .ts = delTs};
if (delTs == nextKey.ts) {
pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
winCode = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
// ts will be deleted later
if (delTs != ts) {
pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &delKey);
@ -1148,6 +1155,251 @@ _end:
return code;
}
void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup,
SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
void* pState = pOperator->pTaskInfo->streamInfo.pState;
bool res = false;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = pKey->groupId;
} else if (pBlock->info.id.groupId != pKey->groupId) {
break;
}
void* val = NULL;
int32_t len = 0;
int32_t winCode = pAPI->stateStore.streamStateFillGet(pOperator->pTaskInfo->streamInfo.pState, pKey, (void**)&val, &len, NULL);
qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode);
if (winCode == TSDB_CODE_SUCCESS) {
pFillSup->cur.key = pKey->ts;
pFillSup->cur.pRowVal = val;
buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
resetFillWindow(&pFillSup->cur);
} else {
SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey);
SWinKey preKey = {.ts = INT64_MIN, .groupId = pKey->groupId};
void* preVal = NULL;
int32_t preVLen = 0;
winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
if (winCode == TSDB_CODE_SUCCESS) {
pFillSup->cur.key = pKey->ts;
pFillSup->cur.pRowVal = preVal;
if (pFillInfo->type == TSDB_FILL_PREV) {
code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
QUERY_CHECK_CODE(code, lino, _end);
} else {
copyNotFillExpData(pFillSup, pFillInfo);
pFillInfo->pResRow->key = pKey->ts;
code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res);
QUERY_CHECK_CODE(code, lino, _end);
}
resetFillWindow(&pFillSup->cur);
}
pAPI->stateStore.streamStateFreeCur(pCur);
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
}
void doBuildForceFillResult(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
blockDataCleanup(pBlock);
if (!hasRemainResults(pGroupResInfo)) {
return;
}
// clear the existed group id
pBlock->info.id.groupId = 0;
doBuildForceFillResultImpl(pOperator, pFillSup, pFillInfo, pBlock, pGroupResInfo);
}
static int32_t buildForceFillResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamFillOperatorInfo* pInfo = pOperator->info;
uint16_t opType = pOperator->operatorType;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
doBuildForceFillResult(pOperator, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
if (pInfo->pRes->info.rows != 0) {
printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pRes;
goto _end;
}
(*ppRes) = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
// force window close impl
static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamFillSupporter* pFillSup = pInfo->pFillSup;
SStreamFillInfo* pFillInfo = pInfo->pFillInfo;
SSDataBlock* pBlock = pInfo->pSrcBlock;
uint64_t groupId = pBlock->info.id.groupId;
SSDataBlock* pRes = pInfo->pRes;
SStreamAggSupporter* pAggSup = pInfo->pStreamAggSup;
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
for (int32_t i = 0; i < pBlock->info.rows; i++){
code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
QUERY_CHECK_CODE(code, lino, _end);
}
code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
return code;
}
static int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int64_t groupId = 0;
SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState);
while (1) {
int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
if (winCode != TSDB_CODE_SUCCESS) {
break;
}
SWinKey key = {.ts = ts, .groupId = groupId};
void* pPushRes = taosArrayPush(pUpdated, &key);
QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
pAggSup->stateStore.streamStateGroupCurNext(pCur);
}
pAggSup->stateStore.streamStateFreeCur(pCur);
pCur = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
pAggSup->stateStore.streamStateFreeCur(pCur);
pCur = NULL;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
// force window close
static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamFillOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
return code;
}
if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* resBlock = NULL;
code = buildForceFillResult(pOperator, &resBlock);
QUERY_CHECK_CODE(code, lino, _end);
if (resBlock != NULL) {
(*ppRes) = resBlock;
goto _end;
}
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
goto _end;
}
SSDataBlock* fillResult = NULL;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
qDebug("===stream===return data:%s.", getStreamOpName(pOperator->operatorType));
break;
}
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_INVALID: {
code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
QUERY_CHECK_CODE(code, lino, _end);
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = -1;
} break;
case STREAM_CHECKPOINT:
case STREAM_CREATE_CHILD_TABLE: {
(*ppRes) = pBlock;
goto _end;
} break;
case STREAM_GET_RESULT: {
void* pPushRes = taosArrayPush(pInfo->pCloseTs, &pBlock->info.window.skey);
QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
continue;
}
default:
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
QUERY_CHECK_CODE(code, lino, _end);
}
doStreamForceFillImpl(pOperator);
}
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) {
TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated);
QUERY_CHECK_CODE(code, lino, _end);
}
taosArrayClear(pInfo->pCloseTs);
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->groupResInfo.freeItem = false;
pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
code = buildForceFillResult(pOperator, ppRes);
QUERY_CHECK_CODE(code, lino, _end);
if ((*ppRes) == NULL) {
pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
setStreamOperatorCompleted(pOperator);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
pTaskInfo->code = code;
}
return code;
}
static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) {
int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock);
pFillSup->rowSize = sizeof(SResultCellData) * numOfCols;
@ -1370,6 +1622,50 @@ static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo*
}
}
int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval, SStreamAggSupporter** ppAggSup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (IS_NORMAL_INTERVAL_OP(downstream)) {
SStreamIntervalOperatorInfo* pInfo = downstream->info;
*triggerType = pInfo->twAggSup.calTrigger;
*pInterval = pInfo->interval;
(*ppAggSup) = NULL;
} else if (IS_CONTINUE_INTERVAL_OP(downstream)) {
SStreamIntervalSliceOperatorInfo* pInfo = downstream->info;
*triggerType = pInfo->twAggSup.calTrigger;
*pInterval = pInfo->interval;
pInfo->hasFill = true;
(*ppAggSup) = &pInfo->streamAggSup;
} else {
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
}
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t initForceFillDownStream(SOperatorInfo* downstream) {
SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (downstream == NULL) {
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
code = initForceFillDownStream(downstream->pDownstream[0]);
return code;
}
SStreamScanInfo* pInfo = (SStreamScanInfo*) downstream->info;
pInfo->useGetResultRange = true;
return code;
}
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
@ -1383,7 +1679,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
QUERY_CHECK_CODE(code, lino, _error);
}
SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval;
int32_t numOfFillCols = 0;
SExprInfo* pFillExprInfo = NULL;
@ -1396,7 +1691,12 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
int8_t triggerType = 0;
SInterval interval = {0};
code = getDownStreamInfo(downstream, &triggerType, &interval, &pInfo->pStreamAggSup);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
pInfo->pSrcBlock);
if (!pInfo->pFillSup) {
code = TSDB_CODE_FAILED;
@ -1426,6 +1726,12 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY));
QUERY_CHECK_NULL(pInfo->pCloseTs, code, lino, _error, terrno);
pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
@ -1440,8 +1746,17 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
pInfo->srcRowIndex = -1;
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
code = initForceFillDownStream(downstream);
QUERY_CHECK_CODE(code, lino, _error);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
} else {
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
}
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
code = appendDownstream(pOperator, &downstream, 1);

View File

@ -0,0 +1,591 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executorInt.h"
#include "functionMgt.h"
#include "operator.h"
#include "querytask.h"
#include "storageapi.h"
#include "streamexecutorInt.h"
#include "tcommon.h"
#include "tdatablock.h"
#include "ttime.h"
#define STREAM_INTERVAL_SLICE_OP_CHECKPOINT_NAME "StreamIntervalSliceOperator_Checkpoint"
typedef struct SInervalSlicePoint {
SSessionKey winKey;
SSliceRowData* pLastRow;
SRowBuffPos* pResPos;
} SInervalSlicePoint;
typedef enum SIntervalSliceType {
INTERVAL_SLICE_START = 1,
INTERVAL_SLICE_END = 2,
} SIntervalSliceType;
void streamIntervalSliceReleaseState(SOperatorInfo* pOperator) {
}
void streamIntervalSliceReloadState(SOperatorInfo* pOperator) {
}
void destroyStreamIntervalSliceOperatorInfo(void* param) {
SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)param;
if (param == NULL) {
return;
}
cleanupBasicInfo(&pInfo->binfo);
if (pInfo->pOperator) {
cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
}
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
if (pInfo->pUpdatedMap != NULL) {
tSimpleHashSetFreeFp(pInfo->pUpdatedMap, destroyFlusedppPos);
tSimpleHashCleanup(pInfo->pUpdatedMap);
pInfo->pUpdatedMap = NULL;
}
destroyStreamAggSupporter(&pInfo->streamAggSup);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
cleanupExprSupp(&pInfo->scalarSup);
tSimpleHashCleanup(pInfo->pDeletedMap);
taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pCheckpointRes);
taosMemoryFreeClear(param);
}
static int32_t buildIntervalSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
uint16_t opType = pOperator->operatorType;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
doBuildDeleteResultImpl(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pState, pInfo->pDelWins, &pInfo->delIndex,
pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->pDelRes;
return code;
}
doBuildStreamIntervalResult(pOperator, pInfo->streamAggSup.pState, pInfo->binfo.pRes, &pInfo->groupResInfo);
if (pInfo->binfo.pRes->info.rows != 0) {
printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->binfo.pRes;
goto _end;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
// static void doStreamIntervalSliceSaveCheckpoint(SOperatorInfo* pOperator) {
// }
void initIntervalSlicePoint(SStreamAggSupporter* pAggSup, STimeWindow* pTWin, int64_t groupId, SInervalSlicePoint* pPoint) {
pPoint->winKey.groupId = groupId;
pPoint->winKey.win = *pTWin;
pPoint->pLastRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pAggSup->resultRowSize - pAggSup->stateKeySize);
}
static int32_t getIntervalSliceCurStateBuf(SStreamAggSupporter* pAggSup, SInterval* pInterval, STimeWindow* pTWin, int64_t groupId,
SInervalSlicePoint* pCurPoint, SInervalSlicePoint* pPrevPoint, int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SWinKey curKey = {.ts = pTWin->skey, .groupId = groupId};
int32_t curVLen = 0;
code = pAggSup->stateStore.streamStateAddIfNotExist(pAggSup->pState, &curKey, (void**)&pCurPoint->pResPos,
&curVLen, pWinCode);
QUERY_CHECK_CODE(code, lino, _end);
qDebug("===stream=== set stream twa next point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d",
curKey.ts, curKey.groupId, *pWinCode);
initIntervalSlicePoint(pAggSup, pTWin, groupId, pCurPoint);
SWinKey prevKey = {.groupId = groupId};
SET_WIN_KEY_INVALID(prevKey.ts);
int32_t prevVLen = 0;
int32_t prevWinCode = TSDB_CODE_SUCCESS;
code = pAggSup->stateStore.streamStateGetPrev(pAggSup->pState, &curKey, &prevKey,
(void**)&pPrevPoint->pResPos, &prevVLen, &prevWinCode);
QUERY_CHECK_CODE(code, lino, _end);
if (prevWinCode == TSDB_CODE_SUCCESS) {
STimeWindow prevSTW = {.skey = prevKey.ts};
prevSTW.ekey = taosTimeGetIntervalEnd(prevSTW.skey, pInterval);
initIntervalSlicePoint(pAggSup, &prevSTW, groupId, pPrevPoint);
} else {
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.skey);
SET_WIN_KEY_INVALID(pPrevPoint->winKey.win.ekey);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void doStreamSliceInterpolation(SSliceRowData* pPrevWinVal, TSKEY winKey, TSKEY curTs, SSDataBlock* pDataBlock,
int32_t curRowIndex, SExprSupp* pSup, SIntervalSliceType type) {
SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
if (!fmIsIntervalInterpoFunc(pCtx[k].functionId)) {
pCtx[k].start.key = INT64_MIN;
continue;
}
SFunctParam* pParam = &pCtx[k].param[0];
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, pParam->pCol->slotId);
double prevVal = 0, curVal = 0, winVal = 0;
SResultCellData* pCell = getSliceResultCell((SResultCellData*)pPrevWinVal->pRowVal, pParam->pCol->slotId);
GET_TYPED_DATA(prevVal, double, pCell->type, pCell->pData);
GET_TYPED_DATA(curVal, double, pColInfo->info.type, colDataGetData(pColInfo, curRowIndex));
SPoint point1 = (SPoint){.key = pPrevWinVal->key, .val = &prevVal};
SPoint point2 = (SPoint){.key = curTs, .val = &curVal};
SPoint point = (SPoint){.key = winKey, .val = &winVal};
if (!fmIsElapsedFunc(pCtx[k].functionId)) {
taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
}
if (type == INTERVAL_SLICE_START) {
pCtx[k].start.key = point.key;
pCtx[k].start.val = winVal;
} else {
pCtx[k].end.key = point.key;
pCtx[k].end.val = winVal;
}
}
}
static void resetIntervalSliceFunctionKey(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].start.key = INT64_MIN;
pCtx[k].end.key = INT64_MIN;
}
}
int32_t setIntervalSliceOutputBuf(SInervalSlicePoint* pPoint, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t* rowEntryInfoOffset) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SResultRow* res = pPoint->pResPos->pRowBuff;
// set time window for current result
res->win = pPoint->winKey.win;
code = setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t doStreamIntervalSliceAggImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock, SSHashObj* pUpdatedMap,
SSHashObj* pDeletedMap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamIntervalSliceOperatorInfo* pInfo = (SStreamIntervalSliceOperatorInfo*)pOperator->info;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t numOfOutput = pSup->numOfExprs;
TSKEY* tsCols = NULL;
int64_t groupId = pBlock->info.id.groupId;
SResultRow* pResult = NULL;
int32_t forwardRows = 0;
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
int32_t startPos = 0;
TSKEY curTs = getStartTsKey(&pBlock->info.window, tsCols);
SInervalSlicePoint curPoint = {0};
SInervalSlicePoint prevPoint = {0};
STimeWindow curWin =
getActiveTimeWindow(NULL, pResultRowInfo, curTs, &pInfo->interval, TSDB_ORDER_ASC);
while (1) {
if (curTs > pInfo->endTs) {
break;
}
int32_t winCode = TSDB_CODE_SUCCESS;
code = getIntervalSliceCurStateBuf(&pInfo->streamAggSup, &pInfo->interval, &curWin, groupId, &curPoint, &prevPoint, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && prevPoint.pLastRow->key != prevPoint.winKey.win.ekey) {
setIntervalSliceOutputBuf(&prevPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
doStreamSliceInterpolation(prevPoint.pLastRow, prevPoint.winKey.win.ekey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_END);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &prevPoint.winKey.win, 1);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
0, pBlock->info.rows, numOfOutput);
QUERY_CHECK_CODE(code, lino, _end);
SWinKey prevKey = {.ts = prevPoint.winKey.win.skey, .groupId = prevPoint.winKey.groupId};
saveWinResult(&prevKey, prevPoint.pResPos, pInfo->pUpdatedMap);
}
setIntervalSliceOutputBuf(&curPoint, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
resetIntervalSliceFunctionKey(pSup->pCtx, numOfOutput);
if (IS_VALID_WIN_KEY(prevPoint.winKey.win.skey) && curPoint.winKey.win.skey != curTs) {
doStreamSliceInterpolation(prevPoint.pLastRow, curPoint.winKey.win.skey, curTs, pBlock, startPos, &pOperator->exprSupp, INTERVAL_SLICE_START);
}
forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
TSDB_ORDER_ASC);
int32_t prevEndPos = (forwardRows - 1) + startPos;
if (winCode != TSDB_CODE_SUCCESS) {
int32_t endRowId = getQualifiedRowNumDesc(pSup, pBlock, tsCols, prevEndPos, false);
TSKEY endRowTs = tsCols[endRowId];
transBlockToSliceResultRow(pBlock, endRowId, endRowTs, curPoint.pLastRow, 0, NULL, NULL);
}
SWinKey curKey = {.ts = curPoint.winKey.win.skey, .groupId = curPoint.winKey.groupId};
if (pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS) {
code = tSimpleHashPut(pDeletedMap, &curKey, sizeof(SWinKey), NULL, 0);
QUERY_CHECK_CODE(code, lino, _end);
}
saveWinResult(&curKey, curPoint.pResPos, pInfo->pUpdatedMap);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &curPoint.winKey.win, 1);
code = applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos,
forwardRows, pBlock->info.rows, numOfOutput);
QUERY_CHECK_CODE(code, lino, _end);
startPos = getNextQualifiedWindow(&pInfo->interval, &curWin, &pBlock->info, tsCols, prevEndPos, TSDB_ORDER_ASC);
if (startPos < 0) {
break;
}
curTs = tsCols[startPos];
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
return code;
}
static int32_t doStreamIntervalSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamIntervalSliceOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
if (pOperator->status == OP_EXEC_DONE) {
(*ppRes) = NULL;
goto _end;
}
if (pOperator->status == OP_RES_TO_RETURN) {
SSDataBlock* resBlock = NULL;
code = buildIntervalSliceResult(pOperator, &resBlock);
QUERY_CHECK_CODE(code, lino, _end);
if (resBlock != NULL) {
(*ppRes) = resBlock;
return code;
}
if (pInfo->hasFill == false) {
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
}
setStreamOperatorCompleted(pOperator);
(*ppRes) = NULL;
return code;
}
SOperatorInfo* downstream = pOperator->pDownstream[0];
int32_t numOfDatapack = 0;
while (1) {
SSDataBlock* pBlock = NULL;
code = downstream->fpSet.getNextFn(downstream, &pBlock);
QUERY_CHECK_CODE(code, lino, _end);
if (pBlock == NULL) {
pOperator->status = OP_RES_TO_RETURN;
break;
}
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_INVALID: {
SExprSupp* pExprSup = &pInfo->scalarSup;
if (pExprSup->pExprInfo != NULL) {
code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL);
QUERY_CHECK_CODE(code, lino, _end);
}
} break;
case STREAM_CHECKPOINT: {
pInfo->recvCkBlock = true;
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
// doStreamIntervalSliceSaveCheckpoint(pOperator);
pInfo->recvCkBlock = true;
code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
continue;
} break;
case STREAM_CREATE_CHILD_TABLE: {
(*ppRes) = pBlock;
goto _end;
} break;
case STREAM_GET_RESULT: {
pInfo->endTs = taosTimeGetIntervalEnd(pBlock->info.window.skey, &pInfo->interval);
if (pInfo->hasFill) {
(*ppRes) = pBlock;
goto _end;
} else {
continue;
}
}
default:
ASSERTS(false, "invalid SSDataBlock type");
}
code = setInputDataBlock(&pOperator->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
QUERY_CHECK_CODE(code, lino, _end);
code = doStreamIntervalSliceAggImpl(pOperator, pBlock, pInfo->pUpdatedMap, pInfo->pDeletedMap);
QUERY_CHECK_CODE(code, lino, _end);
}
if (!pInfo->destHasPrimaryKey) {
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
}
if (pInfo->destHasPrimaryKey) {
code = copyIntervalDeleteKey(pInfo->pDeletedMap, pInfo->pDelWins);
QUERY_CHECK_CODE(code, lino, _end);
}
code = copyUpdateResult(&pInfo->pUpdatedMap, pInfo->pUpdated, winPosCmprImpl);
QUERY_CHECK_CODE(code, lino, _end);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end);
(*ppRes) = NULL;
code = buildIntervalSliceResult(pOperator, ppRes);
QUERY_CHECK_CODE(code, lino, _end);
if ((*ppRes) == NULL) {
if (pInfo->hasFill == false) {
pAggSup->stateStore.streamStateClearExpiredState(pAggSup->pState);
}
setStreamOperatorCompleted(pOperator);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t initIntervalSliceDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic,
SInterval* pInterval) {
SExecTaskInfo* pTaskInfo = downstream->pTaskInfo;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
SStreamPartitionOperatorInfo* pPartionInfo = downstream->info;
pPartionInfo->tsColIndex = tsColIndex;
pBasic->primaryPkIndex = pPartionInfo->basic.primaryPkIndex;
}
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
code =
initIntervalSliceDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic, pInterval);
return code;
}
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->igCheckUpdate = true;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState;
if (!pScanInfo->pUpdateInfo) {
code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen,
&pScanInfo->pUpdateInfo);
QUERY_CHECK_CODE(code, lino, _end);
}
pScanInfo->twAggSup = *pTwSup;
pScanInfo->interval = *pInterval;
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
if (!hasSrcPrimaryKeyCol(pBasic)) {
pBasic->primaryPkIndex = pScanInfo->basic.primaryPkIndex;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
return code;
}
int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** ppOptInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamIntervalSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalSliceOperatorInfo));
QUERY_CHECK_NULL(pInfo, code, lino, _error, terrno);
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
QUERY_CHECK_NULL(pOperator, code, lino, _error, terrno)
pInfo->pUpdated = taosArrayInit(1024, POINTER_BYTES);
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pUpdatedMap = tSimpleHashInit(1024, hashFn);
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _error, terrno);
pInfo->pDeletedMap = tSimpleHashInit(1024, hashFn);
QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
QUERY_CHECK_CODE(code, lino, _error);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno);
initBasicInfo(&pInfo->binfo, pResBlock);
code = createSpecialDataBlock(STREAM_CHECKPOINT, &pInfo->pCheckpointRes);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->recvCkBlock = false;
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(&pOperator->resultInfo, 4096);
SExprSupp* pExpSup = &pOperator->exprSupp;
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfExprs);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(pExpSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval,
.sliding = pIntervalPhyNode->sliding,
.intervalUnit = pIntervalPhyNode->intervalUnit,
.slidingUnit = pIntervalPhyNode->slidingUnit,
.offset = pIntervalPhyNode->offset,
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
pInfo->twAggSup =
(STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)};
code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
QUERY_CHECK_CODE(code, lino, _error);
if (pIntervalPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = NULL;
code = createExprInfo(pIntervalPhyNode->window.pExprs, NULL, &pScalarExprInfo, &numOfScalar);
QUERY_CHECK_CODE(code, lino, _error);
code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
QUERY_CHECK_CODE(code, lino, _error);
}
SSDataBlock* pDownRes = NULL;
SColumnInfo* pPkCol = NULL;
code = getDownstreamRes(downstream, &pDownRes, &pPkCol);
QUERY_CHECK_CODE(code, lino, _error);
int32_t keyBytes = sizeof(TSKEY);
keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock);
if (pPkCol) {
keyBytes += pPkCol->bytes;
}
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfExprs, 0, pTaskInfo->streamInfo.pState, keyBytes, 0,
&pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo),
&pTaskInfo->storageAPI, pInfo->primaryTsIndex, STREAM_STATE_BUFF_HASH_SEARCH, 1);
pInfo->destHasPrimaryKey = pIntervalPhyNode->window.destHasPrimaryKey;
pInfo->pOperator = pOperator;
pInfo->hasFill = false;
setOperatorInfo(pOperator, "StreamIntervalSliceOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_CONTINUE_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalSliceNext, NULL, destroyStreamIntervalSliceOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState);
initStreamBasicInfo(&pInfo->basic);
if (downstream) {
code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic, &pInfo->interval);
QUERY_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
QUERY_CHECK_CODE(code, lino, _error);
}
(*ppOptInfo) = pOperator;
return code;
_error:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pInfo != NULL) {
destroyStreamIntervalSliceOperatorInfo(pInfo);
}
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code;
(*ppOptInfo) = NULL;
return code;
}

View File

@ -29,8 +29,6 @@
#define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState"
#define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint"
#define IS_INVALID_WIN_KEY(ts) ((ts) == INT64_MIN)
#define SET_WIN_KEY_INVALID(ts) ((ts) = INT64_MIN)
int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) {
return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0);
@ -146,6 +144,11 @@ static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) {
void destroyStreamTimeSliceOperatorInfo(void* param) {
SStreamTimeSliceOperatorInfo* pInfo = (SStreamTimeSliceOperatorInfo*)param;
if (pInfo->pOperator) {
cleanupResultInfoInStream(pInfo->pOperator->pTaskInfo, pInfo->streamAggSup.pState, &pInfo->pOperator->exprSupp,
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
}
colDataDestroy(&pInfo->twAggSup.timeWindowData);
destroyStreamAggSupporter(&pInfo->streamAggSup);
resetPrevAndNextWindow(pInfo->pFillSup);
@ -353,7 +356,7 @@ _end:
}
}
static SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) {
SResultCellData* getSliceResultCell(SResultCellData* pRowVal, int32_t index) {
if (!pRowVal) {
return NULL;
}
@ -608,7 +611,7 @@ static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, i
return -1;
}
static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId,
int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock, TSKEY* tsCols, int32_t rowId,
bool ignoreNull) {
TSKEY ts = tsCols[rowId];
int32_t resRow = -1;
@ -1219,7 +1222,7 @@ static bool needAdjustValue(SSlicePoint* pPoint, TSKEY ts, void* pPkVal, SStream
return false;
}
static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal,
void transBlockToSliceResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SSliceRowData* pRowVal,
int32_t rowSize, void* pPkData, SColumnInfoData* pPkCol) {
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
@ -1344,7 +1347,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
}
right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type);
if (right) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo);
transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap);
@ -1363,7 +1366,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
}
left = needAdjustValue(&nextPoint, tsCols[leftRowId], pPkVal, pFillSup, true, pFillSup->type);
if (left) {
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo);
transBlockToSliceResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow, pFillSup->rowSize, pPkVal, pPkColDataInfo);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &nextPoint.key, pInfo->pUpdatedMap,
needDel, pInfo->pDeletedMap);
@ -1388,7 +1391,7 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
}
right = needAdjustValue(&curPoint, tsCols[startPos], pPkVal, pFillSup, false, pFillSup->type);
if (right) {
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo);
transBlockToSliceResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow, pFillSup->rowSize, pPkVal, pPkColDataInfo);
bool needDel = pInfo->destHasPrimaryKey && winCode == TSDB_CODE_SUCCESS;
code = saveTimeSliceWinResultInfo(pAggSup, &pInfo->twAggSup, &curPoint.key, pInfo->pUpdatedMap, needDel,
pInfo->pDeletedMap);
@ -1680,7 +1683,7 @@ _end:
return code;
}
static int32_t setAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SSHashObj* pUpdatedMap) {
int32_t setAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SSHashObj* pUpdatedMap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int64_t groupId = 0;
@ -1793,7 +1796,6 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
pInfo->recvCkBlock = true;
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
doStreamTimeSliceSaveCheckpoint(pOperator);
pInfo->recvCkBlock = true;
code = copyDataBlock(pInfo->pCheckpointRes, pBlock);
QUERY_CHECK_CODE(code, lino, _end);
continue;
@ -2075,6 +2077,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory;
}
pInfo->pOperator = pOperator;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,

View File

@ -30,9 +30,6 @@
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL)
#define IS_NORMAL_INTERVAL_OP(op) \
((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || \
(op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
#define IS_NORMAL_SESSION_OP(op) \
@ -2240,7 +2237,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize,
funResSize, sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr,
pHandle->checkpointId, stateType, &pSup->pState->pFileState);
} else if (stateType == STREAM_STATE_BUFF_HASH_SORT) {
} else if (stateType == STREAM_STATE_BUFF_HASH_SORT || stateType == STREAM_STATE_BUFF_HASH_SEARCH) {
pSup->pState->pFileState = NULL;
code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pSup->resultRowSize, funResSize,
compareTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr,
@ -5304,7 +5301,7 @@ _end:
return code;
}
int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
static int32_t createStreamSingleIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
QRY_PARAM_CHECK(pOptrInfo);
@ -5458,6 +5455,17 @@ _error:
return code;
}
int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode;
if (pIntervalPhyNode->window.triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
return createStreamIntervalSliceOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, pOptrInfo);
} else {
return createStreamSingleIntervalOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, pOptrInfo);
}
return TSDB_CODE_SUCCESS;
}
static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pUpdatedMap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;

View File

@ -3252,7 +3252,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "elapsed",
.type = FUNCTION_TYPE_ELAPSED,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC |
FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_SPECIAL_DATA_REQUIRED,
.dataRequiredFunc = statisDataRequired,
.translateFunc = translateElapsed,
@ -3508,7 +3508,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "twa",
.type = FUNCTION_TYPE_TWA,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC |
FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
.translateFunc = translateInNumOutDou,
.dataRequiredFunc = statisDataRequired,

View File

@ -5748,6 +5748,7 @@ static const char* jkSelectStmtHasAggFuncs = "HasAggFuncs";
static const char* jkSelectStmtInterpFuncs = "HasInterpFuncs";
static const char* jkSelectStmtInterpFill = "InterpFill";
static const char* jkSelectStmtInterpEvery = "InterpEvery";
static const char* jkSelectStmtTwaOrElapsedFuncs = "HasTwaOrElapsedFuncs";
static int32_t selectStmtToJson(const void* pObj, SJson* pJson) {
const SSelectStmt* pNode = (const SSelectStmt*)pObj;
@ -5798,6 +5799,9 @@ static int32_t selectStmtToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSelectStmtInterpFuncs, pNode->hasInterpFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSelectStmtTwaOrElapsedFuncs, pNode->hasTwaOrElapsedFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkSelectStmtInterpFill, nodeToJson, pNode->pFill);
}
@ -5857,6 +5861,9 @@ static int32_t jsonToSelectStmt(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSelectStmtInterpFuncs, &pNode->hasInterpFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSelectStmtTwaOrElapsedFuncs, &pNode->hasTwaOrElapsedFunc);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkSelectStmtInterpFill, &pNode->pFill);
}

View File

@ -2753,6 +2753,9 @@ static void setFuncClassification(STranslateContext* pCxt, SFunctionNode* pFunc)
pSelect->hasUniqueFunc = pSelect->hasUniqueFunc ? true : (FUNCTION_TYPE_UNIQUE == pFunc->funcType);
pSelect->hasTailFunc = pSelect->hasTailFunc ? true : (FUNCTION_TYPE_TAIL == pFunc->funcType);
pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType);
pSelect->hasTwaOrElapsedFunc = pSelect->hasTwaOrElapsedFunc ? true
: (FUNCTION_TYPE_TWA == pFunc->funcType ||
FUNCTION_TYPE_ELAPSED == pFunc->funcType);
pSelect->hasInterpPseudoColFunc =
pSelect->hasInterpPseudoColFunc ? true : fmIsInterpPseudoColumnFunc(pFunc->funcId);
pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType);
@ -10690,6 +10693,32 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
}
}
if (pSelect->hasTwaOrElapsedFunc) {
if (pStmt->pOptions->triggerType != STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Stream twa or elapsed function only support force window close");
}
if (pSelect->pWindow->type != QUERY_NODE_INTERVAL_WINDOW) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Stream twa or elapsed function only support interval");
}
if ((SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta &&
TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasTbnameFunction(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"twa or elapsed on super table must patitioned by table name");
}
}
if (pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
if (pStmt->pOptions->fillHistory) {
return generateSyntaxErrMsgExt(
&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"When trigger was force window close, Stream unsupported Fill history");
}
}
if (NULL != pSelect->pGroupByList) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported Group by");
}

View File

@ -172,6 +172,7 @@ void streamStateCurPrev_rocksdb(SStreamStateCur* pCur);
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState);
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key);
@ -210,6 +211,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
SStreamStateCur* streamStateFillSeekToLast_rocksdb(SStreamState* pState);
int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
// partag cf
int32_t streamStatePutParTag_rocksdb(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen);

View File

@ -3359,7 +3359,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
return streamStateDel_rocksdb(pState, &tmp);
}
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
int32_t streamStateFillGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
if (!pCur) {
return -1;
}
@ -3422,7 +3422,7 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key) {
SStreamStateCur* pCur = streamStateFillGetCur_rocksdb(pState, key);
if (pCur) {
int32_t code = streamStateGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
int32_t code = streamStateFillGetGroupKVByCur_rocksdb(pCur, key, NULL, 0);
if (code == 0) return pCur;
streamStateFreeCur(pCur);
}
@ -4239,6 +4239,16 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_next(pCur->iter);
}
if (rocksdb_iter_valid(pCur->iter)) {
int64_t curGroupId;
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
TAOS_UNUSED(parKeyDecode((void*)&curGroupId, keyStr));
if (curGroupId > groupId) return ;
rocksdb_iter_next(pCur->iter);
}
}
int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) {
@ -5148,3 +5158,61 @@ int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname) {
return code;
}
#endif
SStreamStateCur* streamStateSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
stDebug("streamStateSeekKeyPrev_rocksdb");
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
SStreamStateCur* pCur = createStreamStateCursor();
if (pCur == NULL) {
return NULL;
}
pCur->db = wrapper->db;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
if (!streamStateIterSeekAndValid(pCur->iter, buf, len)) {
streamStateFreeCur(pCur);
return NULL;
}
while (rocksdb_iter_valid(pCur->iter) && iterValueIsStale(pCur->iter)) {
rocksdb_iter_prev(pCur->iter);
}
if (rocksdb_iter_valid(pCur->iter)) {
SWinKey curKey;
size_t kLen = 0;
char* keyStr = (char*)rocksdb_iter_key(pCur->iter, &kLen);
TAOS_UNUSED(winKeyDecode((void*)&curKey, keyStr));
if (winKeyCmpr(key, sizeof(*key), &curKey, sizeof(curKey)) > 0) {
return pCur;
}
rocksdb_iter_prev(pCur->iter);
return pCur;
}
streamStateFreeCur(pCur);
return NULL;
}
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
if (!pCur) {
return -1;
}
uint64_t groupId = pKey->groupId;
int32_t code = streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
if (code == 0) {
if (pKey->groupId == groupId) {
return 0;
}
if (pVal != NULL) {
taosMemoryFree((void*)*pVal);
*pVal = NULL;
}
}
return -1;
}

View File

@ -330,9 +330,11 @@ int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerTyp
int64_t now = taosGetTimestampMs();
STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger);
p->pBlock->info.window = window;
p->pBlock->info.window.skey = window.skey;
p->pBlock->info.window.ekey = TMAX(now, window.ekey);
p->pBlock->info.type = STREAM_GET_RESULT;
stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64, window.skey, window.ekey);
stDebug("force_window_close trigger block generated, window range:%" PRId64 "-%" PRId64,
p->pBlock->info.window.skey, p->pBlock->info.window.ekey);
} else {
p->pBlock->info.type = STREAM_GET_ALL;
}

View File

@ -23,11 +23,6 @@
#define NUM_OF_CACHE_WIN 64
#define MAX_NUM_OF_CACHE_WIN 128
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
SWinKey* pWin2 = taosArrayGet(pDatas, pos);
return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
}
int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
@ -57,7 +52,7 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start);
for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
SWinKey tmpKey = {.groupId = pKey->groupId};
int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0);
int32_t tmpRes = streamStateFillGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0);
if (tmpRes != TSDB_CODE_SUCCESS) {
break;
}
@ -134,7 +129,7 @@ int32_t getStateFromRocksdbByCur(SStreamFileState* pFileState, SStreamStateCur*
int32_t lino = 0;
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
(*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
@ -167,7 +162,7 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey);
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
(*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS && ppVal != NULL) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
@ -188,7 +183,7 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey);
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
(*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
if (ppVal != NULL) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
@ -241,7 +236,7 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
(*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
@ -270,7 +265,7 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
(*pWinCode) = streamStateFillGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {

View File

@ -229,6 +229,7 @@ int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void*
int32_t streamStateFillGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen, int32_t* pWinCode) {
if (pState->pFileState) {
// todo(liuyao) 改这里
return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen, pWinCode);
}
return streamStateFillGet_rocksdb(pState, key, pVal, pVLen);
@ -316,8 +317,8 @@ int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const vo
return streamStateFillGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
}
int32_t streamStateGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
return streamStateGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
int32_t streamStateFillGetGroupKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
return streamStateFillGetGroupKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
}
SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const SWinKey* key) {
@ -582,3 +583,12 @@ int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void**
}
return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen);
}
void streamStateClearExpiredState(SStreamState* pState) {
clearExpiredState(pState->pFileState);
}
int32_t streamStateGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
return getRowStatePrevRow(pState->pFileState, pKey, pResKey, pVal, pVLen, pWinCode);
}

View File

@ -30,6 +30,8 @@
#define MIN_NUM_SEARCH_BUCKET 128
#define MAX_ARRAY_SIZE 1024
#define MAX_GROUP_ID_NUM 200000
#define NUM_OF_CACHE_WIN 64
#define MAX_NUM_OF_CACHE_WIN 128
#define TASK_KEY "streamFileState"
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
@ -68,6 +70,11 @@ struct SStreamFileState {
typedef SRowBuffPos SRowBuffInfo;
int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
SWinKey* pWin2 = taosArrayGet(pDatas, pos);
return winKeyCmprImpl((SWinKey*)pWin1, pWin2);
}
int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
SRowBuffPos** pos = tSimpleHashGet(pBuff, pKey, keyLen);
if (pos) {
@ -111,6 +118,17 @@ void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) {
return pStateKey;
}
void* defaultCreateStateKey(SRowBuffPos* pPos, int64_t num) {
SWinKey* pStateKey = taosMemoryCalloc(1, sizeof(SWinKey));
if (pStateKey == NULL) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
return NULL;
}
SWinKey* pWinKey = pPos->pKey;
*pStateKey = *pWinKey;
return pStateKey;
}
int32_t sessionFileRemoveFn(SStreamFileState* pFileState, const void* pKey) {
return streamStateSessionDel_rocksdb(pFileState->pFileStore, pKey);
}
@ -171,7 +189,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize,
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
int32_t cap = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
if (type == STREAM_STATE_BUFF_HASH) {
if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
pFileState->rowStateBuff = tSimpleHashInit(cap, hashFn);
pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
@ -200,7 +218,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize,
pFileState->stateBuffCleanupFn = stateHashBuffCleanupFn;
pFileState->stateBuffRemoveFn = stateHashBuffRemoveFn;
pFileState->stateBuffRemoveByPosFn = stateHashBuffRemoveByPosFn;
pFileState->stateBuffCreateStateKeyFn = intervalCreateStateKey;
pFileState->stateBuffCreateStateKeyFn = defaultCreateStateKey;
pFileState->stateFileRemoveFn = hashSortFileRemoveFn;
pFileState->stateFileGetFn = hashSortFileGetFn;
@ -213,6 +231,11 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize,
QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _end, terrno);
QUERY_CHECK_NULL(pFileState->cfName, code, lino, _end, terrno);
if (type == STREAM_STATE_BUFF_HASH_SEARCH) {
pFileState->searchBuff = tSimpleHashInit(MIN_NUM_SEARCH_BUCKET, hashFn);
QUERY_CHECK_NULL(pFileState->searchBuff, code, lino, _end, terrno);
}
pFileState->keyLen = keySize;
pFileState->rowSize = rowSize;
pFileState->selectivityRowSize = selectRowSize;
@ -230,8 +253,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize,
pFileState->pGroupIdMap = tSimpleHashInit(1024, hashFn);
QUERY_CHECK_NULL(pFileState->pGroupIdMap, code, lino, _end, terrno);
// todo(liuyao) optimize
if (type == STREAM_STATE_BUFF_HASH) {
if (type == STREAM_STATE_BUFF_HASH || type == STREAM_STATE_BUFF_HASH_SEARCH) {
code = recoverSnapshot(pFileState, checkpointId);
} else if (type == STREAM_STATE_BUFF_SORT) {
code = recoverSesssion(pFileState, checkpointId);
@ -1160,6 +1182,9 @@ void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
return;
}
int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
pCur->minGpId = TMAX(pCur->minGpId, gpId);
SSHashObj* pHash = pFileState->pGroupIdMap;
pCur->pHashData = tSimpleHashIterate(pHash, pCur->pHashData, &pCur->hashIter);
if (!pCur->pHashData) {
@ -1167,8 +1192,6 @@ void streamFileStateGroupCurNext(SStreamStateCur* pCur) {
streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur);
return;
}
int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL);
pCur->minGpId = TMIN(pCur->minGpId, gpId);
}
int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) {
@ -1183,3 +1206,164 @@ int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, voi
SSHashObj* getGroupIdCache(SStreamFileState* pFileState) {
return pFileState->pGroupIdMap;
}
void clearExpiredState(SStreamFileState* pFileState) {
SSHashObj* pSearchBuff = pFileState->searchBuff;
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pFileState->searchBuff, pIte, &iter)) != NULL) {
SArray* pWinStates = *((void**)pIte);
int32_t size = taosArrayGetSize(pWinStates);
for (int32_t i = 0; i < size - 1; i++) {
SWinKey* pKey = taosArrayGet(pWinStates, i);
int32_t code_buff = pFileState->stateBuffRemoveFn(pFileState->rowStateBuff, pKey, sizeof(SWinKey));
qTrace("%s at line %d res:%d", __func__, __LINE__, code_buff);
int32_t code_file = pFileState->stateFileRemoveFn(pFileState, pKey);
qTrace("%s at line %d res:%d", __func__, __LINE__, code_file);
streamStateFillDel_rocksdb(pFileState->pFileStore, pKey);
}
taosArrayRemoveBatch(pWinStates, 0, size - 1, NULL);
}
}
int32_t getStateSearchRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen,
int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
code = addRowBuffIfNotExist(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, pWinCode);
QUERY_CHECK_CODE(code, lino, _end);
SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
pWinStates = taosArrayInit(16, sizeof(SWinKey));
QUERY_CHECK_NULL(pWinStates, code, lino, _end, terrno);
code = tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
QUERY_CHECK_CODE(code, lino, _end);
}
// recover
if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
TSKEY ts = getFlushMark(pFileState);
SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX};
void* pState = getStateFileStore(pFileState);
SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, &start);
for (int32_t i = 0; i < NUM_OF_CACHE_WIN; i++) {
SWinKey tmpKey = {.groupId = pKey->groupId};
int32_t tmpRes = streamStateGetGroupKVByCur_rocksdb(pCur, &tmpKey, NULL, 0);
if (tmpRes != TSDB_CODE_SUCCESS) {
break;
}
void* tmp = taosArrayPush(pWinStates, &tmpKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
streamStateCurPrev_rocksdb(pCur);
}
taosArraySort(pWinStates, winKeyCmprImpl);
streamStateFreeCur(pCur);
}
int32_t size = taosArrayGetSize(pWinStates);
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (!isFlushedState(pFileState, pKey->ts, 0)|| index >= 0) {
// find the first position which is smaller than the pKey
if (index >= 0) {
SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
goto _end;
}
}
index++;
void* tmp = taosArrayInsert(pWinStates, index, pKey);
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
}
if (size >= MAX_NUM_OF_CACHE_WIN) {
int32_t num = size - NUM_OF_CACHE_WIN;
taosArrayRemoveBatch(pWinStates, 0, num, NULL);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t getRowStatePrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
int32_t* pVLen, int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void* pState = getStateFileStore(pFileState);
void** ppBuff = (void**) tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
if (ppBuff) {
pWinStates = (SArray*)(*ppBuff);
} else {
qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
taosMemoryFreeClear(tmpVal);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
}
streamStateFreeCur(pCur);
return code;
}
int32_t size = taosArrayGetSize(pWinStates);
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index >= 0) {
SWinKey* pCurKey = taosArrayGet(pWinStates, index);
if (winKeyCmprImpl(pCurKey, pKey) == 0) {
index--;
} else {
qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__);
}
}
if (index == -1) {
SStreamStateCur* pCur = streamStateSeekKeyPrev_rocksdb(pState, pKey);
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
taosMemoryFreeClear(tmpVal);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
}
streamStateFreeCur(pCur);
return code;
} else {
SWinKey* pPrevKey = taosArrayGet(pWinStates, index);
*pResKey = *pPrevKey;
return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode);
}
(*pWinCode) = TSDB_CODE_FAILED;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}