Merge pull request #21597 from taosdata/enh/TD-24586
enh: optimize filter performance
This commit is contained in:
commit
6c74282946
|
@ -231,6 +231,7 @@ typedef struct SColumnInfoData {
|
||||||
};
|
};
|
||||||
SColumnInfo info; // column info
|
SColumnInfo info; // column info
|
||||||
bool hasNull; // if current column data has null value.
|
bool hasNull; // if current column data has null value.
|
||||||
|
bool reassigned; // if current column data is reassigned.
|
||||||
} SColumnInfoData;
|
} SColumnInfoData;
|
||||||
|
|
||||||
typedef struct SQueryTableDataCond {
|
typedef struct SQueryTableDataCond {
|
||||||
|
|
|
@ -178,6 +178,7 @@ int32_t getJsonValueLen(const char* data);
|
||||||
|
|
||||||
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
|
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
|
||||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
|
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
|
||||||
|
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData);
|
||||||
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
|
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
|
||||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
|
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
|
||||||
const SColumnInfoData* pSource, int32_t numOfRow2);
|
const SColumnInfoData* pSource, int32_t numOfRow2);
|
||||||
|
|
|
@ -23,6 +23,20 @@
|
||||||
|
|
||||||
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
|
if (pColumnInfoData->reassigned) {
|
||||||
|
int32_t totalSize = 0;
|
||||||
|
for (int32_t row = 0; row < numOfRows; ++row) {
|
||||||
|
char* pColData = pColumnInfoData->pData + pColumnInfoData->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pColData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pColData);
|
||||||
|
}
|
||||||
|
totalSize += colSize;
|
||||||
|
}
|
||||||
|
return totalSize;
|
||||||
|
}
|
||||||
return pColumnInfoData->varmeta.length;
|
return pColumnInfoData->varmeta.length;
|
||||||
} else {
|
} else {
|
||||||
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
|
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
|
||||||
|
@ -126,6 +140,29 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData) {
|
||||||
|
int32_t type = pColumnInfoData->info.type;
|
||||||
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
|
int32_t dataLen = 0;
|
||||||
|
if (type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
dataLen = getJsonValueLen(pData);
|
||||||
|
} else {
|
||||||
|
dataLen = varDataTLen(pData);
|
||||||
|
}
|
||||||
|
|
||||||
|
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
|
||||||
|
|
||||||
|
pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
|
||||||
|
pColumnInfoData->reassigned = true;
|
||||||
|
} else {
|
||||||
|
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * dstRowIdx, pData, pColumnInfoData->info.bytes);
|
||||||
|
colDataClearNull_f(pColumnInfoData->nullbitmap, dstRowIdx);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
|
int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
|
||||||
if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -580,9 +617,23 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
|
||||||
*(int32_t*)pStart = dataSize;
|
*(int32_t*)pStart = dataSize;
|
||||||
pStart += sizeof(int32_t);
|
pStart += sizeof(int32_t);
|
||||||
|
|
||||||
|
if (pCol->reassigned && IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||||
|
for (int32_t row = 0; row < numOfRows; ++row) {
|
||||||
|
char* pColData = pCol->pData + pCol->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pColData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pColData);
|
||||||
|
}
|
||||||
|
memcpy(pStart, pColData, colSize);
|
||||||
|
pStart += colSize;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
memcpy(pStart, pCol->pData, dataSize);
|
memcpy(pStart, pCol->pData, dataSize);
|
||||||
pStart += dataSize;
|
pStart += dataSize;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1741,8 +1792,21 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
||||||
int32_t len = colDataGetLength(pColData, rows);
|
int32_t len = colDataGetLength(pColData, rows);
|
||||||
tlen += taosEncodeFixedI32(buf, len);
|
tlen += taosEncodeFixedI32(buf, len);
|
||||||
|
|
||||||
|
if (pColData->reassigned && IS_VAR_DATA_TYPE(pColData->info.type)) {
|
||||||
|
for (int32_t row = 0; row < rows; ++row) {
|
||||||
|
char* pData = pColData->pData + pColData->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (pColData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pData);
|
||||||
|
}
|
||||||
|
tlen += taosEncodeBinary(buf, pData, colSize);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
tlen += taosEncodeBinary(buf, pColData->pData, len);
|
tlen += taosEncodeBinary(buf, pColData->pData, len);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2502,12 +2566,29 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
||||||
data += metaSize;
|
data += metaSize;
|
||||||
dataLen += metaSize;
|
dataLen += metaSize;
|
||||||
|
|
||||||
|
if (pColRes->reassigned && IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
||||||
|
colSizes[col] = 0;
|
||||||
|
for (int32_t row = 0; row < numOfRows; ++row) {
|
||||||
|
char* pColData = pColRes->pData + pColRes->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (pColRes->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pColData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pColData);
|
||||||
|
}
|
||||||
|
colSizes[col] += colSize;
|
||||||
|
dataLen += colSize;
|
||||||
|
memmove(data, pColData, colSize);
|
||||||
|
data += colSize;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
||||||
dataLen += colSizes[col];
|
dataLen += colSizes[col];
|
||||||
if (pColRes->pData != NULL) {
|
if (pColRes->pData != NULL) {
|
||||||
memmove(data, pColRes->pData, colSizes[col]);
|
memmove(data, pColRes->pData, colSizes[col]);
|
||||||
}
|
}
|
||||||
data += colSizes[col];
|
data += colSizes[col];
|
||||||
|
}
|
||||||
|
|
||||||
colSizes[col] = htonl(colSizes[col]);
|
colSizes[col] = htonl(colSizes[col]);
|
||||||
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]);
|
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]);
|
||||||
|
|
|
@ -562,7 +562,6 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
|
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
pDst->varmeta.length = 0;
|
|
||||||
|
|
||||||
while (j < totalRows) {
|
while (j < totalRows) {
|
||||||
if (pIndicator[j] == 0) {
|
if (pIndicator[j] == 0) {
|
||||||
|
@ -573,18 +572,8 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD
|
||||||
if (colDataIsNull_var(pDst, j)) {
|
if (colDataIsNull_var(pDst, j)) {
|
||||||
colDataSetNull_var(pDst, numOfRows);
|
colDataSetNull_var(pDst, numOfRows);
|
||||||
} else {
|
} else {
|
||||||
// fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first copy it to p2
|
|
||||||
char* p1 = colDataGetVarData(pDst, j);
|
char* p1 = colDataGetVarData(pDst, j);
|
||||||
int32_t len = 0;
|
colDataReassignVal(pDst, numOfRows, j, p1);
|
||||||
if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
|
|
||||||
len = getJsonValueLen(p1);
|
|
||||||
} else {
|
|
||||||
len = varDataTLen(p1);
|
|
||||||
}
|
|
||||||
char* p2 = taosMemoryMalloc(len);
|
|
||||||
memcpy(p2, p1, len);
|
|
||||||
colDataSetVal(pDst, numOfRows, p2, false);
|
|
||||||
taosMemoryFree(p2);
|
|
||||||
}
|
}
|
||||||
numOfRows += 1;
|
numOfRows += 1;
|
||||||
j += 1;
|
j += 1;
|
||||||
|
|
|
@ -791,7 +791,21 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
|
||||||
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
|
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
|
||||||
udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
|
udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
|
||||||
udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
|
udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
|
||||||
|
if (col->reassigned) {
|
||||||
|
for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
|
||||||
|
char* pColData = col->pData + col->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (col->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pColData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pColData);
|
||||||
|
}
|
||||||
|
memcpy(udfCol->colData.varLenCol.payload, pColData, colSize);
|
||||||
|
udfCol->colData.varLenCol.payload += colSize;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
|
memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
|
udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
|
||||||
int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
|
int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
|
||||||
|
|
Loading…
Reference in New Issue