diff --git a/include/util/talgo.h b/include/util/talgo.h index b065ea3705..29c51cc16b 100644 --- a/include/util/talgo.h +++ b/include/util/talgo.h @@ -54,6 +54,12 @@ typedef int32_t (*__ext_compar_fn_t)(const void *p1, const void *p2, const void */ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __ext_compar_fn_t comparFn); +/** + * Non-recursive quick sort. + * + */ +void taosqsort_r(void *src, int64_t nelem, int64_t size, const void *arg, __ext_compar_fn_t cmp); + /** * merge sort, with the compare function requiring additional parameters support * diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 09e13939a4..6630ad59b1 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -190,30 +190,29 @@ static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t cur } } - size_t start = 1; - int32_t t = 0; - int32_t count = log(numOfRows) / log(2); - uint32_t startOffset = (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen); - + size_t start = 1; + int32_t t = 0; + int32_t count = log(numOfRows) / log(2); + uint32_t startOffset = + (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) ? pColumnInfoData->varmeta.length : (currentRow * itemLen); + // the first item memcpy(pColumnInfoData->pData + startOffset, pData, itemLen); - + while (t < count) { int32_t xlen = 1 << t; - memcpy(pColumnInfoData->pData + start * itemLen + startOffset, - pColumnInfoData->pData + startOffset, + memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset, xlen * itemLen); t += 1; start += xlen; } - + // the tail part if (numOfRows > start) { - memcpy(pColumnInfoData->pData + start * itemLen + startOffset, - pColumnInfoData->pData + startOffset, + memcpy(pColumnInfoData->pData + start * itemLen + startOffset, pColumnInfoData->pData + startOffset, (numOfRows - start) * itemLen); } - + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { for (int32_t i = 0; i < numOfRows; ++i) { pColumnInfoData->varmeta.offset[i + currentRow] = pColumnInfoData->varmeta.length + i * itemLen; @@ -233,7 +232,7 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, len = getJsonValueLen(pData); } else { len = varDataTLen(pData); - } + } if (pColumnInfoData->varmeta.allocLen < (numOfRows * len + pColumnInfoData->varmeta.length)) { int32_t code = colDataReserve(pColumnInfoData, (numOfRows * len + pColumnInfoData->varmeta.length)); if (code != TSDB_CODE_SUCCESS) { @@ -247,7 +246,7 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, uint32_t numOfRows) { pColumnInfoData->hasNull = true; - + if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { memset(&pColumnInfoData->varmeta.offset[currentRow], -1, sizeof(int32_t) * numOfRows); } else { @@ -268,7 +267,7 @@ void colDataSetNItemsNull(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int32_t bytes = (numOfRows - i) / 8; memset(&BMCharPos(pColumnInfoData->nullbitmap, currentRow + i), 0xFF, bytes); i += bytes * 8; - + for (; i < numOfRows; ++i) { colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow + i); } @@ -491,7 +490,8 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p return 0; } -int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, int32_t numOfRows) { +int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnInfoData* pSrc, int32_t srcIdx, + int32_t numOfRows) { if (pDst->info.type != pSrc->info.type || pDst->info.bytes != pSrc->info.bytes || pSrc->reassigned) { return TSDB_CODE_FAILED; } @@ -502,7 +502,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI if (IS_VAR_DATA_TYPE(pDst->info.type)) { int32_t allLen = 0; - void* srcAddr = NULL; + void* srcAddr = NULL; if (pSrc->hasNull) { for (int32_t i = 0; i < numOfRows; ++i) { if (colDataIsNull_var(pSrc, srcIdx + i)) { @@ -526,7 +526,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI } } else { for (int32_t i = 0; i < numOfRows; ++i) { - char* pData = colDataGetVarData(pSrc, srcIdx + i); + char* pData = colDataGetVarData(pSrc, srcIdx + i); int32_t dataLen = 0; if (pSrc->info.type == TSDB_DATA_TYPE_JSON) { dataLen = getJsonValueLen(pData); @@ -545,7 +545,7 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - + pDst->pData = tmp; pDst->varmeta.allocLen = pDst->varmeta.length + allLen; } @@ -585,17 +585,17 @@ int32_t colDataAssignNRows(SColumnInfoData* pDst, int32_t dstIdx, const SColumnI } } } - } + } if (pSrc->pData != NULL) { - memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx, pDst->info.bytes * numOfRows); + memcpy(pDst->pData + pDst->info.bytes * dstIdx, pSrc->pData + pSrc->info.bytes * srcIdx, + pDst->info.bytes * numOfRows); } } return 0; } - size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return taosArrayGetSize(pBlock->pDataBlock); } size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } @@ -636,7 +636,7 @@ int32_t blockDataUpdatePkRange(SSDataBlock* pDataBlock, int32_t pkColumnIndex, b return -1; } - SDataBlockInfo* pInfo = &pDataBlock->info; + SDataBlockInfo* pInfo = &pDataBlock->info; SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pkColumnIndex); if (!IS_NUMERIC_TYPE(pColInfoData->info.type) && (pColInfoData->info.type != TSDB_DATA_TYPE_VARCHAR)) { return 0; @@ -732,7 +732,7 @@ void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) { int32_t bytes = (pBlock->info.rows - i) / 8; memset(&BMCharPos(pCol->nullbitmap, i), 0, bytes); i += bytes * 8; - + for (; i < pBlock->info.rows; ++i) { colDataClearNull_f(pCol->nullbitmap, i); } @@ -742,7 +742,6 @@ void blockDataShrinkNRows(SSDataBlock* pBlock, int32_t numOfRows) { pBlock->info.rows -= numOfRows; } - size_t blockDataGetSize(const SSDataBlock* pBlock) { size_t total = 0; size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -827,19 +826,16 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 return NULL; } - blockDataEnsureCapacity(pDst, rowCount); + /* may have disorder varchar data, TODO + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); -/* may have disorder varchar data, TODO - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); - SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); - - colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount); - } -*/ - + colDataAssignNRows(pDstCol, 0, pColData, startIndex, rowCount); + } + */ size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { @@ -1322,7 +1318,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { } terrno = 0; - taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar); + taosqsort_r(index, rows, sizeof(int32_t), &helper, dataBlockCompar); if (terrno) return terrno; int64_t p1 = taosGetTimestampUs(); @@ -1400,14 +1396,13 @@ void blockDataReset(SSDataBlock* pDataBlock) { pInfo->id.groupId = 0; } - /* * NOTE: the type of the input column may be TSDB_DATA_TYPE_NULL, which is used to denote * the all NULL value in this column. It is an internal representation of all NULL value column, and no visible to * any users. The length of TSDB_DATA_TYPE_NULL is 0, and it is an special case. */ int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* pBlockInfo, uint32_t numOfRows, - bool clearPayload) { + bool clearPayload) { if (numOfRows <= 0 || pBlockInfo && numOfRows <= pBlockInfo->capacity) { return TSDB_CODE_SUCCESS; } @@ -2402,12 +2397,12 @@ _end: return TSDB_CODE_SUCCESS; } -void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId){ +void buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId) { char tmp[TSDB_TABLE_NAME_LEN] = {0}; - if (stbName == NULL){ - snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%"PRIu64, groupId); - }else{ - snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%"PRIu64, stbName, groupId); + if (stbName == NULL) { + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%" PRIu64, groupId); + } else { + snprintf(tmp, TSDB_TABLE_NAME_LEN, "_%s_%" PRIu64, stbName, groupId); } ctbName[TSDB_TABLE_NAME_LEN - strlen(tmp) - 1] = 0; // put stbname + groupId to the end strcat(ctbName, tmp); diff --git a/source/util/src/talgo.c b/source/util/src/talgo.c index 8d83a70c11..3e5a86588d 100644 --- a/source/util/src/talgo.c +++ b/source/util/src/talgo.c @@ -153,6 +153,130 @@ void taosqsort(void *src, int64_t numOfElem, int64_t size, const void *param, __ taosMemoryFreeClear(buf); } +#define DOSWAP(a, b, size) \ + do { \ + size_t __size = (size); \ + char *__a = (a), *__b = (b); \ + do { \ + char __tmp = *__a; \ + *__a++ = *__b; \ + *__b++ = __tmp; \ + } while (--__size > 0); \ + } while (0) + +typedef struct { + char *lo; + char *hi; +} stack_node; + +#define STACK_SIZE (CHAR_BIT * sizeof(size_t)) +#define PUSH(low, high) ((void)((top->lo = (low)), (top->hi = (high)), ++top)) +#define POP(low, high) ((void)(--top, (low = top->lo), (high = top->hi))) +#define STACK_NOT_EMPTY (stack < top) + +void taosqsort_r(void *src, int64_t nelem, int64_t size, const void *arg, __ext_compar_fn_t cmp) { + const int32_t MAX_THRESH = 6; + char *base_ptr = (char *)src; + + const size_t max_thresh = MAX_THRESH * size; + + if (nelem == 0) return; + + if (nelem > MAX_THRESH) { + char *lo = base_ptr; + char *hi = &lo[size * (nelem - 1)]; + stack_node stack[STACK_SIZE]; + stack_node *top = stack; + + PUSH(NULL, NULL); + + while (STACK_NOT_EMPTY) { + char *left_ptr; + char *right_ptr; + + char *mid = lo + size * ((hi - lo) / size >> 1); + + if ((*cmp)((void *)mid, (void *)lo, arg) < 0) DOSWAP(mid, lo, size); + if ((*cmp)((void *)hi, (void *)mid, arg) < 0) + DOSWAP(mid, hi, size); + else + goto jump_over; + if ((*cmp)((void *)mid, (void *)lo, arg) < 0) DOSWAP(mid, lo, size); + jump_over:; + + left_ptr = lo + size; + right_ptr = hi - size; + do { + while ((*cmp)((void *)left_ptr, (void *)mid, arg) < 0) left_ptr += size; + + while ((*cmp)((void *)mid, (void *)right_ptr, arg) < 0) right_ptr -= size; + + if (left_ptr < right_ptr) { + DOSWAP(left_ptr, right_ptr, size); + if (mid == left_ptr) + mid = right_ptr; + else if (mid == right_ptr) + mid = left_ptr; + left_ptr += size; + right_ptr -= size; + } else if (left_ptr == right_ptr) { + left_ptr += size; + right_ptr -= size; + break; + } + } while (left_ptr <= right_ptr); + + if ((size_t)(right_ptr - lo) <= max_thresh) { + if ((size_t)(hi - left_ptr) <= max_thresh) + POP(lo, hi); + else + lo = left_ptr; + } else if ((size_t)(hi - left_ptr) <= max_thresh) + hi = right_ptr; + else if ((right_ptr - lo) > (hi - left_ptr)) { + PUSH(lo, right_ptr); + lo = left_ptr; + } else { + PUSH(left_ptr, hi); + hi = right_ptr; + } + } + } +#define min(x, y) ((x) < (y) ? (x) : (y)) + + { + char *const end_ptr = &base_ptr[size * (nelem - 1)]; + char *tmp_ptr = base_ptr; + char *thresh = min(end_ptr, base_ptr + max_thresh); + char *run_ptr; + + for (run_ptr = tmp_ptr + size; run_ptr <= thresh; run_ptr += size) + if ((*cmp)((void *)run_ptr, (void *)tmp_ptr, arg) < 0) tmp_ptr = run_ptr; + + if (tmp_ptr != base_ptr) DOSWAP(tmp_ptr, base_ptr, size); + + run_ptr = base_ptr + size; + while ((run_ptr += size) <= end_ptr) { + tmp_ptr = run_ptr - size; + while ((*cmp)((void *)run_ptr, (void *)tmp_ptr, arg) < 0) tmp_ptr -= size; + + tmp_ptr += size; + if (tmp_ptr != run_ptr) { + char *trav; + + trav = run_ptr + size; + while (--trav >= run_ptr) { + char c = *trav; + char *hi, *lo; + + for (hi = lo = trav; (lo -= size) >= tmp_ptr; hi = lo) *hi = *lo; + *hi = c; + } + } + } + } +} + void *taosbsearch(const void *key, const void *base, int32_t nmemb, int32_t size, __compar_fn_t compar, int32_t flags) { uint8_t *p; int32_t lidx; @@ -275,7 +399,7 @@ void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, } static void taosMerge(void *src, int32_t start, int32_t leftend, int32_t end, int64_t size, const void *param, - __ext_compar_fn_t comparFn, void *tmp) { + __ext_compar_fn_t comparFn, void *tmp) { int32_t leftSize = leftend - start + 1; int32_t rightSize = end - leftend; @@ -326,7 +450,7 @@ static int32_t taosMergeSortHelper(void *src, int64_t numOfElem, int64_t size, c if (numOfElem > THRESHOLD_SIZE) { int32_t currSize; - void *tmp = taosMemoryMalloc(numOfElem * size); + void *tmp = taosMemoryMalloc(numOfElem * size); if (tmp == NULL) return TSDB_CODE_OUT_OF_MEMORY; for (currSize = THRESHOLD_SIZE; currSize <= numOfElem - 1; currSize = 2 * currSize) { @@ -351,7 +475,6 @@ int32_t msortHelper(const void *p1, const void *p2, const void *param) { return comparFn(p1, p2); } - int32_t taosMergeSort(void *src, int64_t numOfElem, int64_t size, __compar_fn_t comparFn) { void *param = comparFn; return taosMergeSortHelper(src, numOfElem, size, param, msortHelper);