enh: reassign data column in blocks
This commit is contained in:
parent
8aa2e96d4e
commit
02b1257b01
|
@ -231,6 +231,7 @@ typedef struct SColumnInfoData {
|
||||||
};
|
};
|
||||||
SColumnInfo info; // column info
|
SColumnInfo info; // column info
|
||||||
bool hasNull; // if current column data has null value.
|
bool hasNull; // if current column data has null value.
|
||||||
|
bool reassigned; // if current column data is reassigned.
|
||||||
} SColumnInfoData;
|
} SColumnInfoData;
|
||||||
|
|
||||||
typedef struct SQueryTableDataCond {
|
typedef struct SQueryTableDataCond {
|
||||||
|
|
|
@ -23,6 +23,20 @@
|
||||||
|
|
||||||
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
|
if (pColumnInfoData->reassigned) {
|
||||||
|
int32_t totalSize = 0;
|
||||||
|
for (int32_t row = 0; row < numOfRows; ++row) {
|
||||||
|
char* pColData = pColumnInfoData->pData + pColumnInfoData->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pColData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pColData);
|
||||||
|
}
|
||||||
|
totalSize += colSize;
|
||||||
|
}
|
||||||
|
return totalSize;
|
||||||
|
}
|
||||||
return pColumnInfoData->varmeta.length;
|
return pColumnInfoData->varmeta.length;
|
||||||
} else {
|
} else {
|
||||||
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
|
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_NULL) {
|
||||||
|
@ -138,8 +152,8 @@ int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx,
|
||||||
|
|
||||||
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
|
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
|
||||||
|
|
||||||
uint32_t len = pColumnInfoData->varmeta.length;
|
|
||||||
pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
|
pColumnInfoData->varmeta.offset[dstRowIdx] = pColumnInfoData->varmeta.offset[srcRowIdx];
|
||||||
|
pColumnInfoData->reassigned = true;
|
||||||
} else {
|
} else {
|
||||||
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * dstRowIdx, pData, pColumnInfoData->info.bytes);
|
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * dstRowIdx, pData, pColumnInfoData->info.bytes);
|
||||||
colDataClearNull_f(pColumnInfoData->nullbitmap, dstRowIdx);
|
colDataClearNull_f(pColumnInfoData->nullbitmap, dstRowIdx);
|
||||||
|
@ -603,8 +617,22 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) {
|
||||||
*(int32_t*)pStart = dataSize;
|
*(int32_t*)pStart = dataSize;
|
||||||
pStart += sizeof(int32_t);
|
pStart += sizeof(int32_t);
|
||||||
|
|
||||||
memcpy(pStart, pCol->pData, dataSize);
|
if (pCol->reassigned && IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||||
pStart += dataSize;
|
for (int32_t row = 0; row < numOfRows; ++row) {
|
||||||
|
char* pColData = pCol->pData + pCol->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (pCol->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pColData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pColData);
|
||||||
|
}
|
||||||
|
memcpy(pStart, pColData, colSize);
|
||||||
|
pStart += colSize;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
memcpy(pStart, pCol->pData, dataSize);
|
||||||
|
pStart += dataSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1764,7 +1792,20 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
||||||
int32_t len = colDataGetLength(pColData, rows);
|
int32_t len = colDataGetLength(pColData, rows);
|
||||||
tlen += taosEncodeFixedI32(buf, len);
|
tlen += taosEncodeFixedI32(buf, len);
|
||||||
|
|
||||||
tlen += taosEncodeBinary(buf, pColData->pData, len);
|
if (pColData->reassigned && IS_VAR_DATA_TYPE(pColData->info.type)) {
|
||||||
|
for (int32_t row = 0; row < rows; ++row) {
|
||||||
|
char* pData = pColData->pData + pColData->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (pColData->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pData);
|
||||||
|
}
|
||||||
|
tlen += taosEncodeBinary(buf, pData, colSize);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tlen += taosEncodeBinary(buf, pColData->pData, len);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
@ -2525,12 +2566,29 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
||||||
data += metaSize;
|
data += metaSize;
|
||||||
dataLen += metaSize;
|
dataLen += metaSize;
|
||||||
|
|
||||||
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
if (pColRes->reassigned && IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
||||||
dataLen += colSizes[col];
|
colSizes[col] = 0;
|
||||||
if (pColRes->pData != NULL) {
|
for (int32_t row = 0; row < numOfRows; ++row) {
|
||||||
memmove(data, pColRes->pData, colSizes[col]);
|
char* pColData = pColRes->pData + pColRes->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (pColRes->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pColData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pColData);
|
||||||
|
}
|
||||||
|
colSizes[col] += colSize;
|
||||||
|
dataLen += colSize;
|
||||||
|
memmove(data, pColData, colSize);
|
||||||
|
data += colSize;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
||||||
|
dataLen += colSizes[col];
|
||||||
|
if (pColRes->pData != NULL) {
|
||||||
|
memmove(data, pColRes->pData, colSizes[col]);
|
||||||
|
}
|
||||||
|
data += colSizes[col];
|
||||||
}
|
}
|
||||||
data += colSizes[col];
|
|
||||||
|
|
||||||
colSizes[col] = htonl(colSizes[col]);
|
colSizes[col] = htonl(colSizes[col]);
|
||||||
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]);
|
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]);
|
||||||
|
|
|
@ -562,7 +562,6 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
|
if (IS_VAR_DATA_TYPE(pDst->info.type)) {
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
pDst->varmeta.length = 0;
|
|
||||||
|
|
||||||
while (j < totalRows) {
|
while (j < totalRows) {
|
||||||
if (pIndicator[j] == 0) {
|
if (pIndicator[j] == 0) {
|
||||||
|
|
|
@ -791,7 +791,21 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
|
||||||
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
|
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
|
||||||
udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
|
udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
|
||||||
udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
|
udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
|
||||||
memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
|
if (col->reassigned) {
|
||||||
|
for (int32_t row = 0; row < udfCol->colData.numOfRows; ++row) {
|
||||||
|
char* pColData = col->pData + col->varmeta.offset[row];
|
||||||
|
int32_t colSize = 0;
|
||||||
|
if (col->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colSize = getJsonValueLen(pColData);
|
||||||
|
} else {
|
||||||
|
colSize = varDataTLen(pColData);
|
||||||
|
}
|
||||||
|
memcpy(udfCol->colData.varLenCol.payload, pColData, colSize);
|
||||||
|
udfCol->colData.varLenCol.payload += colSize;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
|
udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
|
||||||
int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
|
int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
|
||||||
|
|
Loading…
Reference in New Issue