Merge branch 'feature/db_ser' into feature/3.0_liaohj
This commit is contained in:
commit
4a60b0e308
|
@ -62,18 +62,17 @@ typedef struct SDataBlockInfo {
|
||||||
union {int64_t uid; int64_t blockId;};
|
union {int64_t uid; int64_t blockId;};
|
||||||
} SDataBlockInfo;
|
} SDataBlockInfo;
|
||||||
|
|
||||||
typedef struct SConstantItem {
|
//typedef struct SConstantItem {
|
||||||
SColumnInfo info;
|
// SColumnInfo info;
|
||||||
int32_t startRow; // run-length-encoding to save the space for multiple rows
|
// int32_t startRow; // run-length-encoding to save the space for multiple rows
|
||||||
int32_t endRow;
|
// int32_t endRow;
|
||||||
SVariant value;
|
// SVariant value;
|
||||||
} SConstantItem;
|
//} SConstantItem;
|
||||||
|
|
||||||
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
|
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
|
||||||
typedef struct SSDataBlock {
|
typedef struct SSDataBlock {
|
||||||
SColumnDataAgg *pBlockAgg;
|
SColumnDataAgg *pBlockAgg;
|
||||||
SArray *pDataBlock; // SArray<SColumnInfoData>
|
SArray *pDataBlock; // SArray<SColumnInfoData>
|
||||||
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
|
|
||||||
SDataBlockInfo info;
|
SDataBlockInfo info;
|
||||||
} SSDataBlock;
|
} SSDataBlock;
|
||||||
|
|
||||||
|
@ -95,66 +94,15 @@ typedef struct SColumnInfoData {
|
||||||
};
|
};
|
||||||
} SColumnInfoData;
|
} SColumnInfoData;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
int64_t tbUid = pBlock->info.uid;
|
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
|
||||||
int16_t numOfCols = pBlock->info.numOfCols;
|
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
|
||||||
int16_t hasVarCol = pBlock->info.hasVarCol;
|
|
||||||
int32_t rows = pBlock->info.rows;
|
|
||||||
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
|
|
||||||
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI64(buf, tbUid);
|
|
||||||
tlen += taosEncodeFixedI16(buf, numOfCols);
|
|
||||||
tlen += taosEncodeFixedI16(buf, hasVarCol);
|
|
||||||
tlen += taosEncodeFixedI32(buf, rows);
|
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
|
||||||
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
|
|
||||||
tlen += taosEncodeFixedI16(buf, pColData->info.type);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
|
|
||||||
int32_t colSz = rows * pColData->info.bytes;
|
|
||||||
tlen += taosEncodeBinary(buf, pColData->pData, colSz);
|
|
||||||
}
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
|
|
||||||
int32_t sz;
|
|
||||||
|
|
||||||
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
|
|
||||||
buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols);
|
|
||||||
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
|
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
|
||||||
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SColumnInfoData data = {0};
|
|
||||||
buf = taosDecodeFixedI16(buf, &data.info.colId);
|
|
||||||
buf = taosDecodeFixedI16(buf, &data.info.type);
|
|
||||||
buf = taosDecodeFixedI32(buf, &data.info.bytes);
|
|
||||||
int32_t colSz = pBlock->info.rows * data.info.bytes;
|
|
||||||
buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &data);
|
|
||||||
}
|
|
||||||
return (void*)buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
|
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
// int32_t numOfOutput = pBlock->info.numOfCols;
|
|
||||||
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
|
|
||||||
for (int32_t i = 0; i < sz; ++i) {
|
|
||||||
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
|
||||||
tfree(pColInfoData->pData);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pBlock->pDataBlock);
|
|
||||||
tfree(pBlock->pBlockAgg);
|
|
||||||
// tfree(pBlock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
|
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
|
||||||
|
|
|
@ -240,10 +240,7 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
|
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
|
||||||
ASSERT(pBlock);
|
ASSERT(pBlock && pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock));
|
||||||
|
|
||||||
size_t constantCols = (pBlock->pConstantList != NULL)? taosArrayGetSize(pBlock->pConstantList):0;
|
|
||||||
ASSERT( pBlock->info.numOfCols == taosArrayGetSize(pBlock->pDataBlock) + constantCols);
|
|
||||||
return pBlock->info.numOfCols;
|
return pBlock->info.numOfCols;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1170,3 +1167,67 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
|
||||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
||||||
return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock));
|
return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
||||||
|
int64_t tbUid = pBlock->info.uid;
|
||||||
|
int16_t numOfCols = pBlock->info.numOfCols;
|
||||||
|
int16_t hasVarCol = pBlock->info.hasVarCol;
|
||||||
|
int32_t rows = pBlock->info.rows;
|
||||||
|
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeFixedI64(buf, tbUid);
|
||||||
|
tlen += taosEncodeFixedI16(buf, numOfCols);
|
||||||
|
tlen += taosEncodeFixedI16(buf, hasVarCol);
|
||||||
|
tlen += taosEncodeFixedI32(buf, rows);
|
||||||
|
tlen += taosEncodeFixedI32(buf, sz);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
tlen += taosEncodeFixedI16(buf, pColData->info.colId);
|
||||||
|
tlen += taosEncodeFixedI16(buf, pColData->info.type);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pColData->info.bytes);
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
|
||||||
|
tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows);
|
||||||
|
} else {
|
||||||
|
tlen += taosEncodeBinary(buf, pColData->nullbitmap, BitmapLen(rows));
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = colDataGetLength(pColData, rows);
|
||||||
|
taosEncodeFixedI32(buf, len);
|
||||||
|
|
||||||
|
tlen += taosEncodeBinary(buf, pColData->pData, len);
|
||||||
|
}
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
|
||||||
|
int32_t sz;
|
||||||
|
|
||||||
|
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
|
||||||
|
buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols);
|
||||||
|
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
|
||||||
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
|
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SColumnInfoData data = {0};
|
||||||
|
buf = taosDecodeFixedI16(buf, &data.info.colId);
|
||||||
|
buf = taosDecodeFixedI16(buf, &data.info.type);
|
||||||
|
buf = taosDecodeFixedI32(buf, &data.info.bytes);
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(data.info.type)) {
|
||||||
|
buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t));
|
||||||
|
data.varmeta.length = pBlock->info.rows * sizeof(int32_t);
|
||||||
|
data.varmeta.allocLen = data.varmeta.length;
|
||||||
|
} else {
|
||||||
|
buf = taosDecodeBinary(buf, (void**)&data.nullbitmap, BitmapLen(pBlock->info.rows));
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t len = 0;
|
||||||
|
buf = taosDecodeFixedI32(buf, &len);
|
||||||
|
buf = taosDecodeBinary(buf, (void**)&data.pData, len);
|
||||||
|
taosArrayPush(pBlock->pDataBlock, &data);
|
||||||
|
}
|
||||||
|
return (void*)buf;
|
||||||
|
}
|
Loading…
Reference in New Issue