fix(qsort): use non-recursive taosqsort_r instead of taosqsort

This commit is contained in:
Minglei Jin 2024-04-23 13:33:30 +08:00
parent 92a7801d09
commit 11dde20995
3 changed files with 170 additions and 46 deletions

View File

@ -54,6 +54,12 @@ typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void
*/
void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __ext_compar_fn_t comparFn);
/**
* Non-recursive quick sort.
*
*/
void taosqsort_r(void *src, int64_t nelem, int64_t size, const void *arg, __ext_compar_fn_t cmp);
/**
* merge sort, with the compare function requiring additional parameters support
*

View File

@ -193,15 +193,15 @@ static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t cur
size_t start = 1;
int32_t t = 0;
int32_t count = log(numOfRows) / log(2);
uint32_t startOffset = (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen);
uint32_t startOffset =
(IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen);
// the first item
memcpy(pColumnInfoData->pData + startOffset, pData, itemLen);
while (t < count) {
int32_t xlen = 1 << t;
memcpy(pColumnInfoData->pData + start * itemLen + startOffset,
pColumnInfoData->pData + startOffset,
memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
xlen * itemLen);
t += 1;
start += xlen;
@ -209,8 +209,7 @@ static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t cur
// the tail part
if (numOfRows > start) {
memcpy(pColumnInfoData->pData + start * itemLen + startOffset,
pColumnInfoData->pData + startOffset,
memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset,
(numOfRows - start) * itemLen);
}
@ -491,7 +490,8 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
return 0;
}
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows) {
int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx,
int32_t numOfRows) {
if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) {
return TSDB_CODE_FAILED;
}
@ -588,14 +588,14 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI
}
if (pSrc->pData != NULL) {
memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx, pDst->info.bytes * numOfRows);
memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx,
pDst->info.bytes * numOfRows);
}
}
return 0;
}
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); }
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
@ -742,7 +742,6 @@ void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) {
pBlock->info.rows -= numOfRows;
}
size_t blockDataGetSize(const SSDataBlock* pBlock) {
size_t total = 0;
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
@ -827,10 +826,8 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
return NULL;
}
blockDataEnsureCapacity(pDst, rowCount);
/* may have disorder varchar data, TODO
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
@ -840,7 +837,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
}
*/
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
@ -1322,7 +1318,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
}
terrno = 0;
taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
taosqsort_r(index, rows, sizeof(int32_t), &helper, dataBlockCompar);
if (terrno) return terrno;
int64_t p1 = taosGetTimestampUs();
@ -1400,7 +1396,6 @@ void blockDataReset(SSDataBlock* pDataBlock) {
pInfo->id.groupId = 0;
}
/*
* NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote
* the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to

View File

@ -153,6 +153,130 @@ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __
taosMemoryFreeClear(buf);
}
#define DOSWAP(a, b, size) \
do { \
size_t __size = (size); \
char *__a = (a), *__b = (b); \
do { \
char __tmp = *__a; \
*__a++ = *__b; \
*__b++ = __tmp; \
} while (--__size > 0); \
} while (0)
typedef struct {
char *lo;
char *hi;
} stack_node;
#define STACK_SIZE (CHAR_BIT * sizeof(size_t))
#define PUSH(low, high) ((void)((top->lo = (low)), (top->hi = (high)), ++top))
#define POP(low, high) ((void)(--top, (low = top->lo), (high = top->hi)))
#define STACK_NOT_EMPTY (stack < top)
void taosqsort_r(void *src, int64_t nelem, int64_t size, const void *arg, __ext_compar_fn_t cmp) {
const int32_t MAX_THRESH = 6;
char *base_ptr = (char *)src;
const size_t max_thresh = MAX_THRESH * size;
if (nelem == 0) return;
if (nelem > MAX_THRESH) {
char *lo = base_ptr;
char *hi = &lo[size * (nelem - 1)];
stack_node stack[STACK_SIZE];
stack_node *top = stack;
PUSH(NULL, NULL);
while (STACK_NOT_EMPTY) {
char *left_ptr;
char *right_ptr;
char *mid = lo + size * ((hi - lo) / size >> 1);
if ((*cmp)((void *)mid, (void *)lo, arg) < 0) DOSWAP(mid, lo, size);
if ((*cmp)((void *)hi, (void *)mid, arg) < 0)
DOSWAP(mid, hi, size);
else
goto jump_over;
if ((*cmp)((void *)mid, (void *)lo, arg) < 0) DOSWAP(mid, lo, size);
jump_over:;
left_ptr = lo + size;
right_ptr = hi - size;
do {
while ((*cmp)((void *)left_ptr, (void *)mid, arg) < 0) left_ptr += size;
while ((*cmp)((void *)mid, (void *)right_ptr, arg) < 0) right_ptr -= size;
if (left_ptr < right_ptr) {
DOSWAP(left_ptr, right_ptr, size);
if (mid == left_ptr)
mid = right_ptr;
else if (mid == right_ptr)
mid = left_ptr;
left_ptr += size;
right_ptr -= size;
} else if (left_ptr == right_ptr) {
left_ptr += size;
right_ptr -= size;
break;
}
} while (left_ptr <= right_ptr);
if ((size_t)(right_ptr - lo) <= max_thresh) {
if ((size_t)(hi - left_ptr) <= max_thresh)
POP(lo, hi);
else
lo = left_ptr;
} else if ((size_t)(hi - left_ptr) <= max_thresh)
hi = right_ptr;
else if ((right_ptr - lo) > (hi - left_ptr)) {
PUSH(lo, right_ptr);
lo = left_ptr;
} else {
PUSH(left_ptr, hi);
hi = right_ptr;
}
}
}
#define min(x, y) ((x) < (y) ? (x) : (y))
{
char *const end_ptr = &base_ptr[size * (nelem - 1)];
char *tmp_ptr = base_ptr;
char *thresh = min(end_ptr, base_ptr + max_thresh);
char *run_ptr;
for (run_ptr = tmp_ptr + size; run_ptr <= thresh; run_ptr += size)
if ((*cmp)((void *)run_ptr, (void *)tmp_ptr, arg) < 0) tmp_ptr = run_ptr;
if (tmp_ptr != base_ptr) DOSWAP(tmp_ptr, base_ptr, size);
run_ptr = base_ptr + size;
while ((run_ptr += size) <= end_ptr) {
tmp_ptr = run_ptr - size;
while ((*cmp)((void *)run_ptr, (void *)tmp_ptr, arg) < 0) tmp_ptr -= size;
tmp_ptr += size;
if (tmp_ptr != run_ptr) {
char *trav;
trav = run_ptr + size;
while (--trav >= run_ptr) {
char c = *trav;
char *hi, *lo;
for (hi = lo = trav; (lo -= size) >= tmp_ptr; hi = lo) *hi = *lo;
*hi = c;
}
}
}
}
}
void *taosbsearch(const void *key, const void *base, int32_t nmemb, int32_t size, __compar_fn_t compar, int32_t flags) {
uint8_t *p;
int32_t lidx;
@ -351,7 +475,6 @@ int32_t msortHelper(const void *p1, const void *p2, const void *param) {
return comparFn(p1, p2);
}
int32_t taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn) {
void *param = comparFn;
return taosMergeSortHelper(src, numOfElem, size, param, msortHelper);