diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index c14704ae19..4d6d69b178 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -193,7 +193,7 @@ int32_t tColDataDecompress(void *input, SColDataCompressInfo *info, SColData *co // for stmt bind int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind, int32_t buffMaxLen); -int32_t tColDataSortMerge(SArray *colDataArr); +int32_t tColDataSortMerge(SArray **arr); // for raw block int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 8e44c9c4b4..29bc13c81e 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -3302,229 +3302,68 @@ static int32_t tColDataSort(SColData *aColData, int32_t nColData) { return tColDataMergeSort(aColData, 0, nVal - 1, nColData); } -static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /* not included */) { - switch (pColData->flag) { - case HAS_NONE: - case HAS_NULL: { - pColData->nVal -= (iEnd - iStart - 1); - } break; - case (HAS_NULL | HAS_NONE): { - if (GET_BIT1(pColData->pBitMap, iStart) == 0) { - for (int32_t i = iStart + 1; i < iEnd; ++i) { - if (GET_BIT1(pColData->pBitMap, i) == 1) { - SET_BIT1(pColData->pBitMap, iStart, 1); - break; - } - } - } - for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) { - SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i)); - } - pColData->nVal -= (iEnd - iStart - 1); +static int32_t tColDataMerge(SArray **colArr) { + int32_t code = 0; + SArray *src = *colArr; + SArray *dst = NULL; - uint8_t flag = 0; - for (int32_t i = 0; i < pColData->nVal; ++i) { - uint8_t bv = GET_BIT1(pColData->pBitMap, i); - if (bv == BIT_FLG_NONE) { - flag |= HAS_NONE; - } else if (bv == BIT_FLG_NULL) { - flag |= HAS_NULL; - } else { - uError("invalid bit value:%d", bv); - return; - } - - if (flag == pColData->flag) break; - } - pColData->flag = flag; - } break; - case HAS_VALUE: { - if (IS_VAR_DATA_TYPE(pColData->type)) { - int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart]; - - memmove(pColData->pData + pColData->aOffset[iStart], pColData->pData + pColData->aOffset[iEnd - 1], - pColData->nData - pColData->aOffset[iEnd - 1]); - pColData->nData -= nDiff; - - for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) { - pColData->aOffset[j] = pColData->aOffset[i] - nDiff; - } - } else { - memmove(pColData->pData + TYPE_BYTES[pColData->type] * iStart, - pColData->pData + TYPE_BYTES[pColData->type] * (iEnd - 1), - TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1)); - pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1)); - } - - pColData->nVal -= (iEnd - iStart - 1); - } break; - case (HAS_VALUE | HAS_NONE): { - uint8_t bv; - int32_t iv; - for (int32_t i = iEnd - 1; i >= iStart; --i) { - bv = GET_BIT1(pColData->pBitMap, i); - if (bv) { - iv = i; - break; - } - } - - if (bv) { // has a value - if (IS_VAR_DATA_TYPE(pColData->type)) { - if (iv != iStart) { - memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iv]], - iv < (pColData->nVal - 1) ? pColData->aOffset[iv + 1] - pColData->aOffset[iv] - : pColData->nData - pColData->aOffset[iv]); - } - } else { - if (iv != iStart) { - (void)memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart], - &pColData->pData[TYPE_BYTES[pColData->type] * iv], TYPE_BYTES[pColData->type]); - } - memmove(&pColData->pData[TYPE_BYTES[pColData->type] * (iStart + 1)], - &pColData->pData[TYPE_BYTES[pColData->type] * iEnd], - TYPE_BYTES[pColData->type] * (iEnd - iStart - 1)); - pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1)); - } - - SET_BIT1(pColData->pBitMap, iStart, 1); - for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) { - SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i)); - } - - uint8_t flag = HAS_VALUE; - for (int32_t i = 0; i < pColData->nVal - (iEnd - iStart - 1); ++i) { - if (GET_BIT1(pColData->pBitMap, i) == 0) { - flag |= HAS_NONE; - } - - if (flag == pColData->flag) break; - } - pColData->flag = flag; - } else { // all NONE - if (IS_VAR_DATA_TYPE(pColData->type)) { - int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart]; - - memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iEnd - 1]], - pColData->nData - pColData->aOffset[iEnd - 1]); - pColData->nData -= nDiff; - - for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) { - pColData->aOffset[j] = pColData->aOffset[i] - nDiff; - } - } else { - memmove(pColData->pData + TYPE_BYTES[pColData->type] * (iStart + 1), - pColData->pData + TYPE_BYTES[pColData->type] * iEnd, - TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1)); - pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1)); - } - - for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) { - SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i)); - } - } - pColData->nVal -= (iEnd - iStart - 1); - } break; - case (HAS_VALUE | HAS_NULL): { - if (IS_VAR_DATA_TYPE(pColData->type)) { - int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart]; - - memmove(pColData->pData + pColData->aOffset[iStart], pColData->pData + pColData->aOffset[iEnd - 1], - pColData->nData - pColData->aOffset[iEnd - 1]); - pColData->nData -= nDiff; - - for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) { - pColData->aOffset[j] = pColData->aOffset[i] - nDiff; - } - } else { - memmove(pColData->pData + TYPE_BYTES[pColData->type] * iStart, - pColData->pData + TYPE_BYTES[pColData->type] * (iEnd - 1), - TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1)); - pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1)); - } - - for (int32_t i = iEnd - 1, j = iStart; i < pColData->nVal; ++i, ++j) { - SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i)); - } - - pColData->nVal -= (iEnd - iStart - 1); - - uint8_t flag = 0; - for (int32_t i = 0; i < pColData->nVal; ++i) { - if (GET_BIT1(pColData->pBitMap, i)) { - flag |= HAS_VALUE; - } else { - flag |= HAS_NULL; - } - - if (flag == pColData->flag) break; - } - pColData->flag = flag; - } break; - case (HAS_VALUE | HAS_NULL | HAS_NONE): { - uint8_t bv; - int32_t iv; - for (int32_t i = iEnd - 1; i >= iStart; --i) { - bv = GET_BIT2(pColData->pBitMap, i); - if (bv) { - iv = i; - break; - } - } - - if (bv) { - // TODO - ASSERT(0); - } else { // ALL NONE - if (IS_VAR_DATA_TYPE(pColData->type)) { - // TODO - ASSERT(0); - } else { - memmove(pColData->pData + TYPE_BYTES[pColData->type] * (iStart + 1), - pColData->pData + TYPE_BYTES[pColData->type] * iEnd, - TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd)); - pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1)); - } - - for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) { - SET_BIT2(pColData->pBitMap, j, GET_BIT2(pColData->pBitMap, i)); - } - } - pColData->nVal -= (iEnd - iStart - 1); - } break; - default: - ASSERT(0); - break; + dst = taosArrayInit(taosArrayGetSize(src), sizeof(SColData)); + if (dst == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } -} -static void tColDataMerge(SColData *aColData, int32_t nColData) { - int32_t iStart = 0; - SRowKey keyStart, keyEnd; - for (;;) { - if (iStart >= aColData[0].nVal - 1) break; - tColDataArrGetRowKey(aColData, nColData, iStart, &keyStart); + for (int32_t i = 0; i < taosArrayGetSize(src); i++) { + SColData *srcCol = taosArrayGet(src, i); - int32_t iEnd = iStart + 1; - while (iEnd < aColData[0].nVal) { - tColDataArrGetRowKey(aColData, nColData, iEnd, &keyEnd); - if (tRowKeyCompare(&keyStart, &keyEnd) != 0) break; - - iEnd++; + SColData *dstCol = taosArrayReserve(dst, 1); + if (dstCol == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } + tColDataInit(dstCol, srcCol->cid, srcCol->type, srcCol->cflag); + } - if (iEnd - iStart > 1) { - for (int32_t i = 0; i < nColData; i++) { - tColDataMergeImpl(&aColData[i], iStart, iEnd); + int32_t numRows = ((SColData *)TARRAY_DATA(src))->nVal; + SRowKey lastKey; + for (int32_t i = 0; i < numRows; i++) { + SRowKey key; + tColDataArrGetRowKey((SColData *)TARRAY_DATA(dst), taosArrayGetSize(dst), i, &key); + + if (i == 0 || tRowKeyCompare(&key, &lastKey) != 0) { // append new row + for (int32_t j = 0; j < taosArrayGetSize(src); j++) { + SColData *srcCol = taosArrayGet(src, j); + SColData *dstCol = taosArrayGet(dst, j); + + SColVal cv; + tColDataGetValue(srcCol, i, &cv); + tColDataAppendValue(dstCol, &cv); + } + lastKey = key; + } else { // update existing row + for (int32_t j = 0; j < taosArrayGetSize(src); j++) { + SColData *srcCol = taosArrayGet(src, j); + SColData *dstCol = taosArrayGet(dst, j); + + SColVal cv; + tColDataGetValue(srcCol, i, &cv); + tColDataUpdateValue(dstCol, &cv, true); } } - - iStart++; } + +_exit: + if (code) { + taosArrayDestroyEx(dst, tColDataDestroy); + } else { + taosArrayDestroyEx(src, tColDataDestroy); + *colArr = dst; + } + return code; } -int32_t tColDataSortMerge(SArray *colDataArr) { +int32_t tColDataSortMerge(SArray **arr) { + SArray *colDataArr = *arr; int32_t nColData = TARRAY_SIZE(colDataArr); SColData *aColData = (SColData *)TARRAY_DATA(colDataArr); @@ -3583,7 +3422,8 @@ int32_t tColDataSortMerge(SArray *colDataArr) { // merge ------- if (doMerge) { - tColDataMerge(aColData, nColData); + int32_t code = tColDataMerge(arr); + if (code) return code; } _exit: diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 280a5c9e71..b0581d2fd3 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -745,7 +745,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool taosArraySort(pTableCxt->pData->aCol, insColDataComp); - code = tColDataSortMerge(pTableCxt->pData->aCol); + code = tColDataSortMerge(&pTableCxt->pData->aCol); } else { // skip the table has no data to insert // eg: import a csv without valid data