Merge pull request #26551 from taosdata/fix/TD-30967

feat: adj result of function
This commit is contained in:
Haojun Liao 2024-07-17 09:09:23 +08:00 committed by GitHub
commit 665fbd6f22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 790 additions and 417 deletions

View File

@ -381,7 +381,7 @@ typedef struct SStateStore {
void** ppVal, int32_t* pVLen);
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol);
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
@ -389,12 +389,12 @@ typedef struct SStateStore {
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count);
void (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count);
int32_t (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count);
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
int32_t (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo);
int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen);
int32_t (*updateInfoDeserialize)(void* buf, int32_t bufLen, SUpdateInfo* pInfo);
SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key);

View File

@ -15,28 +15,28 @@
#ifndef _TSTREAMUPDATE_H_
#define _TSTREAMUPDATE_H_
#include "storageapi.h"
#include "taosdef.h"
#include "tarray.h"
#include "tcommon.h"
#include "tmsg.h"
#include "storageapi.h"
#ifdef __cplusplus
extern "C" {
#endif
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo);
TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol);
bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid);
void updateInfoDestroy(SUpdateInfo* pInfo);
void updateInfoAddCloseWindowSBF(SUpdateInfo* pInfo);
void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo);
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo);
int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen);
int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo);
void windowSBfDelete(SUpdateInfo* pInfo, uint64_t count);
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count);
int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count);
bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
#ifdef __cplusplus

View File

@ -19,6 +19,8 @@
#include "os.h"
#include "tencode.h"
#include "thash.h"
#include "tlog.h"
#include "tutil.h"
#ifdef __cplusplus
extern "C" {
@ -39,7 +41,7 @@ typedef struct SBloomFilter {
double errorRate;
} SBloomFilter;
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate);
int32_t tBloomFilterInit(uint64_t expectedEntries, double errorRate, SBloomFilter** ppBF);
int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2);
int32_t tBloomFilterPut(SBloomFilter* pBF, const void* keyBuf, uint32_t len);
int32_t tBloomFilterNoContain(const SBloomFilter* pBF, uint64_t h1, uint64_t h2);
@ -47,7 +49,7 @@ void tBloomFilterDestroy(SBloomFilter *pBF);
void tBloomFilterDump(const SBloomFilter* pBF);
bool tBloomFilterIsFull(const SBloomFilter* pBF);
int32_t tBloomFilterEncode(const SBloomFilter* pBF, SEncoder* pEncoder);
SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder);
int32_t tBloomFilterDecode(SDecoder* pDecoder, SBloomFilter** ppBF);
#ifdef __cplusplus
}

View File

@ -32,13 +32,13 @@ typedef struct SScalableBf {
_hash_fn_t hashFn2;
} SScalableBf;
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate);
int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf** ppSBf);
int32_t tScalableBfPutNoCheck(SScalableBf* pSBf, const void* keyBuf, uint32_t len);
int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len);
int32_t tScalableBfNoContain(const SScalableBf* pSBf, const void* keyBuf, uint32_t len);
void tScalableBfDestroy(SScalableBf* pSBf);
int32_t tScalableBfEncode(const SScalableBf* pSBf, SEncoder* pEncoder);
SScalableBf *tScalableBfDecode(SDecoder *pDecoder);
int32_t tScalableBfDecode(SDecoder* pDecoder, SScalableBf** ppSBf);
#ifdef __cplusplus
}

View File

@ -21,6 +21,7 @@
#include "tdef.h"
#include "thash.h"
#include "tmd5.h"
#include "tutil.h"
#ifdef __cplusplus
extern "C" {

View File

@ -929,7 +929,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, i
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
SStorageAPI* pApi, int32_t tsIndex);
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic);
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);

View File

@ -20,6 +20,7 @@ extern "C" {
#endif
#include "executorInt.h"
#include "tutil.h"
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);

View File

@ -17,6 +17,7 @@
#include "function.h"
#include "os.h"
#include "tname.h"
#include "tutil.h"
#include "tdatablock.h"
#include "tmsg.h"
@ -1357,11 +1358,13 @@ static void destroyStreamPartitionOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, SExprSupp* pTbnameExpr) {
int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, SExprSupp* pTbnameExpr) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI;
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
return;
return code;
}
SStreamScanInfo* pScanInfo = downstream->info;
@ -1369,8 +1372,9 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
pScanInfo->pPartScalarSup = pExpr;
pScanInfo->pPartTbnameSup = pTbnameExpr;
if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen);
code = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
}
return code;
}
SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) {
@ -1421,6 +1425,7 @@ void freePartItem(void* ptr) {
SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode,
SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -1511,14 +1516,19 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup);
code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup);
TSDB_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
TSDB_CHECK_CODE(code, lino, _error);
return pOperator;
_error:
pTaskInfo->code = code;
destroyStreamPartitionOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}

View File

@ -2483,27 +2483,54 @@ static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBl
}
}
int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) {
int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t* pLen) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t len = 0;
code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len);
TSDB_CHECK_CODE(code, lino, _end);
len += encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup);
*pBuff = taosMemoryCalloc(1, len);
if (!(*pBuff)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
void* buf = *pBuff;
encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup);
pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo);
return len;
int32_t tmp = 0;
code = pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo, &tmp);
TSDB_CHECK_CODE(code, lino, _end);
*pLen = len;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!pInfo->pState) {
return;
}
if (needSaveStreamOperatorInfo(&pInfo->basic)) {
void* pBuf = NULL;
int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
int32_t len = 0;
code = streamScanOperatorEncode(pInfo, &pBuf, &len);
TSDB_CHECK_CODE(code, lino, _end);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
taosMemoryFree(pBuf);
saveStreamOperatorStateComplete(&pInfo->basic);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
}
// other properties are recovered from the execution plan
@ -3101,17 +3128,35 @@ static void destroyStreamScanOperatorInfo(void* param) {
}
void streamScanReleaseState(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamScanInfo* pInfo = pOperator->info;
void* pBuff = NULL;
if (!pInfo->pState) {
return;
}
if (!pInfo->pUpdateInfo) {
return;
}
int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo);
void* pBuff = taosMemoryCalloc(1, len);
pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo);
int32_t len = 0;
code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len);
TSDB_CHECK_CODE(code, lino, _end);
pBuff = taosMemoryCalloc(1, len);
if (!pBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
int32_t tmp = 0;
code = pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo, &tmp);
TSDB_CHECK_CODE(code, lino, _end);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), pBuff, len);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
taosMemoryFree(pBuff);
}

View File

@ -68,9 +68,7 @@ void destroyStreamCountAggOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) {
return pAggSup->windowCount != pAggSup->windowSliding;
}
bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) { return pAggSup->windowCount != pAggSup->windowSliding; }
void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, SCountWindowInfo* pCurWin,
SBuffInfo* pBuffInfo) {
@ -95,7 +93,8 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
(void**)&pCurWin->winInfo.pStatePos, &size);
}
} else {
pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount);
pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin,
pAggSup->windowCount);
code = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin,
(void**)&pCurWin->winInfo.pStatePos, &size);
if (code == TSDB_CODE_FAILED) {
@ -107,8 +106,9 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
pBuffInfo->rebuildWindow = true;
}
} else {
code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(
pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size);
code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin,
pAggSup->windowCount,
(void**)&pCurWin->winInfo.pStatePos, &size);
}
if (code == TSDB_CODE_SUCCESS) {
@ -289,8 +289,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) {
code = saveResult(curWin.winInfo, pStUpdated);
if (code != TSDB_CODE_SUCCESS) {
qError("%s do stream count aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo),
tstrerror(code));
qError("%s do stream count aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
break;
}
}
@ -403,7 +402,8 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL};
setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo);
buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize);
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo,
sizeof(SResultWindowInfo));
}
// 2.twAggSup
@ -650,11 +650,13 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) {
SCountWinodwPhysiNode* pCountNode = (SCountWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamCountAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamCountAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pOperator->pTaskInfo = pTaskInfo;
@ -664,18 +666,14 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pCountNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
}
SExprSupp* pExpSup = &pOperator->exprSupp;
SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pCountNode->window.watermark,
@ -686,12 +684,11 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
};
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
&pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState,
sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
TSDB_CHECK_CODE(code, lino, _error);
pInfo->streamAggSup.windowCount = pCountNode->windowCount;
pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
@ -709,7 +706,8 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->dataVersion = 0;
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
if (!pInfo->historyWins) {
goto _error;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
@ -735,8 +733,12 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic);
TSDB_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
TSDB_CHECK_CODE(code, lino, _error);
}
return pOperator;
@ -747,6 +749,6 @@ _error:
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}

View File

@ -89,7 +89,8 @@ void setEventWindowFlag(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo
pWinInfo->pWinFlag = (SEventWinfowFlag*)pFlagInfo;
}
void setEventWindowInfo(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SRowBuffPos* pPos, SEventWindowInfo* pWinInfo) {
void setEventWindowInfo(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SRowBuffPos* pPos,
SEventWindowInfo* pWinInfo) {
pWinInfo->winInfo.sessionWin = *pKey;
pWinInfo->winInfo.pStatePos = pPos;
setEventWindowFlag(pAggSup, pWinInfo);
@ -122,7 +123,8 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI
pCurWin->winInfo.sessionWin.groupId = groupId;
pCurWin->winInfo.sessionWin.win.skey = ts;
pCurWin->winInfo.sessionWin.win.ekey = ts;
SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin);
SStreamStateCur* pCur =
pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin);
SSessionKey leftWinKey = {.groupId = groupId};
void* pVal = NULL;
int32_t len = 0;
@ -173,8 +175,8 @@ _end:
}
int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo, SSessionKey* pNextWinKey,
TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, SSHashObj* pResultRows,
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) {
TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start,
SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) {
*pRebuild = false;
if (!pWinInfo->pWinFlag->startFlag && !(starts[start])) {
return 1;
@ -244,12 +246,14 @@ static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pC
}
SEventWindowInfo nextWinInfo = {0};
getNextSessionWinInfo(pAggSup, pStUpdated, &pCurWin->winInfo, &nextWinInfo.winInfo);
if (!IS_VALID_SESSION_WIN(nextWinInfo.winInfo) || !inWinRange(&pAggSup->winRange, &nextWinInfo.winInfo.sessionWin.win)) {
if (!IS_VALID_SESSION_WIN(nextWinInfo.winInfo) ||
!inWinRange(&pAggSup->winRange, &nextWinInfo.winInfo.sessionWin.win)) {
releaseOutputBuf(pAggSup->pState, nextWinInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore);
break;
}
setEventWindowFlag(pAggSup, &nextWinInfo);
compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo, pStUpdated, pStDeleted, false);
compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo, pStUpdated,
pStDeleted, false);
pCurWin->pWinFlag->endFlag = nextWinInfo.pWinFlag->endFlag;
winNum++;
}
@ -290,7 +294,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
return;
}
SFilterColumnParam paramStart = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock};
SFilterColumnParam paramStart = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock),
.pDataBlock = pSDataBlock->pDataBlock};
code = filterSetDataFromSlotId(pInfo->pStartCondInfo, &paramStart);
if (code != TSDB_CODE_SUCCESS) {
qError("set data from start slotId error.");
@ -299,7 +304,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
int32_t statusStart = 0;
filterExecute(pInfo->pStartCondInfo, pSDataBlock, &pColStart, NULL, paramStart.numOfCols, &statusStart);
SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock};
SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock),
.pDataBlock = pSDataBlock->pDataBlock};
code = filterSetDataFromSlotId(pInfo->pEndCondInfo, &paramEnd);
if (code != TSDB_CODE_SUCCESS) {
qError("set data from end slotId error.");
@ -321,11 +327,13 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
bool allEqual = true;
SEventWindowInfo curWin = {0};
SSessionKey nextWinKey = {0};
setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin, &nextWinKey);
setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin,
&nextWinKey);
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
bool rebuild = false;
winRows = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, rows, i,
pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild);
winRows =
updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData,
rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild);
ASSERT(winRows >= 1);
if (rebuild) {
uint64_t uid = 0;
@ -333,7 +341,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
&curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL);
tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey));
doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin);
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) && !isWindowIncomplete(&curWin)) {
if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) &&
!isWindowIncomplete(&curWin)) {
saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin);
}
releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore);
@ -603,7 +612,8 @@ void streamEventReleaseState(SOperatorInfo* pOperator) {
char* pBuff = taosMemoryCalloc(1, resSize);
memcpy(pBuff, pInfo->historyWins->pData, winSize);
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
qDebug("===stream=== event window operator relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins));
qDebug("===stream=== event window operator relase state. save result count:%d",
(int32_t)taosArrayGetSize(pInfo->historyWins));
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_STATE_NAME,
strlen(STREAM_EVENT_OP_STATE_NAME), pBuff, resSize);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
@ -693,6 +703,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode;
int32_t tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamEventAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamEventAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -783,20 +794,18 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic);
TSDB_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
code = filterInitFromNode((SNode*)pEventNode->pStartCond, &pInfo->pStartCondInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
return pOperator;
@ -804,5 +813,6 @@ _error:
destroyStreamEventOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}

View File

@ -30,10 +30,14 @@
#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL)
#define IS_NORMAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_NORMAL_INTERVAL_OP(op) \
((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || \
(op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL)
#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
#define IS_NORMAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
#define IS_NORMAL_SESSION_OP(op) \
((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || \
(op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION)
#define IS_NORMAL_STATE_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE)
@ -263,14 +267,16 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
if (chIds) {
int32_t childId = getChildIndex(pBlock);
if (pInvalidWins) {
qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts,
winRes.groupId, childId);
taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0);
}
SArray* chArray = *(void**)chIds;
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
if (index != -1) {
qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId);
qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey,
winGpId, childId);
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
continue;
}
@ -488,8 +494,11 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo =
pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen);
int32_t code = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate,
pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at %d since %s", __func__, __LINE__, tstrerror(code));
}
}
pScanInfo->interval = pInfo->interval;
@ -624,8 +633,8 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
blockDataUpdateTsWindow(pBlock, 0);
}
static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins,
int32_t numOfCh, SOperatorInfo* pOperator) {
static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval,
SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData;
@ -663,7 +672,8 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina
.calWin.skey = nextWin.skey,
.calWin.ekey = nextWin.skey};
// add pull data request
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d",
winRes.ts, winRes.groupId, numOfCh);
if (IS_MID_INTERVAL_OP(pOperator)) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
taosArrayPush(pInfo->pMidPullDatas, &winRes);
@ -751,7 +761,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDat
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = groupId;
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname, false) < 0) {
if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname,
false) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
@ -829,9 +840,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
return startPos;
}
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) {
return pInfo->primaryPkIndex != -1;
}
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; }
static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
@ -860,8 +869,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
}
if (pSDataBlock->info.window.skey != tsCols[0] || pSDataBlock->info.window.ekey != tsCols[endRowId]) {
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
",maxKey %" PRId64,
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64 ",maxKey %" PRId64,
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
blockDataUpdateTsWindow(pSDataBlock, pInfo->primaryTsIndex);
@ -1755,27 +1763,37 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
}
}
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
pScanInfo->tsColIndex = tsColIndex;
}
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic);
return;
code = initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic);
return code;
}
SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState;
if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen);
code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen,
&pScanInfo->pUpdateInfo);
TSDB_CHECK_CODE(code, lino, _end);
}
pScanInfo->twAggSup = *pTwSup;
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static TSKEY sesionTs(void* pKey) {
@ -2454,8 +2472,8 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
pBlock->info.id.groupId = pKey->groupId;
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId,
&tbname, false) < 0) {
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname,
false) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
@ -3018,6 +3036,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
int32_t numOfCols = 0;
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
int32_t lino = 0;
SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -3112,8 +3131,12 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
if (downstream) {
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic);
TSDB_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
TSDB_CHECK_CODE(code, lino, _error);
}
return pOperator;
@ -3124,6 +3147,7 @@ _error:
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}
@ -3681,9 +3705,9 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato
SSessionKey key = {0};
SResultWindowInfo winfo = {0};
buf = decodeSSessionKey(buf, &key);
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL,
pAggSup->stateKeySize, compareStateKey,
(void**)&winfo.pStatePos, &pAggSup->resultRowSize);
pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL, pAggSup->stateKeySize,
compareStateKey, (void**)&winfo.pStatePos,
&pAggSup->resultRowSize);
buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize);
tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo));
}
@ -3941,12 +3965,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey);
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _error;
TSDB_CHECK_CODE(code, lino, _error);
}
pInfo->stateCol = extractColumnFromColumnNode(pColNode);
@ -3955,9 +3980,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
int32_t numOfScalar = 0;
SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar);
code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
}
pInfo->twAggSup = (STimeWindowAggSupp){
@ -3984,9 +4007,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize,
type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup,
GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pInfo->pSeDeleted = tSimpleHashInit(64, hashFn);
pInfo->pDelIterator = NULL;
@ -3999,7 +4021,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->dataVersion = 0;
pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey));
if (!pInfo->historyWins) {
goto _error;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory;
@ -4026,17 +4049,20 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic);
TSDB_CHECK_CODE(code, lino, _error);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
TSDB_CHECK_CODE(code, lino, _error);
return pOperator;
_error:
destroyStreamStateOperatorInfo(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}
@ -4203,7 +4229,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision,
};
pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
pInfo->twAggSup =
(STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
@ -4611,5 +4638,6 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
void setStreamOperatorCompleted(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo),
getStreamOpName(pOperator->operatorType), pOperator->status);
}

View File

@ -18,6 +18,7 @@
#include "tencode.h"
#include "tstreamUpdate.h"
#include "ttime.h"
#include "tutil.h"
#define DEFAULT_FALSE_POSITIVE 0.01
#define DEFAULT_BUCKET_SIZE 131072
@ -34,7 +35,8 @@
static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
int compareKeyTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) {
return compareInt64Val(pTs1, pTs2);;
return compareInt64Val(pTs1, pTs2);
;
}
int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) {
@ -66,17 +68,31 @@ int32_t getValueBuff(TSKEY ts, char* pVal, int32_t len, char* buff) {
return sizeof(TSKEY) + len;
}
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pInfo->numSBFs < count) {
count = pInfo->numSBFs;
}
for (uint64_t i = 0; i < count; ++i) {
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
taosArrayPush(pInfo->pTsSBFs, &tsSBF);
SScalableBf* tsSBF = NULL;
code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &tsSBF);
TSDB_CHECK_CODE(code, lino, _error);
void* res = taosArrayPush(pInfo->pTsSBFs, &tsSBF);
if (!res) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
}
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static void clearItemHelper(void* p) {
SScalableBf** pBf = p;
tScalableBfDestroy(*pBf);
@ -124,14 +140,19 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w
return watermark;
}
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) {
return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp, pkType, pkLen);
int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen,
SUpdateInfo** ppInfo) {
return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp, pkType, pkLen, ppInfo);
}
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) {
int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen,
SUpdateInfo** ppInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SUpdateInfo* pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
if (pInfo == NULL) {
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
pInfo->pTsBuckets = NULL;
pInfo->pTsSBFs = NULL;
@ -148,14 +169,16 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void*));
if (pInfo->pTsSBFs == NULL) {
updateInfoDestroy(pInfo);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
windowSBfAdd(pInfo, bfSize);
pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY));
if (pInfo->pTsBuckets == NULL) {
updateInfoDestroy(pInfo);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
TSKEY dumy = 0;
@ -167,31 +190,54 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
if (!pInfo->pMap) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
pInfo->maxDataVersion = 0;
pInfo->pkColLen = pkLen;
pInfo->pkColType = pkType;
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen);
if (!pInfo->pKeyBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen);
if (!pInfo->pValueBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
if (pkLen != 0) {
pInfo->comparePkRowFn = compareKeyTsAndPk;
pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);;
pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);
;
} else {
pInfo->comparePkRowFn = compareKeyTs;
pInfo->comparePkCol = NULL;
}
return pInfo;
(*ppInfo) = pInfo;
_end:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (ts <= 0) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (pInfo->minTS < 0) {
pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval);
}
int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval);
if (index < 0) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (index >= pInfo->numSBFs) {
uint64_t count = index + 1 - pInfo->numSBFs;
@ -202,10 +248,18 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) {
SScalableBf* res = taosArrayGetP(pInfo->pTsSBFs, index);
if (res == NULL) {
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
res = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &res);
TSDB_CHECK_CODE(code, lino, _end);
taosArrayPush(pInfo->pTsSBFs, &res);
}
return res;
(*ppSBf) = res;
_end:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) {
@ -215,6 +269,7 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
}
TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol) {
int32_t code = TSDB_CODE_SUCCESS;
if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN;
TSKEY maxTs = INT64_MIN;
void* pPkVal = NULL;
@ -238,13 +293,18 @@ TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t p
maxLen = colDataGetRowLength(pPkDataInfo, i);
}
}
SScalableBf *pSBf = getSBf(pInfo, ts);
SScalableBf* pSBf = NULL;
int32_t code = getSBf(pInfo, ts, &pSBf);
if (code != TSDB_CODE_SUCCESS) {
uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
if (pSBf) {
if (primaryKeyCol >= 0) {
pPkVal = colDataGetData(pPkDataInfo, i);
len = colDataGetRowLength(pPkDataInfo, i);
}
int32_t buffLen = getKeyBuff(ts, tbUid, pPkVal, len, pInfo->pKeyBuff);
// we don't care whether the data is updated or not
tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen);
}
}
@ -277,10 +337,15 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* p
return true;
}
SScalableBf *pSBf = getSBf(pInfo, ts);
SScalableBf* pSBf = NULL;
int32_t code = getSBf(pInfo, ts, &pSBf);
if (code != TSDB_CODE_SUCCESS) {
uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
}
int32_t size = taosHashGetSize(pInfo->pMap);
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1 )) {
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) ||
(pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1)) {
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
// pSBf may be a null pointer
@ -334,7 +399,11 @@ void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo) {
return;
}
int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND);
pInfo->pCloseWinSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE);
int32_t code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &pInfo->pCloseWinSBF);
if (code != TSDB_CODE_SUCCESS) {
pInfo->pCloseWinSBF = NULL;
uError("%s failed to add close window SBF since %s", __func__, tstrerror(code));
}
}
void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo) {
@ -345,62 +414,124 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) {
pInfo->pCloseWinSBF = NULL;
}
int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) {
int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (!pInfo) {
return 0;
return TSDB_CODE_SUCCESS;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tStartEncode(&encoder) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
int32_t size = taosArrayGetSize(pInfo->pTsBuckets);
if (tEncodeI32(&encoder, size) < 0) return -1;
if (tEncodeI32(&encoder, size) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < size; i++) {
TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i);
if (tEncodeI64(&encoder, *pTs) < 0) return -1;
if (tEncodeI64(&encoder, *pTs) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
}
if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs);
if (tEncodeI32(&encoder, sBfSize) < 0) return -1;
if (tEncodeI32(&encoder, sBfSize) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i);
if (tScalableBfEncode(pSBf, &encoder) < 0) return -1;
if (tScalableBfEncode(pSBf, &encoder) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
}
if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1;
if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(&encoder, pInfo->interval) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(&encoder, pInfo->watermark) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (tEncodeI64(&encoder, pInfo->minTS) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1;
if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
int32_t mapSize = taosHashGetSize(pInfo->pMap);
if (tEncodeI32(&encoder, mapSize) < 0) return -1;
if (tEncodeI32(&encoder, mapSize) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
void* pIte = NULL;
size_t keyLen = 0;
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
void* key = taosHashGetKey(pIte, &keyLen);
if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1;
if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
int32_t valueSize = taosHashGetValueSize(pIte);
if (tEncodeBinary(&encoder, (const uint8_t *)pIte, valueSize) < 0) return -1;
if (tEncodeBinary(&encoder, (const uint8_t*)pIte, valueSize) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
}
if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1;
if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) return -1;
if (tEncodeI8(&encoder, pInfo->pkColType) < 0) return -1;
if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
if (tEncodeI8(&encoder, pInfo->pkColType) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _end);
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
*pLen = tlen;
_end:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
ASSERT(pInfo);
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
@ -421,8 +552,10 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
if (tDecodeI32(&decoder, &sBfSize) < 0) return -1;
pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void*));
for (int32_t i = 0; i < sBfSize; i++) {
SScalableBf *pSBf = tScalableBfDecode(&decoder);
if (!pSBf) return -1;
SScalableBf* pSBf = NULL;
code = tScalableBfDecode(&decoder, &pSBf);
TSDB_CHECK_CODE(code, lino, _error);
taosArrayPush(pInfo->pTsSBFs, &pSBf);
}
@ -430,7 +563,12 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1;
if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1;
pInfo->pCloseWinSBF = tScalableBfDecode(&decoder);
code = tScalableBfDecode(&decoder, &pInfo->pCloseWinSBF);
if (code != TSDB_CODE_SUCCESS) {
pInfo->pCloseWinSBF = NULL;
code = TSDB_CODE_SUCCESS;
}
int32_t mapSize = 0;
if (tDecodeI32(&decoder, &mapSize) < 0) return -1;
@ -454,7 +592,8 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen);
if (pInfo->pkColLen != 0) {
pInfo->comparePkRowFn = compareKeyTsAndPk;
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);;
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);
;
} else {
pInfo->comparePkRowFn = compareKeyTs;
pInfo->comparePkCol = NULL;
@ -463,7 +602,12 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {

View File

@ -35,13 +35,17 @@ static FORCE_INLINE bool getBit(uint64_t *buf, uint64_t index) {
return buf[unitIndex] & mask;
}
SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
int32_t tBloomFilterInit(uint64_t expectedEntries, double errorRate, SBloomFilter** ppBF) {
int32_t code = 0;
int32_t lino = 0;
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
SBloomFilter* pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
if (pBF == NULL) {
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pBF->expectedEntries = expectedEntries;
pBF->errorRate = errorRate;
@ -61,9 +65,16 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) {
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
if (pBF->buffer == NULL) {
tBloomFilterDestroy(pBF);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
return pBF;
(*ppBF) = pBF;
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) {
@ -133,27 +144,56 @@ int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder *pEncoder) {
return 0;
}
SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) {
int32_t tBloomFilterDecode(SDecoder* pDecoder, SBloomFilter** ppBF) {
int32_t code = 0;
int32_t lino = 0;
SBloomFilter* pBF = taosMemoryCalloc(1, sizeof(SBloomFilter));
if (!pBF) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pBF->buffer = NULL;
if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->numBits) < 0) goto _error;
if (tDecodeU64(pDecoder, &pBF->size) < 0) goto _error;
if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->numBits) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pBF->size) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t));
for (int32_t i = 0; i < pBF->numUnits; i++) {
uint64_t* pUnits = (uint64_t*)pBF->buffer;
if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error;
if (tDecodeU64(pDecoder, pUnits + i) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
}
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error;
pBF->hashFn1 = HASH_FUNCTION_1;
pBF->hashFn2 = HASH_FUNCTION_2;
return pBF;
(*ppBF) = pBF;
return TSDB_CODE_SUCCESS;
_error:
tBloomFilterDestroy(pBF);
return NULL;
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
bool tBloomFilterIsFull(const SBloomFilter* pBF) { return pBF->size >= pBF->expectedEntries; }

View File

@ -17,6 +17,7 @@
#include "tscalablebf.h"
#include "taoserror.h"
#include "tutil.h"
#define DEFAULT_GROWTH 2
#define DEFAULT_TIGHTENING_RATIO 0.5
@ -24,52 +25,84 @@
#define SBF_INVALID -1
#define SBF_VALID 0
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate);
static int32_t tScalableBfAddFilter(SScalableBf* pSBf, uint64_t expectedEntries, double errorRate,
SBloomFilter** ppNormalBf);
SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) {
int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf** ppSBf) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
const uint32_t defaultSize = 8;
if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
SScalableBf* pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
if (pSBf == NULL) {
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS;
pSBf->status = SBF_VALID;
pSBf->numBits = 0;
pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void*));
if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) {
if (!pSBf->bfArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
SBloomFilter* pNormalBf = NULL;
code = tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
tScalableBfDestroy(pSBf);
return NULL;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->growth = DEFAULT_GROWTH;
pSBf->hashFn1 = HASH_FUNCTION_1;
pSBf->hashFn2 = HASH_FUNCTION_2;
return pSBf;
(*ppSBf) = pSBf;
return TSDB_CODE_SUCCESS;
_error:
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code;
}
int32_t tScalableBfPutNoCheck(SScalableBf* pSBf, const void* keyBuf, uint32_t len) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
int32_t size = taosArrayGetSize(pSBf->bfArray);
SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
ASSERT(pNormalBf);
if (!pNormalBf) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tBloomFilterIsFull(pNormalBf)) {
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
if (pNormalBf == NULL) {
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
pSBf->status = SBF_INVALID;
return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
}
return tBloomFilterPut(pNormalBf, keyBuf, len);
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pSBf->status == SBF_INVALID) {
return TSDB_CODE_FAILED;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len);
uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len);
@ -83,14 +116,20 @@ int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) {
SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1);
ASSERT(pNormalBf);
if (tBloomFilterIsFull(pNormalBf)) {
pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO);
if (pNormalBf == NULL) {
code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth,
pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf);
if (code != TSDB_CODE_SUCCESS) {
pSBf->status = SBF_INVALID;
return TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
}
return tBloomFilterPutHash(pNormalBf, h1, h2);
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t tScalableBfNoContain(const SScalableBf* pSBf, const void* keyBuf, uint32_t len) {
@ -108,21 +147,32 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32
return TSDB_CODE_SUCCESS;
}
static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) {
static int32_t tScalableBfAddFilter(SScalableBf* pSBf, uint64_t expectedEntries, double errorRate,
SBloomFilter** ppNormalBf) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) {
return NULL;
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate);
if (pNormalBf == NULL) {
return NULL;
}
SBloomFilter* pNormalBf = NULL;
code = tBloomFilterInit(expectedEntries, errorRate, &pNormalBf);
TSDB_CHECK_CODE(code, lino, _error);
if (taosArrayPush(pSBf->bfArray, &pNormalBf) == NULL) {
tBloomFilterDestroy(pNormalBf);
return NULL;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->numBits += pNormalBf->numBits;
return pNormalBf;
(*ppNormalBf) = pNormalBf;
_error:
if (code != TSDB_CODE_SUCCESS) {
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void tScalableBfDestroy(SScalableBf* pSBf) {
@ -153,30 +203,61 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) {
return 0;
}
SScalableBf *tScalableBfDecode(SDecoder *pDecoder) {
int32_t tScalableBfDecode(SDecoder* pDecoder, SScalableBf** ppSBf) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SScalableBf* pSBf = taosMemoryCalloc(1, sizeof(SScalableBf));
if (!pSBf) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->hashFn1 = HASH_FUNCTION_1;
pSBf->hashFn2 = HASH_FUNCTION_2;
pSBf->bfArray = NULL;
int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) goto _error;
if (size == 0) {
tScalableBfDestroy(pSBf);
return NULL;
if (tDecodeI32(pDecoder, &size) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->bfArray = taosArrayInit(size * 2, sizeof(void *));
if (size == 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
pSBf->bfArray = taosArrayInit(size * 2, POINTER_BYTES);
if (!pSBf->bfArray) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _error);
}
for (int32_t i = 0; i < size; i++) {
SBloomFilter *pBF = tBloomFilterDecode(pDecoder);
if (!pBF) goto _error;
SBloomFilter* pBF = NULL;
code = tBloomFilterDecode(pDecoder, &pBF);
TSDB_CHECK_CODE(code, lino, _error);
taosArrayPush(pSBf->bfArray, &pBF);
}
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error;
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error;
if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error;
if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error;
return pSBf;
if (tDecodeU32(pDecoder, &pSBf->growth) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
if (tDecodeI8(pDecoder, &pSBf->status) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _error);
}
(*ppSBf) = pSBf;
_error:
if (code != TSDB_CODE_SUCCESS) {
tScalableBfDestroy(pSBf);
return NULL;
uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}

View File

@ -8,12 +8,15 @@ using namespace std;
TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
int64_t ts1 = 1650803518000;
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 0));
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 1));
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, -0.1));
GTEST_ASSERT_EQ(NULL, tBloomFilterInit(0, 0.01));
SBloomFilter* pBFTmp = NULL;
GTEST_ASSERT_NE(0, tBloomFilterInit(100, 0, &pBFTmp));
GTEST_ASSERT_NE(0, tBloomFilterInit(100, 1, &pBFTmp));
GTEST_ASSERT_NE(0, tBloomFilterInit(100, -0.1, &pBFTmp));
GTEST_ASSERT_NE(0, tBloomFilterInit(0, 0.01, &pBFTmp));
SBloomFilter *pBF1 = tBloomFilterInit(100, 0.005);
SBloomFilter *pBF1 = NULL;
int32_t code = tBloomFilterInit(100, 0.005, &pBF1);
GTEST_ASSERT_EQ(0, code);
GTEST_ASSERT_EQ(pBF1->numBits, 1152);
GTEST_ASSERT_EQ(pBF1->numUnits, 1152 / 64);
int64_t count = 0;
@ -25,16 +28,19 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
}
ASSERT_TRUE(tBloomFilterIsFull(pBF1));
SBloomFilter *pBF2 = tBloomFilterInit(1000 * 10000, 0.1);
SBloomFilter* pBF2 = NULL;
GTEST_ASSERT_EQ(0, tBloomFilterInit(1000 * 10000, 0.1, &pBF2));
GTEST_ASSERT_EQ(pBF2->numBits, 47925312);
GTEST_ASSERT_EQ(pBF2->numUnits, 47925312 / 64);
SBloomFilter *pBF3 = tBloomFilterInit(10000 * 10000, 0.001);
SBloomFilter* pBF3 = NULL;
GTEST_ASSERT_EQ(0, tBloomFilterInit(10000 * 10000, 0.001, &pBF3));
GTEST_ASSERT_EQ(pBF3->numBits, 1437758784);
GTEST_ASSERT_EQ(pBF3->numUnits, 1437758784 / 64);
int64_t size = 10000;
SBloomFilter *pBF4 = tBloomFilterInit(size, 0.001);
SBloomFilter* pBF4 = NULL;
GTEST_ASSERT_EQ(0, tBloomFilterInit(size, 0.001, &pBF4));
for (int64_t i = 0; i < 1000; i++) {
int64_t ts = i + ts1;
GTEST_ASSERT_EQ(tBloomFilterPut(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);
@ -64,12 +70,14 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) {
TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
int64_t ts1 = 1650803518000;
GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 0));
GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 1));
GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, -0.1));
GTEST_ASSERT_EQ(NULL, tScalableBfInit(0, 0.01));
SScalableBf* tsSBF = NULL;
GTEST_ASSERT_NE(0, tScalableBfInit(100, 0, &tsSBF));
GTEST_ASSERT_NE(0, tScalableBfInit(100, 1, &tsSBF));
GTEST_ASSERT_NE(0, tScalableBfInit(100, -0.1, &tsSBF));
GTEST_ASSERT_NE(0, tScalableBfInit(0, 0.01, &tsSBF));
SScalableBf *pSBF1 = tScalableBfInit(100, 0.01);
SScalableBf* pSBF1 = NULL;
GTEST_ASSERT_EQ(0, tScalableBfInit(100, 0.01, &pSBF1));
GTEST_ASSERT_EQ(pSBF1->numBits, 1152);
int64_t count = 0;
int64_t index = 0;
@ -120,7 +128,8 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) {
}
int64_t size = 10000;
SScalableBf *pSBF4 = tScalableBfInit(size, 0.001);
SScalableBf* pSBF4 = NULL;
GTEST_ASSERT_EQ(0, tScalableBfInit(size, 0.001, &pSBF4));
for (int64_t i = 0; i < 1000; i++) {
int64_t ts = i + ts1;
GTEST_ASSERT_EQ(tScalableBfPut(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);