enh: remove some assert and handle NONE and NULL in stmt
This commit is contained in:
parent
f2fd5d2e8c
commit
2a1e14ef24
|
@ -193,7 +193,7 @@ int32_t tColDataDecompress(void *input, SColDataCompressInfo *info, SColData *co
|
|||
|
||||
// for stmt bind
|
||||
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind, int32_t buffMaxLen);
|
||||
int32_t tColDataSortMerge(SArray *colDataArr);
|
||||
int32_t tColDataSortMerge(SArray **arr);
|
||||
|
||||
// for raw block
|
||||
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
|
||||
|
|
|
@ -3302,229 +3302,68 @@ static int32_t tColDataSort(SColData *aColData, int32_t nColData) {
|
|||
|
||||
return tColDataMergeSort(aColData, 0, nVal - 1, nColData);
|
||||
}
|
||||
static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd /* not included */) {
|
||||
switch (pColData->flag) {
|
||||
case HAS_NONE:
|
||||
case HAS_NULL: {
|
||||
pColData->nVal -= (iEnd - iStart - 1);
|
||||
} break;
|
||||
case (HAS_NULL | HAS_NONE): {
|
||||
if (GET_BIT1(pColData->pBitMap, iStart) == 0) {
|
||||
for (int32_t i = iStart + 1; i < iEnd; ++i) {
|
||||
if (GET_BIT1(pColData->pBitMap, i) == 1) {
|
||||
SET_BIT1(pColData->pBitMap, iStart, 1);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
|
||||
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
|
||||
}
|
||||
|
||||
pColData->nVal -= (iEnd - iStart - 1);
|
||||
static int32_t tColDataMerge(SArray **colArr) {
|
||||
int32_t code = 0;
|
||||
SArray *src = *colArr;
|
||||
SArray *dst = NULL;
|
||||
|
||||
uint8_t flag = 0;
|
||||
for (int32_t i = 0; i < pColData->nVal; ++i) {
|
||||
uint8_t bv = GET_BIT1(pColData->pBitMap, i);
|
||||
if (bv == BIT_FLG_NONE) {
|
||||
flag |= HAS_NONE;
|
||||
} else if (bv == BIT_FLG_NULL) {
|
||||
flag |= HAS_NULL;
|
||||
} else {
|
||||
uError("invalid bit value:%d", bv);
|
||||
return;
|
||||
}
|
||||
|
||||
if (flag == pColData->flag) break;
|
||||
}
|
||||
pColData->flag = flag;
|
||||
} break;
|
||||
case HAS_VALUE: {
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart];
|
||||
|
||||
memmove(pColData->pData + pColData->aOffset[iStart], pColData->pData + pColData->aOffset[iEnd - 1],
|
||||
pColData->nData - pColData->aOffset[iEnd - 1]);
|
||||
pColData->nData -= nDiff;
|
||||
|
||||
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
|
||||
pColData->aOffset[j] = pColData->aOffset[i] - nDiff;
|
||||
}
|
||||
} else {
|
||||
memmove(pColData->pData + TYPE_BYTES[pColData->type] * iStart,
|
||||
pColData->pData + TYPE_BYTES[pColData->type] * (iEnd - 1),
|
||||
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1));
|
||||
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
|
||||
}
|
||||
|
||||
pColData->nVal -= (iEnd - iStart - 1);
|
||||
} break;
|
||||
case (HAS_VALUE | HAS_NONE): {
|
||||
uint8_t bv;
|
||||
int32_t iv;
|
||||
for (int32_t i = iEnd - 1; i >= iStart; --i) {
|
||||
bv = GET_BIT1(pColData->pBitMap, i);
|
||||
if (bv) {
|
||||
iv = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (bv) { // has a value
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
if (iv != iStart) {
|
||||
memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iv]],
|
||||
iv < (pColData->nVal - 1) ? pColData->aOffset[iv + 1] - pColData->aOffset[iv]
|
||||
: pColData->nData - pColData->aOffset[iv]);
|
||||
}
|
||||
} else {
|
||||
if (iv != iStart) {
|
||||
(void)memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart],
|
||||
&pColData->pData[TYPE_BYTES[pColData->type] * iv], TYPE_BYTES[pColData->type]);
|
||||
}
|
||||
memmove(&pColData->pData[TYPE_BYTES[pColData->type] * (iStart + 1)],
|
||||
&pColData->pData[TYPE_BYTES[pColData->type] * iEnd],
|
||||
TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
|
||||
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
|
||||
}
|
||||
|
||||
SET_BIT1(pColData->pBitMap, iStart, 1);
|
||||
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
|
||||
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
|
||||
}
|
||||
|
||||
uint8_t flag = HAS_VALUE;
|
||||
for (int32_t i = 0; i < pColData->nVal - (iEnd - iStart - 1); ++i) {
|
||||
if (GET_BIT1(pColData->pBitMap, i) == 0) {
|
||||
flag |= HAS_NONE;
|
||||
}
|
||||
|
||||
if (flag == pColData->flag) break;
|
||||
}
|
||||
pColData->flag = flag;
|
||||
} else { // all NONE
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart];
|
||||
|
||||
memmove(&pColData->pData[pColData->aOffset[iStart]], &pColData->pData[pColData->aOffset[iEnd - 1]],
|
||||
pColData->nData - pColData->aOffset[iEnd - 1]);
|
||||
pColData->nData -= nDiff;
|
||||
|
||||
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
|
||||
pColData->aOffset[j] = pColData->aOffset[i] - nDiff;
|
||||
}
|
||||
} else {
|
||||
memmove(pColData->pData + TYPE_BYTES[pColData->type] * (iStart + 1),
|
||||
pColData->pData + TYPE_BYTES[pColData->type] * iEnd,
|
||||
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1));
|
||||
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
|
||||
}
|
||||
|
||||
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
|
||||
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
|
||||
}
|
||||
}
|
||||
pColData->nVal -= (iEnd - iStart - 1);
|
||||
} break;
|
||||
case (HAS_VALUE | HAS_NULL): {
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
int32_t nDiff = pColData->aOffset[iEnd - 1] - pColData->aOffset[iStart];
|
||||
|
||||
memmove(pColData->pData + pColData->aOffset[iStart], pColData->pData + pColData->aOffset[iEnd - 1],
|
||||
pColData->nData - pColData->aOffset[iEnd - 1]);
|
||||
pColData->nData -= nDiff;
|
||||
|
||||
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
|
||||
pColData->aOffset[j] = pColData->aOffset[i] - nDiff;
|
||||
}
|
||||
} else {
|
||||
memmove(pColData->pData + TYPE_BYTES[pColData->type] * iStart,
|
||||
pColData->pData + TYPE_BYTES[pColData->type] * (iEnd - 1),
|
||||
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd + 1));
|
||||
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
|
||||
}
|
||||
|
||||
for (int32_t i = iEnd - 1, j = iStart; i < pColData->nVal; ++i, ++j) {
|
||||
SET_BIT1(pColData->pBitMap, j, GET_BIT1(pColData->pBitMap, i));
|
||||
}
|
||||
|
||||
pColData->nVal -= (iEnd - iStart - 1);
|
||||
|
||||
uint8_t flag = 0;
|
||||
for (int32_t i = 0; i < pColData->nVal; ++i) {
|
||||
if (GET_BIT1(pColData->pBitMap, i)) {
|
||||
flag |= HAS_VALUE;
|
||||
} else {
|
||||
flag |= HAS_NULL;
|
||||
}
|
||||
|
||||
if (flag == pColData->flag) break;
|
||||
}
|
||||
pColData->flag = flag;
|
||||
} break;
|
||||
case (HAS_VALUE | HAS_NULL | HAS_NONE): {
|
||||
uint8_t bv;
|
||||
int32_t iv;
|
||||
for (int32_t i = iEnd - 1; i >= iStart; --i) {
|
||||
bv = GET_BIT2(pColData->pBitMap, i);
|
||||
if (bv) {
|
||||
iv = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (bv) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
} else { // ALL NONE
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
} else {
|
||||
memmove(pColData->pData + TYPE_BYTES[pColData->type] * (iStart + 1),
|
||||
pColData->pData + TYPE_BYTES[pColData->type] * iEnd,
|
||||
TYPE_BYTES[pColData->type] * (pColData->nVal - iEnd));
|
||||
pColData->nData -= (TYPE_BYTES[pColData->type] * (iEnd - iStart - 1));
|
||||
}
|
||||
|
||||
for (int32_t i = iEnd, j = iStart + 1; i < pColData->nVal; ++i, ++j) {
|
||||
SET_BIT2(pColData->pBitMap, j, GET_BIT2(pColData->pBitMap, i));
|
||||
}
|
||||
}
|
||||
pColData->nVal -= (iEnd - iStart - 1);
|
||||
} break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
break;
|
||||
dst = taosArrayInit(taosArrayGetSize(src), sizeof(SColData));
|
||||
if (dst == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
static void tColDataMerge(SColData *aColData, int32_t nColData) {
|
||||
int32_t iStart = 0;
|
||||
SRowKey keyStart, keyEnd;
|
||||
|
||||
for (;;) {
|
||||
if (iStart >= aColData[0].nVal - 1) break;
|
||||
tColDataArrGetRowKey(aColData, nColData, iStart, &keyStart);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(src); i++) {
|
||||
SColData *srcCol = taosArrayGet(src, i);
|
||||
|
||||
int32_t iEnd = iStart + 1;
|
||||
while (iEnd < aColData[0].nVal) {
|
||||
tColDataArrGetRowKey(aColData, nColData, iEnd, &keyEnd);
|
||||
if (tRowKeyCompare(&keyStart, &keyEnd) != 0) break;
|
||||
|
||||
iEnd++;
|
||||
SColData *dstCol = taosArrayReserve(dst, 1);
|
||||
if (dstCol == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
tColDataInit(dstCol, srcCol->cid, srcCol->type, srcCol->cflag);
|
||||
}
|
||||
|
||||
if (iEnd - iStart > 1) {
|
||||
for (int32_t i = 0; i < nColData; i++) {
|
||||
tColDataMergeImpl(&aColData[i], iStart, iEnd);
|
||||
int32_t numRows = ((SColData *)TARRAY_DATA(src))->nVal;
|
||||
SRowKey lastKey;
|
||||
for (int32_t i = 0; i < numRows; i++) {
|
||||
SRowKey key;
|
||||
tColDataArrGetRowKey((SColData *)TARRAY_DATA(dst), taosArrayGetSize(dst), i, &key);
|
||||
|
||||
if (i == 0 || tRowKeyCompare(&key, &lastKey) != 0) { // append new row
|
||||
for (int32_t j = 0; j < taosArrayGetSize(src); j++) {
|
||||
SColData *srcCol = taosArrayGet(src, j);
|
||||
SColData *dstCol = taosArrayGet(dst, j);
|
||||
|
||||
SColVal cv;
|
||||
tColDataGetValue(srcCol, i, &cv);
|
||||
tColDataAppendValue(dstCol, &cv);
|
||||
}
|
||||
lastKey = key;
|
||||
} else { // update existing row
|
||||
for (int32_t j = 0; j < taosArrayGetSize(src); j++) {
|
||||
SColData *srcCol = taosArrayGet(src, j);
|
||||
SColData *dstCol = taosArrayGet(dst, j);
|
||||
|
||||
SColVal cv;
|
||||
tColDataGetValue(srcCol, i, &cv);
|
||||
tColDataUpdateValue(dstCol, &cv, true);
|
||||
}
|
||||
}
|
||||
|
||||
iStart++;
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
taosArrayDestroyEx(dst, tColDataDestroy);
|
||||
} else {
|
||||
taosArrayDestroyEx(src, tColDataDestroy);
|
||||
*colArr = dst;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tColDataSortMerge(SArray *colDataArr) {
|
||||
int32_t tColDataSortMerge(SArray **arr) {
|
||||
SArray *colDataArr = *arr;
|
||||
int32_t nColData = TARRAY_SIZE(colDataArr);
|
||||
SColData *aColData = (SColData *)TARRAY_DATA(colDataArr);
|
||||
|
||||
|
@ -3583,7 +3422,8 @@ int32_t tColDataSortMerge(SArray *colDataArr) {
|
|||
|
||||
// merge -------
|
||||
if (doMerge) {
|
||||
tColDataMerge(aColData, nColData);
|
||||
int32_t code = tColDataMerge(arr);
|
||||
if (code) return code;
|
||||
}
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -745,7 +745,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool
|
|||
|
||||
taosArraySort(pTableCxt->pData->aCol, insColDataComp);
|
||||
|
||||
code = tColDataSortMerge(pTableCxt->pData->aCol);
|
||||
code = tColDataSortMerge(&pTableCxt->pData->aCol);
|
||||
} else {
|
||||
// skip the table has no data to insert
|
||||
// eg: import a csv without valid data
|
||||
|
|
Loading…
Reference in New Issue