Merge branch '2.0' into feature/2.0tsdb
This commit is contained in:
commit
8855a6ac6c
|
@ -15,12 +15,12 @@
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
|
#include "hashfunc.h"
|
||||||
#include "tcache.h"
|
#include "tcache.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "hashutil.h"
|
|
||||||
|
|
||||||
#define HASH_MAX_CAPACITY (1024*1024*16)
|
#define HASH_MAX_CAPACITY (1024*1024*16)
|
||||||
#define HASH_VALUE_IN_TRASH (-1)
|
#define HASH_VALUE_IN_TRASH (-1)
|
||||||
|
|
|
@ -1002,7 +1002,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) {
|
||||||
|| ((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList)));
|
|| ((NULL != pSql->asyncTblPos) && (NULL != pSql->pTableHashList)));
|
||||||
|
|
||||||
if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) {
|
if ((NULL == pSql->asyncTblPos) && (NULL == pSql->pTableHashList)) {
|
||||||
pSql->pTableHashList = taosInitHashTable(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
pSql->pTableHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||||
|
|
||||||
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
|
pSql->cmd.pDataBlocks = tscCreateBlockArrayList();
|
||||||
if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) {
|
if (NULL == pSql->pTableHashList || NULL == pSql->cmd.pDataBlocks) {
|
||||||
|
@ -1260,7 +1260,7 @@ _error_clean:
|
||||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||||
|
|
||||||
_clean:
|
_clean:
|
||||||
taosCleanUpHashTable(pSql->pTableHashList);
|
taosHashCleanup(pSql->pTableHashList);
|
||||||
|
|
||||||
pSql->pTableHashList = NULL;
|
pSql->pTableHashList = NULL;
|
||||||
pSql->asyncTblPos = NULL;
|
pSql->asyncTblPos = NULL;
|
||||||
|
|
|
@ -206,7 +206,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) {
|
||||||
|
|
||||||
pSql->asyncTblPos = NULL;
|
pSql->asyncTblPos = NULL;
|
||||||
if (NULL != pSql->pTableHashList) {
|
if (NULL != pSql->pTableHashList) {
|
||||||
taosCleanUpHashTable(pSql->pTableHashList);
|
taosHashCleanup(pSql->pTableHashList);
|
||||||
pSql->pTableHashList = NULL;
|
pSql->pTableHashList = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -705,8 +705,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// current data are exhausted, fetch more data
|
// current data are exhausted, fetch more data
|
||||||
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows && pCmd->command == TSDB_SQL_RETRIEVE)) {
|
if (pRes->data == NULL || (pRes->data != NULL && pRes->row >= pRes->numOfRows &&
|
||||||
|
(pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_METRIC || pCmd->command == TSDB_SQL_FETCH))) {
|
||||||
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
|
taos_fetch_rows_a(res, asyncFetchCallback, pSql->pTscObj);
|
||||||
|
|
||||||
sem_wait(&pSql->rspSem);
|
sem_wait(&pSql->rspSem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1079,7 +1081,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
|
||||||
|
|
||||||
pSql->asyncTblPos = NULL;
|
pSql->asyncTblPos = NULL;
|
||||||
if (NULL != pSql->pTableHashList) {
|
if (NULL != pSql->pTableHashList) {
|
||||||
taosCleanUpHashTable(pSql->pTableHashList);
|
taosHashCleanup(pSql->pTableHashList);
|
||||||
pSql->pTableHashList = NULL;
|
pSql->pTableHashList = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -677,7 +677,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
|
||||||
STableDataBlocks** dataBlocks) {
|
STableDataBlocks** dataBlocks) {
|
||||||
*dataBlocks = NULL;
|
*dataBlocks = NULL;
|
||||||
|
|
||||||
STableDataBlocks** t1 = (STableDataBlocks**)taosGetDataFromHashTable(pHashList, (const char*)&id, sizeof(id));
|
STableDataBlocks** t1 = (STableDataBlocks**)taosHashGet(pHashList, (const char*)&id, sizeof(id));
|
||||||
if (t1 != NULL) {
|
if (t1 != NULL) {
|
||||||
*dataBlocks = *t1;
|
*dataBlocks = *t1;
|
||||||
}
|
}
|
||||||
|
@ -688,7 +688,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosAddToHashTable(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
|
taosHashPut(pHashList, (const char*)&id, sizeof(int64_t), (char*)dataBlocks, POINTER_BYTES);
|
||||||
tscAppendDataBlock(pDataBlockList, *dataBlocks);
|
tscAppendDataBlock(pDataBlockList, *dataBlocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -698,7 +698,7 @@ int32_t tscGetDataBlockFromList(void* pHashList, SDataBlockList* pDataBlockList,
|
||||||
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
|
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockList) {
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
void* pVnodeDataBlockHashList = taosInitHashTable(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||||
SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList();
|
SDataBlockList* pVnodeDataBlockList = tscCreateBlockArrayList();
|
||||||
|
|
||||||
for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) {
|
for (int32_t i = 0; i < pTableDataBlockList->nSize; ++i) {
|
||||||
|
@ -710,7 +710,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
|
||||||
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pMeterMeta, &dataBuf);
|
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pMeterMeta, &dataBuf);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
|
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
|
||||||
taosCleanUpHashTable(pVnodeDataBlockHashList);
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -728,7 +728,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
|
||||||
} else { // failed to allocate memory, free already allocated memory and return error code
|
} else { // failed to allocate memory, free already allocated memory and return error code
|
||||||
tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize);
|
tscError("%p failed to allocate memory for merging submit block, size:%d", pSql, dataBuf->nAllocSize);
|
||||||
|
|
||||||
taosCleanUpHashTable(pVnodeDataBlockHashList);
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
tfree(dataBuf->pData);
|
tfree(dataBuf->pData);
|
||||||
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
tscDestroyBlockArrayList(pVnodeDataBlockList);
|
||||||
|
|
||||||
|
@ -761,7 +761,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SDataBlockList* pTableDataBlockLi
|
||||||
pCmd->pDataBlocks = pVnodeDataBlockList;
|
pCmd->pDataBlocks = pVnodeDataBlockList;
|
||||||
|
|
||||||
tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
|
tscFreeUnusedDataBlocks(pCmd->pDataBlocks);
|
||||||
taosCleanUpHashTable(pVnodeDataBlockHashList);
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ int32_t createDiskbasedResultBuffer(SQueryDiskbasedResultBuf** pResultBuf, int32
|
||||||
pResBuf->incStep = 4;
|
pResBuf->incStep = 4;
|
||||||
|
|
||||||
// init id hash table
|
// init id hash table
|
||||||
pResBuf->idsTable = taosInitHashTable(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
|
pResBuf->idsTable = taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
|
||||||
pResBuf->list = calloc(size, sizeof(SIDList));
|
pResBuf->list = calloc(size, sizeof(SIDList));
|
||||||
pResBuf->numOfAllocGroupIds = size;
|
pResBuf->numOfAllocGroupIds = size;
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ tFilePage* getResultBufferPageById(SQueryDiskbasedResultBuf* pResultBuf, int32_t
|
||||||
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_SIZE * id);
|
return (tFilePage*)(pResultBuf->pBuf + DEFAULT_INTERN_BUF_SIZE * id);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf) { return taosNumElemsInHashTable(pResultBuf->idsTable); }
|
int32_t getNumOfResultBufGroupId(SQueryDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->idsTable); }
|
||||||
|
|
||||||
int32_t getResBufSize(SQueryDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
|
int32_t getResBufSize(SQueryDiskbasedResultBuf* pResultBuf) { return pResultBuf->totalBufSize; }
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ static bool noMoreAvailablePages(SQueryDiskbasedResultBuf* pResultBuf) {
|
||||||
static int32_t getGroupIndex(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId) {
|
static int32_t getGroupIndex(SQueryDiskbasedResultBuf* pResultBuf, int32_t groupId) {
|
||||||
assert(pResultBuf != NULL);
|
assert(pResultBuf != NULL);
|
||||||
|
|
||||||
char* p = taosGetDataFromHashTable(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
|
char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
|
||||||
if (p == NULL) { // it is a new group id
|
if (p == NULL) { // it is a new group id
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -121,7 +121,7 @@ static int32_t addNewGroupId(SQueryDiskbasedResultBuf* pResultBuf, int32_t group
|
||||||
pResultBuf->numOfAllocGroupIds = n;
|
pResultBuf->numOfAllocGroupIds = n;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosAddToHashTable(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t));
|
taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t));
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,7 +210,7 @@ void destroyResultBuf(SQueryDiskbasedResultBuf* pResultBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pResultBuf->list);
|
tfree(pResultBuf->list);
|
||||||
taosCleanUpHashTable(pResultBuf->idsTable);
|
taosHashCleanup(pResultBuf->idsTable);
|
||||||
|
|
||||||
tfree(pResultBuf);
|
tfree(pResultBuf);
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,7 +14,6 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "sskiplist.h"
|
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tast.h"
|
#include "tast.h"
|
||||||
|
|
|
@ -14,12 +14,12 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
#include "hashutil.h"
|
#include "hashfunc.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "shash.h"
|
#include "shash.h"
|
||||||
|
#include "taosdef.h"
|
||||||
#include "tstoken.h"
|
#include "tstoken.h"
|
||||||
#include "ttokendef.h"
|
#include "ttokendef.h"
|
||||||
#include "taosdef.h"
|
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
// All the keywords of the SQL language are stored in a hash table
|
// All the keywords of the SQL language are stored in a hash table
|
||||||
|
@ -253,11 +253,11 @@ static void* KeywordHashTable = NULL;
|
||||||
static void doInitKeywordsTable() {
|
static void doInitKeywordsTable() {
|
||||||
int numOfEntries = tListLen(keywordTable);
|
int numOfEntries = tListLen(keywordTable);
|
||||||
|
|
||||||
KeywordHashTable = taosInitHashTable(numOfEntries, MurmurHash3_32, false);
|
KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, false);
|
||||||
for (int32_t i = 0; i < numOfEntries; i++) {
|
for (int32_t i = 0; i < numOfEntries; i++) {
|
||||||
keywordTable[i].len = strlen(keywordTable[i].name);
|
keywordTable[i].len = strlen(keywordTable[i].name);
|
||||||
void* ptr = &keywordTable[i];
|
void* ptr = &keywordTable[i];
|
||||||
taosAddToHashTable(KeywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES);
|
taosHashPut(KeywordHashTable, keywordTable[i].name, keywordTable[i].len, (void*)&ptr, POINTER_BYTES);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -275,7 +275,7 @@ int tSQLKeywordCode(const char* z, int n) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SKeyword** pKey = (SKeyword**)taosGetDataFromHashTable(KeywordHashTable, key, n);
|
SKeyword** pKey = (SKeyword**)taosHashGet(KeywordHashTable, key, n);
|
||||||
if (pKey != NULL) {
|
if (pKey != NULL) {
|
||||||
return (*pKey)->type;
|
return (*pKey)->type;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -13,17 +13,16 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "tvariant.h"
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
#include "hashutil.h"
|
#include "hashfunc.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "shash.h"
|
#include "shash.h"
|
||||||
|
#include "taos.h"
|
||||||
|
#include "taosdef.h"
|
||||||
#include "tstoken.h"
|
#include "tstoken.h"
|
||||||
#include "ttokendef.h"
|
#include "ttokendef.h"
|
||||||
#include "taosdef.h"
|
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tvariant.h"
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "taos.h"
|
|
||||||
|
|
||||||
// todo support scientific expression number and oct number
|
// todo support scientific expression number and oct number
|
||||||
void tVariantCreate(tVariant *pVar, SSQLToken *token) { tVariantCreateFromString(pVar, token->z, token->n, token->type); }
|
void tVariantCreate(tVariant *pVar, SSQLToken *token) { tVariantCreateFromString(pVar, token->z, token->n, token->type); }
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include "hashutil.h"
|
#include "hashfunc.h"
|
||||||
|
|
||||||
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
|
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
|
||||||
#define HASH_VALUE_IN_TRASH (-1)
|
#define HASH_VALUE_IN_TRASH (-1)
|
||||||
|
@ -45,32 +45,77 @@ typedef struct SHashEntry {
|
||||||
uint32_t num;
|
uint32_t num;
|
||||||
} SHashEntry;
|
} SHashEntry;
|
||||||
|
|
||||||
typedef struct HashObj {
|
typedef struct SHashObj {
|
||||||
SHashEntry **hashList;
|
SHashEntry **hashList;
|
||||||
uint32_t capacity; // number of slots
|
size_t capacity; // number of slots
|
||||||
int size; // number of elements in hash table
|
size_t size; // number of elements in hash table
|
||||||
_hash_fn_t hashFp; // hash function
|
_hash_fn_t hashFp; // hash function
|
||||||
bool multithreadSafe; // enable lock or not
|
|
||||||
|
|
||||||
#if defined LINUX
|
#if defined (LINUX)
|
||||||
pthread_rwlock_t lock;
|
pthread_rwlock_t* lock;
|
||||||
#else
|
#else
|
||||||
pthread_mutex_t lock;
|
pthread_mutex_t* lock;
|
||||||
#endif
|
#endif
|
||||||
|
} SHashObj;
|
||||||
|
|
||||||
} HashObj;
|
/**
|
||||||
|
* init the hash table
|
||||||
|
*
|
||||||
|
* @param capacity initial capacity of the hash table
|
||||||
|
* @param fn hash function to generate the hash value
|
||||||
|
* @param threadsafe thread safe or not
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe);
|
||||||
|
|
||||||
void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe);
|
/**
|
||||||
void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen);
|
* return the size of hash table
|
||||||
|
* @param pHashObj
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
size_t taosHashGetSize(const SHashObj *pHashObj);
|
||||||
|
|
||||||
int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void *data, uint32_t size);
|
/**
|
||||||
int32_t taosNumElemsInHashTable(HashObj *pObj);
|
* put element into hash table, if the element with the same key exists, update it
|
||||||
|
* @param pHashObj
|
||||||
|
* @param key
|
||||||
|
* @param keyLen
|
||||||
|
* @param data
|
||||||
|
* @param size
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *data, size_t size);
|
||||||
|
|
||||||
char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen);
|
/**
|
||||||
|
* return the payload data with the specified key
|
||||||
|
*
|
||||||
|
* @param pHashObj
|
||||||
|
* @param key
|
||||||
|
* @param keyLen
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen);
|
||||||
|
|
||||||
void taosCleanUpHashTable(void *handle);
|
/**
|
||||||
|
* remove item with the specified key
|
||||||
|
* @param pHashObj
|
||||||
|
* @param key
|
||||||
|
* @param keyLen
|
||||||
|
*/
|
||||||
|
void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen);
|
||||||
|
|
||||||
int32_t taosGetHashMaxOverflowLength(HashObj *pObj);
|
/**
|
||||||
|
* clean up hash table
|
||||||
|
* @param handle
|
||||||
|
*/
|
||||||
|
void taosHashCleanup(SHashObj *pHashObj);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param pHashObj
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,207 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
#if 0
|
|
||||||
|
|
||||||
#ifndef TBASE_TSKIPLIST_H
|
|
||||||
#define TBASE_TSKIPLIST_H
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#define MAX_SKIP_LIST_LEVEL 20
|
|
||||||
|
|
||||||
#include <pthread.h>
|
|
||||||
#include <stdint.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
|
|
||||||
#include "os.h"
|
|
||||||
#include "taosdef.h"
|
|
||||||
|
|
||||||
/*
|
|
||||||
* key of each node
|
|
||||||
* todo move to as the global structure in all search codes...
|
|
||||||
*/
|
|
||||||
|
|
||||||
const static size_t SKIP_LIST_STR_KEY_LENGTH_THRESHOLD = 15;
|
|
||||||
typedef tVariant tSkipListKey;
|
|
||||||
|
|
||||||
typedef enum tSkipListPointQueryType {
|
|
||||||
INCLUDE_POINT_QUERY,
|
|
||||||
EXCLUDE_POINT_QUERY,
|
|
||||||
} tSkipListPointQueryType;
|
|
||||||
|
|
||||||
typedef struct tSkipListNode {
|
|
||||||
uint16_t nLevel;
|
|
||||||
char * pData;
|
|
||||||
|
|
||||||
struct tSkipListNode **pForward;
|
|
||||||
struct tSkipListNode **pBackward;
|
|
||||||
|
|
||||||
tSkipListKey key;
|
|
||||||
} tSkipListNode;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* @version 0.2
|
|
||||||
* @date 2017/11/12
|
|
||||||
* the simple version of SkipList.
|
|
||||||
* for multi-thread safe purpose, we employ pthread_rwlock_t to guarantee to generate
|
|
||||||
* deterministic result. Later, we will remove the lock in SkipList to further
|
|
||||||
* enhance the performance. In this case, one should use the concurrent skip list (by
|
|
||||||
* using michael-scott algorithm) instead of this simple version in a multi-thread
|
|
||||||
* environment, to achieve higher performance of read/write operations.
|
|
||||||
*
|
|
||||||
* Note: Duplicated primary key situation.
|
|
||||||
* In case of duplicated primary key, two ways can be employed to handle this situation:
|
|
||||||
* 1. add as normal insertion with out special process.
|
|
||||||
* 2. add an overflow pointer at each list node, all nodes with the same key will be added
|
|
||||||
* in the overflow pointer. In this case, the total steps of each search will be reduced significantly.
|
|
||||||
* Currently, we implement the skip list in a line with the first means, maybe refactor it soon.
|
|
||||||
*
|
|
||||||
* Memory consumption: the memory alignment causes many memory wasted. So, employ a memory
|
|
||||||
* pool will significantly reduce the total memory consumption, as well as the calloc/malloc operation costs.
|
|
||||||
*
|
|
||||||
* 3. use the iterator pattern to refactor all routines to make it more clean
|
|
||||||
*/
|
|
||||||
|
|
||||||
// state struct, record following information:
|
|
||||||
// number of links in each level.
|
|
||||||
// avg search steps, for latest 1000 queries
|
|
||||||
// avg search rsp time, for latest 1000 queries
|
|
||||||
// total memory size
|
|
||||||
typedef struct tSkipListState {
|
|
||||||
// in bytes, sizeof(tSkipList)+sizeof(tSkipListNode)*tSkipList->nSize
|
|
||||||
uint64_t nTotalMemSize;
|
|
||||||
uint64_t nLevelNodeCnt[MAX_SKIP_LIST_LEVEL];
|
|
||||||
uint64_t queryCount; // total query count
|
|
||||||
|
|
||||||
/*
|
|
||||||
* only record latest 1000 queries
|
|
||||||
* when the value==1000, = 0,
|
|
||||||
* nTotalStepsForQueries = 0,
|
|
||||||
* nTotalElapsedTimeForQueries = 0
|
|
||||||
*/
|
|
||||||
uint64_t nRecQueries;
|
|
||||||
uint16_t nTotalStepsForQueries;
|
|
||||||
uint64_t nTotalElapsedTimeForQueries;
|
|
||||||
|
|
||||||
uint16_t nInsertObjs;
|
|
||||||
uint16_t nTotalStepsForInsert;
|
|
||||||
uint64_t nTotalElapsedTimeForInsert;
|
|
||||||
} tSkipListState;
|
|
||||||
|
|
||||||
typedef struct tSkipList {
|
|
||||||
tSkipListNode pHead;
|
|
||||||
uint64_t nSize;
|
|
||||||
uint16_t nMaxLevel;
|
|
||||||
uint16_t nLevel;
|
|
||||||
uint16_t keyType;
|
|
||||||
uint16_t nMaxKeyLen;
|
|
||||||
|
|
||||||
__compar_fn_t comparator;
|
|
||||||
pthread_rwlock_t lock; // will be removed soon
|
|
||||||
tSkipListState state; // skiplist state
|
|
||||||
} tSkipList;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* iterate the skiplist
|
|
||||||
* this will cause the multi-thread problem, when the skiplist is destroyed, the iterate may
|
|
||||||
* continue iterating the skiplist, so add the reference count for skiplist
|
|
||||||
* TODO add the ref for skiplist when one iterator is created
|
|
||||||
*/
|
|
||||||
typedef struct SSkipListIterator {
|
|
||||||
tSkipList * pSkipList;
|
|
||||||
tSkipListNode *cur;
|
|
||||||
int64_t num;
|
|
||||||
} SSkipListIterator;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* query condition structure to denote the range query
|
|
||||||
* todo merge the point query cond with range query condition
|
|
||||||
*/
|
|
||||||
typedef struct tSKipListQueryCond {
|
|
||||||
// when the upper bounding == lower bounding, it is a point query
|
|
||||||
tSkipListKey lowerBnd;
|
|
||||||
tSkipListKey upperBnd;
|
|
||||||
int32_t lowerBndRelOptr; // relation operator to denote if lower bound is
|
|
||||||
int32_t upperBndRelOptr; // included or not
|
|
||||||
} tSKipListQueryCond;
|
|
||||||
|
|
||||||
tSkipList *SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen);
|
|
||||||
|
|
||||||
void *SSkipListDestroy(tSkipList *pSkipList);
|
|
||||||
|
|
||||||
// create skip list key
|
|
||||||
tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength);
|
|
||||||
|
|
||||||
// destroy skip list key
|
|
||||||
void tSkipListDestroyKey(tSkipListKey *pKey);
|
|
||||||
|
|
||||||
// put data into skiplist
|
|
||||||
tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* get only *one* node of which key is equalled to pKey, even there are more
|
|
||||||
* than one nodes are of the same key
|
|
||||||
*/
|
|
||||||
tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* get all data with the same keys
|
|
||||||
*/
|
|
||||||
int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes);
|
|
||||||
|
|
||||||
int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *),
|
|
||||||
void *param);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* remove only one node of the pKey value.
|
|
||||||
* If more than one node has the same value, any one will be removed
|
|
||||||
*
|
|
||||||
* @Return
|
|
||||||
* true: one node has been removed
|
|
||||||
* false: no node has been removed
|
|
||||||
*/
|
|
||||||
bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* remove the specified node in parameters
|
|
||||||
*/
|
|
||||||
void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode);
|
|
||||||
|
|
||||||
// for debug purpose only
|
|
||||||
void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* range query & single point query function
|
|
||||||
*/
|
|
||||||
int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* include/exclude point query
|
|
||||||
*/
|
|
||||||
int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type,
|
|
||||||
tSkipListNode ***pResult);
|
|
||||||
|
|
||||||
int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator *iter);
|
|
||||||
bool tSkipListIteratorNext(SSkipListIterator *iter);
|
|
||||||
tSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif // TBASE_TSKIPLIST_H
|
|
||||||
#endif
|
|
|
@ -76,7 +76,7 @@ void* taosArrayGetP(SArray* pArray, size_t index);
|
||||||
* @param pArray
|
* @param pArray
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
size_t taosArrayGetSize(SArray* pArray);
|
size_t taosArrayGetSize(const SArray* pArray);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* insert data into array
|
* insert data into array
|
||||||
|
|
|
@ -185,15 +185,39 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, SSkipListNode *pNode);
|
||||||
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType);
|
SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
* get the size of skip list
|
||||||
* @param pSkipList
|
* @param pSkipList
|
||||||
* @param pRes
|
|
||||||
* @param fp
|
|
||||||
* @param param
|
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t tSkipListIterateList(SSkipList *pSkipList, SSkipListNode ***pRes, bool (*fp)(SSkipListNode *, void *),
|
size_t tSkipListGetSize(const SSkipList* pSkipList);
|
||||||
void *param);
|
|
||||||
|
/**
|
||||||
|
* create skiplist iterator
|
||||||
|
* @param pSkipList
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* forward the skip list iterator
|
||||||
|
* @param iter
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
bool tSkipListIterNext(SSkipListIterator *iter);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get the element of skip list node
|
||||||
|
* @param iter
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
SSkipListNode *tSkipListIterGet(SSkipListIterator *iter);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* destroy the skip list node
|
||||||
|
* @param iter
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
void* tSkipListDestroyIter(SSkipListIterator* iter);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* remove only one node of the pKey value.
|
* remove only one node of the pKey value.
|
||||||
|
@ -210,9 +234,6 @@ bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey);
|
||||||
*/
|
*/
|
||||||
void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode);
|
void tSkipListRemoveNode(SSkipList *pSkipList, SSkipListNode *pNode);
|
||||||
|
|
||||||
int32_t tSkipListIteratorReset(SSkipList *pSkipList, SSkipListIterator *iter);
|
|
||||||
bool tSkipListIteratorNext(SSkipListIterator *iter);
|
|
||||||
SSkipListNode *tSkipListIteratorGet(SSkipListIterator *iter);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,11 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
static FORCE_INLINE void __wr_lock(void *lock) {
|
static FORCE_INLINE void __wr_lock(void *lock) {
|
||||||
#if defined LINUX
|
if (lock == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined (LINUX)
|
||||||
pthread_rwlock_wrlock(lock);
|
pthread_rwlock_wrlock(lock);
|
||||||
#else
|
#else
|
||||||
pthread_mutex_lock(lock);
|
pthread_mutex_lock(lock);
|
||||||
|
@ -29,7 +33,11 @@ static FORCE_INLINE void __wr_lock(void *lock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void __rd_lock(void *lock) {
|
static FORCE_INLINE void __rd_lock(void *lock) {
|
||||||
#if defined LINUX
|
if (lock == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined (LINUX)
|
||||||
pthread_rwlock_rdlock(lock);
|
pthread_rwlock_rdlock(lock);
|
||||||
#else
|
#else
|
||||||
pthread_mutex_lock(lock);
|
pthread_mutex_lock(lock);
|
||||||
|
@ -37,7 +45,11 @@ static FORCE_INLINE void __rd_lock(void *lock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void __unlock(void *lock) {
|
static FORCE_INLINE void __unlock(void *lock) {
|
||||||
#if defined LINUX
|
if (lock == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined (LINUX)
|
||||||
pthread_rwlock_unlock(lock);
|
pthread_rwlock_unlock(lock);
|
||||||
#else
|
#else
|
||||||
pthread_mutex_unlock(lock);
|
pthread_mutex_unlock(lock);
|
||||||
|
@ -45,7 +57,11 @@ static FORCE_INLINE void __unlock(void *lock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t __lock_init(void *lock) {
|
static FORCE_INLINE int32_t __lock_init(void *lock) {
|
||||||
#if defined LINUX
|
if (lock == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined (LINUX)
|
||||||
return pthread_rwlock_init(lock, NULL);
|
return pthread_rwlock_init(lock, NULL);
|
||||||
#else
|
#else
|
||||||
return pthread_mutex_init(lock, NULL);
|
return pthread_mutex_init(lock, NULL);
|
||||||
|
@ -53,7 +69,11 @@ static FORCE_INLINE int32_t __lock_init(void *lock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void __lock_destroy(void *lock) {
|
static FORCE_INLINE void __lock_destroy(void *lock) {
|
||||||
#if defined LINUX
|
if (lock == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if defined (LINUX)
|
||||||
pthread_rwlock_destroy(lock);
|
pthread_rwlock_destroy(lock);
|
||||||
#else
|
#else
|
||||||
pthread_mutex_destroy(lock);
|
pthread_mutex_destroy(lock);
|
||||||
|
@ -68,21 +88,12 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
|
||||||
return i;
|
return i;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* hash key function
|
|
||||||
*
|
|
||||||
* @param key key string
|
|
||||||
* @param len length of key
|
|
||||||
* @return hash value
|
|
||||||
*/
|
|
||||||
static FORCE_INLINE uint32_t taosHashKey(const char *key, uint32_t len) { return MurmurHash3_32(key, len); }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* inplace update node in hash table
|
* inplace update node in hash table
|
||||||
* @param pObj hash table object
|
* @param pHashObj hash table object
|
||||||
* @param pNode data node
|
* @param pNode data node
|
||||||
*/
|
*/
|
||||||
static void doUpdateHashTable(HashObj *pObj, SHashNode *pNode) {
|
static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) {
|
||||||
if (pNode->prev1) {
|
if (pNode->prev1) {
|
||||||
pNode->prev1->next = pNode;
|
pNode->prev1->next = pNode;
|
||||||
}
|
}
|
||||||
|
@ -96,16 +107,16 @@ static void doUpdateHashTable(HashObj *pObj, SHashNode *pNode) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* get SHashNode from hashlist, nodes from trash are not included.
|
* get SHashNode from hashlist, nodes from trash are not included.
|
||||||
* @param pObj Cache objection
|
* @param pHashObj Cache objection
|
||||||
* @param key key for hash
|
* @param key key for hash
|
||||||
* @param keyLen key length
|
* @param keyLen key length
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
static SHashNode *doGetNodeFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen, uint32_t *hashVal) {
|
static SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const char *key, uint32_t keyLen, uint32_t *hashVal) {
|
||||||
uint32_t hash = (*pObj->hashFp)(key, keyLen);
|
uint32_t hash = (*pHashObj->hashFp)(key, keyLen);
|
||||||
|
|
||||||
int32_t slot = HASH_INDEX(hash, pObj->capacity);
|
int32_t slot = HASH_INDEX(hash, pHashObj->capacity);
|
||||||
SHashEntry *pEntry = pObj->hashList[slot];
|
SHashEntry *pEntry = pHashObj->hashList[slot];
|
||||||
|
|
||||||
SHashNode *pNode = pEntry->next;
|
SHashNode *pNode = pEntry->next;
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
|
@ -117,7 +128,7 @@ static SHashNode *doGetNodeFromHashTable(HashObj *pObj, const char *key, uint32_
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pNode) {
|
if (pNode) {
|
||||||
assert(HASH_INDEX(pNode->hashVal, pObj->capacity) == slot);
|
assert(HASH_INDEX(pNode->hashVal, pHashObj->capacity) == slot);
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the calculated hash value, to avoid calculating it again in other functions
|
// return the calculated hash value, to avoid calculating it again in other functions
|
||||||
|
@ -131,10 +142,10 @@ static SHashNode *doGetNodeFromHashTable(HashObj *pObj, const char *key, uint32_
|
||||||
/**
|
/**
|
||||||
* resize the hash list if the threshold is reached
|
* resize the hash list if the threshold is reached
|
||||||
*
|
*
|
||||||
* @param pObj
|
* @param pHashObj
|
||||||
*/
|
*/
|
||||||
static void taosHashTableResize(HashObj *pObj) {
|
static void taosHashTableResize(SHashObj *pHashObj) {
|
||||||
if (pObj->size < pObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
|
if (pHashObj->size < pHashObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,30 +153,30 @@ static void taosHashTableResize(HashObj *pObj) {
|
||||||
SHashNode *pNode = NULL;
|
SHashNode *pNode = NULL;
|
||||||
SHashNode *pNext = NULL;
|
SHashNode *pNext = NULL;
|
||||||
|
|
||||||
int32_t newSize = pObj->capacity << 1U;
|
int32_t newSize = pHashObj->capacity << 1U;
|
||||||
if (newSize > HASH_MAX_CAPACITY) {
|
if (newSize > HASH_MAX_CAPACITY) {
|
||||||
pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", pObj->capacity,
|
pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", pHashObj->capacity,
|
||||||
HASH_MAX_CAPACITY);
|
HASH_MAX_CAPACITY);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
SHashEntry **pNewEntry = realloc(pObj->hashList, sizeof(SHashEntry*) * newSize);
|
SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry*) * newSize);
|
||||||
if (pNewEntry == NULL) {
|
if (pNewEntry == NULL) {
|
||||||
pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity);
|
pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj->hashList = pNewEntry;
|
pHashObj->hashList = pNewEntry;
|
||||||
for(int32_t i = pObj->capacity; i < newSize; ++i) {
|
for(int32_t i = pHashObj->capacity; i < newSize; ++i) {
|
||||||
pObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj->capacity = newSize;
|
pHashObj->capacity = newSize;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pObj->capacity; ++i) {
|
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||||
SHashEntry *pEntry = pObj->hashList[i];
|
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||||
|
|
||||||
pNode = pEntry->next;
|
pNode = pEntry->next;
|
||||||
if (pNode != NULL) {
|
if (pNode != NULL) {
|
||||||
|
@ -173,7 +184,7 @@ static void taosHashTableResize(HashObj *pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
int32_t j = HASH_INDEX(pNode->hashVal, pObj->capacity);
|
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||||
if (j == i) { // this key resides in the same slot, no need to relocate it
|
if (j == i) { // this key resides in the same slot, no need to relocate it
|
||||||
pNode = pNode->next;
|
pNode = pNode->next;
|
||||||
} else {
|
} else {
|
||||||
|
@ -199,7 +210,7 @@ static void taosHashTableResize(HashObj *pObj) {
|
||||||
pNode->next = NULL;
|
pNode->next = NULL;
|
||||||
pNode->prev1 = NULL;
|
pNode->prev1 = NULL;
|
||||||
|
|
||||||
SHashEntry *pNewIndexEntry = pObj->hashList[j];
|
SHashEntry *pNewIndexEntry = pHashObj->hashList[j];
|
||||||
|
|
||||||
if (pNewIndexEntry->next != NULL) {
|
if (pNewIndexEntry->next != NULL) {
|
||||||
assert(pNewIndexEntry->next->prev1 == pNewIndexEntry);
|
assert(pNewIndexEntry->next->prev1 == pNewIndexEntry);
|
||||||
|
@ -221,8 +232,8 @@ static void taosHashTableResize(HashObj *pObj) {
|
||||||
|
|
||||||
int64_t et = taosGetTimestampUs();
|
int64_t et = taosGetTimestampUs();
|
||||||
|
|
||||||
pTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pObj->capacity,
|
pTrace("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity,
|
||||||
((double)pObj->size) / pObj->capacity, (et - st) / 1000.0);
|
((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -230,43 +241,51 @@ static void taosHashTableResize(HashObj *pObj) {
|
||||||
* @param fn hash function
|
* @param fn hash function
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe) {
|
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
|
||||||
if (capacity == 0 || fn == NULL) {
|
if (capacity == 0 || fn == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
HashObj *pObj = (HashObj *)calloc(1, sizeof(HashObj));
|
SHashObj *pHashObj = (SHashObj *)calloc(1, sizeof(SHashObj));
|
||||||
if (pObj == NULL) {
|
if (pHashObj == NULL) {
|
||||||
pError("failed to allocate memory, reason:%s", strerror(errno));
|
pError("failed to allocate memory, reason:%s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// the max slots is not defined by user
|
// the max slots is not defined by user
|
||||||
pObj->capacity = taosHashCapacity(capacity);
|
pHashObj->capacity = taosHashCapacity(capacity);
|
||||||
assert((pObj->capacity & (pObj->capacity - 1)) == 0);
|
assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
|
||||||
|
|
||||||
pObj->hashFp = fn;
|
pHashObj->hashFp = fn;
|
||||||
|
|
||||||
pObj->hashList = (SHashEntry **)calloc(pObj->capacity, sizeof(SHashEntry*));
|
pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(SHashEntry*));
|
||||||
if (pObj->hashList == NULL) {
|
if (pHashObj->hashList == NULL) {
|
||||||
free(pObj);
|
free(pHashObj);
|
||||||
pError("failed to allocate memory, reason:%s", strerror(errno));
|
pError("failed to allocate memory, reason:%s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < pObj->capacity; ++i) {
|
for(int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||||
pObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (multithreadSafe && (__lock_init(pObj) != 0)) {
|
if (threadsafe) {
|
||||||
free(pObj->hashList);
|
#if defined(LINUX)
|
||||||
free(pObj);
|
pHashObj->lock = calloc(1, sizeof(pthread_rwlock_t));
|
||||||
|
#else
|
||||||
|
pHashObj->lock = calloc(1, sizeof(pthread_mutex_t));
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
if (__lock_init(pHashObj->lock) != 0) {
|
||||||
|
free(pHashObj->hashList);
|
||||||
|
free(pHashObj);
|
||||||
|
|
||||||
pError("failed to init lock, reason:%s", strerror(errno));
|
pError("failed to init lock, reason:%s", strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return (void *)pObj;
|
return pHashObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -277,7 +296,7 @@ void *taosInitHashTable(uint32_t capacity, _hash_fn_t fn, bool multithreadSafe)
|
||||||
* @param size size of block
|
* @param size size of block
|
||||||
* @return SHashNode
|
* @return SHashNode
|
||||||
*/
|
*/
|
||||||
static SHashNode *doCreateHashNode(const char *key, uint32_t keyLen, const char *pData, size_t dataSize,
|
static SHashNode *doCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t dataSize,
|
||||||
uint32_t hashVal) {
|
uint32_t hashVal) {
|
||||||
size_t totalSize = dataSize + sizeof(SHashNode) + keyLen;
|
size_t totalSize = dataSize + sizeof(SHashNode) + keyLen;
|
||||||
|
|
||||||
|
@ -298,7 +317,7 @@ static SHashNode *doCreateHashNode(const char *key, uint32_t keyLen, const char
|
||||||
return pNewNode;
|
return pNewNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SHashNode *doUpdateHashNode(SHashNode *pNode, const char *key, uint32_t keyLen, const char *pData,
|
static SHashNode *doUpdateHashNode(SHashNode *pNode, const char *key, size_t keyLen, const char *pData,
|
||||||
size_t dataSize) {
|
size_t dataSize) {
|
||||||
size_t size = dataSize + sizeof(SHashNode) + keyLen;
|
size_t size = dataSize + sizeof(SHashNode) + keyLen;
|
||||||
|
|
||||||
|
@ -320,14 +339,14 @@ static SHashNode *doUpdateHashNode(SHashNode *pNode, const char *key, uint32_t k
|
||||||
/**
|
/**
|
||||||
* insert the hash node at the front of the linked list
|
* insert the hash node at the front of the linked list
|
||||||
*
|
*
|
||||||
* @param pObj
|
* @param pHashObj
|
||||||
* @param pNode
|
* @param pNode
|
||||||
*/
|
*/
|
||||||
static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) {
|
static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) {
|
||||||
assert(pNode != NULL);
|
assert(pNode != NULL);
|
||||||
|
|
||||||
int32_t index = HASH_INDEX(pNode->hashVal, pObj->capacity);
|
int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||||
SHashEntry *pEntry = pObj->hashList[index];
|
SHashEntry *pEntry = pHashObj->hashList[index];
|
||||||
|
|
||||||
pNode->next = pEntry->next;
|
pNode->next = pEntry->next;
|
||||||
|
|
||||||
|
@ -339,74 +358,60 @@ static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) {
|
||||||
pNode->prev1 = pEntry;
|
pNode->prev1 = pEntry;
|
||||||
|
|
||||||
pEntry->num++;
|
pEntry->num++;
|
||||||
pObj->size++;
|
pHashObj->size++;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosNumElemsInHashTable(HashObj *pObj) {
|
size_t taosHashGetSize(const SHashObj *pHashObj) {
|
||||||
if (pObj == NULL) {
|
if (pHashObj == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pObj->size;
|
return pHashObj->size;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* add data node into hash table
|
* add data node into hash table
|
||||||
* @param pObj hash object
|
* @param pHashObj hash object
|
||||||
* @param pNode hash node
|
* @param pNode hash node
|
||||||
*/
|
*/
|
||||||
int32_t taosAddToHashTable(HashObj *pObj, const char *key, uint32_t keyLen, void *data, uint32_t size) {
|
int32_t taosHashPut(SHashObj *pHashObj, const char *key, size_t keyLen, void *data, size_t size) {
|
||||||
if (pObj->multithreadSafe) {
|
__wr_lock(pHashObj->lock);
|
||||||
__wr_lock(&pObj->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t hashVal = 0;
|
uint32_t hashVal = 0;
|
||||||
SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &hashVal);
|
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &hashVal);
|
||||||
|
|
||||||
if (pNode == NULL) { // no data in hash table with the specified key, add it into hash table
|
if (pNode == NULL) { // no data in hash table with the specified key, add it into hash table
|
||||||
taosHashTableResize(pObj);
|
taosHashTableResize(pHashObj);
|
||||||
|
|
||||||
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
|
SHashNode *pNewNode = doCreateHashNode(key, keyLen, data, size, hashVal);
|
||||||
if (pNewNode == NULL) {
|
if (pNewNode == NULL) {
|
||||||
if (pObj->multithreadSafe) {
|
__unlock(pHashObj->lock);
|
||||||
__unlock(&pObj->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
doAddToHashTable(pObj, pNewNode);
|
doAddToHashTable(pHashObj, pNewNode);
|
||||||
} else {
|
} else {
|
||||||
SHashNode *pNewNode = doUpdateHashNode(pNode, key, keyLen, data, size);
|
SHashNode *pNewNode = doUpdateHashNode(pNode, key, keyLen, data, size);
|
||||||
if (pNewNode == NULL) {
|
if (pNewNode == NULL) {
|
||||||
if (pObj->multithreadSafe) {
|
__unlock(pHashObj->lock);
|
||||||
__unlock(&pObj->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
doUpdateHashTable(pObj, pNewNode);
|
doUpdateHashTable(pHashObj, pNewNode);
|
||||||
}
|
|
||||||
|
|
||||||
if (pObj->multithreadSafe) {
|
|
||||||
__unlock(&pObj->lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
__unlock(pHashObj->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) {
|
void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen) {
|
||||||
if (pObj->multithreadSafe) {
|
__rd_lock(pHashObj->lock);
|
||||||
__rd_lock(&pObj->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t hashVal = 0;
|
uint32_t hashVal = 0;
|
||||||
SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &hashVal);
|
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &hashVal);
|
||||||
|
|
||||||
if (pObj->multithreadSafe) {
|
__unlock(pHashObj->lock);
|
||||||
__unlock(&pObj->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pNode != NULL) {
|
if (pNode != NULL) {
|
||||||
assert(pNode->hashVal == hashVal);
|
assert(pNode->hashVal == hashVal);
|
||||||
|
@ -419,29 +424,24 @@ char *taosGetDataFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* remove node in hash list
|
* remove node in hash list
|
||||||
* @param pObj
|
* @param pHashObj
|
||||||
* @param pNode
|
* @param pNode
|
||||||
*/
|
*/
|
||||||
void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) {
|
void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) {
|
||||||
if (pObj->multithreadSafe) {
|
__wr_lock(pHashObj->lock);
|
||||||
__wr_lock(&pObj->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t val = 0;
|
uint32_t val = 0;
|
||||||
SHashNode *pNode = doGetNodeFromHashTable(pObj, key, keyLen, &val);
|
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &val);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
if (pObj->multithreadSafe) {
|
__unlock(pHashObj->lock);
|
||||||
__unlock(&pObj->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SHashNode *pNext = pNode->next;
|
SHashNode *pNext = pNode->next;
|
||||||
if (pNode->prev != NULL) {
|
if (pNode->prev != NULL) {
|
||||||
int32_t slot = HASH_INDEX(val, pObj->capacity);
|
int32_t slot = HASH_INDEX(val, pHashObj->capacity);
|
||||||
if (pObj->hashList[slot]->next == pNode) {
|
if (pHashObj->hashList[slot]->next == pNode) {
|
||||||
pObj->hashList[slot]->next = pNext;
|
pHashObj->hashList[slot]->next = pNext;
|
||||||
} else {
|
} else {
|
||||||
pNode->prev->next = pNext;
|
pNode->prev->next = pNext;
|
||||||
}
|
}
|
||||||
|
@ -451,11 +451,12 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) {
|
||||||
pNext->prev = pNode->prev;
|
pNext->prev = pNode->prev;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t index = HASH_INDEX(pNode->hashVal, pObj->capacity);
|
uint32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
|
||||||
SHashEntry *pEntry = pObj->hashList[index];
|
|
||||||
|
SHashEntry *pEntry = pHashObj->hashList[index];
|
||||||
pEntry->num--;
|
pEntry->num--;
|
||||||
|
|
||||||
pObj->size--;
|
pHashObj->size--;
|
||||||
|
|
||||||
pNode->next = NULL;
|
pNode->next = NULL;
|
||||||
pNode->prev = NULL;
|
pNode->prev = NULL;
|
||||||
|
@ -463,24 +464,21 @@ void taosDeleteFromHashTable(HashObj *pObj, const char *key, uint32_t keyLen) {
|
||||||
pTrace("key:%s %p remove from hash table", pNode->key, pNode);
|
pTrace("key:%s %p remove from hash table", pNode->key, pNode);
|
||||||
tfree(pNode);
|
tfree(pNode);
|
||||||
|
|
||||||
if (pObj->multithreadSafe) {
|
__unlock(pHashObj->lock);
|
||||||
__unlock(&pObj->lock);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCleanUpHashTable(void *handle) {
|
void taosHashCleanup(SHashObj *pHashObj) {
|
||||||
HashObj *pObj = (HashObj *)handle;
|
if (pHashObj == NULL || pHashObj->capacity <= 0) {
|
||||||
if (pObj == NULL || pObj->capacity <= 0) return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SHashNode *pNode, *pNext;
|
SHashNode *pNode, *pNext;
|
||||||
|
|
||||||
if (pObj->multithreadSafe) {
|
__wr_lock(pHashObj->lock);
|
||||||
__wr_lock(&pObj->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pObj->hashList) {
|
if (pHashObj->hashList) {
|
||||||
for (int32_t i = 0; i < pObj->capacity; ++i) {
|
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
|
||||||
SHashEntry *pEntry = pObj->hashList[i];
|
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||||
pNode = pEntry->next;
|
pNode = pEntry->next;
|
||||||
|
|
||||||
while (pNode) {
|
while (pNode) {
|
||||||
|
@ -492,28 +490,26 @@ void taosCleanUpHashTable(void *handle) {
|
||||||
tfree(pEntry);
|
tfree(pEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(pObj->hashList);
|
free(pHashObj->hashList);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pObj->multithreadSafe) {
|
__unlock(pHashObj->lock);
|
||||||
__unlock(&pObj->lock);
|
__lock_destroy(pHashObj->lock);
|
||||||
__lock_destroy(&pObj->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
memset(pObj, 0, sizeof(HashObj));
|
memset(pHashObj, 0, sizeof(SHashObj));
|
||||||
free(pObj);
|
free(pHashObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
// for profile only
|
// for profile only
|
||||||
int32_t taosGetHashMaxOverflowLength(HashObj* pObj) {
|
int32_t taosHashGetMaxOverflowLinkLength(const SHashObj* pHashObj) {
|
||||||
if (pObj == NULL || pObj->size == 0) {
|
if (pHashObj == NULL || pHashObj->size == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
|
|
||||||
for(int32_t i = 0; i < pObj->size; ++i) {
|
for(int32_t i = 0; i < pHashObj->size; ++i) {
|
||||||
SHashEntry *pEntry = pObj->hashList[i];
|
SHashEntry *pEntry = pHashObj->hashList[i];
|
||||||
if (num < pEntry->num) {
|
if (num < pEntry->num) {
|
||||||
num = pEntry->num;
|
num = pEntry->num;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,848 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
#include "os.h"
|
|
||||||
|
|
||||||
#include "tlog.h"
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "sskiplist.h"
|
|
||||||
#include "tutil.h"
|
|
||||||
|
|
||||||
static FORCE_INLINE void recordNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) { // record link count in each level
|
|
||||||
for (int32_t i = 0; i < nLevel; ++i) {
|
|
||||||
pSkipList->state.nLevelNodeCnt[i]++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void removeNodeEachLevel(tSkipList *pSkipList, int32_t nLevel) {
|
|
||||||
for (int32_t i = 0; i < nLevel; ++i) {
|
|
||||||
pSkipList->state.nLevelNodeCnt[i]--;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t getSkipListNodeRandomHeight(tSkipList *pSkipList) {
|
|
||||||
const uint32_t factor = 4;
|
|
||||||
|
|
||||||
int32_t n = 1;
|
|
||||||
while ((rand() % factor) == 0 && n <= pSkipList->nMaxLevel) {
|
|
||||||
n++;
|
|
||||||
}
|
|
||||||
|
|
||||||
return n;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t getSkipListNodeLevel(tSkipList *pSkipList) {
|
|
||||||
int32_t nLevel = getSkipListNodeRandomHeight(pSkipList);
|
|
||||||
if (pSkipList->nSize == 0) {
|
|
||||||
nLevel = 1;
|
|
||||||
pSkipList->nLevel = 1;
|
|
||||||
} else {
|
|
||||||
if (nLevel > pSkipList->nLevel && pSkipList->nLevel < pSkipList->nMaxLevel) {
|
|
||||||
nLevel = (++pSkipList->nLevel);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nLevel;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode);
|
|
||||||
|
|
||||||
void SSkipListDoRecordPut(tSkipList *pSkipList) {
|
|
||||||
const int32_t MAX_RECORD_NUM = 1000;
|
|
||||||
|
|
||||||
if (pSkipList->state.nInsertObjs == MAX_RECORD_NUM) {
|
|
||||||
pSkipList->state.nInsertObjs = 1;
|
|
||||||
pSkipList->state.nTotalStepsForInsert = 0;
|
|
||||||
pSkipList->state.nTotalElapsedTimeForInsert = 0;
|
|
||||||
} else {
|
|
||||||
pSkipList->state.nInsertObjs++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t compareIntVal(const void *pLeft, const void *pRight) {
|
|
||||||
int64_t lhs = ((tSkipListKey *)pLeft)->i64Key;
|
|
||||||
int64_t rhs = ((tSkipListKey *)pRight)->i64Key;
|
|
||||||
|
|
||||||
DEFAULT_COMP(lhs, rhs);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t scompareIntDoubleVal(const void *pLeft, const void *pRight) {
|
|
||||||
int64_t lhs = ((tSkipListKey *)pLeft)->i64Key;
|
|
||||||
double rhs = ((tSkipListKey *)pRight)->dKey;
|
|
||||||
if (fabs(lhs - rhs) < FLT_EPSILON) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return (lhs > rhs) ? 1 : -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t scompareDoubleIntVal(const void *pLeft, const void *pRight) {
|
|
||||||
double lhs = ((tSkipListKey *)pLeft)->dKey;
|
|
||||||
int64_t rhs = ((tSkipListKey *)pRight)->i64Key;
|
|
||||||
if (fabs(lhs - rhs) < FLT_EPSILON) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return (lhs > rhs) ? 1 : -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t scompareDoubleVal(const void *pLeft, const void *pRight) {
|
|
||||||
double ret = (((tSkipListKey *)pLeft)->dKey - ((tSkipListKey *)pRight)->dKey);
|
|
||||||
if (fabs(ret) < FLT_EPSILON) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return ret > 0 ? 1 : -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t scompareStrVal(const void *pLeft, const void *pRight) {
|
|
||||||
tSkipListKey *pL = (tSkipListKey *)pLeft;
|
|
||||||
tSkipListKey *pR = (tSkipListKey *)pRight;
|
|
||||||
|
|
||||||
if (pL->nLen == 0 && pR->nLen == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
//handle only one-side bound compare situation, there is only lower bound or only upper bound
|
|
||||||
if (pL->nLen == -1) {
|
|
||||||
return 1; // no lower bound, lower bound is minimum, always return -1;
|
|
||||||
} else if (pR->nLen == -1) {
|
|
||||||
return -1; // no upper bound, upper bound is maximum situation, always return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t ret = strcmp(((tSkipListKey *)pLeft)->pz, ((tSkipListKey *)pRight)->pz);
|
|
||||||
|
|
||||||
if (ret == 0) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return ret > 0 ? 1 : -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t scompareWStrVal(const void *pLeft, const void *pRight) {
|
|
||||||
tSkipListKey *pL = (tSkipListKey *)pLeft;
|
|
||||||
tSkipListKey *pR = (tSkipListKey *)pRight;
|
|
||||||
|
|
||||||
if (pL->nLen == 0 && pR->nLen == 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
//handle only one-side bound compare situation, there is only lower bound or only upper bound
|
|
||||||
if (pL->nLen == -1) {
|
|
||||||
return 1; // no lower bound, lower bound is minimum, always return -1;
|
|
||||||
} else if (pR->nLen == -1) {
|
|
||||||
return -1; // no upper bound, upper bound is maximum situation, always return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t ret = wcscmp(((tSkipListKey *)pLeft)->wpz, ((tSkipListKey *)pRight)->wpz);
|
|
||||||
|
|
||||||
if (ret == 0) {
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return ret > 0 ? 1 : -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static __compar_fn_t getKeyFilterComparator(tSkipList *pSkipList, int32_t filterDataType) {
|
|
||||||
__compar_fn_t comparator = NULL;
|
|
||||||
|
|
||||||
switch (pSkipList->keyType) {
|
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
|
||||||
case TSDB_DATA_TYPE_INT:
|
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
|
||||||
case TSDB_DATA_TYPE_BOOL: {
|
|
||||||
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
comparator = compareIntVal;
|
|
||||||
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
comparator = scompareIntDoubleVal;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
|
||||||
case TSDB_DATA_TYPE_DOUBLE: {
|
|
||||||
if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
comparator = scompareDoubleIntVal;
|
|
||||||
} else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
comparator = scompareDoubleVal;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
|
||||||
comparator = scompareStrVal;
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
|
||||||
comparator = scompareWStrVal;
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
comparator = compareIntVal;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return comparator;
|
|
||||||
}
|
|
||||||
|
|
||||||
static __compar_fn_t getKeyComparator(int32_t keyType) {
|
|
||||||
__compar_fn_t comparator = NULL;
|
|
||||||
|
|
||||||
switch (keyType) {
|
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
|
||||||
case TSDB_DATA_TYPE_INT:
|
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
|
||||||
comparator = compareIntVal;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
|
||||||
comparator = scompareDoubleVal;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
|
||||||
comparator = scompareStrVal;
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
|
||||||
comparator = scompareWStrVal;
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
comparator = compareIntVal;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return comparator;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSkipList* SSkipListCreate(int16_t nMaxLevel, int16_t keyType, int16_t nMaxKeyLen) {
|
|
||||||
tSkipList *pSkipList = (tSkipList *)calloc(1, sizeof(tSkipList));
|
|
||||||
if (pSkipList == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSkipList->keyType = keyType;
|
|
||||||
|
|
||||||
pSkipList->comparator = getKeyComparator(keyType);
|
|
||||||
pSkipList->pHead.pForward = (tSkipListNode **)calloc(1, POINTER_BYTES * MAX_SKIP_LIST_LEVEL);
|
|
||||||
|
|
||||||
pSkipList->nMaxLevel = MAX_SKIP_LIST_LEVEL;
|
|
||||||
pSkipList->nLevel = 1;
|
|
||||||
|
|
||||||
pSkipList->nMaxKeyLen = nMaxKeyLen;
|
|
||||||
pSkipList->nMaxLevel = nMaxLevel;
|
|
||||||
|
|
||||||
if (pthread_rwlock_init(&pSkipList->lock, NULL) != 0) {
|
|
||||||
tfree(pSkipList->pHead.pForward);
|
|
||||||
tfree(pSkipList);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
srand(time(NULL));
|
|
||||||
pSkipList->state.nTotalMemSize += sizeof(tSkipList);
|
|
||||||
return pSkipList;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void doRemove(tSkipList *pSkipList, tSkipListNode *pNode, tSkipListNode *forward[]) {
|
|
||||||
int32_t level = pNode->nLevel;
|
|
||||||
for (int32_t j = level - 1; j >= 0; --j) {
|
|
||||||
if ((forward[j]->pForward[j] != NULL) && (forward[j]->pForward[j]->pForward[j])) {
|
|
||||||
forward[j]->pForward[j]->pForward[j]->pBackward[j] = forward[j];
|
|
||||||
}
|
|
||||||
|
|
||||||
if (forward[j]->pForward[j] != NULL) {
|
|
||||||
forward[j]->pForward[j] = forward[j]->pForward[j]->pForward[j];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pSkipList->state.nTotalMemSize -= (sizeof(tSkipListNode) + POINTER_BYTES * pNode->nLevel * 2);
|
|
||||||
removeNodeEachLevel(pSkipList, pNode->nLevel);
|
|
||||||
|
|
||||||
tfree(pNode);
|
|
||||||
--pSkipList->nSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
static size_t getOneNodeSize(const tSkipListKey *pKey, int32_t nLevel) {
|
|
||||||
size_t size = sizeof(tSkipListNode) + sizeof(intptr_t) * (nLevel << 1);
|
|
||||||
if (pKey->nType == TSDB_DATA_TYPE_BINARY) {
|
|
||||||
size += pKey->nLen + 1;
|
|
||||||
} else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
size += (pKey->nLen + 1) * TSDB_NCHAR_SIZE;
|
|
||||||
}
|
|
||||||
|
|
||||||
return size;
|
|
||||||
}
|
|
||||||
|
|
||||||
static tSkipListNode *SSkipListCreateNode(void *pData, const tSkipListKey *pKey, int32_t nLevel) {
|
|
||||||
size_t nodeSize = getOneNodeSize(pKey, nLevel);
|
|
||||||
tSkipListNode *pNode = (tSkipListNode *)calloc(1, nodeSize);
|
|
||||||
|
|
||||||
pNode->pForward = (tSkipListNode **)(&pNode[1]);
|
|
||||||
pNode->pBackward = (pNode->pForward + nLevel);
|
|
||||||
|
|
||||||
pNode->pData = pData;
|
|
||||||
|
|
||||||
pNode->key = *pKey;
|
|
||||||
if (pKey->nType == TSDB_DATA_TYPE_BINARY) {
|
|
||||||
pNode->key.pz = (char *)(pNode->pBackward + nLevel);
|
|
||||||
|
|
||||||
strcpy(pNode->key.pz, pKey->pz);
|
|
||||||
pNode->key.pz[pKey->nLen] = 0;
|
|
||||||
} else if (pKey->nType == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
pNode->key.wpz = (wchar_t *)(pNode->pBackward + nLevel);
|
|
||||||
wcsncpy(pNode->key.wpz, pKey->wpz, pKey->nLen);
|
|
||||||
pNode->key.wpz[pKey->nLen] = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pNode->nLevel = nLevel;
|
|
||||||
return pNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSkipListKey SSkipListCreateKey(int32_t type, char *val, size_t keyLength) {
|
|
||||||
tSkipListKey k = {0};
|
|
||||||
tVariantCreateFromBinary(&k, val, (uint32_t) keyLength, (uint32_t) type);
|
|
||||||
return k;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tSkipListDestroyKey(tSkipListKey *pKey) { tVariantDestroy(pKey); }
|
|
||||||
|
|
||||||
void* SSkipListDestroy(tSkipList *pSkipList) {
|
|
||||||
if (pSkipList == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&pSkipList->lock);
|
|
||||||
tSkipListNode *pNode = pSkipList->pHead.pForward[0];
|
|
||||||
while (pNode) {
|
|
||||||
tSkipListNode *pTemp = pNode;
|
|
||||||
pNode = pNode->pForward[0];
|
|
||||||
tfree(pTemp);
|
|
||||||
}
|
|
||||||
|
|
||||||
tfree(pSkipList->pHead.pForward);
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
pthread_rwlock_destroy(&pSkipList->lock);
|
|
||||||
tfree(pSkipList);
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSkipListNode *SSkipListPut(tSkipList *pSkipList, void *pData, tSkipListKey *pKey, int32_t insertIdenticalKey) {
|
|
||||||
if (pSkipList == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
// record one node is put into skiplist
|
|
||||||
SSkipListDoRecordPut(pSkipList);
|
|
||||||
|
|
||||||
tSkipListNode *px = &pSkipList->pHead;
|
|
||||||
|
|
||||||
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
|
||||||
for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) {
|
|
||||||
while (px->pForward[i] != NULL && (pSkipList->comparator(&px->pForward[i]->key, pKey) < 0)) {
|
|
||||||
px = px->pForward[i];
|
|
||||||
}
|
|
||||||
|
|
||||||
pSkipList->state.nTotalStepsForInsert++;
|
|
||||||
forward[i] = px;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the skiplist does not allowed identical key inserted, the new data will be discarded.
|
|
||||||
if ((insertIdenticalKey == 0) && forward[0] != &pSkipList->pHead &&
|
|
||||||
(pSkipList->comparator(&forward[0]->key, pKey) == 0)) {
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
return forward[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t nLevel = getSkipListNodeLevel(pSkipList);
|
|
||||||
recordNodeEachLevel(pSkipList, nLevel);
|
|
||||||
|
|
||||||
tSkipListNode *pNode = SSkipListCreateNode(pData, pKey, nLevel);
|
|
||||||
tSkipListDoInsert(pSkipList, forward, nLevel, pNode);
|
|
||||||
|
|
||||||
pSkipList->nSize += 1;
|
|
||||||
|
|
||||||
// char tmpstr[512] = {0};
|
|
||||||
// tVariantToString(&pNode->key, tmpstr);
|
|
||||||
// pTrace("skiplist:%p, node added, key:%s, total list len:%d", pSkipList,
|
|
||||||
// tmpstr, pSkipList->nSize);
|
|
||||||
|
|
||||||
pSkipList->state.nTotalMemSize += getOneNodeSize(pKey, nLevel);
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
return pNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tSkipListDoInsert(tSkipList *pSkipList, tSkipListNode **forward, int32_t nLevel, tSkipListNode *pNode) {
|
|
||||||
for (int32_t i = 0; i < nLevel; ++i) {
|
|
||||||
tSkipListNode *x = forward[i];
|
|
||||||
if (x != NULL) {
|
|
||||||
pNode->pBackward[i] = x;
|
|
||||||
if (x->pForward[i]) x->pForward[i]->pBackward[i] = pNode;
|
|
||||||
|
|
||||||
pNode->pForward[i] = x->pForward[i];
|
|
||||||
x->pForward[i] = pNode;
|
|
||||||
} else {
|
|
||||||
pSkipList->pHead.pForward[i] = pNode;
|
|
||||||
pNode->pBackward[i] = &(pSkipList->pHead);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tSkipListNode *tSkipListGetOne(tSkipList *pSkipList, tSkipListKey *pKey) {
|
|
||||||
int32_t sLevel = pSkipList->nLevel - 1;
|
|
||||||
int32_t ret = -1;
|
|
||||||
|
|
||||||
tSkipListNode *x = &pSkipList->pHead;
|
|
||||||
|
|
||||||
pthread_rwlock_rdlock(&pSkipList->lock);
|
|
||||||
pSkipList->state.queryCount++;
|
|
||||||
|
|
||||||
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
|
|
||||||
|
|
||||||
for (int32_t i = sLevel; i >= 0; --i) {
|
|
||||||
while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) {
|
|
||||||
x = x->pForward[i];
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ret == 0) {
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
return x->pForward[i];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tSkipListEndParQuery(tSkipList *pSkipList, tSkipListNode *pStartNode, tSkipListKey *pEndKey,
|
|
||||||
int32_t cond, tSkipListNode ***pRes) {
|
|
||||||
pthread_rwlock_rdlock(&pSkipList->lock);
|
|
||||||
tSkipListNode *p = pStartNode;
|
|
||||||
int32_t numOfRes = 0;
|
|
||||||
|
|
||||||
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pEndKey->nType);
|
|
||||||
while (p != NULL) {
|
|
||||||
int32_t ret = filterComparator(&p->key, pEndKey);
|
|
||||||
if (ret > 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ret < 0) {
|
|
||||||
numOfRes++;
|
|
||||||
p = p->pForward[0];
|
|
||||||
} else if (ret == 0) {
|
|
||||||
if (cond == TSDB_RELATION_LESS_EQUAL) {
|
|
||||||
numOfRes++;
|
|
||||||
p = p->pForward[0];
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(*pRes) = (tSkipListNode **)malloc(POINTER_BYTES * numOfRes);
|
|
||||||
for (int32_t i = 0; i < numOfRes; ++i) {
|
|
||||||
(*pRes)[i] = pStartNode;
|
|
||||||
pStartNode = pStartNode->pForward[0];
|
|
||||||
}
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
return numOfRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* maybe return the copy of tSkipListNode would be better
|
|
||||||
*/
|
|
||||||
int32_t tSkipListGets(tSkipList *pSkipList, tSkipListKey *pKey, tSkipListNode ***pRes) {
|
|
||||||
(*pRes) = NULL;
|
|
||||||
|
|
||||||
tSkipListNode *pNode = tSkipListGetOne(pSkipList, pKey);
|
|
||||||
if (pNode == NULL) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
|
|
||||||
|
|
||||||
// backward check if previous nodes are with the same value.
|
|
||||||
tSkipListNode *pPrev = pNode->pBackward[0];
|
|
||||||
while ((pPrev != &pSkipList->pHead) && filterComparator(&pPrev->key, pKey) == 0) {
|
|
||||||
pPrev = pPrev->pBackward[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes);
|
|
||||||
}
|
|
||||||
|
|
||||||
static tSkipListNode *tSkipListParQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t cond) {
|
|
||||||
int32_t sLevel = pSkipList->nLevel - 1;
|
|
||||||
int32_t ret = -1;
|
|
||||||
|
|
||||||
tSkipListNode *x = &pSkipList->pHead;
|
|
||||||
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
|
|
||||||
|
|
||||||
pthread_rwlock_rdlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
if (cond == TSDB_RELATION_LARGE_EQUAL || cond == TSDB_RELATION_LARGE) {
|
|
||||||
for (int32_t i = sLevel; i >= 0; --i) {
|
|
||||||
while (x->pForward[i] != NULL && (ret = filterComparator(&x->pForward[i]->key, pKey)) < 0) {
|
|
||||||
x = x->pForward[i];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// backward check if previous nodes are with the same value.
|
|
||||||
if (cond == TSDB_RELATION_LARGE_EQUAL && ret == 0) {
|
|
||||||
tSkipListNode *pNode = x->pForward[0];
|
|
||||||
while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparator(&pNode->pBackward[0]->key, pKey) == 0)) {
|
|
||||||
pNode = pNode->pBackward[0];
|
|
||||||
}
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
return pNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ret > 0 || cond == TSDB_RELATION_LARGE_EQUAL) {
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
return x->pForward[0];
|
|
||||||
} else { // cond == TSDB_RELATION_LARGE && ret == 0
|
|
||||||
tSkipListNode *pn = x->pForward[0];
|
|
||||||
while (pn != NULL && filterComparator(&pn->key, pKey) == 0) {
|
|
||||||
pn = pn->pForward[0];
|
|
||||||
}
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
return pn;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tSkipListIterateList(tSkipList *pSkipList, tSkipListNode ***pRes, bool (*fp)(tSkipListNode *, void *),
|
|
||||||
void *param) {
|
|
||||||
(*pRes) = (tSkipListNode **)calloc(1, POINTER_BYTES * pSkipList->nSize);
|
|
||||||
if (NULL == *pRes) {
|
|
||||||
pError("error skiplist %p, malloc failed", pSkipList);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_rdlock(&pSkipList->lock);
|
|
||||||
tSkipListNode *pStartNode = pSkipList->pHead.pForward[0];
|
|
||||||
int32_t num = 0;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSkipList->nSize; ++i) {
|
|
||||||
if (pStartNode == NULL) {
|
|
||||||
pError("error skiplist %p, required length:%d, actual length:%d", pSkipList, pSkipList->nSize, i - 1);
|
|
||||||
#ifdef _DEBUG_VIEW
|
|
||||||
SSkipListPrint(pSkipList, 1);
|
|
||||||
#endif
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fp == NULL || (fp != NULL && fp(pStartNode, param) == true)) {
|
|
||||||
(*pRes)[num++] = pStartNode;
|
|
||||||
}
|
|
||||||
|
|
||||||
pStartNode = pStartNode->pForward[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
if (num == 0) {
|
|
||||||
free(*pRes);
|
|
||||||
*pRes = NULL;
|
|
||||||
} else if (num < pSkipList->nSize) { // free unused memory
|
|
||||||
char* tmp = realloc((*pRes), num * POINTER_BYTES);
|
|
||||||
assert(tmp != NULL);
|
|
||||||
|
|
||||||
*pRes = (tSkipListNode**)tmp;
|
|
||||||
}
|
|
||||||
|
|
||||||
return num;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tSkipListIteratorReset(tSkipList *pSkipList, SSkipListIterator* iter) {
|
|
||||||
if (pSkipList == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
iter->pSkipList = pSkipList;
|
|
||||||
|
|
||||||
pthread_rwlock_rdlock(&pSkipList->lock);
|
|
||||||
iter->cur = NULL;//pSkipList->pHead.pForward[0];
|
|
||||||
iter->num = pSkipList->nSize;
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool tSkipListIteratorNext(SSkipListIterator* iter) {
|
|
||||||
if (iter->num == 0 || iter->pSkipList == NULL) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSkipList* pSkipList = iter->pSkipList;
|
|
||||||
|
|
||||||
pthread_rwlock_rdlock(&pSkipList->lock);
|
|
||||||
if (iter->cur == NULL) {
|
|
||||||
iter->cur = pSkipList->pHead.pForward[0];
|
|
||||||
} else {
|
|
||||||
iter->cur = iter->cur->pForward[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
return iter->cur != NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSkipListNode* tSkipListIteratorGet(SSkipListIterator* iter) {
|
|
||||||
return iter->cur;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tSkipListRangeQuery(tSkipList *pSkipList, tSKipListQueryCond *pCond, tSkipListNode ***pRes) {
|
|
||||||
pSkipList->state.queryCount++;
|
|
||||||
tSkipListNode *pStart = tSkipListParQuery(pSkipList, &pCond->lowerBnd, pCond->lowerBndRelOptr);
|
|
||||||
if (pStart == 0) {
|
|
||||||
*pRes = NULL;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return tSkipListEndParQuery(pSkipList, pStart, &pCond->upperBnd, pCond->upperBndRelOptr, pRes);
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool removeSupport(tSkipList *pSkipList, tSkipListNode **forward, tSkipListKey *pKey) {
|
|
||||||
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
|
|
||||||
|
|
||||||
if (filterComparator(&forward[0]->pForward[0]->key, pKey) == 0) {
|
|
||||||
tSkipListNode *p = forward[0]->pForward[0];
|
|
||||||
doRemove(pSkipList, p, forward);
|
|
||||||
} else { // failed to find the node of specified value,abort
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// compress the minimum level of skip list
|
|
||||||
while (pSkipList->nLevel > 0 && pSkipList->pHead.pForward[pSkipList->nLevel - 1] == NULL) {
|
|
||||||
pSkipList->nLevel -= 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tSkipListRemoveNode(tSkipList *pSkipList, tSkipListNode *pNode) {
|
|
||||||
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
|
||||||
|
|
||||||
pthread_rwlock_rdlock(&pSkipList->lock);
|
|
||||||
for (int32_t i = 0; i < pNode->nLevel; ++i) {
|
|
||||||
forward[i] = pNode->pBackward[i];
|
|
||||||
}
|
|
||||||
|
|
||||||
removeSupport(pSkipList, forward, &pNode->key);
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool tSkipListRemove(tSkipList *pSkipList, tSkipListKey *pKey) {
|
|
||||||
tSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
|
||||||
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
|
|
||||||
|
|
||||||
pthread_rwlock_rdlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
tSkipListNode *x = &pSkipList->pHead;
|
|
||||||
for (int32_t i = pSkipList->nLevel - 1; i >= 0; --i) {
|
|
||||||
while (x->pForward[i] != NULL && (filterComparator(&x->pForward[i]->key, pKey) < 0)) {
|
|
||||||
x = x->pForward[i];
|
|
||||||
}
|
|
||||||
forward[i] = x;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ret = removeSupport(pSkipList, forward, pKey);
|
|
||||||
pthread_rwlock_unlock(&pSkipList->lock);
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
void SSkipListPrint(tSkipList *pSkipList, int16_t nlevel) {
|
|
||||||
if (pSkipList == NULL || pSkipList->nLevel < nlevel || nlevel <= 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
tSkipListNode *p = pSkipList->pHead.pForward[nlevel - 1];
|
|
||||||
int32_t id = 1;
|
|
||||||
while (p) {
|
|
||||||
switch (pSkipList->keyType) {
|
|
||||||
case TSDB_DATA_TYPE_INT:
|
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
|
||||||
fprintf(stdout, "%d: %" PRId64 " \n", id++, p->key.i64Key);
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
|
||||||
fprintf(stdout, "%d: %s \n", id++, p->key.pz);
|
|
||||||
break;
|
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
|
||||||
fprintf(stdout, "%d: %lf \n", id++, p->key.dKey);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
fprintf(stdout, "\n");
|
|
||||||
}
|
|
||||||
p = p->pForward[nlevel - 1];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* query processor based on query condition
|
|
||||||
*/
|
|
||||||
int32_t tSkipListQuery(tSkipList *pSkipList, tSKipListQueryCond *pQueryCond, tSkipListNode ***pResult) {
|
|
||||||
// query condition check
|
|
||||||
int32_t rel = 0;
|
|
||||||
__compar_fn_t comparator = getKeyComparator(pQueryCond->lowerBnd.nType);
|
|
||||||
|
|
||||||
if (pSkipList == NULL || pQueryCond == NULL || pSkipList->nSize == 0 ||
|
|
||||||
(((rel = comparator(&pQueryCond->lowerBnd, &pQueryCond->upperBnd)) > 0 &&
|
|
||||||
pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_NCHAR && pQueryCond->lowerBnd.nType != TSDB_DATA_TYPE_BINARY))) {
|
|
||||||
(*pResult) = NULL;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (rel == 0) {
|
|
||||||
/*
|
|
||||||
* 0 means: pQueryCond->lowerBnd == pQueryCond->upperBnd
|
|
||||||
* point query
|
|
||||||
*/
|
|
||||||
if (pQueryCond->lowerBndRelOptr == TSDB_RELATION_LARGE_EQUAL &&
|
|
||||||
pQueryCond->upperBndRelOptr == TSDB_RELATION_LESS_EQUAL) { // point query
|
|
||||||
return tSkipListGets(pSkipList, &pQueryCond->lowerBnd, pResult);
|
|
||||||
} else {
|
|
||||||
(*pResult) = NULL;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/* range query, query operation code check */
|
|
||||||
return tSkipListRangeQuery(pSkipList, pQueryCond, pResult);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct MultipleQueryResult {
|
|
||||||
int32_t len;
|
|
||||||
tSkipListNode **pData;
|
|
||||||
} MultipleQueryResult;
|
|
||||||
|
|
||||||
static int32_t mergeQueryResult(MultipleQueryResult *pResults, int32_t numOfResSet, tSkipListNode ***pRes) {
|
|
||||||
int32_t total = 0;
|
|
||||||
for (int32_t i = 0; i < numOfResSet; ++i) {
|
|
||||||
total += pResults[i].len;
|
|
||||||
}
|
|
||||||
|
|
||||||
(*pRes) = malloc(POINTER_BYTES * total);
|
|
||||||
int32_t idx = 0;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfResSet; ++i) {
|
|
||||||
MultipleQueryResult *pOneResult = &pResults[i];
|
|
||||||
for (int32_t j = 0; j < pOneResult->len; ++j) {
|
|
||||||
(*pRes)[idx++] = pOneResult->pData[j];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return total;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void removeDuplicateKey(tSkipListKey *pKey, int32_t *numOfKey, __compar_fn_t comparator) {
|
|
||||||
if (*numOfKey == 1) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
qsort(pKey, *numOfKey, sizeof(pKey[0]), comparator);
|
|
||||||
int32_t i = 0, j = 1;
|
|
||||||
|
|
||||||
while (i < (*numOfKey) && j < (*numOfKey)) {
|
|
||||||
int32_t ret = comparator(&pKey[i], &pKey[j]);
|
|
||||||
if (ret == 0) {
|
|
||||||
j++;
|
|
||||||
} else {
|
|
||||||
pKey[i + 1] = pKey[j];
|
|
||||||
i++;
|
|
||||||
j++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
(*numOfKey) = i + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mergeResult(const tSkipListKey *pKey, int32_t numOfKey, tSkipListNode ***pRes, __compar_fn_t comparator,
|
|
||||||
tSkipListNode *pNode) {
|
|
||||||
int32_t i = 0, j = 0;
|
|
||||||
// merge two sorted arrays in O(n) time
|
|
||||||
while (i < numOfKey && pNode != NULL) {
|
|
||||||
int32_t ret = comparator(&pNode->key, &pKey[i]);
|
|
||||||
if (ret < 0) {
|
|
||||||
(*pRes)[j++] = pNode;
|
|
||||||
pNode = pNode->pForward[0];
|
|
||||||
} else if (ret == 0) {
|
|
||||||
pNode = pNode->pForward[0];
|
|
||||||
} else { // pNode->key > pkey[i]
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
while (pNode != NULL) {
|
|
||||||
(*pRes)[j++] = pNode;
|
|
||||||
pNode = pNode->pForward[0];
|
|
||||||
}
|
|
||||||
return j;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tSkipListPointQuery(tSkipList *pSkipList, tSkipListKey *pKey, int32_t numOfKey, tSkipListPointQueryType type,
|
|
||||||
tSkipListNode ***pRes) {
|
|
||||||
if (numOfKey == 0 || pKey == NULL || pSkipList == NULL || pSkipList->nSize == 0 ||
|
|
||||||
(type != INCLUDE_POINT_QUERY && type != EXCLUDE_POINT_QUERY)) {
|
|
||||||
(*pRes) = NULL;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
__compar_fn_t comparator = getKeyComparator(pKey->nType);
|
|
||||||
removeDuplicateKey(pKey, &numOfKey, comparator);
|
|
||||||
|
|
||||||
if (type == INCLUDE_POINT_QUERY) {
|
|
||||||
if (numOfKey == 1) {
|
|
||||||
return tSkipListGets(pSkipList, &pKey[0], pRes);
|
|
||||||
} else {
|
|
||||||
MultipleQueryResult *pTempResult = (MultipleQueryResult *)malloc(sizeof(MultipleQueryResult) * numOfKey);
|
|
||||||
for (int32_t i = 0; i < numOfKey; ++i) {
|
|
||||||
pTempResult[i].len = tSkipListGets(pSkipList, &pKey[i], &pTempResult[i].pData);
|
|
||||||
}
|
|
||||||
int32_t num = mergeQueryResult(pTempResult, numOfKey, pRes);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfKey; ++i) {
|
|
||||||
free(pTempResult[i].pData);
|
|
||||||
}
|
|
||||||
free(pTempResult);
|
|
||||||
return num;
|
|
||||||
}
|
|
||||||
} else { // exclude query
|
|
||||||
*pRes = malloc(POINTER_BYTES * pSkipList->nSize);
|
|
||||||
|
|
||||||
__compar_fn_t filterComparator = getKeyFilterComparator(pSkipList, pKey->nType);
|
|
||||||
|
|
||||||
tSkipListNode *pNode = pSkipList->pHead.pForward[0];
|
|
||||||
int32_t retLen = mergeResult(pKey, numOfKey, pRes, filterComparator, pNode);
|
|
||||||
|
|
||||||
if (retLen < pSkipList->nSize) {
|
|
||||||
(*pRes) = realloc(*pRes, POINTER_BYTES * retLen);
|
|
||||||
}
|
|
||||||
return retLen;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
|
@ -98,7 +98,7 @@ void* taosArrayGetP(SArray* pArray, size_t index) {
|
||||||
return *(void**)ret;
|
return *(void**)ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t taosArrayGetSize(SArray* pArray) { return pArray->size; }
|
size_t taosArrayGetSize(const SArray* pArray) { return pArray->size; }
|
||||||
|
|
||||||
void* taosArrayInsert(SArray* pArray, size_t index, void* pData) {
|
void* taosArrayInsert(SArray* pArray, size_t index, void* pData) {
|
||||||
if (pArray == NULL || pData == NULL) {
|
if (pArray == NULL || pData == NULL) {
|
||||||
|
|
|
@ -7,8 +7,8 @@
|
||||||
* MurmurHash algorithm
|
* MurmurHash algorithm
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
#include "hashfunc.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "hashutil.h"
|
|
||||||
|
|
||||||
#define ROTL32(x, r) ((x) << (r) | (x) >> (32 - (r)))
|
#define ROTL32(x, r) ((x) << (r) | (x) >> (32 - (r)))
|
||||||
|
|
||||||
|
|
|
@ -524,6 +524,71 @@ SArray* tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey, int16_t keyType) {
|
||||||
return sa;
|
return sa;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t tSkipListGetSize(const SSkipList* pSkipList) {
|
||||||
|
if (pSkipList == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pSkipList->size;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSkipListIterator* tSkipListCreateIter(SSkipList *pSkipList) {
|
||||||
|
if (pSkipList == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSkipListIterator* iter = calloc(1, sizeof(SSkipListIterator));
|
||||||
|
|
||||||
|
iter->pSkipList = pSkipList;
|
||||||
|
if (pSkipList->lock) {
|
||||||
|
pthread_rwlock_rdlock(pSkipList->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
iter->cur = NULL;
|
||||||
|
iter->num = pSkipList->size;
|
||||||
|
|
||||||
|
if (pSkipList->lock) {
|
||||||
|
pthread_rwlock_unlock(pSkipList->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
return iter;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool tSkipListIterNext(SSkipListIterator *iter) {
|
||||||
|
if (iter->num == 0 || iter->pSkipList == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSkipList *pSkipList = iter->pSkipList;
|
||||||
|
|
||||||
|
if (pSkipList->lock) {
|
||||||
|
pthread_rwlock_rdlock(pSkipList->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (iter->cur == NULL) {
|
||||||
|
iter->cur = SL_GET_FORWARD_POINTER(pSkipList->pHead, 0);
|
||||||
|
} else {
|
||||||
|
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pSkipList->lock) {
|
||||||
|
pthread_rwlock_unlock(pSkipList->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
return iter->cur != NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSkipListNode *tSkipListIterGet(SSkipListIterator *iter) { return (iter == NULL)? NULL:iter->cur; }
|
||||||
|
|
||||||
|
void* tSkipListDestroyIter(SSkipListIterator* iter) {
|
||||||
|
if (iter == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
tfree(iter);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
|
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
|
||||||
// int32_t cond, SSkipListNode ***pRes) {
|
// int32_t cond, SSkipListNode ***pRes) {
|
||||||
// pthread_rwlock_rdlock(&pSkipList->lock);
|
// pthread_rwlock_rdlock(&pSkipList->lock);
|
||||||
|
|
|
@ -23,7 +23,7 @@ extern "C" {
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
#include "hashutil.h"
|
#include "hashfunc.h"
|
||||||
|
|
||||||
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query))
|
#define GET_QINFO_ADDR(x) ((char*)(x)-offsetof(SQInfo, query))
|
||||||
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
|
#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0)
|
||||||
|
@ -119,7 +119,7 @@ typedef enum {
|
||||||
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
|
typedef int (*__block_search_fn_t)(char* data, int num, int64_t key, int order);
|
||||||
|
|
||||||
static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) {
|
static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) {
|
||||||
return *(SMeterObj**)taosGetDataFromHashTable(hashHandle, (const char*)&sid, sizeof(sid));
|
return *(SMeterObj**)taosHashGet(hashHandle, (const char*)&sid, sizeof(sid));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isQueryKilled(SQuery* pQuery);
|
bool isQueryKilled(SQuery* pQuery);
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
#include "hashutil.h"
|
#include "hashfunc.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "textbuffer.h"
|
#include "textbuffer.h"
|
||||||
|
@ -1460,7 +1460,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
||||||
int16_t bytes) {
|
int16_t bytes) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
int32_t *p1 = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, pData, bytes);
|
int32_t *p1 = (int32_t *)taosHashGet(pWindowResInfo->hashList, pData, bytes);
|
||||||
if (p1 != NULL) {
|
if (p1 != NULL) {
|
||||||
pWindowResInfo->curIndex = *p1;
|
pWindowResInfo->curIndex = *p1;
|
||||||
} else { // more than the capacity, reallocate the resources
|
} else { // more than the capacity, reallocate the resources
|
||||||
|
@ -1485,7 +1485,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
||||||
|
|
||||||
// add a new result set for a new group
|
// add a new result set for a new group
|
||||||
pWindowResInfo->curIndex = pWindowResInfo->size++;
|
pWindowResInfo->curIndex = pWindowResInfo->size++;
|
||||||
taosAddToHashTable(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
|
taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex);
|
return getWindowResult(pWindowResInfo, pWindowResInfo->curIndex);
|
||||||
|
@ -2018,7 +2018,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
|
||||||
pWindowResInfo->type = type;
|
pWindowResInfo->type = type;
|
||||||
|
|
||||||
_hash_fn_t fn = taosGetDefaultHashFunction(type);
|
_hash_fn_t fn = taosGetDefaultHashFunction(type);
|
||||||
pWindowResInfo->hashList = taosInitHashTable(threshold, fn, false);
|
pWindowResInfo->hashList = taosHashInit(threshold, fn, false);
|
||||||
|
|
||||||
pWindowResInfo->curIndex = -1;
|
pWindowResInfo->curIndex = -1;
|
||||||
pWindowResInfo->size = 0;
|
pWindowResInfo->size = 0;
|
||||||
|
@ -2044,7 +2044,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRu
|
||||||
destroyTimeWindowRes(pResult, pRuntimeEnv->pQuery->numOfOutputCols);
|
destroyTimeWindowRes(pResult, pRuntimeEnv->pQuery->numOfOutputCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCleanUpHashTable(pWindowResInfo->hashList);
|
taosHashCleanup(pWindowResInfo->hashList);
|
||||||
tfree(pWindowResInfo->pResult);
|
tfree(pWindowResInfo->pResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2059,11 +2059,11 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->curIndex = -1;
|
pWindowResInfo->curIndex = -1;
|
||||||
taosCleanUpHashTable(pWindowResInfo->hashList);
|
taosHashCleanup(pWindowResInfo->hashList);
|
||||||
pWindowResInfo->size = 0;
|
pWindowResInfo->size = 0;
|
||||||
|
|
||||||
_hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
|
_hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
|
||||||
pWindowResInfo->hashList = taosInitHashTable(pWindowResInfo->capacity, fn, false);
|
pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, false);
|
||||||
|
|
||||||
pWindowResInfo->startTime = 0;
|
pWindowResInfo->startTime = 0;
|
||||||
pWindowResInfo->prevSKey = 0;
|
pWindowResInfo->prevSKey = 0;
|
||||||
|
@ -2081,7 +2081,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
||||||
if (pResult->status.closed) { // remove the window slot from hash table
|
if (pResult->status.closed) { // remove the window slot from hash table
|
||||||
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2104,14 +2104,14 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
|
|
||||||
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
||||||
int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
||||||
TSDB_KEYSIZE);
|
TSDB_KEYSIZE);
|
||||||
int32_t v = (*p - num);
|
int32_t v = (*p - num);
|
||||||
assert(v >= 0 && v <= pWindowResInfo->size);
|
assert(v >= 0 && v <= pWindowResInfo->size);
|
||||||
|
|
||||||
// todo add the update function for hash table
|
// todo add the update function for hash table
|
||||||
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
||||||
taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v,
|
taosHashPut(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v,
|
||||||
sizeof(int32_t));
|
sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4812,7 +4812,7 @@ void vnodeQueryFreeQInfoEx(SQInfo *pQInfo) {
|
||||||
tfree(pSupporter->pMeterSidExtInfo);
|
tfree(pSupporter->pMeterSidExtInfo);
|
||||||
|
|
||||||
if (pSupporter->pMetersHashTable != NULL) {
|
if (pSupporter->pMetersHashTable != NULL) {
|
||||||
taosCleanUpHashTable(pSupporter->pMetersHashTable);
|
taosHashCleanup(pSupporter->pMetersHashTable);
|
||||||
pSupporter->pMetersHashTable = NULL;
|
pSupporter->pMetersHashTable = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
|
#include "hash.h"
|
||||||
|
#include "hashfunc.h"
|
||||||
#include "ihash.h"
|
#include "ihash.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tast.h"
|
#include "tast.h"
|
||||||
|
@ -25,8 +27,6 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeRead.h"
|
#include "vnodeRead.h"
|
||||||
#include "vnodeUtil.h"
|
#include "vnodeUtil.h"
|
||||||
#include "hash.h"
|
|
||||||
#include "hashutil.h"
|
|
||||||
|
|
||||||
int (*pQueryFunc[])(SMeterObj *, SQuery *) = {vnodeQueryFromCache, vnodeQueryFromFile};
|
int (*pQueryFunc[])(SMeterObj *, SQuery *) = {vnodeQueryFromCache, vnodeQueryFromFile};
|
||||||
|
|
||||||
|
@ -651,8 +651,8 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
||||||
STableQuerySupportObj *pSupporter = (STableQuerySupportObj *)calloc(1, sizeof(STableQuerySupportObj));
|
STableQuerySupportObj *pSupporter = (STableQuerySupportObj *)calloc(1, sizeof(STableQuerySupportObj));
|
||||||
pSupporter->numOfMeters = 1;
|
pSupporter->numOfMeters = 1;
|
||||||
|
|
||||||
pSupporter->pMetersHashTable = taosInitHashTable(pSupporter->numOfMeters, taosIntHash_32, false);
|
pSupporter->pMetersHashTable = taosHashInit(pSupporter->numOfMeters, taosIntHash_32, false);
|
||||||
taosAddToHashTable(pSupporter->pMetersHashTable, (const char*) &pMetersObj[0]->sid, sizeof(pMeterObj[0].sid),
|
taosHashPut(pSupporter->pMetersHashTable, (const char*) &pMetersObj[0]->sid, sizeof(pMeterObj[0].sid),
|
||||||
(char *)&pMetersObj[0], POINTER_BYTES);
|
(char *)&pMetersObj[0], POINTER_BYTES);
|
||||||
|
|
||||||
pSupporter->pSidSet = NULL;
|
pSupporter->pSidSet = NULL;
|
||||||
|
@ -742,9 +742,9 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
|
||||||
STableQuerySupportObj *pSupporter = (STableQuerySupportObj *)calloc(1, sizeof(STableQuerySupportObj));
|
STableQuerySupportObj *pSupporter = (STableQuerySupportObj *)calloc(1, sizeof(STableQuerySupportObj));
|
||||||
pSupporter->numOfMeters = pQueryMsg->numOfSids;
|
pSupporter->numOfMeters = pQueryMsg->numOfSids;
|
||||||
|
|
||||||
pSupporter->pMetersHashTable = taosInitHashTable(pSupporter->numOfMeters, taosIntHash_32, false);
|
pSupporter->pMetersHashTable = taosHashInit(pSupporter->numOfMeters, taosIntHash_32, false);
|
||||||
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
|
for (int32_t i = 0; i < pSupporter->numOfMeters; ++i) {
|
||||||
taosAddToHashTable(pSupporter->pMetersHashTable, (const char*) &pMetersObj[i]->sid, sizeof(pMetersObj[i]->sid), (char *)&pMetersObj[i],
|
taosHashPut(pSupporter->pMetersHashTable, (const char*) &pMetersObj[i]->sid, sizeof(pMetersObj[i]->sid), (char *)&pMetersObj[i],
|
||||||
POINTER_BYTES);
|
POINTER_BYTES);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,7 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta) {
|
||||||
tsdbFreeTable(pTemp);
|
tsdbFreeTable(pTemp);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCleanUpHashTable(pMeta->map);
|
taosHashCleanup(pMeta->map);
|
||||||
|
|
||||||
free(pMeta);
|
free(pMeta);
|
||||||
|
|
||||||
|
@ -260,7 +260,7 @@ static int32_t tsdbCheckTableCfg(STableCfg *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) {
|
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid) {
|
||||||
void *ptr = taosGetDataFromHashTable(pMeta->map, (char *)(&uid), sizeof(uid));
|
void *ptr = taosHashGet(pMeta->tableMap, (char *)(&uid), sizeof(uid));
|
||||||
|
|
||||||
if (ptr == NULL) return NULL;
|
if (ptr == NULL) return NULL;
|
||||||
|
|
||||||
|
@ -299,7 +299,7 @@ static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
|
||||||
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) {
|
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) {
|
||||||
// TODO: add the table to the map
|
// TODO: add the table to the map
|
||||||
int64_t uid = pTable->tableId.uid;
|
int64_t uid = pTable->tableId.uid;
|
||||||
if (taosAddToHashTable(pMeta->map, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) {
|
if (taosHashPut(pMeta->map, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
PROJECT(TDengine)
|
PROJECT(TDengine)
|
||||||
|
|
||||||
IF (TD_WINDOWS_64)
|
IF (TD_WINDOWS_64)
|
||||||
INCLUDE_DIRECTORIES(${TD_ROOT_DIR}/deps/pthread)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
INCLUDE_DIRECTORIES(. ${TD_ROOT_DIR}/src/inc ${TD_ROOT_DIR}/src/client/inc ${TD_OS_DIR}/inc)
|
INCLUDE_DIRECTORIES(. ${TD_COMMUNITY_DIR}/src/inc ${TD_COMMUNITY_DIR}/src/client/inc ${TD_COMMUNITY_DIR}/inc)
|
||||||
AUX_SOURCE_DIRECTORY(. SRC)
|
AUX_SOURCE_DIRECTORY(. SRC)
|
||||||
#ADD_EXECUTABLE(demo ${SRC})
|
ADD_EXECUTABLE(demo demo.c)
|
||||||
#TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
|
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue