[td-5376]<enhance>: improve last_row query performance.
This commit is contained in:
parent
e3046f62f6
commit
f3fd5d1a66
|
@ -3002,7 +3002,7 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
|
|||
}
|
||||
|
||||
// update the tsdb query time range
|
||||
if (pQueryHandle->cachelastrow) {
|
||||
if (pQueryHandle->cachelastrow != TSDB_CACHED_TYPE_NONE) {
|
||||
pQueryHandle->window = TSWINDOW_INITIALIZER;
|
||||
pQueryHandle->checkFiles = false;
|
||||
pQueryHandle->activeIndex = -1; // start from -1
|
||||
|
@ -3034,6 +3034,7 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
|
|||
STimeWindow window = {INT64_MAX, INT64_MIN};
|
||||
|
||||
int32_t totalNumOfTable = 0;
|
||||
SArray* emptyGroup = taosArrayInit(16, sizeof(int32_t));
|
||||
|
||||
// NOTE: starts from the buffer in case of descending timestamp order check data blocks
|
||||
size_t numOfGroups = taosArrayGetSize(groupList->pGroupList);
|
||||
|
@ -3076,18 +3077,18 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
|
|||
}
|
||||
}
|
||||
|
||||
taosArrayClear(pGroup);
|
||||
|
||||
// more than one table in each group, only one table left for each group
|
||||
if (keyInfo.pTable != NULL) {
|
||||
totalNumOfTable++;
|
||||
taosArrayPush(pGroup, &keyInfo);
|
||||
} else {
|
||||
if (taosArrayGetSize(pGroup) == 1) {
|
||||
// do nothing
|
||||
} else {
|
||||
taosArrayClear(pGroup);
|
||||
taosArrayPush(pGroup, &keyInfo);
|
||||
}
|
||||
} else { // mark all the empty groups, and remove it later
|
||||
taosArrayDestroy(pGroup);
|
||||
|
||||
taosArrayRemove(groupList->pGroupList, j);
|
||||
numOfGroups -= 1;
|
||||
j -= 1;
|
||||
taosArrayPush(emptyGroup, &j);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3097,6 +3098,9 @@ STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList) {
|
|||
assert(totalNumOfTable == 0 && taosArrayGetSize(groupList->pGroupList) == 0);
|
||||
}
|
||||
|
||||
taosArrayRemoveBatch(groupList->pGroupList, TARRAY_GET_START(emptyGroup), taosArrayGetSize(emptyGroup));
|
||||
taosArrayDestroy(emptyGroup);
|
||||
|
||||
groupList->numOfTables = totalNumOfTable;
|
||||
return window;
|
||||
}
|
||||
|
|
|
@ -4,13 +4,14 @@ PROJECT(TDengine)
|
|||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/rpc/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/sync/inc)
|
||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/rmonotonic/inc)
|
||||
|
||||
AUX_SOURCE_DIRECTORY(src SRC)
|
||||
ADD_LIBRARY(tutil ${SRC})
|
||||
ADD_LIBRARY(tutil ${SRC} tests/arrayTest.cpp)
|
||||
TARGET_LINK_LIBRARIES(tutil pthread os lz4 z rmonotonic)
|
||||
|
||||
IF (TD_LINUX)
|
||||
TARGET_LINK_LIBRARIES(tutil m rt)
|
||||
# ADD_SUBDIRECTORY(tests)
|
||||
ADD_SUBDIRECTORY(tests)
|
||||
|
||||
FIND_PATH(ICONV_INCLUDE_EXIST iconv.h /usr/include/ /usr/local/include/)
|
||||
IF (ICONV_INCLUDE_EXIST)
|
||||
|
|
|
@ -52,6 +52,15 @@ void* taosArrayInit(size_t size, size_t elemSize);
|
|||
*/
|
||||
void *taosArrayAddBatch(SArray *pArray, const void *pData, int nEles);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pArray
|
||||
* @param pData position array list
|
||||
* @param numOfElems the number of removed position
|
||||
*/
|
||||
void taosArrayRemoveBatch(SArray *pArray, const int32_t* pData, int32_t numOfElems);
|
||||
|
||||
|
||||
/**
|
||||
* add all element from the source array list into the destination
|
||||
* @param pArray
|
||||
|
|
|
@ -83,6 +83,47 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int nEles) {
|
|||
return dst;
|
||||
}
|
||||
|
||||
void taosArrayRemoveBatch(SArray *pArray, const int32_t* pData, int32_t numOfElems) {
|
||||
assert(pArray != NULL && pData != NULL);
|
||||
if (numOfElems <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t size = taosArrayGetSize(pArray);
|
||||
if (numOfElems >= size) {
|
||||
taosArrayClear(pArray);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t i = pData[0] + 1, j = 0;
|
||||
while(i < size) {
|
||||
if (j == numOfElems - 1) {
|
||||
break;
|
||||
}
|
||||
|
||||
char* p = TARRAY_GET_ELEM(pArray, i);
|
||||
if (i > pData[j] && i < pData[j + 1]) {
|
||||
char* dst = TARRAY_GET_ELEM(pArray, i - (j + 1));
|
||||
memmove(dst, p, pArray->elemSize);
|
||||
} else if (i == pData[j + 1]) {
|
||||
j += 1;
|
||||
}
|
||||
|
||||
i += 1;
|
||||
}
|
||||
|
||||
assert(i == pData[numOfElems - 1] + 1);
|
||||
|
||||
int32_t dstIndex = pData[numOfElems - 1] - numOfElems + 1;
|
||||
int32_t srcIndex = pData[numOfElems - 1] + 1;
|
||||
|
||||
char* dst = TARRAY_GET_ELEM(pArray, dstIndex);
|
||||
char* src = TARRAY_GET_ELEM(pArray, srcIndex);
|
||||
memmove(dst, src, pArray->elemSize * (pArray->size - numOfElems));
|
||||
|
||||
pArray->size -= numOfElems;
|
||||
}
|
||||
|
||||
void* taosArrayAddAll(SArray* pArray, const SArray* pInput) {
|
||||
return taosArrayAddBatch(pArray, pInput->pData, (int32_t) taosArrayGetSize(pInput));
|
||||
}
|
||||
|
|
|
@ -681,143 +681,3 @@ static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipL
|
|||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
|
||||
// int32_t cond, SSkipListNode ***pRes) {
|
||||
// pthread_rwlock_rdlock(&pSkipList->lock);
|
||||
// SSkipListNode *p = pStartNode;
|
||||
// int32_t numOfRes = 0;
|
||||
//
|
||||
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pEndKey->nType);
|
||||
// while (p != NULL) {
|
||||
// int32_t ret = filterComparFn(&p->key, pEndKey);
|
||||
// if (ret > 0) {
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// if (ret < 0) {
|
||||
// numOfRes++;
|
||||
// p = p->pForward[0];
|
||||
// } else if (ret == 0) {
|
||||
// if (cond == TSDB_RELATION_LESS_EQUAL) {
|
||||
// numOfRes++;
|
||||
// p = p->pForward[0];
|
||||
// } else {
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// (*pRes) = (SSkipListNode **)malloc(POINTER_BYTES * numOfRes);
|
||||
// for (int32_t i = 0; i < numOfRes; ++i) {
|
||||
// (*pRes)[i] = pStartNode;
|
||||
// pStartNode = pStartNode->pForward[0];
|
||||
// }
|
||||
// pthread_rwlock_unlock(&pSkipList->lock);
|
||||
//
|
||||
// return numOfRes;
|
||||
//}
|
||||
//
|
||||
///*
|
||||
// * maybe return the copy of SSkipListNode would be better
|
||||
// */
|
||||
// int32_t tSkipListGets(SSkipList *pSkipList, SSkipListKey *pKey, SSkipListNode ***pRes) {
|
||||
// (*pRes) = NULL;
|
||||
//
|
||||
// SSkipListNode *pNode = tSkipListGet(pSkipList, pKey);
|
||||
// if (pNode == NULL) {
|
||||
// return 0;
|
||||
// }
|
||||
//
|
||||
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
|
||||
//
|
||||
// // backward check if previous nodes are with the same value.
|
||||
// SSkipListNode *pPrev = pNode->pBackward[0];
|
||||
// while ((pPrev != &pSkipList->pHead) && filterComparFn(&pPrev->key, pKey) == 0) {
|
||||
// pPrev = pPrev->pBackward[0];
|
||||
// }
|
||||
//
|
||||
// return tSkipListEndParQuery(pSkipList, pPrev->pForward[0], &pNode->key, TSDB_RELATION_LESS_EQUAL, pRes);
|
||||
//}
|
||||
//
|
||||
// static SSkipListNode *tSkipListParQuery(SSkipList *pSkipList, SSkipListKey *pKey, int32_t cond) {
|
||||
// int32_t sLevel = pSkipList->level - 1;
|
||||
// int32_t ret = -1;
|
||||
//
|
||||
// SSkipListNode *x = &pSkipList->pHead;
|
||||
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
|
||||
//
|
||||
// pthread_rwlock_rdlock(&pSkipList->lock);
|
||||
//
|
||||
// if (cond == TSDB_RELATION_GREATER_EQUAL || cond == TSDB_RELATION_GREATER) {
|
||||
// for (int32_t i = sLevel; i >= 0; --i) {
|
||||
// while (x->pForward[i] != NULL && (ret = filterComparFn(&x->pForward[i]->key, pKey)) < 0) {
|
||||
// x = x->pForward[i];
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// // backward check if previous nodes are with the same value.
|
||||
// if (cond == TSDB_RELATION_GREATER_EQUAL && ret == 0) {
|
||||
// SSkipListNode *pNode = x->pForward[0];
|
||||
// while ((pNode->pBackward[0] != &pSkipList->pHead) && (filterComparFn(&pNode->pBackward[0]->key, pKey) == 0)) {
|
||||
// pNode = pNode->pBackward[0];
|
||||
// }
|
||||
// pthread_rwlock_unlock(&pSkipList->lock);
|
||||
// return pNode;
|
||||
// }
|
||||
//
|
||||
// if (ret > 0 || cond == TSDB_RELATION_GREATER_EQUAL) {
|
||||
// pthread_rwlock_unlock(&pSkipList->lock);
|
||||
// return x->pForward[0];
|
||||
// } else { // cond == TSDB_RELATION_GREATER && ret == 0
|
||||
// SSkipListNode *pn = x->pForward[0];
|
||||
// while (pn != NULL && filterComparFn(&pn->key, pKey) == 0) {
|
||||
// pn = pn->pForward[0];
|
||||
// }
|
||||
// pthread_rwlock_unlock(&pSkipList->lock);
|
||||
// return pn;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pthread_rwlock_unlock(&pSkipList->lock);
|
||||
// return NULL;
|
||||
//}
|
||||
//
|
||||
//
|
||||
// static bool removeSupport(SSkipList *pSkipList, SSkipListNode **forward, SSkipListKey *pKey) {
|
||||
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
|
||||
//
|
||||
// if (filterComparFn(&forward[0]->pForward[0]->key, pKey) == 0) {
|
||||
// SSkipListNode *p = forward[0]->pForward[0];
|
||||
// doRemove(pSkipList, p, forward);
|
||||
// } else { // failed to find the node of specified value,abort
|
||||
// return false;
|
||||
// }
|
||||
//
|
||||
// // compress the minimum level of skip list
|
||||
// while (pSkipList->level > 0 && SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, pSkipList->level - 1) == NULL) {
|
||||
// pSkipList->level -= 1;
|
||||
// }
|
||||
//
|
||||
// return true;
|
||||
//}
|
||||
//
|
||||
// bool tSkipListRemove(SSkipList *pSkipList, SSkipListKey *pKey) {
|
||||
// SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
// __compar_fn_t filterComparFn = getComparFunc(pSkipList, pKey->nType);
|
||||
//
|
||||
// pthread_rwlock_rdlock(&pSkipList->lock);
|
||||
//
|
||||
// SSkipListNode *x = &pSkipList->pHead;
|
||||
// for (int32_t i = pSkipList->level - 1; i >= 0; --i) {
|
||||
// while (x->pForward[i] != NULL && (filterComparFn(&x->pForward[i]->key, pKey) < 0)) {
|
||||
// x = x->pForward[i];
|
||||
// }
|
||||
// forward[i] = x;
|
||||
// }
|
||||
//
|
||||
// bool ret = removeSupport(pSkipList, forward, pKey);
|
||||
// pthread_rwlock_unlock(&pSkipList->lock);
|
||||
//
|
||||
// return ret;
|
||||
//}
|
||||
|
|
|
@ -5,10 +5,6 @@
|
|||
#include "taos.h"
|
||||
#include "tcache.h"
|
||||
|
||||
namespace {
|
||||
int32_t tsMaxMgmtConnections = 10000;
|
||||
int32_t tsMaxMeterConnections = 200;
|
||||
}
|
||||
// test cache
|
||||
TEST(testCase, client_cache_test) {
|
||||
const int32_t REFRESH_TIME_IN_SEC = 2;
|
||||
|
@ -43,7 +39,7 @@ TEST(testCase, client_cache_test) {
|
|||
|
||||
sleep(3);
|
||||
char* d = (char*) taosCacheAcquireByKey(tscMetaCache, key3, strlen(key3));
|
||||
// assert(d == NULL);
|
||||
assert(d == NULL);
|
||||
|
||||
char key5[] = "test5";
|
||||
char data5[] = "data5kkkkk";
|
||||
|
@ -102,7 +98,7 @@ TEST(testCase, cache_resize_test) {
|
|||
|
||||
char key[256] = {0};
|
||||
char data[1024] = "abcdefghijk";
|
||||
int32_t len = strlen(data);
|
||||
// int32_t len = strlen(data);
|
||||
|
||||
uint64_t startTime = taosGetTimestampUs();
|
||||
int32_t num = 10000;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <limits.h>
|
||||
#include <taosdef.h>
|
||||
#include <tcompare.h>
|
||||
#include <iostream>
|
||||
|
||||
#include "os.h"
|
||||
|
@ -8,12 +9,14 @@
|
|||
#include "tskiplist.h"
|
||||
#include "tutil.h"
|
||||
|
||||
#if 0
|
||||
namespace {
|
||||
|
||||
char* getkey(const void* data) { return (char*)(data); }
|
||||
|
||||
void doubleSkipListTest() {
|
||||
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double), 0, false, true, getkey);
|
||||
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_DOUBLE, sizeof(double),
|
||||
getKeyComparFunc(TSDB_DATA_TYPE_DOUBLE), false, getkey);
|
||||
|
||||
double doubleVal[1000] = {0};
|
||||
int32_t size = 20000;
|
||||
|
@ -25,18 +28,15 @@ void doubleSkipListTest() {
|
|||
doubleVal[i] = i * 0.997;
|
||||
}
|
||||
|
||||
int32_t level = 0;
|
||||
int32_t size = 0;
|
||||
// int32_t level = 0;
|
||||
size = 0;
|
||||
|
||||
tSkipListNewNodeInfo(pSkipList, &level, &size);
|
||||
auto d = (SSkipListNode*)calloc(1, size + sizeof(double) * 2);
|
||||
d->level = level;
|
||||
// tSkipListNewNodeInfo(pSkipList, &level, &size);
|
||||
// auto d = (SSkipListNode*)calloc(1, size + sizeof(double) * 2);
|
||||
// d->level = level;
|
||||
|
||||
double* key = (double*)SL_GET_NODE_KEY(pSkipList, d);
|
||||
key[0] = i * 0.997;
|
||||
key[1] = i * 0.997;
|
||||
|
||||
tSkipListPut(pSkipList, d);
|
||||
double key = 0.997;
|
||||
tSkipListPut(pSkipList, &key);
|
||||
}
|
||||
|
||||
printf("the first level of skip list is:\n");
|
||||
|
@ -70,7 +70,8 @@ void doubleSkipListTest() {
|
|||
}
|
||||
|
||||
void randKeyTest() {
|
||||
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_INT, sizeof(int32_t), 0, false, true, getkey);
|
||||
SSkipList* pSkipList = tSkipListCreate(10, TSDB_DATA_TYPE_INT, sizeof(int32_t), getKeyComparFunc(TSDB_DATA_TYPE_INT),
|
||||
false, getkey);
|
||||
|
||||
int32_t size = 200000;
|
||||
srand(time(NULL));
|
||||
|
@ -375,3 +376,5 @@ TEST(testCase, skiplist_test) {
|
|||
|
||||
free(pKeys);*/
|
||||
}
|
||||
|
||||
#endif
|
|
@ -14,18 +14,18 @@ typedef struct {
|
|||
int refNum;
|
||||
int steps;
|
||||
int rsetId;
|
||||
int64_t rid;
|
||||
int64_t*rid;
|
||||
void **p;
|
||||
} SRefSpace;
|
||||
|
||||
void iterateRefs(int rsetId) {
|
||||
int count = 0;
|
||||
|
||||
void *p = taosIterateRef(rsetId, NULL);
|
||||
void *p = taosIterateRef(rsetId, 0);
|
||||
while (p) {
|
||||
// process P
|
||||
count++;
|
||||
p = taosIterateRef(rsetId, p);
|
||||
p = taosIterateRef(rsetId, (int64_t) p);
|
||||
}
|
||||
|
||||
printf(" %d ", count);
|
||||
|
@ -34,7 +34,6 @@ void iterateRefs(int rsetId) {
|
|||
void *addRef(void *param) {
|
||||
SRefSpace *pSpace = (SRefSpace *)param;
|
||||
int id;
|
||||
int64_t rid;
|
||||
|
||||
for (int i=0; i < pSpace->steps; ++i) {
|
||||
printf("a");
|
||||
|
@ -51,8 +50,7 @@ void *addRef(void *param) {
|
|||
|
||||
void *removeRef(void *param) {
|
||||
SRefSpace *pSpace = (SRefSpace *)param;
|
||||
int id;
|
||||
int64_t rid;
|
||||
int id, code;
|
||||
|
||||
for (int i=0; i < pSpace->steps; ++i) {
|
||||
printf("d");
|
||||
|
@ -71,16 +69,15 @@ void *removeRef(void *param) {
|
|||
void *acquireRelease(void *param) {
|
||||
SRefSpace *pSpace = (SRefSpace *)param;
|
||||
int id;
|
||||
int64_t rid;
|
||||
|
||||
for (int i=0; i < pSpace->steps; ++i) {
|
||||
printf("a");
|
||||
|
||||
id = random() % pSpace->refNum;
|
||||
void *p = taosAcquireRef(pSpace->rsetId, pSpace->p[id]);
|
||||
void *p = taosAcquireRef(pSpace->rsetId, (int64_t) pSpace->p[id]);
|
||||
if (p) {
|
||||
usleep(id % 5 + 1);
|
||||
taosReleaseRef(pSpace->rsetId, pSpace->p[id]);
|
||||
taosReleaseRef(pSpace->rsetId, (int64_t) pSpace->p[id]);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,6 +100,7 @@ void *openRefSpace(void *param) {
|
|||
}
|
||||
|
||||
pSpace->p = (void **) calloc(sizeof(void *), pSpace->refNum);
|
||||
pSpace->rid = calloc(pSpace->refNum, sizeof(int64_t));
|
||||
|
||||
pthread_attr_t thattr;
|
||||
pthread_attr_init(&thattr);
|
||||
|
|
Loading…
Reference in New Issue