refactor: do some internal refactor.
This commit is contained in:
parent
9f5b21a841
commit
9bee5e2cee
|
@ -206,7 +206,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
||||||
size_t blockDataGetSize(const SSDataBlock* pBlock);
|
size_t blockDataGetSize(const SSDataBlock* pBlock);
|
||||||
size_t blockDataGetRowSize(SSDataBlock* pBlock);
|
size_t blockDataGetRowSize(SSDataBlock* pBlock);
|
||||||
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
|
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
|
||||||
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
|
size_t blockDataGetSerialMetaSize(uint32_t numOfCols);
|
||||||
|
|
||||||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||||
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
|
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
|
||||||
|
@ -238,7 +238,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool
|
||||||
const char* stbFullName, int32_t vgId);
|
const char* stbFullName, int32_t vgId);
|
||||||
|
|
||||||
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||||
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
|
return blockDataGetSerialMetaSize(pBlock->info.numOfCols) + blockDataGetSize(pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
|
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
|
||||||
|
|
|
@ -1340,6 +1340,17 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
||||||
uint64_t groupId = *(uint64_t*)p;
|
uint64_t groupId = *(uint64_t*)p;
|
||||||
p += sizeof(uint64_t);
|
p += sizeof(uint64_t);
|
||||||
|
|
||||||
|
// check fields
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
int16_t type = *(int16_t*) p;
|
||||||
|
p += sizeof(int16_t);
|
||||||
|
|
||||||
|
int32_t bytes = *(int32_t*) p;
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
|
||||||
|
ASSERT(type == pFields[i].type && bytes == pFields[i].bytes);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t* colLength = (int32_t*)p;
|
int32_t* colLength = (int32_t*)p;
|
||||||
p += sizeof(int32_t) * numOfCols;
|
p += sizeof(int32_t) * numOfCols;
|
||||||
|
|
||||||
|
|
|
@ -682,9 +682,9 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
|
||||||
* @param pBlock
|
* @param pBlock
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
|
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
|
||||||
// | total rows/total length | block group id | each column length |
|
// | total rows/total length | block group id | column schema | each column length |
|
||||||
return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t);
|
return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
||||||
|
@ -1244,7 +1244,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
||||||
int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(pBlock);
|
int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(pBlock->info.numOfCols);
|
||||||
|
|
||||||
int32_t rowSize = pBlock->info.rowSize;
|
int32_t rowSize = pBlock->info.rowSize;
|
||||||
|
|
||||||
|
@ -1883,33 +1883,43 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
||||||
|
|
||||||
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
|
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
|
||||||
int8_t needCompress) {
|
int8_t needCompress) {
|
||||||
|
// todo extract method
|
||||||
int32_t* actualLen = (int32_t*)data;
|
int32_t* actualLen = (int32_t*)data;
|
||||||
data += sizeof(int32_t);
|
data += sizeof(int32_t);
|
||||||
|
|
||||||
uint64_t* groupId = (uint64_t*)data;
|
uint64_t* groupId = (uint64_t*)data;
|
||||||
data += sizeof(uint64_t);
|
data += sizeof(uint64_t);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
*((int16_t*) data) = pColInfoData->info.type;
|
||||||
|
data += sizeof(int16_t);
|
||||||
|
|
||||||
|
*((int32_t*) data) = pColInfoData->info.bytes;
|
||||||
|
data += sizeof(int32_t);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t* colSizes = (int32_t*)data;
|
int32_t* colSizes = (int32_t*)data;
|
||||||
data += numOfCols * sizeof(int32_t);
|
data += numOfCols * sizeof(int32_t);
|
||||||
|
|
||||||
*dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
|
*dataLen = blockDataGetSerialMetaSize(numOfCols);;
|
||||||
|
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
for (int32_t col = 0; col < numOfCols; ++col) {
|
for (int32_t col = 0; col < numOfCols; ++col) {
|
||||||
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
|
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
|
||||||
|
|
||||||
// copy the null bitmap
|
// copy the null bitmap
|
||||||
|
size_t metaSize = 0;
|
||||||
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
||||||
size_t metaSize = numOfRows * sizeof(int32_t);
|
metaSize = numOfRows * sizeof(int32_t);
|
||||||
memcpy(data, pColRes->varmeta.offset, metaSize);
|
memcpy(data, pColRes->varmeta.offset, metaSize);
|
||||||
|
} else {
|
||||||
|
metaSize = BitmapLen(numOfRows);
|
||||||
|
memcpy(data, pColRes->nullbitmap, metaSize);
|
||||||
|
}
|
||||||
|
|
||||||
data += metaSize;
|
data += metaSize;
|
||||||
(*dataLen) += metaSize;
|
(*dataLen) += metaSize;
|
||||||
} else {
|
|
||||||
int32_t len = BitmapLen(numOfRows);
|
|
||||||
memcpy(data, pColRes->nullbitmap, len);
|
|
||||||
data += len;
|
|
||||||
(*dataLen) += len;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (needCompress) {
|
if (needCompress) {
|
||||||
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
|
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
|
||||||
|
@ -1939,6 +1949,17 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t
|
||||||
pBlock->info.groupId = *(uint64_t*)pStart;
|
pBlock->info.groupId = *(uint64_t*)pStart;
|
||||||
pStart += sizeof(uint64_t);
|
pStart += sizeof(uint64_t);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
pColInfoData->info.type = *(int16_t*)pStart;
|
||||||
|
pStart += sizeof(int16_t);
|
||||||
|
|
||||||
|
pColInfoData->info.bytes = *(int32_t*)pStart;
|
||||||
|
pStart += sizeof(int32_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pBlock, numOfRows);
|
||||||
|
|
||||||
int32_t* colLen = (int32_t*)pStart;
|
int32_t* colLen = (int32_t*)pStart;
|
||||||
pStart += sizeof(int32_t) * numOfCols;
|
pStart += sizeof(int32_t) * numOfCols;
|
||||||
|
|
||||||
|
|
|
@ -266,7 +266,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
|
size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
|
||||||
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(pBlock);
|
blockDataGetSize(pBlock) + blockDataGetSerialMetaSize(pBlock->info.numOfCols);
|
||||||
|
|
||||||
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
|
SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
|
|
|
@ -68,10 +68,10 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// data format:
|
// data format:
|
||||||
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
|
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||||
// |SDataCacheEntry | total length | group id | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
|
// |SDataCacheEntry | total length | group id | col1_schema | col2_schema | col3_schema ...| column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
|
||||||
// | | (4 bytes) |(8 bytes) | sizeof(int32_t) * numOfCols | actual size | | actual size | |
|
// | | (4 bytes) |(8 bytes) |(sizeof(int16_t)+sizeof(int32_t))*numOfCols | sizeof(int32_t) * numOfCols | actual size | | actual size | |
|
||||||
// +----------------+--------------+----------+--------------------------------------+-------------+-----------+-------------+-----------+
|
// +----------------+--------------+----------+--------------------------------------------+--------------------------------------+-------------+-----------+-------------+-----------+
|
||||||
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
|
||||||
// recorded in the first segment, next to the struct header
|
// recorded in the first segment, next to the struct header
|
||||||
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
|
||||||
|
|
|
@ -2559,37 +2559,7 @@ int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadIn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataEnsureCapacity(pBlock, numOfRows);
|
blockCompressDecode(pBlock, numOfCols, numOfRows, pStart);
|
||||||
|
|
||||||
int32_t dataLen = *(int32_t*)pStart;
|
|
||||||
uint64_t groupId = *(uint64_t*)(pStart + sizeof(int32_t));
|
|
||||||
pStart += sizeof(int32_t) + sizeof(uint64_t);
|
|
||||||
|
|
||||||
int32_t* colLen = (int32_t*)(pStart);
|
|
||||||
pStart += sizeof(int32_t) * numOfCols;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
colLen[i] = htonl(colLen[i]);
|
|
||||||
ASSERT(colLen[i] >= 0);
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
|
||||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
|
||||||
pColInfoData->varmeta.length = colLen[i];
|
|
||||||
pColInfoData->varmeta.allocLen = colLen[i];
|
|
||||||
|
|
||||||
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
|
|
||||||
pStart += sizeof(int32_t) * numOfRows;
|
|
||||||
|
|
||||||
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
|
|
||||||
} else {
|
|
||||||
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
|
|
||||||
pStart += BitmapLen(numOfRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
|
||||||
pStart += colLen[i];
|
|
||||||
}
|
|
||||||
|
|
||||||
// data from mnode
|
// data from mnode
|
||||||
relocateColumnData(pRes, pColList, pBlock->pDataBlock);
|
relocateColumnData(pRes, pColList, pBlock->pDataBlock);
|
||||||
taosArrayDestroy(pBlock->pDataBlock);
|
taosArrayDestroy(pBlock->pDataBlock);
|
||||||
|
|
|
@ -144,7 +144,9 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf *pBuf, SArray* pAllSource
|
||||||
(*sourceId) += 1;
|
(*sourceId) += 1;
|
||||||
|
|
||||||
int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
|
int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
|
||||||
int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(pBlock))/rowSize; // The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
|
|
||||||
|
// The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
|
||||||
|
int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(pBlock->info.numOfCols))/rowSize;
|
||||||
ASSERT(numOfRows > 0);
|
ASSERT(numOfRows > 0);
|
||||||
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
|
return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue