From f7325a99bb9a6da49f1bbe8862fc959ec3c24e34 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 27 Nov 2022 15:27:37 +0800 Subject: [PATCH] more code --- include/common/tdataformat.h | 2 +- include/util/tarray.h | 7 +++ source/common/src/tdataformat.c | 106 +++++++++++++++++++++++++++++++- source/util/src/tarray.c | 14 +++++ 4 files changed, 125 insertions(+), 4 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 9e3b9d191c..60acb78140 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -87,7 +87,7 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData); int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow); void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); void tRowDestroy(SRow *pRow); -int32_t tRowMergeSort(SArray *aRow, STSchema *pTSchema); +int32_t tRowMergeSort(SArray *aRowP, STSchema *pTSchema, int8_t flag); // SRowIter ================================ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); diff --git a/include/util/tarray.h b/include/util/tarray.h index 0632db3103..3c6fd3d571 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -194,6 +194,13 @@ void taosArrayPopTailBatch(SArray* pArray, size_t cnt); */ void taosArrayRemove(SArray* pArray, size_t index); +/** + * remove batch entry from the given index + * @param pArray + * @param index + */ +void taosArrayRemoveBatch(SArray* pArray, size_t index, size_t num, FDelete fp); + /** * copy the whole array from source to destination * @param pDst diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 55d843f616..b40e48eb96 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -492,10 +492,109 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) { void tRowDestroy(SRow *pRow) { tFree((uint8_t *)pRow); } -int32_t tRowMergeSort(SArray *aRow, STSchema *pTSchema) { - // todo +static int32_t tRowPCmprFn(const void *p1, const void *p2) { + SRow *pRow1 = *(SRow **)p1; + SRow *pRow2 = *(SRow **)p2; + + if (pRow1->ts < pRow2->ts) { + return -1; + } else if (pRow1->ts > pRow2->ts) { + return 1; + } + return 0; } +static void tRowPDestroy(SRow **ppRow) { tFree((uint8_t *)*ppRow); } +static int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int32_t iStart, int32_t iEnd, int8_t flag) { + int32_t code = 0; + + int32_t nRow = iEnd - iStart; + SRowIter **aIter = NULL; + SArray *aColVal = NULL; + SRow *pRow = NULL; + + aIter = taosMemoryCalloc(nRow, sizeof(SRowIter *)); + if (aIter == NULL) { + code = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _exit; + } + + for (int32_t i = 0; i < nRow; i++) { + SRow *pRowT = taosArrayGetP(aRowP, iStart + i); + + code = tRowIterOpen(pRowT, pTSchema, &aIter[i]); + if (code) goto _exit; + } + + // merge + aColVal = taosArrayInit(pTSchema->numOfCols, sizeof(SColVal)); + if (aColVal == NULL) { + code = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _exit; + } + + for (int32_t iCol = 0; iCol < pTSchema->numOfCols; iCol++) { + SColVal *pColVal = NULL; + for (int32_t iRow = 0; iRow < nRow; iRow++) { + SColVal *pColValT = tRowIterNext(aIter[iRow]); + + // todo: take value according to flag + if (pColVal == NULL || COL_VAL_IS_VALUE(pColValT)) { + pColVal = pColValT; + } + } + + taosArrayPush(aColVal, pColVal); + } + + // build + code = tRowBuild(aColVal, pTSchema, &pRow); + if (code) goto _exit; + + taosArrayRemoveBatch(aRowP, iStart, nRow, (FDelete)tRowPDestroy); + taosArrayInsert(aRowP, iStart, &pRow); + +_exit: + if (aIter) { + for (int32_t i = 0; i < nRow; i++) { + tRowIterClose(&aIter[i]); + } + taosMemoryFree(aIter); + } + if (aColVal) taosArrayDestroy(aColVal); + if (code) tRowDestroy(pRow); + return code; +} +int32_t tRowMergeSort(SArray *aRowP, STSchema *pTSchema, int8_t flag) { + int32_t code = 0; + + if (aRowP->size <= 1) return 0; + + taosArraySort(aRowP, tRowPCmprFn); + + int32_t iStart = 0; + while (iStart < aRowP->size) { + SRow *pRow = (SRow *)taosArrayGetP(aRowP, iStart); + + int32_t iEnd = iStart + 1; + while (iEnd < aRowP->size) { + SRow *pRowT = (SRow *)taosArrayGetP(aRowP, iEnd); + + if (pRow->ts != pRowT->ts) break; + + iEnd++; + } + + if (iEnd - iStart > 1) { + code = tRowMerge(aRowP, pTSchema, iStart, iEnd, flag); + } + + // the array is also changing, so the iStart just ++ instead of iEnd + iStart++; + } + + return code; +} // SRowIter ======================================== struct SRowIter { @@ -1505,7 +1604,8 @@ static FORCE_INLINE void tColDataGetValue1(SColData *pColData, int32_t iVal, SCo static FORCE_INLINE void tColDataGetValue2(SColData *pColData, int32_t iVal, SColVal *pColVal) { // HAS_NULL *pColVal = COL_VAL_NULL(pColData->cid, pColData->type); } -static FORCE_INLINE void tColDataGetValue3(SColData *pColData, int32_t iVal, SColVal *pColVal) { // HAS_NULL|HAS_NONE +static FORCE_INLINE void tColDataGetValue3(SColData *pColData, int32_t iVal, + SColVal *pColVal) { // HAS_NULL|HAS_NONE switch (GET_BIT1(pColData->pBitMap, iVal)) { case 0: *pColVal = COL_VAL_NONE(pColData->cid, pColData->type); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 5703d8f8f4..a068165f6b 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -290,6 +290,20 @@ void taosArrayRemove(SArray* pArray, size_t index) { pArray->size -= 1; } +void taosArrayRemoveBatch(SArray* pArray, size_t index, size_t num, FDelete fp) { + ASSERT(index + num <= pArray->size); + + if (fp) { + for (int32_t i = 0; i < num; i++) { + fp(taosArrayGet(pArray, index + i)); + } + } + + memmove((char*)pArray->pData + index * pArray->elemSize, (char*)pArray->pData + (index + num) * pArray->elemSize, + (pArray->size - index - num) * pArray->elemSize); + pArray->size -= num; +} + SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) { assert(src != NULL && elemSize > 0); SArray* pDst = taosArrayInit(size, elemSize);