diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0409decaf8..dbca38526a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1730,13 +1730,14 @@ typedef struct { char name[TSDB_STREAM_FNAME_LEN]; char sourceDB[TSDB_DB_FNAME_LEN]; char targetStbFullName[TSDB_TABLE_FNAME_LEN]; - int8_t igExists; char* sql; char* ast; + int8_t igExists; int8_t triggerType; + int8_t igExpired; + int8_t fillHistory; // process data inserted before creating stream int64_t maxDelay; int64_t watermark; - int8_t igExpired; int32_t numOfTags; SArray* pTags; // array of SField } SCMCreateStreamReq; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bdc12f7e3f..2ab0dc828e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -36,6 +36,7 @@ typedef struct SStreamTask SStreamTask; enum { STREAM_STATUS__NORMAL = 0, STREAM_STATUS__STOP, + STREAM_STATUS__INIT, STREAM_STATUS__FAILED, STREAM_STATUS__RECOVER, }; @@ -291,6 +292,9 @@ typedef struct SStreamTask { int64_t recoverSnapVer; int64_t startVer; + // fill history + int8_t fillHistory; + // children info SArray* childEpInfo; // SArray int32_t nextCheckId; @@ -534,7 +538,7 @@ typedef struct SStreamMeta { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc); void streamMetaClose(SStreamMeta* streamMeta); -int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); +// int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index c186430f3f..1c490852f9 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -17,6 +17,7 @@ #include "taosdef.h" #include "tarray.h" +#include "tcommon.h" #include "tmsg.h" #include "tscalablebf.h" @@ -24,6 +25,11 @@ extern "C" { #endif +typedef struct SUpdateKey { + int64_t tbUid; + TSKEY ts; +} SUpdateKey; + typedef struct SUpdateInfo { SArray *pTsBuckets; uint64_t numBuckets; @@ -41,6 +47,7 @@ typedef struct SUpdateInfo { SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); +void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); diff --git a/include/util/thash.h b/include/util/thash.h index 2be5f4a047..08caad495d 100644 --- a/include/util/thash.h +++ b/include/util/thash.h @@ -50,6 +50,9 @@ uint64_t MurmurHash3_64(const char *key, uint32_t len); uint32_t taosIntHash_32(const char *key, uint32_t len); uint32_t taosIntHash_64(const char *key, uint32_t len); +uint32_t taosFastHash(const char *key, uint32_t len); +uint32_t taosDJB2Hash(const char *key, uint32_t len); + _hash_fn_t taosGetDefaultHashFunction(int32_t type); _equal_fn_t taosGetDefaultEqualFunction(int32_t type); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 2ee732e797..74a92c9fcd 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -613,6 +613,7 @@ typedef struct { // config int8_t igExpired; int8_t trigger; + int8_t fillHistory; int64_t triggerParam; int64_t watermark; // source and target diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 462b068a73..143131bac8 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -18,6 +18,7 @@ #include "mndConsumer.h" int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; @@ -31,6 +32,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1; if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1; if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1; @@ -74,10 +76,12 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1; + tEndEncode(pEncoder); return pEncoder->pos; } int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { + if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; @@ -91,6 +95,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1; @@ -134,6 +139,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1; + tEndDecode(pDecoder); return 0; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 1c8ec0a302..6c54d41818 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -239,6 +239,7 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(tasks, pTask); pTask->nodeId = pVgroup->vgId; @@ -270,6 +271,7 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + pTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(tasks, pTask); ASSERT(pStream->fixedSinkVg.vgId == pStream->fixedSinkVgId); @@ -356,6 +358,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { qDestroyQueryPlan(pPlan); return -1; } + pInnerTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); @@ -422,6 +425,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { qDestroyQueryPlan(pPlan); return -1; } + pTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(taskSourceLevel, pTask); pTask->triggerParam = 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ce6b3b0656..3ff989e670 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -143,8 +143,10 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { SDecoder decoder; tDecoderInit(&decoder, buf, tlen + 1); if (tDecodeSStreamObj(&decoder, pStream) < 0) { + tDecoderClear(&decoder); goto STREAM_DECODE_OVER; } + tDecoderClear(&decoder); terrno = TSDB_CODE_SUCCESS; @@ -280,6 +282,7 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->trigger = pCreate->triggerType; pObj->triggerParam = pCreate->maxDelay; pObj->watermark = pCreate->watermark; + pObj->fillHistory = pCreate->fillHistory; memcpy(pObj->sourceDb, pCreate->sourceDB, TSDB_DB_FNAME_LEN); SDbObj *pSourceDb = mndAcquireDb(pMnode, pCreate->sourceDB); @@ -686,7 +689,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); goto _OVER; } - mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); // hack way + mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); // create stb for stream diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1d0960dc93..20ca8b7112 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -162,6 +162,8 @@ typedef struct { SQueryTableDataCond tableCond; int64_t recoverStartVer; int64_t recoverEndVer; + int64_t fillHistoryVer1; + int64_t fillHistoryVer2; SStreamState* pState; } SStreamTaskInfo; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f703ebb355..1d99a32cf9 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -569,8 +569,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc return TSDB_CODE_SUCCESS; } - pResult->info.groupId = pSrcBlock->info.groupId; - memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); + if (pResult != pSrcBlock) { + pResult->info.groupId = pSrcBlock->info.groupId; + memcpy(pResult->info.parTbName, pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN); + } // if the source equals to the destination, it is to create a new column as the result of scalar // function or some operators. diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index cdc59deee1..fa71e8efa6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -284,7 +284,6 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, return true; } - static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup; @@ -743,7 +742,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; - pInfo->pColMatchInfo = extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); + pInfo->pColMatchInfo = + extractColMatchInfo(pTableScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID); int32_t code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { @@ -1713,9 +1713,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } #endif +#if 1 if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); + pTSInfo->cond.startVersion = -1; + pTSInfo->cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1; pTSInfo->scanTimes = 0; pTSInfo->currentGroupId = -1; pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN; @@ -1724,12 +1727,14 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) { SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp); if (pBlock != NULL) { + calBlockTbName(&pInfo->tbnameCalSup, pBlock); + updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex); return pBlock; } - // TODO fill in bloom filter pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; return NULL; } +#endif size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor @@ -2115,7 +2120,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL); return pOperator; - _end: +_end: taosMemoryFree(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = code; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 823b457f89..13ec8505fd 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3149,6 +3149,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SOperatorInfo* downstream = pOperator->pDownstream[0]; TSKEY maxTs = INT64_MIN; @@ -3191,6 +3192,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else { deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, &pInfo->delKey); + streamStateCommit(pTaskInfo->streamInfo.pState); } return NULL; } else { @@ -5387,6 +5389,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, &pInfo->delKey); doSetOperatorCompleted(pOperator); + streamStateCommit(pTaskInfo->streamInfo.pState); return NULL; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 08defcd671..df28e4a62d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -94,7 +94,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); if (tDecodeSStreamTask(&decoder, pTask) < 0) { - ASSERT(0); + tDecoderClear(&decoder); goto FAIL; } tDecoderClear(&decoder); @@ -113,6 +113,13 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* ASSERT(0); goto FAIL; } + + if (pTask->fillHistory) { + // pipeline exec + // if finished, dispatch a stream-prepare-finished msg to downstream task + // set status normal + } + return 0; FAIL: @@ -120,6 +127,7 @@ FAIL: return -1; } +#if 0 int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { void* buf = NULL; if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { @@ -149,6 +157,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { return 0; } +#endif SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 5304938195..f7252ed8a0 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -49,7 +49,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { } int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - /*if (tStartEncode(pEncoder) < 0) return -1;*/ + if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1; @@ -64,6 +64,10 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->recoverSnapVer) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->startVer) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->fillHistory) < 0) return -1; + int32_t epSz = taosArrayGetSize(pTask->childEpInfo); if (tEncodeI32(pEncoder, epSz) < 0) return -1; for (int32_t i = 0; i < epSz; i++) { @@ -93,12 +97,12 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { } if (tEncodeI64(pEncoder, pTask->triggerParam) < 0) return -1; - /*tEndEncode(pEncoder);*/ + tEndEncode(pEncoder); return pEncoder->pos; } int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { - /*if (tStartDecode(pDecoder) < 0) return -1;*/ + if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1; @@ -113,6 +117,10 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->recoverSnapVer) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->startVer) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->fillHistory) < 0) return -1; + int32_t epSz; if (tDecodeI32(pDecoder, &epSz) < 0) return -1; pTask->childEpInfo = taosArrayInit(epSz, sizeof(void*)); @@ -150,7 +158,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { } if (tDecodeI64(pDecoder, &pTask->triggerParam) < 0) return -1; - /*tEndDecode(pDecoder);*/ + tEndDecode(pDecoder); return 0; } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 5aadb599a3..80410568e5 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -14,6 +14,7 @@ */ #include "query.h" +#include "tdatablock.h" #include "tencode.h" #include "tstreamUpdate.h" #include "ttime.h" @@ -162,15 +163,42 @@ bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { return false; } +void updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol) { + if (pBlock == NULL || pBlock->info.rows == 0) return; + TSKEY maxTs = -1; + int64_t tbUid = pBlock->info.uid; + + SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol); + + for (int32_t i = 0; i < pBlock->info.rows; i++) { + TSKEY ts = ((TSKEY *)pColDataInfo->pData)[i]; + maxTs = TMAX(maxTs, ts); + SScalableBf *pSBf = getSBf(pInfo, ts); + if (pSBf) { + tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); + } + } + TSKEY *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); + if (pMaxTs == NULL || *pMaxTs > tbUid) { + taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), &maxTs, sizeof(TSKEY)); + } +} + bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { - int32_t res = TSDB_CODE_FAILED; + int32_t res = TSDB_CODE_FAILED; + + SUpdateKey updateKey = { + .tbUid = tableId, + .ts = ts, + }; + TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); if (ts < maxTs - pInfo->watermark) { // this window has been closed. if (pInfo->pCloseWinSBF) { - res = tScalableBfPut(pInfo->pCloseWinSBF, &ts, sizeof(TSKEY)); + res = tScalableBfPut(pInfo->pCloseWinSBF, &updateKey, sizeof(SUpdateKey)); if (res == TSDB_CODE_SUCCESS) { return false; } else { @@ -183,7 +211,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { SScalableBf *pSBf = getSBf(pInfo, ts); // pSBf may be a null pointer if (pSBf) { - res = tScalableBfPut(pSBf, &ts, sizeof(TSKEY)); + res = tScalableBfPut(pSBf, &updateKey, sizeof(SUpdateKey)); } int32_t size = taosHashGetSize(pInfo->pMap); diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index f3ccbb0aac..b9c96dd606 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -57,8 +57,10 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) { // ln(2) = 0.693147180559945 pBF->hashFunctions = (uint32_t)ceil(lnRate / 0.693147180559945); - pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP); - pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR); + /*pBF->hashFn1 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);*/ + /*pBF->hashFn2 = taosGetDefaultHashFunction(TSDB_DATA_TYPE_NCHAR);*/ + pBF->hashFn1 = taosFastHash; + pBF->hashFn2 = taosDJB2Hash; pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); if (pBF->buffer == NULL) { tBloomFilterDestroy(pBF); @@ -144,4 +146,4 @@ _error: return NULL; } -bool tBloomFilterIsFull(const SBloomFilter *pBF) { return pBF->size >= pBF->expectedEntries; } \ No newline at end of file +bool tBloomFilterIsFull(const SBloomFilter *pBF) { return pBF->size >= pBF->expectedEntries; } diff --git a/source/util/src/thashutil.c b/source/util/src/thashutil.c index e646e9aa36..59f7d389c2 100644 --- a/source/util/src/thashutil.c +++ b/source/util/src/thashutil.c @@ -32,6 +32,23 @@ (h) ^= (h) >> 16; \ } while (0) +uint32_t taosFastHash(const char *key, uint32_t len) { + uint32_t result = 0x55555555; + for (uint32_t i = 0; i < len; i++) { + result ^= (uint8_t)key[i]; + result = ROTL32(result, 5); + } + return result; +} + +uint32_t taosDJB2Hash(const char *key, uint32_t len) { + uint32_t hash = 5381; + for (uint32_t i = 0; i < len; i++) { + hash = ((hash << 5) + hash) + (uint8_t)key[i]; /* hash * 33 + c */ + } + return hash; +} + uint32_t MurmurHash3_32(const char *key, uint32_t len) { const uint8_t *data = (const uint8_t *)key; const int32_t nblocks = len >> 2u;