[td-13039] refactor.

This commit is contained in:
Haojun Liao 2022-03-01 13:25:24 +08:00
parent 0d566d17b5
commit 3a6eecbabf
2 changed files with 6 additions and 5 deletions

View File

@ -233,7 +233,7 @@ typedef struct STaskAttr {
SArray* pUdfInfo; // no need to free SArray* pUdfInfo; // no need to free
} STaskAttr; } STaskAttr;
typedef int32_t (*__optr_prepare_fn_t)(void* param); typedef int32_t (*__optr_open_fn_t)(void* param);
typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup); typedef SSDataBlock* (*__operator_fn_t)(void* param, bool* newgroup);
typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num);
@ -318,7 +318,7 @@ typedef struct SOperatorInfo {
struct SOperatorInfo** pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_prepare_fn_t prepareFn; __optr_open_fn_t prepareFn;
__operator_fn_t exec; __operator_fn_t exec;
__optr_cleanup_fn_t cleanupFn; __optr_cleanup_fn_t cleanupFn;
} SOperatorInfo; } SOperatorInfo;

View File

@ -29,8 +29,8 @@ typedef struct SLHashBucket {
typedef struct SLHashObj { typedef struct SLHashObj {
SDiskbasedBuf *pBuf; SDiskbasedBuf *pBuf;
_hash_fn_t hashFn; _hash_fn_t hashFn;
int32_t tuplesPerPage;
SLHashBucket **pBucket; // entry list SLHashBucket **pBucket; // entry list
int32_t tuplesPerPage;
int32_t numOfAlloc; // number of allocated bucket ptr slot int32_t numOfAlloc; // number of allocated bucket ptr slot
int32_t bits; // the number of bits used in hash int32_t bits; // the number of bits used in hash
int32_t numOfBuckets; // the number of buckets int32_t numOfBuckets; // the number of buckets
@ -142,7 +142,7 @@ static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket
pBucket->size -= 1; pBucket->size -= 1;
} }
static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) { static void doTrimBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) {
size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList); size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList);
if (numOfPages <= 1) { if (numOfPages <= 1) {
return; return;
@ -253,6 +253,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_
return NULL; return NULL;
} }
// disable compress when flushing to disk
setBufPageCompressOnDisk(pHashObj->pBuf, false); setBufPageCompressOnDisk(pHashObj->pBuf, false);
/** /**
@ -367,7 +368,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
releaseBufPage(pHashObj->pBuf, p); releaseBufPage(pHashObj->pBuf, p);
} }
doCompressBucketPages(pHashObj, pBucket); doTrimBucketPages(pHashObj, pBucket);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;