[td-13039] refactor.
This commit is contained in:
parent
da48a6522b
commit
03629aabaf
|
@ -44,6 +44,7 @@ extern "C" {
|
|||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/utsname.h>
|
||||
#include <sys/param.h>
|
||||
#include <unistd.h>
|
||||
#include <wchar.h>
|
||||
#include <wctype.h>
|
||||
|
|
|
@ -28,11 +28,6 @@ typedef int32_t (*_equal_fn_t)(const void *, const void *, size_t len);
|
|||
typedef void (*_hash_before_fn_t)(void *);
|
||||
typedef void (*_hash_free_fn_t)(void *);
|
||||
|
||||
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
|
||||
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
|
||||
|
||||
#define HASH_INDEX(v, c) ((v) & ((c)-1))
|
||||
|
||||
#define HASH_NODE_EXIST(code) (code == -2)
|
||||
|
||||
/**
|
||||
|
@ -62,41 +57,17 @@ typedef struct SHashNode {
|
|||
uint32_t hashVal; // the hash value of key
|
||||
uint32_t dataLen; // length of data
|
||||
uint32_t keyLen; // length of the key
|
||||
uint16_t count; // reference count
|
||||
uint16_t refCount; // reference count
|
||||
int8_t removed; // flag to indicate removed
|
||||
char data[];
|
||||
} SHashNode;
|
||||
|
||||
#define GET_HASH_NODE_KEY(_n) ((char *)(_n) + sizeof(SHashNode) + (_n)->dataLen)
|
||||
#define GET_HASH_NODE_DATA(_n) ((char *)(_n) + sizeof(SHashNode))
|
||||
#define GET_HASH_PNODE(_n) ((SHashNode *)((char *)(_n) - sizeof(SHashNode)))
|
||||
|
||||
typedef enum SHashLockTypeE {
|
||||
HASH_NO_LOCK = 0,
|
||||
HASH_ENTRY_LOCK = 1,
|
||||
} SHashLockTypeE;
|
||||
|
||||
typedef struct SHashEntry {
|
||||
int32_t num; // number of elements in current entry
|
||||
SRWLatch latch; // entry latch
|
||||
SHashNode *next;
|
||||
} SHashEntry;
|
||||
|
||||
typedef struct SHashObj {
|
||||
SHashEntry **hashList;
|
||||
uint32_t capacity; // number of slots
|
||||
uint32_t size; // number of elements in hash table
|
||||
|
||||
_hash_fn_t hashFp; // hash function
|
||||
_hash_free_fn_t freeFp; // hash node free callback function
|
||||
_equal_fn_t equalFp; // equal function
|
||||
_hash_before_fn_t callbackFp; // function invoked before return the value to caller
|
||||
|
||||
SRWLatch lock; // read-write spin lock
|
||||
SHashLockTypeE type; // lock type
|
||||
bool enableUpdate; // enable update
|
||||
SArray *pMemBlock; // memory block allocated for SHashEntry
|
||||
} SHashObj;
|
||||
typedef struct SHashObj SHashObj;
|
||||
|
||||
/**
|
||||
* init the hash table
|
||||
|
@ -126,8 +97,6 @@ int32_t taosHashGetSize(const SHashObj *pHashObj);
|
|||
*/
|
||||
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size);
|
||||
|
||||
int32_t taosHashPutExt(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size, bool *newAdded);
|
||||
|
||||
/**
|
||||
* return the payload data with the specified key
|
||||
*
|
||||
|
@ -146,17 +115,18 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
|
|||
* @param destBuf
|
||||
* @return
|
||||
*/
|
||||
void *taosHashGetClone(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf);
|
||||
int32_t taosHashGetDup(SHashObj *pHashObj, const void *key, size_t keyLen, void *destBuf);
|
||||
|
||||
/**
|
||||
* Clone the result to interval allocated buffer
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param key
|
||||
* @param keyLen
|
||||
* @param destBuf
|
||||
* @param size
|
||||
* @return
|
||||
*/
|
||||
void *taosHashGetCloneExt(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *), void **d, size_t *sz);
|
||||
int32_t taosHashGetDup_m(SHashObj* pHashObj, const void* key, size_t keyLen, void** destBuf, int32_t* size);
|
||||
|
||||
/**
|
||||
* remove item with the specified key
|
||||
|
@ -210,34 +180,10 @@ void taosHashCancelIterate(SHashObj *pHashObj, void *p);
|
|||
/**
|
||||
* Get the corresponding key information for a given data in hash table
|
||||
* @param data
|
||||
* @param keyLen
|
||||
* @return
|
||||
*/
|
||||
int32_t taosHashGetKey(void *data, void **key, size_t *keyLen);
|
||||
|
||||
/**
|
||||
* Get the corresponding key information for a given data in hash table, using memcpy
|
||||
* @param data
|
||||
* @param dst
|
||||
* @return
|
||||
*/
|
||||
static FORCE_INLINE int32_t taosHashCopyKey(void *data, void *dst) {
|
||||
if (NULL == data || NULL == dst) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SHashNode *node = GET_HASH_PNODE(data);
|
||||
void *key = GET_HASH_NODE_KEY(node);
|
||||
memcpy(dst, key, node->keyLen);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the corresponding data length for a given data in hash table
|
||||
* @param data
|
||||
* @return
|
||||
*/
|
||||
int32_t taosHashGetDataLen(void *data);
|
||||
void *taosHashGetKey(void *data, size_t* keyLen);
|
||||
|
||||
/**
|
||||
* return the payload data with the specified key(reference number added)
|
||||
|
@ -258,8 +204,20 @@ void *taosHashAcquire(SHashObj *pHashObj, const void *key, size_t keyLen);
|
|||
*/
|
||||
void taosHashRelease(SHashObj *pHashObj, void *p);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param fp
|
||||
*/
|
||||
void taosHashSetEqualFp(SHashObj *pHashObj, _equal_fn_t fp);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pHashObj
|
||||
* @param fp
|
||||
*/
|
||||
void taosHashSetFreeFp(SHashObj *pHashObj, _hash_free_fn_t fp);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -53,7 +53,7 @@ typedef struct SDiskbasedBufStatis {
|
|||
* @param handle
|
||||
* @return
|
||||
*/
|
||||
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
|
||||
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, const char* dir);
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
|
@ -482,7 +482,8 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
|
|||
free(pAppHbMgr);
|
||||
return NULL;
|
||||
}
|
||||
pAppHbMgr->activeInfo->freeFp = tFreeClientHbReq;
|
||||
|
||||
taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
|
||||
// init getInfoFunc
|
||||
pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
|
|||
int32_t refCount = 0;
|
||||
|
||||
taosRLockLatch(&pMgmt->latch);
|
||||
taosHashGetClone(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||
taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||
if (pVnode == NULL) {
|
||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||
} else {
|
||||
|
|
|
@ -162,8 +162,7 @@ void ctgDbgShowDBCache(SHashObj *dbHash) {
|
|||
size_t len = 0;
|
||||
|
||||
dbCache = (SCtgDBCache *)pIter;
|
||||
|
||||
taosHashGetKey(dbCache, (void **)&dbFName, &len);
|
||||
dbFName = taosHashGetKey(dbCache, &len);
|
||||
|
||||
CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId);
|
||||
|
||||
|
@ -532,9 +531,9 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
size_t sz = 0;
|
||||
int32_t sz = 0;
|
||||
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||
STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz);
|
||||
int32_t code = taosHashGetDup_m(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), (void **)pTableMeta, &sz);
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||
|
||||
if (NULL == *pTableMeta) {
|
||||
|
@ -545,8 +544,7 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
|
|||
}
|
||||
|
||||
*exist = 1;
|
||||
|
||||
tbMeta = *pTableMeta;
|
||||
STableMeta* tbMeta = *pTableMeta;
|
||||
|
||||
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
|
@ -1110,7 +1108,7 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
|
|||
void *pIter = taosHashIterate(cache->stbCache, NULL);
|
||||
while (pIter) {
|
||||
uint64_t *suid = NULL;
|
||||
taosHashGetKey(pIter, (void **)&suid, NULL);
|
||||
suid = taosHashGetKey(pIter, NULL);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionCompare)) {
|
||||
ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
|
||||
|
@ -1305,7 +1303,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
|
|||
if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) {
|
||||
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
|
||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||
ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid);
|
||||
ctgError("taosHashPut stable to stable cache failed, suid:%"PRIx64, meta->suid);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
|
@ -1343,7 +1341,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
|
|||
int32_t *vgId = NULL;
|
||||
void *pIter = taosHashIterate(src->vgHash, NULL);
|
||||
while (pIter) {
|
||||
taosHashGetKey(pIter, (void **)&vgId, NULL);
|
||||
vgId = taosHashGetKey(pIter, NULL);
|
||||
|
||||
if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) {
|
||||
qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize);
|
||||
|
@ -2296,7 +2294,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
|
|||
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
|
||||
} else {
|
||||
int32_t vgId = tbMeta->vgId;
|
||||
if (NULL == taosHashGetClone(vgHash, &vgId, sizeof(vgId), &vgroupInfo)) {
|
||||
if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) {
|
||||
ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
|
|
@ -4619,7 +4619,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
|
|||
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize);
|
||||
|
||||
int32_t TENMB = 1024*1024*10;
|
||||
int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, pQInfo->qId, "/tmp");
|
||||
int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, "", "/tmp");
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -255,7 +255,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
|
|||
|
||||
resetSlotInfo(pBucket);
|
||||
|
||||
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, 1, "/tmp");
|
||||
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", "/tmp");
|
||||
if (ret != 0) {
|
||||
tMemBucketDestroy(pBucket);
|
||||
return NULL;
|
||||
|
|
|
@ -153,7 +153,7 @@ static int32_t buildOutput(SInsertParseContext* pCxt) {
|
|||
if (NULL == dst) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
taosHashGetClone(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
|
||||
taosHashGetDup(pCxt->pVgroupsHashObj, (const char*)&src->vgId, sizeof(src->vgId), &dst->vg);
|
||||
dst->numOfTables = src->numOfTables;
|
||||
dst->size = src->size;
|
||||
TSWAP(dst->pData, src->pData, char*);
|
||||
|
|
|
@ -305,8 +305,9 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// TODO remove it
|
||||
SCacheDataNode *ptNode = NULL;
|
||||
taosHashGetClone(pCacheObj->pHashTable, key, keyLen, &ptNode);
|
||||
ptNode = taosHashAcquire(pCacheObj->pHashTable, key, keyLen);
|
||||
// taosHashGetClone(pCacheObj->pHashTable, key, keyLen, incRefFn, &ptNode);
|
||||
|
||||
void *pData = (ptNode != NULL) ? ptNode->data : NULL;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -42,7 +42,7 @@ struct SDiskbasedBuf {
|
|||
bool comp; // compressed before flushed to disk
|
||||
uint64_t nextPos; // next page flush position
|
||||
|
||||
uint64_t qId; // for debug purpose
|
||||
char* id; // for debug purpose
|
||||
bool printStatis; // Print statistics info when closing this buffer.
|
||||
SDiskbasedBufStatis statis;
|
||||
};
|
||||
|
@ -356,7 +356,7 @@ static SPageInfo* getPageInfoFromPayload(void* page) {
|
|||
return ppi;
|
||||
}
|
||||
|
||||
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId,
|
||||
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
|
||||
const char* dir) {
|
||||
*pBuf = calloc(1, sizeof(SDiskbasedBuf));
|
||||
|
||||
|
@ -372,7 +372,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
|
|||
pPBuf->allocateId = -1;
|
||||
pPBuf->comp = true;
|
||||
pPBuf->pFile = NULL;
|
||||
pPBuf->qId = qId;
|
||||
pPBuf->id = strdup(id);
|
||||
pPBuf->fileSize = 0;
|
||||
pPBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
|
||||
pPBuf->freePgList = tdListNew(POINTER_BYTES);
|
||||
|
@ -540,13 +540,13 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
|
|||
if (pBuf->pFile != NULL) {
|
||||
uDebug(
|
||||
"Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page "
|
||||
"size:%.2f Kb, %" PRIx64 "\n",
|
||||
"size:%.2f Kb, %s\n",
|
||||
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
|
||||
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId);
|
||||
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
|
||||
|
||||
taosCloseFile(&pBuf->pFile);
|
||||
} else {
|
||||
uDebug("Paged buffer closed, total:%.2f Kb, no file created, %" PRIx64, pBuf->totalBufSize / 1024.0, pBuf->qId);
|
||||
uDebug("Paged buffer closed, total:%.2f Kb, no file created, %s", pBuf->totalBufSize / 1024.0, pBuf->id);
|
||||
}
|
||||
|
||||
// print the statistics information
|
||||
|
@ -584,6 +584,7 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
|
|||
taosHashCleanup(pBuf->groupSet);
|
||||
taosHashCleanup(pBuf->all);
|
||||
|
||||
tfree(pBuf->id);
|
||||
tfree(pBuf->assistBuf);
|
||||
tfree(pBuf);
|
||||
}
|
||||
|
@ -639,9 +640,9 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
|
|||
|
||||
printf(
|
||||
"Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f "
|
||||
"Kb, %" PRIx64 "\n",
|
||||
"Kb, %s\n",
|
||||
pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0,
|
||||
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId);
|
||||
listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->id);
|
||||
|
||||
printf(
|
||||
"Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n",
|
||||
|
|
|
@ -13,7 +13,7 @@ namespace {
|
|||
// simple test
|
||||
void simpleTest() {
|
||||
SDiskbasedBuf* pResultBuf = NULL;
|
||||
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4096, 1, "/tmp/");
|
||||
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4096, "", "/tmp/");
|
||||
|
||||
int32_t pageId = 0;
|
||||
int32_t groupId = 0;
|
||||
|
@ -55,7 +55,7 @@ void simpleTest() {
|
|||
|
||||
void writeDownTest() {
|
||||
SDiskbasedBuf* pResultBuf = NULL;
|
||||
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/");
|
||||
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, "1", "/tmp/");
|
||||
|
||||
int32_t pageId = 0;
|
||||
int32_t writePageId = 0;
|
||||
|
@ -102,7 +102,7 @@ void writeDownTest() {
|
|||
|
||||
void recyclePageTest() {
|
||||
SDiskbasedBuf* pResultBuf = NULL;
|
||||
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, 1, "/tmp/");
|
||||
int32_t ret = createDiskbasedBuf(&pResultBuf, 1024, 4*1024, "1", "/tmp/");
|
||||
|
||||
int32_t pageId = 0;
|
||||
int32_t writePageId = 0;
|
||||
|
|
Loading…
Reference in New Issue