steam interp
This commit is contained in:
parent
063b492f72
commit
3ff0256145
|
@ -208,6 +208,8 @@ typedef struct SInterpFuncLogicNode {
|
||||||
SNodeList* pFuncs;
|
SNodeList* pFuncs;
|
||||||
STimeWindow timeRange;
|
STimeWindow timeRange;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
|
char intervalUnit;
|
||||||
|
int8_t precision;
|
||||||
EFillMode fillMode;
|
EFillMode fillMode;
|
||||||
SNode* pFillValues; // SNodeListNode
|
SNode* pFillValues; // SNodeListNode
|
||||||
SNode* pTimeSeries; // SColumnNode
|
SNode* pTimeSeries; // SColumnNode
|
||||||
|
@ -511,7 +513,8 @@ typedef struct SInterpFuncPhysiNode {
|
||||||
SNodeList* pFuncs;
|
SNodeList* pFuncs;
|
||||||
STimeWindow timeRange;
|
STimeWindow timeRange;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int8_t intervalUnit;
|
char intervalUnit;
|
||||||
|
int8_t precision;
|
||||||
EFillMode fillMode;
|
EFillMode fillMode;
|
||||||
SNode* pFillValues; // SNodeListNode
|
SNode* pFillValues; // SNodeListNode
|
||||||
SNode* pTimeSeries; // SColumnNode
|
SNode* pTimeSeries; // SColumnNode
|
||||||
|
|
|
@ -123,7 +123,7 @@ int32_t getSessionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyL
|
||||||
int32_t* pWinCode);
|
int32_t* pWinCode);
|
||||||
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
int32_t getFunctionRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen);
|
||||||
|
|
||||||
// fill
|
// time slice
|
||||||
int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen);
|
int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen);
|
||||||
int32_t hashSortFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen);
|
int32_t hashSortFileGetFn(SStreamFileState* pFileState, void* pKey, void** data, int32_t* pDataLen);
|
||||||
int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey);
|
int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey);
|
||||||
|
@ -133,6 +133,7 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
|
||||||
int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal,
|
int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal,
|
||||||
int32_t* pVLen);
|
int32_t* pVLen);
|
||||||
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId);
|
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId);
|
||||||
|
void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -828,6 +828,7 @@ typedef struct SStreamTimeSliceOperatorInfo {
|
||||||
bool destHasPrimaryKey;
|
bool destHasPrimaryKey;
|
||||||
SArray* historyPoints;
|
SArray* historyPoints;
|
||||||
SArray* pUpdated; // SWinKey
|
SArray* pUpdated; // SWinKey
|
||||||
|
SArray* historyWins;
|
||||||
SSHashObj* pUpdatedMap;
|
SSHashObj* pUpdatedMap;
|
||||||
int32_t delIndex;
|
int32_t delIndex;
|
||||||
SArray* pDelWins; // SWinKey
|
SArray* pDelWins; // SWinKey
|
||||||
|
@ -835,6 +836,7 @@ typedef struct SStreamTimeSliceOperatorInfo {
|
||||||
uint64_t numOfDatapack;
|
uint64_t numOfDatapack;
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
bool ignoreNull;
|
bool ignoreNull;
|
||||||
|
bool isHistoryOp;
|
||||||
} SStreamTimeSliceOperatorInfo;
|
} SStreamTimeSliceOperatorInfo;
|
||||||
|
|
||||||
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
|
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
|
||||||
|
@ -952,56 +954,56 @@ void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultR
|
||||||
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
|
void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset);
|
||||||
void doClearBufferedBlocks(SStreamScanInfo* pInfo);
|
void doClearBufferedBlocks(SStreamScanInfo* pInfo);
|
||||||
|
|
||||||
void streamOpReleaseState(struct SOperatorInfo* pOperator);
|
void streamOpReleaseState(struct SOperatorInfo* pOperator);
|
||||||
void streamOpReloadState(struct SOperatorInfo* pOperator);
|
void streamOpReloadState(struct SOperatorInfo* pOperator);
|
||||||
void destroyStreamAggSupporter(SStreamAggSupporter* pSup);
|
void destroyStreamAggSupporter(SStreamAggSupporter* pSup);
|
||||||
void clearGroupResInfo(SGroupResInfo* pGroupResInfo);
|
void clearGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||||
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SFunctionStateStore* pStore);
|
SSDataBlock* pResultBlock, SFunctionStateStore* pStore);
|
||||||
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
|
||||||
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
|
||||||
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||||
SStorageAPI* pApi, int32_t tsIndex);
|
SStorageAPI* pApi, int32_t tsIndex);
|
||||||
int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
|
int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type,
|
||||||
int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic);
|
int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic);
|
||||||
int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
int32_t getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
||||||
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||||
void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
|
void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
|
||||||
int32_t deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
|
int32_t deleteSessionWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pMapUpdate,
|
||||||
SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd);
|
SSHashObj* pMapDelete, SSHashObj* pPkDelete, bool needAdd);
|
||||||
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
|
int32_t getAllSessionWindow(SSHashObj* pHashMap, SSHashObj* pStUpdated);
|
||||||
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed);
|
int32_t closeSessionWindow(SSHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SSHashObj* pClosed);
|
||||||
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar);
|
int32_t copyUpdateResult(SSHashObj** ppWinUpdated, SArray* pUpdated, __compar_fn_t compar);
|
||||||
int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2);
|
int32_t sessionKeyCompareAsc(const void* pKey1, const void* pKey2);
|
||||||
void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins);
|
void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins);
|
||||||
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
|
int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
|
||||||
int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
|
int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
|
||||||
struct SOperatorInfo* pOperator, int64_t winDelta);
|
struct SOperatorInfo* pOperator, int64_t winDelta);
|
||||||
void setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo);
|
void setSessionWinOutputInfo(SSHashObj* pStUpdated, SResultWindowInfo* pWinInfo);
|
||||||
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo);
|
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo);
|
||||||
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated);
|
int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated);
|
||||||
int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key);
|
int32_t saveDeleteRes(SSHashObj* pStDelete, SSessionKey key);
|
||||||
void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey);
|
void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey);
|
||||||
void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite);
|
void doBuildDeleteDataBlock(struct SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite);
|
||||||
void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
|
void doBuildSessionResult(struct SOperatorInfo* pOperator, void* pState, SGroupResInfo* pGroupResInfo,
|
||||||
SSDataBlock* pBlock);
|
SSDataBlock* pBlock);
|
||||||
int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo);
|
int32_t getSessionWindowInfoByKey(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SResultWindowInfo* pWinInfo);
|
||||||
void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
|
void getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pStUpdated, SResultWindowInfo* pCurWin,
|
||||||
SResultWindowInfo* pNextWin);
|
SResultWindowInfo* pNextWin);
|
||||||
int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup,
|
int32_t compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeWindowAggSupp* pTwAggSup,
|
||||||
SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
|
SExecTaskInfo* pTaskInfo, SResultWindowInfo* pCurWin, SResultWindowInfo* pNextWin,
|
||||||
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap);
|
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap);
|
||||||
void releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
|
void releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
|
||||||
void resetWinRange(STimeWindow* winRange);
|
void resetWinRange(STimeWindow* winRange);
|
||||||
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId,
|
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId,
|
||||||
TSKEY ts, void* pPkVal, int32_t len);
|
TSKEY ts, void* pPkVal, int32_t len);
|
||||||
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
|
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
|
||||||
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
|
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
|
||||||
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
|
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
|
||||||
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
|
void reloadAggSupFromDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup);
|
||||||
void destroyFlusedPos(void* pRes);
|
void destroyFlusedPos(void* pRes);
|
||||||
bool isIrowtsPseudoColumn(SExprInfo* pExprInfo);
|
bool isIrowtsPseudoColumn(SExprInfo* pExprInfo);
|
||||||
bool isIsfilledPseudoColumn(SExprInfo* pExprInfo);
|
bool isIsfilledPseudoColumn(SExprInfo* pExprInfo);
|
||||||
|
|
||||||
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
|
int32_t encodeSSessionKey(void** buf, SSessionKey* key);
|
||||||
void* decodeSSessionKey(void* buf, SSessionKey* key);
|
void* decodeSSessionKey(void* buf, SSessionKey* key);
|
||||||
|
|
|
@ -2065,9 +2065,117 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock, EStreamType mode) {
|
void getTimeSliceWinRange(SStreamAggSupporter* pAggSup, SInterval* pInterval, TSKEY start, TSKEY end, int64_t groupId,
|
||||||
// todo(liuyao) add code 获取delete range的左邻居和右邻居,作为range
|
STimeWindow* pScanRange, STimeWindow* pDelRange) {
|
||||||
return TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SResultRowInfo dumyInfo = {0};
|
||||||
|
dumyInfo.cur.pageId = -1;
|
||||||
|
STimeWindow sWin = getActiveTimeWindow(NULL, &dumyInfo, start, pInterval, TSDB_ORDER_ASC);
|
||||||
|
SWinKey startKey = {.groupId = groupId, .ts = sWin.skey};
|
||||||
|
pDelRange->skey = sWin.skey;
|
||||||
|
|
||||||
|
sWin = getActiveTimeWindow(NULL, &dumyInfo, end, pInterval, TSDB_ORDER_ASC);
|
||||||
|
SWinKey endKey = {.groupId = groupId, .ts = sWin.ekey};
|
||||||
|
pDelRange->ekey = sWin.ekey;
|
||||||
|
|
||||||
|
SWinKey preKey = {.groupId = groupId};
|
||||||
|
code = pAggSup->stateStore.streamStateFillGetPrev(pAggSup->pState, &startKey, &preKey, NULL, NULL);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
pScanRange->skey = preKey.ts;
|
||||||
|
} else {
|
||||||
|
pScanRange->skey = startKey.ts;
|
||||||
|
}
|
||||||
|
|
||||||
|
SWinKey nextKey = {.groupId = groupId};
|
||||||
|
code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &endKey, &nextKey, NULL, NULL);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
pScanRange->ekey = nextKey.ts;
|
||||||
|
} else {
|
||||||
|
pScanRange->ekey = endKey.ts;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t generateTimeSliceScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock,
|
||||||
|
EStreamType mode) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
blockDataCleanup(pDestBlock);
|
||||||
|
if (pSrcBlock->info.rows == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
|
||||||
|
SColumnInfoData* pStartTsCol = taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
|
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||||
|
SColumnInfoData* pEndTsCol = taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||||
|
SColumnInfoData* pUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
|
uint64_t* uidCol = (uint64_t*)pUidCol->pData;
|
||||||
|
SColumnInfoData* pGpCol = taosArrayGet(pSrcBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
|
uint64_t* pSrcGp = (uint64_t*)pGpCol->pData;
|
||||||
|
SColumnInfoData* pSrcPkCol = NULL;
|
||||||
|
if (taosArrayGetSize(pSrcBlock->pDataBlock) > PRIMARY_KEY_COLUMN_INDEX) {
|
||||||
|
pSrcPkCol = taosArrayGet(pSrcBlock->pDataBlock, PRIMARY_KEY_COLUMN_INDEX);
|
||||||
|
}
|
||||||
|
int64_t ver = pSrcBlock->info.version - 1;
|
||||||
|
|
||||||
|
if (pInfo->partitionSup.needCalc &&
|
||||||
|
(startData[0] != endData[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA))) {
|
||||||
|
code = getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
startData = (TSKEY*)pStartTsCol->pData;
|
||||||
|
endData = (TSKEY*)pEndTsCol->pData;
|
||||||
|
uidCol = (uint64_t*)pUidCol->pData;
|
||||||
|
pSrcGp = (uint64_t*)pGpCol->pData;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
|
SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
|
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||||
|
uint64_t groupId = pSrcGp[i];
|
||||||
|
if (groupId == 0) {
|
||||||
|
void* pVal = NULL;
|
||||||
|
if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) {
|
||||||
|
pVal = colDataGetData(pSrcPkCol, i);
|
||||||
|
}
|
||||||
|
groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal);
|
||||||
|
}
|
||||||
|
|
||||||
|
STimeWindow scanRange = {0};
|
||||||
|
STimeWindow delRange = {0};
|
||||||
|
ASSERT(mode == STREAM_DELETE_RESULT || mode == STREAM_DELETE_DATA);
|
||||||
|
getTimeSliceWinRange(pInfo->windowSup.pStreamAggSup, &pInfo->interval, startData[i], endData[i], groupId, &scanRange, &delRange);
|
||||||
|
|
||||||
|
code = colDataSetVal(pDestStartCol, i, (const char*)&scanRange.skey, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
code = colDataSetVal(pDestEndCol, i, (const char*)&scanRange.ekey, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
colDataSetNULL(pDestUidCol, i);
|
||||||
|
code = colDataSetVal(pDestGpCol, i, (const char*)&groupId, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
code = colDataSetVal(pDestCalStartTsCol, i, (const char*)&delRange.skey, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
code = colDataSetVal(pDestCalEndTsCol, i, (const char*)&delRange.ekey, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
pDestBlock->info.rows++;
|
||||||
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock,
|
static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock,
|
||||||
|
@ -3033,7 +3141,8 @@ static bool hasScanRange(SStreamScanInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isStreamWindow(SStreamScanInfo* pInfo) {
|
static bool isStreamWindow(SStreamScanInfo* pInfo) {
|
||||||
return isIntervalWindow(pInfo) || isSessionWindow(pInfo) || isStateWindow(pInfo) || isCountWindow(pInfo);
|
return isIntervalWindow(pInfo) || isSessionWindow(pInfo) || isStateWindow(pInfo) || isCountWindow(pInfo) ||
|
||||||
|
isTimeSlice(pInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
static int32_t doStreamScanNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
@ -4596,7 +4705,7 @@ int32_t createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* p
|
||||||
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx);
|
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//TODO wjm check pInfo->filterCtx.code
|
// TODO wjm check pInfo->filterCtx.code
|
||||||
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry;
|
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry;
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "querytask.h"
|
#include "querytask.h"
|
||||||
#include "storageapi.h"
|
#include "storageapi.h"
|
||||||
#include "streamexecutorInt.h"
|
#include "streamexecutorInt.h"
|
||||||
|
#include "tchecksum.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
@ -28,7 +29,7 @@
|
||||||
|
|
||||||
#define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState"
|
#define STREAM_TIME_SLICE_OP_STATE_NAME "StreamTimeSliceHistoryState"
|
||||||
#define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint"
|
#define STREAM_TIME_SLICE_OP_CHECKPOINT_NAME "StreamTimeSliceOperator_Checkpoint"
|
||||||
#define HAS_NON_ROW_DATA(pRowData) (pRowData->key == INT64_MIN)
|
#define HAS_NON_ROW_DATA(pRowData) (pRowData == NULL || pRowData->key == INT64_MIN)
|
||||||
|
|
||||||
typedef struct SSlicePoint {
|
typedef struct SSlicePoint {
|
||||||
SWinKey key;
|
SWinKey key;
|
||||||
|
@ -37,12 +38,91 @@ typedef struct SSlicePoint {
|
||||||
SRowBuffPos* pResPos;
|
SRowBuffPos* pResPos;
|
||||||
} SSlicePoint;
|
} SSlicePoint;
|
||||||
|
|
||||||
|
int32_t saveTimeSliceWinResult(SWinKey* pKey, SSHashObj* pUpdatedMap) {
|
||||||
|
return tSimpleHashPut(pUpdatedMap, pKey, sizeof(SWinKey), NULL, 0);
|
||||||
|
}
|
||||||
|
|
||||||
void streamTimeSliceReleaseState(SOperatorInfo* pOperator) {
|
void streamTimeSliceReleaseState(SOperatorInfo* pOperator) {
|
||||||
// todo(liuyao) add
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SWinKey);
|
||||||
|
int32_t resSize = winSize + sizeof(TSKEY);
|
||||||
|
char* pBuff = taosMemoryCalloc(1, resSize);
|
||||||
|
QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
|
||||||
|
|
||||||
|
memcpy(pBuff, pInfo->historyWins->pData, winSize);
|
||||||
|
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
|
||||||
|
qDebug("===stream=== time slice operator relase state. save result count:%d",
|
||||||
|
(int32_t)taosArrayGetSize(pInfo->historyWins));
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_TIME_SLICE_OP_STATE_NAME,
|
||||||
|
strlen(STREAM_TIME_SLICE_OP_STATE_NAME), pBuff, resSize);
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
|
||||||
|
taosMemoryFreeClear(pBuff);
|
||||||
|
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
if (downstream->fpSet.releaseStreamStateFn) {
|
||||||
|
downstream->fpSet.releaseStreamStateFn(downstream);
|
||||||
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
|
void streamTimeSliceReloadState(SOperatorInfo* pOperator) {
|
||||||
// todo(liuyao) add
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
resetWinRange(&pAggSup->winRange);
|
||||||
|
|
||||||
|
int32_t size = 0;
|
||||||
|
void* pBuf = NULL;
|
||||||
|
code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_TIME_SLICE_OP_STATE_NAME,
|
||||||
|
strlen(STREAM_TIME_SLICE_OP_STATE_NAME), &pBuf, &size);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
int32_t num = (size - sizeof(TSKEY)) / sizeof(SWinKey);
|
||||||
|
qDebug("===stream=== time slice operator reload state. get result count:%d", num);
|
||||||
|
SWinKey* pKeyBuf = (SWinKey*)pBuf;
|
||||||
|
ASSERT(size == num * sizeof(SWinKey) + sizeof(TSKEY));
|
||||||
|
|
||||||
|
TSKEY ts = *(TSKEY*)((char*)pBuf + size - sizeof(TSKEY));
|
||||||
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts);
|
||||||
|
pAggSup->stateStore.streamStateReloadInfo(pAggSup->pState, ts);
|
||||||
|
|
||||||
|
if (!pInfo->pUpdatedMap && num > 0) {
|
||||||
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
pInfo->pUpdatedMap = tSimpleHashInit(64, hashFn);
|
||||||
|
QUERY_CHECK_NULL(pInfo->pUpdatedMap, code, lino, _end, terrno);
|
||||||
|
}
|
||||||
|
if (!pInfo->pDeletedMap && num > 0) {
|
||||||
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
pInfo->pDeletedMap = tSimpleHashInit(64, hashFn);
|
||||||
|
QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _end, terrno);
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < num; i++) {
|
||||||
|
SWinKey* pKey = pKeyBuf + i;
|
||||||
|
qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pKey->ts, pKey->groupId,
|
||||||
|
i);
|
||||||
|
code = saveTimeSliceWinResult(pKey, pInfo->pUpdatedMap);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
}
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
|
}
|
||||||
|
reloadAggSupFromDownStream(downstream, &pInfo->streamAggSup);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyStreamTimeSliceOperatorInfo(void* param) {
|
void destroyStreamTimeSliceOperatorInfo(void* param) {
|
||||||
|
@ -75,13 +155,165 @@ void destroyStreamTimeSliceOperatorInfo(void* param) {
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doStreamTimeSliceSaveCheckpoint(SOperatorInfo* pOperator) {
|
int32_t doStreamTimeSliceEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, int32_t* pLen) {
|
||||||
// todo(liuyao) add
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
|
||||||
|
if (!pInfo) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* pData = (buf == NULL) ? NULL : *buf;
|
||||||
|
|
||||||
|
// 1.streamAggSup.pResultRows
|
||||||
|
int32_t tlen = 0;
|
||||||
|
int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows);
|
||||||
|
tlen += taosEncodeFixedI32(buf, mapSize);
|
||||||
|
void* pIte = NULL;
|
||||||
|
size_t keyLen = 0;
|
||||||
|
int32_t iter = 0;
|
||||||
|
while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) {
|
||||||
|
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
||||||
|
tlen += encodeSSessionKey(buf, key);
|
||||||
|
tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
// 3.checksum
|
||||||
|
if (buf) {
|
||||||
|
uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t));
|
||||||
|
tlen += taosEncodeFixedU32(buf, cksum);
|
||||||
|
} else {
|
||||||
|
tlen += sizeof(uint32_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pLen) = tlen;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock) {
|
int32_t doStreamTimeSliceDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
if (!pInfo) {
|
||||||
|
code = TSDB_CODE_FAILED;
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
}
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
|
||||||
|
// 3.checksum
|
||||||
|
int32_t dataLen = len - sizeof(uint32_t);
|
||||||
|
void* pCksum = POINTER_SHIFT(buf, dataLen);
|
||||||
|
if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("stream event state is invalid");
|
||||||
|
code = TSDB_CODE_FAILED;
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1.streamAggSup.pResultRows
|
||||||
|
int32_t mapSize = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &mapSize);
|
||||||
|
for (int32_t i = 0; i < mapSize; i++) {
|
||||||
|
SResultWindowInfo winfo = {0};
|
||||||
|
buf = decodeSSessionKey(buf, &winfo.sessionWin);
|
||||||
|
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||||
|
code = pAggSup->stateStore.streamStateSessionAddIfNotExist(
|
||||||
|
pAggSup->pState, &winfo.sessionWin, pAggSup->gap, (void**)&winfo.pStatePos, &pAggSup->resultRowSize, &winCode);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
|
||||||
|
code = tSimpleHashPut(pInfo->streamAggSup.pResultRows, &winfo.sessionWin, sizeof(SSessionKey), &winfo,
|
||||||
|
sizeof(SResultWindowInfo));
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.twAggSup
|
||||||
|
buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup);
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprInfo* pExprInfo, int32_t numOfExprs,
|
||||||
|
SStreamFillSupporter** ppResFillSup) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
|
||||||
|
QUERY_CHECK_NULL(pFillSup, code, lino, _end, terrno);
|
||||||
|
|
||||||
|
pFillSup->numOfFillCols = numOfExprs;
|
||||||
|
int32_t numOfNotFillCols = 0;
|
||||||
|
pFillSup->pAllColInfo = createFillColInfo(pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols,
|
||||||
|
(const SNodeListNode*)(pPhyFillNode->pFillValues));
|
||||||
|
QUERY_CHECK_NULL(pFillSup->pAllColInfo, code, lino, _end, terrno);
|
||||||
|
|
||||||
|
pFillSup->type = convertFillType(pPhyFillNode->fillMode);
|
||||||
|
pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
|
||||||
|
pFillSup->interval.interval = pPhyFillNode->interval;
|
||||||
|
pFillSup->interval.intervalUnit = pPhyFillNode->intervalUnit;
|
||||||
|
pFillSup->interval.offset = 0;
|
||||||
|
pFillSup->interval.offsetUnit = pPhyFillNode->intervalUnit;
|
||||||
|
pFillSup->interval.precision = pPhyFillNode->precision;
|
||||||
|
pFillSup->interval.sliding = pPhyFillNode->interval;
|
||||||
|
pFillSup->interval.slidingUnit = pPhyFillNode->intervalUnit;
|
||||||
|
pFillSup->pAPI = NULL;
|
||||||
|
|
||||||
|
code = initResultBuf(pFillSup);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
pFillSup->pResMap = NULL;
|
||||||
|
pFillSup->hasDelete = false;
|
||||||
|
(*ppResFillSup) = pFillSup;
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
destroyStreamFillSupporter(pFillSup);
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doStreamTimeSliceSaveCheckpoint(SOperatorInfo* pOperator) {
|
||||||
|
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
void* buf = NULL;
|
||||||
|
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
|
||||||
|
int32_t len = 0;
|
||||||
|
code = doStreamTimeSliceEncodeOpState(NULL, 0, pOperator, &len);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
buf = taosMemoryCalloc(1, len);
|
||||||
|
QUERY_CHECK_NULL(buf, code, lino, _end, terrno);
|
||||||
|
|
||||||
|
void* pBuf = buf;
|
||||||
|
code = doStreamTimeSliceEncodeOpState(&pBuf, len, pOperator, &len);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_TIME_SLICE_OP_CHECKPOINT_NAME,
|
||||||
|
strlen(STREAM_TIME_SLICE_OP_CHECKPOINT_NAME), buf, len);
|
||||||
|
saveStreamOperatorStateComplete(&pInfo->basic);
|
||||||
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
taosMemoryFreeClear(buf);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pResRow, TSKEY ts, SSDataBlock* pBlock,
|
||||||
|
bool* pRes) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
if (pBlock->info.rows >= pBlock->info.capacity) {
|
if (pBlock->info.rows >= pBlock->info.capacity) {
|
||||||
return false;
|
(*pRes) = false;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
|
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
|
||||||
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
|
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
|
||||||
|
@ -89,33 +321,53 @@ static bool fillPointResult(SStreamFillSupporter* pFillSup, SResultRowData* pRes
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
|
|
||||||
if (isIrowtsPseudoColumn(pFillCol->pExpr)) {
|
if (isIrowtsPseudoColumn(pFillCol->pExpr)) {
|
||||||
colDataSetVal(pDstCol, pBlock->info.rows, (char*)&ts, false);
|
code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&ts, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else if (isIsfilledPseudoColumn(pFillCol->pExpr)) {
|
} else if (isIsfilledPseudoColumn(pFillCol->pExpr)) {
|
||||||
bool isFilled = false;
|
bool isFilled = false;
|
||||||
colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
|
code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else {
|
} else {
|
||||||
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
|
int32_t srcSlot = pFillCol->pExpr->base.pParam[0].pCol->slotId;
|
||||||
SResultCellData* pCell = getResultCell(pResRow, srcSlot);
|
SResultCellData* pCell = getResultCell(pResRow, srcSlot);
|
||||||
setRowCell(pDstCol, pBlock->info.rows, pCell);
|
code = setRowCell(pDstCol, pBlock->info.rows, pCell);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlock->info.rows += 1;
|
pBlock->info.rows += 1;
|
||||||
return true;
|
(*pRes) = true;
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
|
static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
|
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
|
||||||
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
|
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
|
||||||
if (inWinRange(&pFillSup->winRange, &st)) {
|
if (inWinRange(&pFillSup->winRange, &st)) {
|
||||||
fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock);
|
bool res = true;
|
||||||
|
code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
|
||||||
pFillSup->interval.precision);
|
pFillSup->interval.precision);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
|
static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
|
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
|
||||||
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
|
for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
|
||||||
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
|
SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
|
||||||
|
@ -126,10 +378,12 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
|
||||||
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot);
|
SResultCellData* pCell = getResultCell(pFillInfo->pResRow, srcSlot);
|
||||||
int32_t index = pBlock->info.rows;
|
int32_t index = pBlock->info.rows;
|
||||||
if (isIrowtsPseudoColumn(pFillCol->pExpr)) {
|
if (isIrowtsPseudoColumn(pFillCol->pExpr)) {
|
||||||
colDataSetVal(pDstCol, pBlock->info.rows, (char*)&pFillInfo->current, false);
|
code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&pFillInfo->current, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else if (isIsfilledPseudoColumn(pFillCol->pExpr)) {
|
} else if (isIsfilledPseudoColumn(pFillCol->pExpr)) {
|
||||||
bool isFilled = true;
|
bool isFilled = true;
|
||||||
colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
|
code = colDataSetVal(pDstCol, pBlock->info.rows, (char*)&isFilled, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
} else {
|
} else {
|
||||||
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
|
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
|
||||||
colDataSetNULL(pDstCol, index);
|
colDataSetNULL(pDstCol, index);
|
||||||
|
@ -144,8 +398,12 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
|
||||||
SPoint cur = {0};
|
SPoint cur = {0};
|
||||||
cur.key = pFillInfo->current;
|
cur.key = pFillInfo->current;
|
||||||
cur.val = taosMemoryCalloc(1, pCell->bytes);
|
cur.val = taosMemoryCalloc(1, pCell->bytes);
|
||||||
|
QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno);
|
||||||
|
|
||||||
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
|
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
|
||||||
colDataSetVal(pDstCol, index, (const char*)cur.val, false);
|
code = colDataSetVal(pDstCol, index, (const char*)cur.val, false);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
destroySPoint(&cur);
|
destroySPoint(&cur);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -153,16 +411,27 @@ static void fillLinearRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
|
||||||
pFillSup->interval.precision);
|
pFillSup->interval.precision);
|
||||||
pBlock->info.rows++;
|
pBlock->info.rows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pRes) {
|
static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pRes) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
bool res = true;
|
||||||
if (pFillInfo->needFill == false) {
|
if (pFillInfo->needFill == false) {
|
||||||
fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes);
|
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pFillInfo->pos == FILL_POS_START) {
|
if (pFillInfo->pos == FILL_POS_START) {
|
||||||
if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) {
|
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
if (res) {
|
||||||
pFillInfo->pos = FILL_POS_INVALID;
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -172,7 +441,9 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
|
||||||
fillLinearRange(pFillSup, pFillInfo, pRes);
|
fillLinearRange(pFillSup, pFillInfo, pRes);
|
||||||
|
|
||||||
if (pFillInfo->pos == FILL_POS_MID) {
|
if (pFillInfo->pos == FILL_POS_MID) {
|
||||||
if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) {
|
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
if (res) {
|
||||||
pFillInfo->pos = FILL_POS_INVALID;
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -186,10 +457,17 @@ static void doStreamFillRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pFillInfo->pos == FILL_POS_END) {
|
if (pFillInfo->pos == FILL_POS_END) {
|
||||||
if (fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes)) {
|
code = fillPointResult(pFillSup, &pFillSup->cur, pFillSup->cur.key, pRes, &res);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
if (res) {
|
||||||
pFillInfo->pos = FILL_POS_INVALID;
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId, bool ignoreNull) {
|
static int32_t getQualifiedRowNumAsc(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId, bool ignoreNull) {
|
||||||
|
@ -226,53 +504,59 @@ static int32_t getQualifiedRowNumDesc(SExprSupp* pExprSup, SSDataBlock* pBlock,
|
||||||
return resRow;
|
return resRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getPointRowDataFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup,
|
|
||||||
SSlicePoint* pPoint) {
|
|
||||||
int32_t curVLen = 0;
|
|
||||||
int32_t code =
|
|
||||||
pAggSup->stateStore.streamStateFillGet(pAggSup->pState, &pPoint->key, (void**)&pPoint->pResPos, &curVLen);
|
|
||||||
pPoint->pLeftRow = pPoint->pResPos->pRowBuff;
|
|
||||||
if (pFillSup->type == TSDB_FILL_LINEAR) {
|
|
||||||
pPoint->pRightRow = POINTER_SHIFT(pPoint->pResPos->pRowBuff, pFillSup->rowSize);
|
|
||||||
} else {
|
|
||||||
pPoint->pRightRow = NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
|
static void getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, TSKEY ts,
|
||||||
int64_t groupId) {
|
int64_t groupId, SSlicePoint* pCurPoint, SSlicePoint* pPrevPoint,
|
||||||
|
SSlicePoint* pNextPoint) {
|
||||||
void* pState = pAggSup->pState;
|
void* pState = pAggSup->pState;
|
||||||
resetPrevAndNextWindow(pFillSup);
|
resetPrevAndNextWindow(pFillSup);
|
||||||
|
pCurPoint->pResPos = NULL;
|
||||||
|
pPrevPoint->pResPos = NULL;
|
||||||
|
pNextPoint->pResPos = NULL;
|
||||||
|
|
||||||
SWinKey key = {.ts = ts, .groupId = groupId};
|
pCurPoint->key.groupId = groupId;
|
||||||
void* curVal = NULL;
|
pCurPoint->key.ts = ts;
|
||||||
int32_t curVLen = 0;
|
int32_t curVLen = 0;
|
||||||
int32_t code = pAggSup->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen);
|
int32_t code = pAggSup->stateStore.streamStateFillGet(pState, &pCurPoint->key, (void**)&pCurPoint->pResPos, &curVLen);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
pCurPoint->pLeftRow = pCurPoint->pResPos->pRowBuff;
|
||||||
pFillSup->cur.key = key.ts;
|
if (pFillSup->type == TSDB_FILL_LINEAR) {
|
||||||
pFillSup->cur.pRowVal = curVal;
|
pCurPoint->pRightRow = POINTER_SHIFT(pCurPoint->pResPos->pRowBuff, pFillSup->rowSize);
|
||||||
} else {
|
} else {
|
||||||
qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId);
|
pCurPoint->pRightRow = NULL;
|
||||||
pFillSup->cur.key = ts;
|
|
||||||
pFillSup->cur.pRowVal = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SWinKey preKey = {.ts = INT64_MIN, .groupId = groupId};
|
if (pCurPoint->pLeftRow->key == pCurPoint->key.ts) {
|
||||||
void* preVal = NULL;
|
pFillSup->cur.key = pCurPoint->key.ts;
|
||||||
int32_t preVLen = 0;
|
pFillSup->cur.pRowVal = pCurPoint->pResPos->pRowBuff;
|
||||||
code = pAggSup->stateStore.streamStateFillGetPrev(pState, &key, &preKey, &preVal, &preVLen);
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
pFillSup->prev.key = preKey.ts;
|
|
||||||
pFillSup->prev.pRowVal = preVal;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId};
|
if (pPrevPoint) {
|
||||||
void* nextVal = NULL;
|
if (HAS_NON_ROW_DATA(pCurPoint->pLeftRow)) {
|
||||||
int32_t nextVLen = 0;
|
pPrevPoint->key.groupId = groupId;
|
||||||
code = pAggSup->stateStore.streamStateFillGetNext(pState, &key, &nextKey, &nextVal, &nextVLen);
|
int32_t preVLen = 0;
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pCurPoint->key, &pPrevPoint->key,
|
||||||
pFillSup->next.key = nextKey.ts;
|
(void**)&pPrevPoint->pResPos, &preVLen);
|
||||||
pFillSup->next.pRowVal = nextVal;
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
pFillSup->prev.key = pPrevPoint->key.ts;
|
||||||
|
pFillSup->prev.pRowVal = pPrevPoint->pResPos->pRowBuff;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pFillSup->prev.key = pPrevPoint->pLeftRow->key;
|
||||||
|
pFillSup->prev.pRowVal = pPrevPoint->pLeftRow->pRowVal;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (HAS_NON_ROW_DATA(pCurPoint->pRightRow)) {
|
||||||
|
pNextPoint->key.groupId = groupId;
|
||||||
|
int32_t nextVLen = 0;
|
||||||
|
code = pAggSup->stateStore.streamStateFillGetNext(pState, &pCurPoint->key, &pNextPoint->key,
|
||||||
|
(void**)&pNextPoint->pResPos, &nextVLen);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
pFillSup->next.key = pNextPoint->key.ts;
|
||||||
|
pFillSup->next.pRowVal = pNextPoint->pResPos->pRowBuff;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pFillSup->next.key = pCurPoint->pRightRow->key;
|
||||||
|
pFillSup->next.pRowVal = pCurPoint->pRightRow->pRowVal;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -460,36 +744,44 @@ static void doStreamTimeSliceImpl(SOperatorInfo* pOperator, SSDataBlock* pBlock)
|
||||||
SResultRowInfo dumyInfo = {0};
|
SResultRowInfo dumyInfo = {0};
|
||||||
dumyInfo.cur.pageId = -1;
|
dumyInfo.cur.pageId = -1;
|
||||||
STimeWindow curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC);
|
STimeWindow curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC);
|
||||||
SSlicePoint point = {.key.ts = curWin.skey, .key.groupId = groupId};
|
SSlicePoint curPoint = {.key.ts = curWin.skey, .key.groupId = groupId};
|
||||||
getPointRowDataFromState(pAggSup, pFillSup, &point);
|
SSlicePoint prevPoint = {0};
|
||||||
if (needAdjValue(&point, tsCols[startPos], true, pFillSup->type)) {
|
SSlicePoint nextPoint = {0};
|
||||||
transBlockToResultRow(pBlock, startPos, tsCols[startPos], point.pLeftRow);
|
bool left = false;
|
||||||
saveWinResult(&point.key, point.pResPos, pInfo->pUpdatedMap);
|
bool right = false;
|
||||||
|
getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint);
|
||||||
|
right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type);
|
||||||
|
if (right) {
|
||||||
|
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
|
||||||
|
saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap);
|
||||||
}
|
}
|
||||||
|
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
|
||||||
|
|
||||||
while (startPos < pBlock->info.rows) {
|
while (startPos < pBlock->info.rows) {
|
||||||
int32_t numOfWin = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
|
int32_t numOfWin = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, curWin.ekey, binarySearchForKey, NULL,
|
||||||
TSDB_ORDER_ASC);
|
TSDB_ORDER_ASC);
|
||||||
startPos += numOfWin;
|
startPos += numOfWin;
|
||||||
int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull);
|
int32_t leftRowId = getQualifiedRowNumDesc(pExprSup, pBlock, tsCols, startPos - 1, pInfo->ignoreNull);
|
||||||
|
left = needAdjValue(&nextPoint, tsCols[leftRowId], true, pFillSup->type);
|
||||||
|
if (left) {
|
||||||
|
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], nextPoint.pLeftRow);
|
||||||
|
saveTimeSliceWinResult(&nextPoint.key, pInfo->pUpdatedMap);
|
||||||
|
}
|
||||||
|
releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore);
|
||||||
|
|
||||||
startPos = getQualifiedRowNumAsc(pExprSup, pBlock, startPos, pInfo->ignoreNull);
|
startPos = getQualifiedRowNumAsc(pExprSup, pBlock, startPos, pInfo->ignoreNull);
|
||||||
if (startPos < 0) {
|
if (startPos < 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC);
|
curWin = getActiveTimeWindow(NULL, &dumyInfo, tsCols[startPos], &pFillSup->interval, TSDB_ORDER_ASC);
|
||||||
getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId);
|
getPointInfoFromState(pAggSup, pFillSup, curWin.skey, groupId, &curPoint, &prevPoint, &nextPoint);
|
||||||
bool left = needAdjValue(&point, tsCols[leftRowId], true, pFillSup->type);
|
|
||||||
if (left) {
|
|
||||||
transBlockToResultRow(pBlock, leftRowId, tsCols[leftRowId], point.pLeftRow);
|
|
||||||
}
|
|
||||||
bool right = needAdjValue(&point, tsCols[startPos], false, pFillSup->type);
|
|
||||||
if (right) {
|
|
||||||
transBlockToResultRow(pBlock, startPos, tsCols[startPos], point.pRightRow);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (left || right) {
|
right = needAdjValue(&curPoint, tsCols[startPos], false, pFillSup->type);
|
||||||
saveWinResult(&point.key, point.pResPos, pInfo->pUpdatedMap);
|
if (right) {
|
||||||
|
transBlockToResultRow(pBlock, startPos, tsCols[startPos], curPoint.pRightRow);
|
||||||
|
saveTimeSliceWinResult(&curPoint.key, pInfo->pUpdatedMap);
|
||||||
}
|
}
|
||||||
|
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -505,21 +797,28 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
|
for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
|
||||||
SRowBuffPos* pPos = *(SRowBuffPos**)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
|
SRowBuffPos* pPos = *(SRowBuffPos**)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
|
||||||
// todo(liuyao) fill 增加接口,get buff from pos设置pFillSup->cur
|
SWinKey* pKey = (SWinKey*)pPos->pKey;
|
||||||
SWinKey* pKey = (SWinKey*)pPos->pKey;
|
|
||||||
if (pBlock->info.id.groupId == 0) {
|
if (pBlock->info.id.groupId == 0) {
|
||||||
pBlock->info.id.groupId = pKey->groupId;
|
pBlock->info.id.groupId = pKey->groupId;
|
||||||
} else if (pBlock->info.id.groupId != pKey->groupId) {
|
} else if (pBlock->info.id.groupId != pKey->groupId) {
|
||||||
pGroupResInfo->index--;
|
pGroupResInfo->index--;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId);
|
SSlicePoint curPoint = {.key.ts = pKey->ts, .key.groupId = pKey->groupId};
|
||||||
|
SSlicePoint prevPoint = {0};
|
||||||
|
SSlicePoint nextPoint = {0};
|
||||||
|
getPointInfoFromState(pAggSup, pFillSup, pKey->ts, pKey->groupId, &curPoint, &prevPoint, &nextPoint);
|
||||||
setTimeSliceFillInfo(pFillSup, pFillInfo, pKey->ts);
|
setTimeSliceFillInfo(pFillSup, pFillInfo, pKey->ts);
|
||||||
doStreamFillRange(pFillSup, pFillInfo, pBlock);
|
doStreamFillRange(pFillSup, pFillInfo, pBlock);
|
||||||
|
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
|
||||||
|
releaseOutputBuf(pAggSup->pState, prevPoint.pResPos, &pAggSup->stateStore);
|
||||||
|
releaseOutputBuf(pAggSup->pState, nextPoint.pResPos, &pAggSup->stateStore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* buildTimeSliceResult(SOperatorInfo* pOperator) {
|
static int32_t buildTimeSliceResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
|
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
uint16_t opType = pOperator->operatorType;
|
uint16_t opType = pOperator->operatorType;
|
||||||
|
@ -529,25 +828,97 @@ static SSDataBlock* buildTimeSliceResult(SOperatorInfo* pOperator) {
|
||||||
if (pInfo->pDelRes->info.rows != 0) {
|
if (pInfo->pDelRes->info.rows != 0) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
||||||
return pInfo->pDelRes;
|
(*ppRes) = pInfo->pDelRes;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
|
doBuildTimeSlicePointResult(pAggSup, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
|
||||||
if (pInfo->pRes->info.rows != 0) {
|
if (pInfo->pRes->info.rows != 0) {
|
||||||
printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
|
||||||
return pInfo->pRes;
|
(*ppRes) = pInfo->pRes;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
(*ppRes) = NULL;
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) {
|
int32_t getSliceMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
|
int32_t size = taosArrayGetSize(pAllWins);
|
||||||
|
if (size == 0) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
SWinKey* pKey = taosArrayGet(pAllWins, size - 1);
|
||||||
|
void* tmp = taosArrayPush(pMaxWins, pKey);
|
||||||
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
|
||||||
|
if (pKey->groupId == 0) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
uint64_t preGpId = pKey->groupId;
|
||||||
|
for (int32_t i = size - 2; i >= 0; i--) {
|
||||||
|
pKey = taosArrayGet(pAllWins, i);
|
||||||
|
if (preGpId != pKey->groupId) {
|
||||||
|
void* tmp = taosArrayPush(pMaxWins, pKey);
|
||||||
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
preGpId = pKey->groupId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHashObj* pUpdatedMap) {
|
||||||
|
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||||
|
uint64_t* groupIds = (uint64_t*)pGroupCol->pData;
|
||||||
|
SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
|
||||||
|
TSKEY* tsCalStarts = (TSKEY*)pCalStartCol->pData;
|
||||||
|
SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||||
|
TSKEY* tsCalEnds = (TSKEY*)pCalEndCol->pData;
|
||||||
|
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||||
|
while (1) {
|
||||||
|
TSKEY ts = tsCalStarts[i];
|
||||||
|
TSKEY endTs = tsCalEnds[i];
|
||||||
|
uint64_t groupId = groupIds[i];
|
||||||
|
SWinKey key = {.ts = ts, .groupId = groupId};
|
||||||
|
SWinKey nextKey = {.groupId = groupId};
|
||||||
|
winCode = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL);
|
||||||
|
if (key.ts > endTs) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
(void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey));
|
||||||
|
pAggSup->stateStore.streamStateDel(pAggSup->pState, &key);
|
||||||
|
if (winCode != TSDB_CODE_SUCCESS) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
key = nextKey;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t lino = 0;
|
||||||
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
|
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
(*ppRes) = NULL;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
|
@ -557,23 +928,30 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) {
|
||||||
doStreamFillRange(pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes);
|
doStreamFillRange(pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes);
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
return pInfo->pRes;
|
(*ppRes) = pInfo->pRes;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* resBlock = buildTimeSliceResult(pOperator);
|
SSDataBlock* resBlock = NULL;
|
||||||
|
code = buildTimeSliceResult(pOperator, &resBlock);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
if (resBlock != NULL) {
|
if (resBlock != NULL) {
|
||||||
return resBlock;
|
(*ppRes) = resBlock;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->recvCkBlock) {
|
if (pInfo->recvCkBlock) {
|
||||||
pInfo->recvCkBlock = false;
|
pInfo->recvCkBlock = false;
|
||||||
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
|
||||||
return pInfo->pCheckpointRes;
|
(*ppRes) = pInfo->pCheckpointRes;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
setStreamOperatorCompleted(pOperator);
|
setStreamOperatorCompleted(pOperator);
|
||||||
return NULL;
|
(*ppRes) = NULL;
|
||||||
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* fillResult = NULL;
|
SSDataBlock* fillResult = NULL;
|
||||||
|
@ -589,10 +967,11 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
pInfo->numOfDatapack++;
|
pInfo->numOfDatapack++;
|
||||||
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
|
||||||
|
setStreamOperatorState(&pInfo->basic, pBlock->info.type);
|
||||||
|
|
||||||
switch (pBlock->info.type) {
|
switch (pBlock->info.type) {
|
||||||
case STREAM_DELETE_RESULT: {
|
case STREAM_DELETE_RESULT: {
|
||||||
// todo(liuyao) add
|
doDeleteTimeSliceResult(pAggSup, pBlock, pInfo->pUpdatedMap);
|
||||||
} break;
|
} break;
|
||||||
case STREAM_NORMAL:
|
case STREAM_NORMAL:
|
||||||
case STREAM_INVALID: {
|
case STREAM_INVALID: {
|
||||||
|
@ -610,7 +989,8 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) {
|
||||||
continue;
|
continue;
|
||||||
} break;
|
} break;
|
||||||
case STREAM_CREATE_CHILD_TABLE: {
|
case STREAM_CREATE_CHILD_TABLE: {
|
||||||
return pBlock;
|
(*ppRes) = pBlock;
|
||||||
|
goto _end;
|
||||||
} break;
|
} break;
|
||||||
default:
|
default:
|
||||||
ASSERTS(false, "invalid SSDataBlock type");
|
ASSERTS(false, "invalid SSDataBlock type");
|
||||||
|
@ -628,53 +1008,38 @@ static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) {
|
||||||
void* pIte = NULL;
|
void* pIte = NULL;
|
||||||
int32_t iter = 0;
|
int32_t iter = 0;
|
||||||
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
|
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
|
||||||
taosArrayPush(pInfo->pUpdated, pIte);
|
void* tmp = taosArrayPush(pInfo->pUpdated, pIte);
|
||||||
|
QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
|
||||||
|
}
|
||||||
|
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
|
||||||
|
|
||||||
|
if (pInfo->isHistoryOp) {
|
||||||
|
code = getSliceMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
}
|
}
|
||||||
taosArraySort(pInfo->pUpdated, winPosCmprImpl);
|
|
||||||
|
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||||
pInfo->pUpdated = NULL;
|
pInfo->pUpdated = NULL;
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||||
pInfo->pUpdatedMap = NULL;
|
pInfo->pUpdatedMap = NULL;
|
||||||
|
|
||||||
return buildTimeSliceResult(pOperator);
|
code = buildTimeSliceResult(pOperator, ppRes);
|
||||||
}
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
int32_t doStreamTimeSliceEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) {
|
_end:
|
||||||
// todo(liuyao) add
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void* doStreamTimeSliceDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) {
|
|
||||||
// todo(liuyao) add
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SStreamFillSupporter* initTimeSliceFillSup(SStreamInterpFuncPhysiNode* pPhyFillNode, SExprInfo* pExprInfo,
|
|
||||||
int32_t numOfExprs) {
|
|
||||||
SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
|
|
||||||
if (!pFillSup) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
pFillSup->numOfFillCols = numOfExprs;
|
|
||||||
int32_t numOfNotFillCols = 0;
|
|
||||||
pFillSup->pAllColInfo = createFillColInfo(pExprInfo, pFillSup->numOfFillCols, NULL, numOfNotFillCols,
|
|
||||||
(const SNodeListNode*)(pPhyFillNode->pFillValues));
|
|
||||||
pFillSup->type = convertFillType(pPhyFillNode->fillMode);
|
|
||||||
pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
|
|
||||||
pFillSup->interval.interval = pPhyFillNode->interval;
|
|
||||||
// todo(liuyao) 初始化 pFillSup->interval其他属性
|
|
||||||
pFillSup->pAPI = NULL;
|
|
||||||
|
|
||||||
int32_t code = initResultBuf(pFillSup);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
destroyStreamFillSupporter(pFillSup);
|
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
pFillSup->pResMap = NULL;
|
return code;
|
||||||
pFillSup->hasDelete = false;
|
}
|
||||||
return pFillSup;
|
|
||||||
|
static SSDataBlock* doStreamTimeSlice(SOperatorInfo* pOperator) {
|
||||||
|
SSDataBlock* pRes = NULL;
|
||||||
|
(void)doStreamTimeSliceNext(pOperator, &pRes);
|
||||||
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo,
|
||||||
|
@ -682,10 +1047,11 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
SStreamTimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTimeSliceOperatorInfo));
|
SStreamTimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamTimeSliceOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
QUERY_CHECK_NULL(pInfo, code, lino, _error, terrno);
|
||||||
if (pOperator == NULL || pInfo == NULL) {
|
|
||||||
goto _error;
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
}
|
QUERY_CHECK_NULL(pOperator, code, lino, _error, terrno);
|
||||||
|
|
||||||
SStreamInterpFuncPhysiNode* pInterpPhyNode = (SStreamInterpFuncPhysiNode*)pPhyNode;
|
SStreamInterpFuncPhysiNode* pInterpPhyNode = (SStreamInterpFuncPhysiNode*)pPhyNode;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
|
@ -724,12 +1090,15 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||||
pInfo->delIndex = 0;
|
pInfo->delIndex = 0;
|
||||||
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
|
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
|
||||||
|
QUERY_CHECK_NULL(pInfo->pDelWins, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->pDelRes = NULL;
|
pInfo->pDelRes = NULL;
|
||||||
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
|
||||||
QUERY_CHECK_CODE(code, lino, _error);
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pDeletedMap = tSimpleHashInit(4096, hashFn);
|
pInfo->pDeletedMap = tSimpleHashInit(1024, hashFn);
|
||||||
|
QUERY_CHECK_NULL(pInfo->pDeletedMap, code, lino, _error, terrno);
|
||||||
|
|
||||||
pInfo->ignoreExpiredData = pInterpPhyNode->streamNodeOption.igExpired;
|
pInfo->ignoreExpiredData = pInterpPhyNode->streamNodeOption.igExpired;
|
||||||
pInfo->ignoreExpiredDataSaved = false;
|
pInfo->ignoreExpiredDataSaved = false;
|
||||||
|
@ -745,7 +1114,13 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
|
|
||||||
pInfo->destHasPrimaryKey = pInterpPhyNode->streamNodeOption.destHasPrimaryKey;
|
pInfo->destHasPrimaryKey = pInterpPhyNode->streamNodeOption.destHasPrimaryKey;
|
||||||
pInfo->numOfDatapack = 0;
|
pInfo->numOfDatapack = 0;
|
||||||
pInfo->pFillSup = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs);
|
pInfo->pFillSup = NULL;
|
||||||
|
code = initTimeSliceFillSup(pInterpPhyNode, pExprInfo, numOfExprs, &pInfo->pFillSup);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
|
|
||||||
|
if (pHandle) {
|
||||||
|
pInfo->isHistoryOp = pHandle->fillHistory;
|
||||||
|
}
|
||||||
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
|
||||||
|
@ -757,8 +1132,9 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
pTaskInfo->streamInfo.pState, STREAM_TIME_SLICE_OP_CHECKPOINT_NAME, strlen(STREAM_TIME_SLICE_OP_CHECKPOINT_NAME),
|
pTaskInfo->streamInfo.pState, STREAM_TIME_SLICE_OP_CHECKPOINT_NAME, strlen(STREAM_TIME_SLICE_OP_CHECKPOINT_NAME),
|
||||||
&buff, &len);
|
&buff, &len);
|
||||||
if (res == TSDB_CODE_SUCCESS) {
|
if (res == TSDB_CODE_SUCCESS) {
|
||||||
doStreamTimeSliceDecodeOpState(buff, len, pOperator);
|
code = doStreamTimeSliceDecodeOpState(buff, len, pOperator);
|
||||||
taosMemoryFree(buff);
|
taosMemoryFree(buff);
|
||||||
|
QUERY_CHECK_CODE(code, lino, _error);
|
||||||
}
|
}
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamTimeSlice, NULL, destroyStreamTimeSliceOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamTimeSlice, NULL, destroyStreamTimeSliceOperatorInfo,
|
||||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||||
|
|
|
@ -1332,7 +1332,7 @@ static int32_t jsonToLogicInterpFuncNode(const SJson* pJson, void* pObj) {
|
||||||
code = tjsonToObject(pJson, jkInterpFuncLogicPlanFillValues, jsonToNode, pNode->pFillValues);
|
code = tjsonToObject(pJson, jkInterpFuncLogicPlanFillValues, jsonToNode, pNode->pFillValues);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonToObject(pJson, jkInterpFuncLogicPlanTimeSeries, jsonToNode, pNode->pTimeSeries);
|
code = jsonToNodeObject(pJson, jkInterpFuncLogicPlanTimeSeries, &pNode->pTimeSeries);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonToObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
|
code = tjsonToObject(pJson, jkInterpFuncLogicPlanStreamNodeOption, jsonToStreamNodeOption, &pNode->streamNodeOption);
|
||||||
|
|
|
@ -961,6 +961,8 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) {
|
||||||
pInterpFunc->interval = ((SValueNode*)pSelect->pEvery)->datum.i;
|
pInterpFunc->interval = ((SValueNode*)pSelect->pEvery)->datum.i;
|
||||||
|
pInterpFunc->intervalUnit = ((SValueNode*)pSelect->pEvery)->unit;
|
||||||
|
pInterpFunc->precision = pSelect->precision;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the output
|
// set the output
|
||||||
|
|
|
@ -1851,6 +1851,8 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
|
||||||
pInterpFunc->timeRange = pFuncLogicNode->timeRange;
|
pInterpFunc->timeRange = pFuncLogicNode->timeRange;
|
||||||
pInterpFunc->interval = pFuncLogicNode->interval;
|
pInterpFunc->interval = pFuncLogicNode->interval;
|
||||||
pInterpFunc->fillMode = pFuncLogicNode->fillMode;
|
pInterpFunc->fillMode = pFuncLogicNode->fillMode;
|
||||||
|
pInterpFunc->intervalUnit = pFuncLogicNode->intervalUnit;
|
||||||
|
pInterpFunc->precision = pFuncLogicNode->node.precision;
|
||||||
pInterpFunc->pFillValues = NULL;
|
pInterpFunc->pFillValues = NULL;
|
||||||
code = nodesCloneNode(pFuncLogicNode->pFillValues, &pInterpFunc->pFillValues);
|
code = nodesCloneNode(pFuncLogicNode->pFillValues, &pInterpFunc->pFillValues);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
|
|
@ -30,6 +30,7 @@ int fillStateKeyCompare(const void* pWin1, const void* pDatas, int pos) {
|
||||||
int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen) {
|
int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, void** pVal, int32_t* pVLen) {
|
||||||
int32_t winCode = TSDB_CODE_SUCCESS;
|
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||||
int32_t code = getRowBuff(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, &winCode);
|
int32_t code = getRowBuff(pFileState, (void*)pKey, sizeof(SWinKey), pVal, pVLen, &winCode);
|
||||||
|
|
||||||
SArray* pWinStates = NULL;
|
SArray* pWinStates = NULL;
|
||||||
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
|
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
|
||||||
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
|
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
|
||||||
|
@ -39,16 +40,18 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
|
||||||
pWinStates = taosArrayInit(16, sizeof(SWinKey));
|
pWinStates = taosArrayInit(16, sizeof(SWinKey));
|
||||||
tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
|
tSimpleHashPut(pSearchBuff, &pKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//recover
|
||||||
if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
|
if (taosArrayGetSize(pWinStates) == 0 && needClearDiskBuff(pFileState)) {
|
||||||
TSKEY ts = getFlushMark(pFileState);
|
TSKEY ts = getFlushMark(pFileState);
|
||||||
SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX};
|
SWinKey start = {.groupId = pKey->groupId, .ts = INT64_MAX};
|
||||||
void* pState = getStateFileStore(pFileState);
|
void* pState = getStateFileStore(pFileState);
|
||||||
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start);
|
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, &start);
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||||
for(int32_t i = 0; i < NUM_OF_FLUSED_WIN && code == TSDB_CODE_SUCCESS; i++) {
|
for(int32_t i = 0; i < NUM_OF_FLUSED_WIN && winCode == TSDB_CODE_SUCCESS; i++) {
|
||||||
SWinKey tmp = {.groupId = pKey->groupId};
|
SWinKey tmp = {.groupId = pKey->groupId};
|
||||||
code = streamStateGetGroupKVByCur_rocksdb(pCur, &tmp, NULL, 0);
|
winCode = streamStateGetGroupKVByCur_rocksdb(pCur, &tmp, NULL, 0);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (winCode != TSDB_CODE_SUCCESS) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
taosArrayPush(pWinStates, &tmp);
|
taosArrayPush(pWinStates, &tmp);
|
||||||
|
@ -65,7 +68,10 @@ int32_t getHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey, vo
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
index = 0;
|
index = 0;
|
||||||
}
|
}
|
||||||
taosArrayInsert(pWinStates, index, pKey);
|
SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
|
||||||
|
if (winKeyCmprImpl(pTmpKey, pKey) != 0) {
|
||||||
|
taosArrayInsert(pWinStates, index, pKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -163,3 +169,25 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey) {
|
||||||
|
deleteRowBuff(pFileState, pKey, sizeof(SWinKey));
|
||||||
|
SSHashObj* pSearchBuff = getSearchBuff(pFileState);
|
||||||
|
void** ppBuff = tSimpleHashGet(pSearchBuff, &pKey->groupId, sizeof(uint64_t));
|
||||||
|
if (!ppBuff) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
SArray* pWinStates = *ppBuff;
|
||||||
|
int32_t size = taosArrayGetSize(pWinStates);
|
||||||
|
if (!isFlushedState(pFileState, pKey->ts, 0)) {
|
||||||
|
// find the first position which is smaller than the pKey
|
||||||
|
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
|
||||||
|
if (index == -1) {
|
||||||
|
index = 0;
|
||||||
|
}
|
||||||
|
SWinKey* pTmpKey = taosArrayGet(pWinStates, index);
|
||||||
|
if (winKeyCmprImpl(pTmpKey, pKey) == 0) {
|
||||||
|
taosArrayRemove(pWinStates, index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -249,6 +249,10 @@ int32_t streamStateFillGetPrev(SStreamState* pState, const SWinKey* pKey, SWinKe
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
void streamStateFillDel(SStreamState* pState, const SWinKey* key) {
|
void streamStateFillDel(SStreamState* pState, const SWinKey* key) {
|
||||||
|
if (pState->pFileState) {
|
||||||
|
deleteHashSortRowBuff(pState->pFileState, key);
|
||||||
|
return;
|
||||||
|
}
|
||||||
int32_t code = streamStateFillDel_rocksdb(pState, key);
|
int32_t code = streamStateFillDel_rocksdb(pState, key);
|
||||||
qTrace("%s at line %d res %d", __func__, __LINE__, code);
|
qTrace("%s at line %d res %d", __func__, __LINE__, code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -574,7 +574,7 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
|
||||||
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
|
if (!isDeteled(pFileState, ts) && isFlushedState(pFileState, ts, 0)) {
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
void* p = NULL;
|
void* p = NULL;
|
||||||
(*pWinCode) = pFileState->stateFileGetFn(pFileState->pFileStore, pKey, &p, &len);
|
(*pWinCode) = pFileState->stateFileGetFn(pFileState, pKey, &p, &len);
|
||||||
qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
|
qDebug("===stream===get %" PRId64 " from disc, res %d", ts, (*pWinCode));
|
||||||
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
|
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
|
||||||
memcpy(pNewPos->pRowBuff, p, len);
|
memcpy(pNewPos->pRowBuff, p, len);
|
||||||
|
|
Loading…
Reference in New Issue