fix(query): fix the invalid column length value during spill out the group data into disk.
This commit is contained in:
parent
52f6a66ba5
commit
5c2c2b378b
|
@ -600,10 +600,11 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
|
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
|
||||||
pBlock->info.rows = *(int32_t*)buf;
|
pBlock->info.rows = *(int32_t*) buf;
|
||||||
|
pBlock->info.groupId = *(uint64_t*) (buf + sizeof(int32_t));
|
||||||
|
|
||||||
int32_t numOfCols = pBlock->info.numOfCols;
|
int32_t numOfCols = pBlock->info.numOfCols;
|
||||||
const char* pStart = buf + sizeof(uint32_t);
|
const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
@ -669,7 +670,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
|
||||||
return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t);
|
return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
|
||||||
ASSERT(pBlock != NULL);
|
ASSERT(pBlock != NULL);
|
||||||
double rowSize = 0;
|
double rowSize = 0;
|
||||||
|
|
||||||
|
@ -1224,7 +1225,27 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
||||||
return (int32_t)((pageSize - blockDataGetSerialMetaSize(pBlock)) / blockDataGetSerialRowSize(pBlock));
|
int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(pBlock);
|
||||||
|
|
||||||
|
int32_t rowSize = pBlock->info.rowSize;
|
||||||
|
|
||||||
|
int32_t nRows = payloadSize / rowSize;
|
||||||
|
|
||||||
|
// the true value must be less than the value of nRows
|
||||||
|
int32_t additional = 0;
|
||||||
|
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||||
|
additional += nRows * sizeof(int32_t);
|
||||||
|
} else {
|
||||||
|
additional += BitmapLen(nRows);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t newRows = (payloadSize - additional) / rowSize;
|
||||||
|
ASSERT(newRows <= nRows && newRows > 1);
|
||||||
|
|
||||||
|
return newRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
void colDataDestroy(SColumnInfoData* pColData) {
|
void colDataDestroy(SColumnInfoData* pColData) {
|
||||||
|
|
|
@ -396,8 +396,11 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
pGInfo->groupId = calcGroupId(pInfo->keyBuf, len);
|
pGInfo->groupId = calcGroupId(pInfo->keyBuf, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// number of rows
|
||||||
int32_t* rows = (int32_t*) pPage;
|
int32_t* rows = (int32_t*) pPage;
|
||||||
|
|
||||||
|
// group id
|
||||||
|
|
||||||
size_t numOfCols = pOperator->numOfExprs;
|
size_t numOfCols = pOperator->numOfExprs;
|
||||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SExprInfo* pExpr = &pOperator->pExpr[i];
|
SExprInfo* pExpr = &pOperator->pExpr[i];
|
||||||
|
@ -408,13 +411,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
int32_t bytes = pColInfoData->info.bytes;
|
int32_t bytes = pColInfoData->info.bytes;
|
||||||
int32_t startOffset = pInfo->columnOffset[i];
|
int32_t startOffset = pInfo->columnOffset[i];
|
||||||
|
|
||||||
char* columnLen = NULL;
|
int32_t* columnLen = NULL;
|
||||||
int32_t contentLen = 0;
|
int32_t contentLen = 0;
|
||||||
|
|
||||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
int32_t* offset = (int32_t*)((char*)pPage + startOffset);
|
int32_t* offset = (int32_t*)((char*)pPage + startOffset);
|
||||||
columnLen = (char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity;
|
columnLen = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
|
||||||
char* data = (char*)(columnLen + sizeof(int32_t));
|
char* data = (char*)((char*) columnLen + sizeof(int32_t));
|
||||||
|
|
||||||
if (colDataIsNull_s(pColInfoData, j)) {
|
if (colDataIsNull_s(pColInfoData, j)) {
|
||||||
offset[(*rows)] = -1;
|
offset[(*rows)] = -1;
|
||||||
|
@ -423,11 +426,15 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
offset[*rows] = (*columnLen);
|
offset[*rows] = (*columnLen);
|
||||||
char* src = colDataGetData(pColInfoData, j);
|
char* src = colDataGetData(pColInfoData, j);
|
||||||
memcpy(data + (*columnLen), src, varDataTLen(src));
|
memcpy(data + (*columnLen), src, varDataTLen(src));
|
||||||
|
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
|
||||||
|
ASSERT(v > 0);
|
||||||
|
printf("len:%d\n", v);
|
||||||
|
|
||||||
contentLen = varDataTLen(src);
|
contentLen = varDataTLen(src);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
char* bitmap = (char*)pPage + startOffset;
|
char* bitmap = (char*)pPage + startOffset;
|
||||||
columnLen = (char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity);
|
columnLen = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
|
||||||
char* data = (char*) columnLen + sizeof(int32_t);
|
char* data = (char*) columnLen + sizeof(int32_t);
|
||||||
|
|
||||||
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
|
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
|
||||||
|
@ -440,6 +447,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
|
|
||||||
(*columnLen) += contentLen;
|
(*columnLen) += contentLen;
|
||||||
|
ASSERT(*columnLen >= 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
(*rows) += 1;
|
(*rows) += 1;
|
||||||
|
@ -476,7 +484,12 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
||||||
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
|
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
|
||||||
taosArrayPush(p->pPageList, &pageId);
|
taosArrayPush(p->pPageList, &pageId);
|
||||||
|
|
||||||
*(int32_t*) pPage = 0;
|
// // number of rows
|
||||||
|
// *(int32_t*) pPage = 0;
|
||||||
|
//
|
||||||
|
// uint64_t* groupId = (pPage + sizeof(int32_t));
|
||||||
|
// *groupId = 0;
|
||||||
|
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,7 +513,7 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||||
size_t numOfCols = pBlock->info.numOfCols;
|
size_t numOfCols = pBlock->info.numOfCols;
|
||||||
int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));
|
int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));
|
||||||
|
|
||||||
offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
|
offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfCols - 1; ++i) {
|
for(int32_t i = 0; i < numOfCols - 1; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
@ -571,7 +584,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
|
|
||||||
doHashPartition(pOperator, pBlock);
|
doHashPartition(pOperator, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue