Merge pull request #25474 from taosdata/fix/TD-29749
fix(stream):opt bloom filter
This commit is contained in:
commit
740115e117
|
@ -308,6 +308,13 @@ typedef struct SUpdateInfo {
|
|||
SScalableBf* pCloseWinSBF;
|
||||
SHashObj* pMap;
|
||||
uint64_t maxDataVersion;
|
||||
int8_t pkColType;
|
||||
int32_t pkColLen;
|
||||
char* pKeyBuff;
|
||||
char* pValueBuff;
|
||||
|
||||
int (*comparePkRowFn)(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn);
|
||||
__compar_fn_t comparePkCol;
|
||||
} SUpdateInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -375,17 +382,17 @@ typedef struct SStateStore {
|
|||
void** ppVal, int32_t* pVLen);
|
||||
int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
|
||||
|
||||
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp);
|
||||
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol);
|
||||
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
|
||||
SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
|
||||
TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol);
|
||||
bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
|
||||
bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid);
|
||||
bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts);
|
||||
bool (*isIncrementalTimeStamp)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
|
||||
|
||||
void (*updateInfoDestroy)(SUpdateInfo* pInfo);
|
||||
void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count);
|
||||
void (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count);
|
||||
|
||||
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp);
|
||||
SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
|
||||
void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo);
|
||||
void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo);
|
||||
int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo);
|
||||
|
|
|
@ -25,28 +25,10 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct SUpdateKey {
|
||||
int64_t tbUid;
|
||||
TSKEY ts;
|
||||
} SUpdateKey;
|
||||
|
||||
//typedef struct SUpdateInfo {
|
||||
// SArray *pTsBuckets;
|
||||
// uint64_t numBuckets;
|
||||
// SArray *pTsSBFs;
|
||||
// uint64_t numSBFs;
|
||||
// int64_t interval;
|
||||
// int64_t watermark;
|
||||
// TSKEY minTS;
|
||||
// SScalableBf *pCloseWinSBF;
|
||||
// SHashObj *pMap;
|
||||
// uint64_t maxDataVersion;
|
||||
//} SUpdateInfo;
|
||||
|
||||
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp);
|
||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp);
|
||||
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol);
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
|
||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen);
|
||||
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol, int32_t primaryKeyCol);
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
|
||||
bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid);
|
||||
void updateInfoDestroy(SUpdateInfo *pInfo);
|
||||
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
||||
|
@ -55,7 +37,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *p
|
|||
int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo);
|
||||
void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count);
|
||||
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count);
|
||||
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -180,6 +180,13 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
|
|||
*/
|
||||
void *taosHashGetKey(void *data, size_t *keyLen);
|
||||
|
||||
/**
|
||||
* Get the corresponding value length for a given data in hash table
|
||||
* @param data
|
||||
* @return
|
||||
*/
|
||||
int32_t taosHashGetValueSize(void *data);
|
||||
|
||||
/**
|
||||
* return the payload data with the specified key(reference number added)
|
||||
*
|
||||
|
|
|
@ -325,7 +325,7 @@ static int32_t createSchemaByFields(const SArray* pFields, SSchemaWrapper* pWrap
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool hasPrimaryKey(SSchemaWrapper* pWrapper) {
|
||||
static bool hasDestPrimaryKey(SSchemaWrapper* pWrapper) {
|
||||
if (pWrapper->nCols < 2) {
|
||||
return false;
|
||||
}
|
||||
|
@ -442,7 +442,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
|
|||
pObj->outputSchema.pSchema = pFullSchema;
|
||||
}
|
||||
|
||||
bool hasKey = hasPrimaryKey(&pObj->outputSchema);
|
||||
bool hasKey = hasDestPrimaryKey(&pObj->outputSchema);
|
||||
SPlanContext cxt = {
|
||||
.pAstRoot = pAst,
|
||||
.topicQuery = false,
|
||||
|
|
|
@ -500,6 +500,8 @@ typedef struct SStreamScanInfo {
|
|||
SStoreTqReader readerFn;
|
||||
SStateStore stateStore;
|
||||
SSDataBlock* pCheckpointRes;
|
||||
int8_t pkColType;
|
||||
int32_t pkColLen;
|
||||
} SStreamScanInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -566,8 +568,13 @@ typedef struct SOpCheckPointInfo {
|
|||
SHashObj* children; // key:child id
|
||||
} SOpCheckPointInfo;
|
||||
|
||||
typedef struct SSteamOpBasicInfo {
|
||||
int32_t primaryPkIndex;
|
||||
} SSteamOpBasicInfo;
|
||||
|
||||
typedef struct SStreamIntervalOperatorInfo {
|
||||
SOptrBasicInfo binfo; // basic info
|
||||
SSteamOpBasicInfo basic;
|
||||
SAggSupporter aggSup; // aggregate supporter
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SGroupResInfo groupResInfo; // multiple results build supporter
|
||||
|
@ -633,6 +640,7 @@ typedef struct SResultWindowInfo {
|
|||
|
||||
typedef struct SStreamSessionAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SSteamOpBasicInfo basic;
|
||||
SStreamAggSupporter streamAggSup;
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SGroupResInfo groupResInfo;
|
||||
|
@ -665,6 +673,7 @@ typedef struct SStreamSessionAggOperatorInfo {
|
|||
|
||||
typedef struct SStreamStateAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SSteamOpBasicInfo basic;
|
||||
SStreamAggSupporter streamAggSup;
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SGroupResInfo groupResInfo;
|
||||
|
@ -691,6 +700,7 @@ typedef struct SStreamStateAggOperatorInfo {
|
|||
|
||||
typedef struct SStreamEventAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SSteamOpBasicInfo basic;
|
||||
SStreamAggSupporter streamAggSup;
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SGroupResInfo groupResInfo;
|
||||
|
@ -719,6 +729,7 @@ typedef struct SStreamEventAggOperatorInfo {
|
|||
|
||||
typedef struct SStreamCountAggOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SSteamOpBasicInfo basic;
|
||||
SStreamAggSupporter streamAggSup;
|
||||
SExprSupp scalarSupp; // supporter for perform scalar function
|
||||
SGroupResInfo groupResInfo;
|
||||
|
@ -742,6 +753,7 @@ typedef struct SStreamCountAggOperatorInfo {
|
|||
|
||||
typedef struct SStreamPartitionOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
SSteamOpBasicInfo basic;
|
||||
SPartitionBySupporter partitionSup;
|
||||
SExprSupp scalarSup;
|
||||
SExprSupp tbnameCalSup;
|
||||
|
@ -775,6 +787,7 @@ typedef struct SStreamFillSupporter {
|
|||
} SStreamFillSupporter;
|
||||
|
||||
typedef struct SStreamFillOperatorInfo {
|
||||
SSteamOpBasicInfo basic;
|
||||
SStreamFillSupporter* pFillSup;
|
||||
SSDataBlock* pRes;
|
||||
SSDataBlock* pSrcBlock;
|
||||
|
@ -911,7 +924,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, i
|
|||
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
|
||||
SStorageAPI* pApi, int32_t tsIndex);
|
||||
void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||
STimeWindowAggSupp* pTwSup);
|
||||
STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic);
|
||||
void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins);
|
||||
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||
void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey);
|
||||
|
@ -939,7 +952,7 @@ void compactTimeWindow(SExprSupp* pSup, SStreamAggSupporter* pAggSup, STimeW
|
|||
SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap);
|
||||
int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI);
|
||||
void resetWinRange(STimeWindow* winRange);
|
||||
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts);
|
||||
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len);
|
||||
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
|
||||
void resetUnCloseSessionWinInfo(SSHashObj* winMap);
|
||||
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
|
||||
|
|
|
@ -1267,7 +1267,7 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup
|
|||
pScanInfo->pPartScalarSup = pExpr;
|
||||
pScanInfo->pPartTbnameSup = pTbnameExpr;
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate);
|
||||
pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1380,7 +1380,7 @@ bool comparePrimaryKey(SColumnInfoData* pCol, int32_t rowId, void* pVal) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool hasPrimaryKey(SStreamScanInfo* pInfo) {
|
||||
bool hasPrimaryKeyCol(SStreamScanInfo* pInfo) {
|
||||
return pInfo->primaryKeyIndex != -1;
|
||||
}
|
||||
|
||||
|
@ -1391,7 +1391,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
|
|||
}
|
||||
|
||||
int32_t rowId = 0;
|
||||
if (hasPrimaryKey(pInfo)) {
|
||||
if (hasPrimaryKeyCol(pInfo)) {
|
||||
SColumnInfoData* pPkCol = taosArrayGet(pPreRes->pDataBlock, pInfo->primaryKeyIndex);
|
||||
for (; rowId < pPreRes->info.rows; rowId++) {
|
||||
if (comparePrimaryKey(pPkCol, rowId, pVal)) {
|
||||
|
@ -1630,7 +1630,7 @@ static void getPreVersionDataBlock(uint64_t uid, TSKEY startTs, TSKEY endTs, int
|
|||
|
||||
SColumnInfoData* pTsCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryTsIndex);
|
||||
SColumnInfoData* pPkCol = NULL;
|
||||
if (hasPrimaryKey(pInfo)) {
|
||||
if (hasPrimaryKeyCol(pInfo)) {
|
||||
pPkCol = (SColumnInfoData*)taosArrayGet(pPreRes->pDataBlock, pInfo->primaryKeyIndex);
|
||||
}
|
||||
for (int32_t i = 0; i < pPreRes->info.rows; i++) {
|
||||
|
@ -1659,7 +1659,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
|||
}
|
||||
int64_t ver = pSrcBlock->info.version - 1;
|
||||
|
||||
if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) {
|
||||
if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA) )) {
|
||||
getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
|
||||
startData = (TSKEY*)pStartTsCol->pData;
|
||||
endData = (TSKEY*)pEndTsCol->pData;
|
||||
|
@ -1682,7 +1682,7 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr
|
|||
uint64_t groupId = pSrcGp[i];
|
||||
if (groupId == 0) {
|
||||
void* pVal = NULL;
|
||||
if (hasPrimaryKey(pInfo) && pSrcPkCol) {
|
||||
if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) {
|
||||
pVal = colDataGetData(pSrcPkCol, i);
|
||||
}
|
||||
groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal);
|
||||
|
@ -1736,7 +1736,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
|
|||
}
|
||||
int64_t ver = pSrcBlock->info.version - 1;
|
||||
|
||||
if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) {
|
||||
if (pInfo->partitionSup.needCalc && ( startData[0] != endData[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA) )) {
|
||||
getPreVersionDataBlock(uidCol[0], startData[0], endData[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
|
||||
startData = (TSKEY*)pStartTsCol->pData;
|
||||
endData = (TSKEY*)pEndTsCol->pData;
|
||||
|
@ -1759,7 +1759,7 @@ static int32_t generateCountScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcB
|
|||
uint64_t groupId = pSrcGp[i];
|
||||
if (groupId == 0) {
|
||||
void* pVal = NULL;
|
||||
if (hasPrimaryKey(pInfo) && pSrcPkCol) {
|
||||
if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) {
|
||||
pVal = colDataGetData(pSrcPkCol, i);
|
||||
}
|
||||
groupId = getGroupIdByData(pInfo, uidCol[i], startData[i], ver, pVal);
|
||||
|
@ -1800,7 +1800,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
TSKEY* srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||
int64_t ver = pSrcBlock->info.version - 1;
|
||||
|
||||
if (pInfo->partitionSup.needCalc && ( srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKey(pInfo) && mode == STREAM_DELETE_DATA) )) {
|
||||
if (pInfo->partitionSup.needCalc && ( srcStartTsCol[0] != srcEndTsCol[0] || (hasPrimaryKeyCol(pInfo) && mode == STREAM_DELETE_DATA) )) {
|
||||
getPreVersionDataBlock(srcUidData[0], srcStartTsCol[0], srcEndTsCol[0], ver, GET_TASKID(pTaskInfo), pInfo, pSrcBlock);
|
||||
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
|
||||
srcEndTsCol = (TSKEY*)pSrcEndTsCol->pData;
|
||||
|
@ -1824,7 +1824,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
uint64_t groupId = srcGp[i];
|
||||
if (groupId == 0) {
|
||||
void* pVal = NULL;
|
||||
if (hasPrimaryKey(pInfo) && pSrcPkCol) {
|
||||
if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) {
|
||||
pVal = colDataGetData(pSrcPkCol, i);
|
||||
}
|
||||
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver, pVal);
|
||||
|
@ -1915,7 +1915,7 @@ static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock
|
|||
char tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
|
||||
if (groupId == 0) {
|
||||
void* pVal = NULL;
|
||||
if (hasPrimaryKey(pInfo) && pSrcPkCol) {
|
||||
if (hasPrimaryKeyCol(pInfo) && pSrcPkCol) {
|
||||
pVal = colDataGetData(pSrcPkCol, i);
|
||||
}
|
||||
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], ver, pVal);
|
||||
|
@ -1979,9 +1979,9 @@ void appendDataToSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndT
|
|||
appendOneRowToSpecialBlockImpl(pBlock, pStartTs, pEndTs, pStartTs, pEndTs, pUid, pGp, pTbName, NULL);
|
||||
}
|
||||
|
||||
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts) {
|
||||
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {
|
||||
bool isExpired = false;
|
||||
bool isInc = pAPI->isIncrementalTimeStamp(pUpdateInfo, tableId, ts);
|
||||
bool isInc = pAPI->isIncrementalTimeStamp(pUpdateInfo, tableId, ts, pPkVal, len);
|
||||
if (!isInc) {
|
||||
isExpired = isOverdue(ts, pTwSup);
|
||||
}
|
||||
|
@ -1997,7 +1997,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
|||
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* tsCol = (TSKEY*)pColDataInfo->pData;
|
||||
SColumnInfoData* pPkColDataInfo = NULL;
|
||||
if (hasPrimaryKey(pInfo)) {
|
||||
if (hasPrimaryKeyCol(pInfo)) {
|
||||
pPkColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryKeyIndex);
|
||||
}
|
||||
|
||||
|
@ -2017,7 +2017,13 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
|||
isClosed = isCloseWindow(&win, &pInfo->twAggSup);
|
||||
}
|
||||
// must check update info first.
|
||||
bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]);
|
||||
void* pPkVal = NULL;
|
||||
int32_t pkLen = 0;
|
||||
if (hasPrimaryKeyCol(pInfo)) {
|
||||
pPkVal = colDataGetData(pPkColDataInfo, rowId);
|
||||
pkLen = colDataGetRowLength(pPkColDataInfo, rowId);
|
||||
}
|
||||
bool update = pInfo->stateStore.updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId], pPkVal, pkLen);
|
||||
bool isDeleted = isClosed && isSignleIntervalWindow(pInfo) &&
|
||||
isDeletedStreamWindow(&win, pBlock->info.id.groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore);
|
||||
if ((update || isDeleted) && out) {
|
||||
|
@ -2517,7 +2523,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
if (pInfo->pRecoverRes != NULL) {
|
||||
calBlockTbName(pInfo, pInfo->pRecoverRes, 0);
|
||||
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
|
||||
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex, pInfo->primaryKeyIndex);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
}
|
||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||
|
@ -3202,8 +3208,10 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||
pInfo->updateWin = (STimeWindow){.skey = INT64_MAX, .ekey = INT64_MAX};
|
||||
pInfo->pUpdateDataRes = createSpecialDataBlock(STREAM_CLEAR);
|
||||
if (hasPrimaryKey(pInfo)) {
|
||||
if (hasPrimaryKeyCol(pInfo)) {
|
||||
addPrimaryKeyCol(pInfo->pUpdateDataRes, pkType.type, pkType.bytes);
|
||||
pInfo->pkColType = pkType.type;
|
||||
pInfo->pkColLen = pkType.bytes;
|
||||
}
|
||||
pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
|
||||
pInfo->partitionSup.needCalc = false;
|
||||
|
|
|
@ -243,7 +243,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
for (int32_t i = 0; i < rows;) {
|
||||
if (pInfo->ignoreExpiredData &&
|
||||
checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup,
|
||||
pSDataBlock->info.id.uid, startTsCols[i])) {
|
||||
pSDataBlock->info.id.uid, startTsCols[i], NULL, 0)) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
|
@ -729,7 +729,7 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState);
|
||||
|
||||
if (downstream) {
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
}
|
||||
return pOperator;
|
||||
|
|
|
@ -312,7 +312,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
blockDataEnsureCapacity(pAggSup->pScanBlock, rows);
|
||||
for (int32_t i = 0; i < rows; i += winRows) {
|
||||
if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo,
|
||||
&pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i])) {
|
||||
&pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i], NULL, 0)) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
|
@ -778,7 +778,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
|
|
@ -487,13 +487,14 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
|
|||
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
pScanInfo->pUpdateInfo =
|
||||
pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate);
|
||||
pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen);
|
||||
}
|
||||
|
||||
pScanInfo->interval = pInfo->interval;
|
||||
pScanInfo->twAggSup = pInfo->twAggSup;
|
||||
pScanInfo->pState = pInfo->pState;
|
||||
pInfo->pUpdateInfo = pScanInfo->pUpdateInfo;
|
||||
pInfo->basic.primaryPkIndex = pScanInfo->primaryKeyIndex;
|
||||
}
|
||||
|
||||
void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int32_t numOfOutput,
|
||||
|
@ -826,6 +827,10 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
|
|||
return startPos;
|
||||
}
|
||||
|
||||
bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) {
|
||||
return pInfo->primaryPkIndex != -1;
|
||||
}
|
||||
|
||||
static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId,
|
||||
SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) {
|
||||
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
|
||||
|
@ -845,6 +850,13 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
|
|||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
|
||||
void* pPkVal = NULL;
|
||||
int32_t pkLen = 0;
|
||||
SColumnInfoData* pPkColDataInfo = NULL;
|
||||
if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
|
||||
pPkColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
}
|
||||
|
||||
if (pSDataBlock->info.window.skey != tsCols[0] || pSDataBlock->info.window.ekey != tsCols[endRowId]) {
|
||||
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
|
||||
",maxKey %" PRId64,
|
||||
|
@ -868,9 +880,15 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
|
|||
}
|
||||
while (1) {
|
||||
bool isClosed = isCloseWindow(&nextWin, &pInfo->twAggSup);
|
||||
if (hasSrcPrimaryKeyCol(&pInfo->basic) && !IS_FINAL_INTERVAL_OP(pOperator) && pInfo->ignoreExpiredData &&
|
||||
pSDataBlock->info.type != STREAM_PULL_DATA) {
|
||||
pPkVal = colDataGetData(pPkColDataInfo, startPos);
|
||||
pkLen = colDataGetRowLength(pPkColDataInfo, startPos);
|
||||
}
|
||||
|
||||
if ((!IS_FINAL_INTERVAL_OP(pOperator) && pInfo->ignoreExpiredData && pSDataBlock->info.type != STREAM_PULL_DATA &&
|
||||
checkExpiredData(&pInfo->stateStore, pInfo->pUpdateInfo, &pInfo->twAggSup, pSDataBlock->info.id.uid,
|
||||
nextWin.ekey)) ||
|
||||
nextWin.ekey, pPkVal, pkLen)) ||
|
||||
!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
|
||||
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin);
|
||||
if (startPos < 0) {
|
||||
|
@ -1722,14 +1740,14 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
|
|||
}
|
||||
|
||||
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex,
|
||||
STimeWindowAggSupp* pTwSup) {
|
||||
STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic) {
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) {
|
||||
SStreamPartitionOperatorInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->tsColIndex = tsColIndex;
|
||||
}
|
||||
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup);
|
||||
initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic);
|
||||
return;
|
||||
}
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
|
@ -1737,10 +1755,11 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
|
|||
pScanInfo->pState = pAggSup->pState;
|
||||
if (!pScanInfo->pUpdateInfo) {
|
||||
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark,
|
||||
pScanInfo->igCheckUpdate);
|
||||
pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen);
|
||||
}
|
||||
pScanInfo->twAggSup = *pTwSup;
|
||||
pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo;
|
||||
pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex;
|
||||
}
|
||||
|
||||
static TSKEY sesionTs(void* pKey) {
|
||||
|
@ -2113,10 +2132,22 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|||
}
|
||||
|
||||
TSKEY* endTsCols = (int64_t*)pEndTsCol->pData;
|
||||
|
||||
void* pPkVal = NULL;
|
||||
int32_t pkLen = 0;
|
||||
SColumnInfoData* pPkColDataInfo = NULL;
|
||||
if (hasSrcPrimaryKeyCol(&pInfo->basic)) {
|
||||
pPkColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < rows;) {
|
||||
if (hasSrcPrimaryKeyCol(&pInfo->basic) && !IS_FINAL_SESSION_OP(pOperator) && pInfo->ignoreExpiredData) {
|
||||
pPkVal = colDataGetData(pPkColDataInfo, i);
|
||||
pkLen = colDataGetRowLength(pPkColDataInfo, i);
|
||||
}
|
||||
if (!IS_FINAL_SESSION_OP(pOperator) && pInfo->ignoreExpiredData &&
|
||||
checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo, &pInfo->twAggSup,
|
||||
pSDataBlock->info.id.uid, endTsCols[i])) {
|
||||
pSDataBlock->info.id.uid, endTsCols[i], pPkVal, pkLen)) {
|
||||
i++;
|
||||
continue;
|
||||
}
|
||||
|
@ -3058,7 +3089,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
||||
|
||||
if (downstream) {
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
}
|
||||
return pOperator;
|
||||
|
@ -3490,7 +3521,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
SColumnInfoData* pKeyColInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->stateCol.slotId);
|
||||
for (int32_t i = 0; i < rows; i += winRows) {
|
||||
if (pInfo->ignoreExpiredData && checkExpiredData(&pInfo->streamAggSup.stateStore, pInfo->streamAggSup.pUpdateInfo,
|
||||
&pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i]) ||
|
||||
&pInfo->twAggSup, pSDataBlock->info.id.uid, tsCols[i], NULL, 0) ||
|
||||
colDataIsNull_s(pKeyColInfo, i)) {
|
||||
i++;
|
||||
continue;
|
||||
|
@ -3957,7 +3988,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo,
|
||||
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
|
||||
setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup);
|
||||
initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tcompare.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tencode.h"
|
||||
#include "tstreamUpdate.h"
|
||||
#include "ttime.h"
|
||||
|
@ -31,6 +33,39 @@
|
|||
|
||||
static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); }
|
||||
|
||||
int compareKeyTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) {
|
||||
return compareInt64Val(pTs1, pTs2);;
|
||||
}
|
||||
|
||||
int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) {
|
||||
int res = compareInt64Val(pValue1, pTs);
|
||||
if (res != 0) {
|
||||
return res;
|
||||
} else {
|
||||
void* pk1 = (char*)pValue1 + sizeof(TSKEY);
|
||||
return cmpPkFn(pk1, pPkVal);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t getKeyBuff(TSKEY ts, int64_t tbUid, void* pVal, int32_t len, char* buff) {
|
||||
*(TSKEY*)buff = ts;
|
||||
memcpy(buff+ sizeof(TSKEY), &tbUid, sizeof(int64_t));
|
||||
if (len == 0) {
|
||||
return sizeof(TSKEY) + sizeof(int64_t);
|
||||
}
|
||||
memcpy(buff, pVal, len);
|
||||
return sizeof(TSKEY) + sizeof(int64_t) + len;
|
||||
}
|
||||
|
||||
int32_t getValueBuff(TSKEY ts, char* pVal, int32_t len, char* buff) {
|
||||
*(TSKEY*)buff = ts;
|
||||
if (len == 0) {
|
||||
return sizeof(TSKEY);
|
||||
}
|
||||
memcpy(buff + sizeof(TSKEY), pVal, len);
|
||||
return sizeof(TSKEY) + len;
|
||||
}
|
||||
|
||||
void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) {
|
||||
if (pInfo->numSBFs < count) {
|
||||
count = pInfo->numSBFs;
|
||||
|
@ -89,11 +124,11 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w
|
|||
return watermark;
|
||||
}
|
||||
|
||||
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp) {
|
||||
return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp);
|
||||
SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) {
|
||||
return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp, pkType, pkLen);
|
||||
}
|
||||
|
||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp) {
|
||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) {
|
||||
SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo));
|
||||
if (pInfo == NULL) {
|
||||
return NULL;
|
||||
|
@ -133,6 +168,17 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
|
||||
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
|
||||
pInfo->maxDataVersion = 0;
|
||||
pInfo->pkColLen = pkLen;
|
||||
pInfo->pkColType = pkType;
|
||||
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen);
|
||||
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen);
|
||||
if (pkLen != 0) {
|
||||
pInfo->comparePkRowFn = compareKeyTsAndPk;
|
||||
pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);;
|
||||
} else {
|
||||
pInfo->comparePkRowFn = compareKeyTs;
|
||||
pInfo->comparePkCol = NULL;
|
||||
}
|
||||
return pInfo;
|
||||
}
|
||||
|
||||
|
@ -168,47 +214,60 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) {
|
|||
return false;
|
||||
}
|
||||
|
||||
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) {
|
||||
TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol, int32_t primaryKeyCol) {
|
||||
if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN;
|
||||
TSKEY maxTs = INT64_MIN;
|
||||
void* pPkVal = NULL;
|
||||
void* pMaxPkVal = NULL;
|
||||
int32_t maxLen = 0;
|
||||
int32_t len = 0;
|
||||
int64_t tbUid = pBlock->info.id.uid;
|
||||
|
||||
SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol);
|
||||
SColumnInfoData *pPkDataInfo = NULL;
|
||||
if (primaryKeyCol >= 0) {
|
||||
pPkDataInfo = taosArrayGet(pBlock->pDataBlock, primaryKeyCol);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
TSKEY ts = ((TSKEY *)pColDataInfo->pData)[i];
|
||||
maxTs = TMAX(maxTs, ts);
|
||||
if (maxTs < ts) {
|
||||
maxTs = ts;
|
||||
if (primaryKeyCol >= 0) {
|
||||
pMaxPkVal = colDataGetData(pPkDataInfo, i);
|
||||
maxLen = colDataGetRowLength(pPkDataInfo, i);
|
||||
}
|
||||
}
|
||||
SScalableBf *pSBf = getSBf(pInfo, ts);
|
||||
if (pSBf) {
|
||||
SUpdateKey updateKey = {
|
||||
.tbUid = tbUid,
|
||||
.ts = ts,
|
||||
};
|
||||
tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
|
||||
if (primaryKeyCol >= 0) {
|
||||
pPkVal = colDataGetData(pPkDataInfo, i);
|
||||
len = colDataGetRowLength(pPkDataInfo, i);
|
||||
}
|
||||
int32_t buffLen = getKeyBuff(ts, tbUid, pPkVal, len, pInfo->pKeyBuff);
|
||||
tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen);
|
||||
}
|
||||
}
|
||||
TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
|
||||
if (pMaxTs == NULL || *pMaxTs > maxTs) {
|
||||
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY));
|
||||
void *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t));
|
||||
if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol) == -1) {
|
||||
int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff);
|
||||
taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen);
|
||||
}
|
||||
return maxTs;
|
||||
}
|
||||
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {
|
||||
int32_t res = TSDB_CODE_FAILED;
|
||||
int32_t buffLen = 0;
|
||||
|
||||
SUpdateKey updateKey = {
|
||||
.tbUid = tableId,
|
||||
.ts = ts,
|
||||
};
|
||||
|
||||
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||
buffLen = getKeyBuff(ts, tableId, pPkVal, len, pInfo->pKeyBuff);
|
||||
void* *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||
uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets;
|
||||
TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index);
|
||||
if (ts < maxTs - pInfo->watermark) {
|
||||
// this window has been closed.
|
||||
if (pInfo->pCloseWinSBF) {
|
||||
res = tScalableBfPut(pInfo->pCloseWinSBF, &updateKey, sizeof(SUpdateKey));
|
||||
res = tScalableBfPut(pInfo->pCloseWinSBF, pInfo->pKeyBuff, buffLen);
|
||||
if (res == TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
} else {
|
||||
|
@ -221,18 +280,19 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
|||
SScalableBf *pSBf = getSBf(pInfo, ts);
|
||||
|
||||
int32_t size = taosHashGetSize(pInfo->pMap);
|
||||
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && *pMapMaxTs < ts)) {
|
||||
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||
if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1 )) {
|
||||
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
|
||||
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
|
||||
// pSBf may be a null pointer
|
||||
if (pSBf) {
|
||||
res = tScalableBfPutNoCheck(pSBf, &updateKey, sizeof(SUpdateKey));
|
||||
res = tScalableBfPutNoCheck(pSBf, pInfo->pKeyBuff, buffLen);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// pSBf may be a null pointer
|
||||
if (pSBf) {
|
||||
res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey));
|
||||
res = tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen);
|
||||
}
|
||||
|
||||
if (!pMapMaxTs && maxTs < ts) {
|
||||
|
@ -262,6 +322,8 @@ void updateInfoDestroy(SUpdateInfo *pInfo) {
|
|||
}
|
||||
|
||||
taosArrayDestroy(pInfo->pTsSBFs);
|
||||
taosMemoryFreeClear(pInfo->pKeyBuff);
|
||||
taosMemoryFreeClear(pInfo->pValueBuff);
|
||||
taosHashCleanup(pInfo->pMap);
|
||||
updateInfoDestoryColseWinSBF(pInfo);
|
||||
taosMemoryFree(pInfo);
|
||||
|
@ -322,11 +384,15 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo)
|
|||
while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) {
|
||||
void *key = taosHashGetKey(pIte, &keyLen);
|
||||
if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1;
|
||||
if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1;
|
||||
int32_t valueSize = taosHashGetValueSize(pIte);
|
||||
if (tEncodeBinary(&encoder, (const uint8_t *)pIte, valueSize) < 0) return -1;
|
||||
}
|
||||
|
||||
if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1;
|
||||
|
||||
if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pInfo->pkColType) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -371,28 +437,43 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT);
|
||||
pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK);
|
||||
uint64_t uid = 0;
|
||||
ts = INT64_MIN;
|
||||
void* pVal = NULL;
|
||||
int32_t valSize = 0;
|
||||
for (int32_t i = 0; i < mapSize; i++) {
|
||||
if (tDecodeU64(&decoder, &uid) < 0) return -1;
|
||||
if (tDecodeI64(&decoder, &ts) < 0) return -1;
|
||||
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||
if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1;
|
||||
taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize);
|
||||
}
|
||||
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
|
||||
if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1;
|
||||
|
||||
if (tDecodeI32(&decoder, &pInfo->pkColLen) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pInfo->pkColType) < 0) return -1;
|
||||
|
||||
pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen);
|
||||
pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen);
|
||||
if (pInfo->pkColLen != 0) {
|
||||
pInfo->comparePkRowFn = compareKeyTsAndPk;
|
||||
pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);;
|
||||
} else {
|
||||
pInfo->comparePkRowFn = compareKeyTs;
|
||||
pInfo->comparePkCol = NULL;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) {
|
||||
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||
bool res = true;
|
||||
if (pMapMaxTs && ts < *pMapMaxTs) {
|
||||
if (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == 1) {
|
||||
res = false;
|
||||
} else {
|
||||
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||
int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff);
|
||||
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
|
|
@ -719,6 +719,11 @@ void *taosHashGetKey(void *data, size_t *keyLen) {
|
|||
return GET_HASH_NODE_KEY(node);
|
||||
}
|
||||
|
||||
int32_t taosHashGetValueSize(void *data) {
|
||||
SHashNode *node = GET_HASH_PNODE(data);
|
||||
return node->dataLen;
|
||||
}
|
||||
|
||||
// release the pNode, return next pNode, and lock the current entry
|
||||
static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
|
||||
SHashNode *pOld = (SHashNode *)GET_HASH_PNODE(p);
|
||||
|
|
Loading…
Reference in New Issue