partition interval and limimt, dataload error

This commit is contained in:
factosea 2024-05-31 18:58:35 +08:00
parent c48db6b212
commit a6217eec03
2 changed files with 94 additions and 55 deletions

View File

@ -53,6 +53,8 @@ typedef struct SPartitionOperatorInfo {
int32_t rowCapacity; // maximum number of rows for each buffer page int32_t rowCapacity; // maximum number of rows for each buffer page
int32_t* columnOffset; // start position for each column data int32_t* columnOffset; // start position for each column data
SArray* sortedGroupArray; // SDataGroupInfo sorted by group id SArray* sortedGroupArray; // SDataGroupInfo sorted by group id
SArray* blockForNotLoaded; // SSDataBlock that data is not loaded
int32_t offsetForNotLoaded;// read offset for SSDataBlock that data is not loaded
int32_t groupIndex; // group index int32_t groupIndex; // group index
int32_t pageIndex; // page index of current group int32_t pageIndex; // page index of current group
SExprSupp scalarSup; SExprSupp scalarSup;
@ -584,71 +586,86 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len); pGroupInfo->groupId = calcGroupId(pInfo->keyBuf, len);
} }
// number of rows if (pBlock->info.dataLoad) {
int32_t* rows = (int32_t*)pPage; // number of rows
int32_t* rows = (int32_t*)pPage;
size_t numOfCols = pOperator->exprSupp.numOfExprs; size_t numOfCols = pOperator->exprSupp.numOfExprs;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i]; SExprInfo* pExpr = &pOperator->exprSupp.pExprInfo[i];
int32_t slotId = pExpr->base.pParam[0].pCol->slotId; int32_t slotId = pExpr->base.pParam[0].pCol->slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
int32_t bytes = pColInfoData->info.bytes; int32_t bytes = pColInfoData->info.bytes;
int32_t startOffset = pInfo->columnOffset[i]; int32_t startOffset = pInfo->columnOffset[i];
int32_t* columnLen = NULL; int32_t* columnLen = NULL;
int32_t contentLen = 0; int32_t contentLen = 0;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
int32_t* offset = (int32_t*)((char*)pPage + startOffset); int32_t* offset = (int32_t*)((char*)pPage + startOffset);
columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity); columnLen = (int32_t*)((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
char* data = (char*)((char*)columnLen + sizeof(int32_t)); char* data = (char*)((char*)columnLen + sizeof(int32_t));
if (colDataIsNull_s(pColInfoData, j)) { if (colDataIsNull_s(pColInfoData, j)) {
offset[(*rows)] = -1; offset[(*rows)] = -1;
contentLen = 0; contentLen = 0;
} else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) { } else if (pColInfoData->info.type == TSDB_DATA_TYPE_JSON) {
offset[*rows] = (*columnLen); offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j); char* src = colDataGetData(pColInfoData, j);
int32_t dataLen = getJsonValueLen(src); int32_t dataLen = getJsonValueLen(src);
memcpy(data + (*columnLen), src, dataLen); memcpy(data + (*columnLen), src, dataLen);
int32_t v = (data + (*columnLen) + dataLen - (char*)pPage); int32_t v = (data + (*columnLen) + dataLen - (char*)pPage);
ASSERT(v > 0); ASSERT(v > 0);
contentLen = dataLen; contentLen = dataLen;
} else {
offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j);
memcpy(data + (*columnLen), src, varDataTLen(src));
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
ASSERT(v > 0);
contentLen = varDataTLen(src);
}
} else { } else {
offset[*rows] = (*columnLen); char* bitmap = (char*)pPage + startOffset;
char* src = colDataGetData(pColInfoData, j); columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
memcpy(data + (*columnLen), src, varDataTLen(src)); char* data = (char*)columnLen + sizeof(int32_t);
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
ASSERT(v > 0);
contentLen = varDataTLen(src); bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
if (isNull) {
colDataSetNull_f(bitmap, (*rows));
} else {
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
}
contentLen = bytes;
} }
} else {
char* bitmap = (char*)pPage + startOffset;
columnLen = (int32_t*)((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
char* data = (char*)columnLen + sizeof(int32_t);
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); (*columnLen) += contentLen;
if (isNull) {
colDataSetNull_f(bitmap, (*rows));
} else {
memcpy(data + (*columnLen), colDataGetData(pColInfoData, j), bytes);
ASSERT((data + (*columnLen) + bytes - (char*)pPage) <= getBufPageSize(pInfo->pBuf));
}
contentLen = bytes;
} }
(*columnLen) += contentLen; (*rows) += 1;
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage);
} else {
SSDataBlock* dataNotLoadBlock = createOneDataBlock(pBlock, true);
if (dataNotLoadBlock == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
if (pInfo->blockForNotLoaded == NULL) {
pInfo->blockForNotLoaded = taosArrayInit(1, sizeof(SSDataBlock));
pInfo->offsetForNotLoaded = 0;
}
dataNotLoadBlock->info.id.groupId = pGroupInfo->groupId;
dataNotLoadBlock->info.dataLoad = 0;
taosArrayInsert(pInfo->blockForNotLoaded, pInfo->blockForNotLoaded->size, &dataNotLoadBlock);
break;
} }
(*rows) += 1;
setBufPageDirty(pPage, true);
releaseBufPage(pInfo->pBuf, pPage);
} }
} }
@ -734,6 +751,14 @@ static void clearPartitionOperator(SPartitionOperatorInfo* pInfo) {
} }
taosArrayClear(pInfo->sortedGroupArray); taosArrayClear(pInfo->sortedGroupArray);
clearDiskbasedBuf(pInfo->pBuf); clearDiskbasedBuf(pInfo->pBuf);
if (pInfo->blockForNotLoaded) {
for (int32_t i = 0; i < pInfo->blockForNotLoaded->size; i++) {
SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, i);
blockDataDestroy(*pBlock);
}
taosArrayClear(pInfo->blockForNotLoaded);
pInfo->offsetForNotLoaded = 0;
}
} }
static int compareDataGroupInfo(const void* group1, const void* group2) { static int compareDataGroupInfo(const void* group1, const void* group2) {
@ -747,6 +772,21 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1; return (pGroupInfo1->groupId < pGroupInfo2->groupId) ? -1 : 1;
} }
static SSDataBlock* buildPartitionResultForNotLoadBlock(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pInfo->blockForNotLoaded && pInfo->offsetForNotLoaded < pInfo->blockForNotLoaded->size) {
SSDataBlock** pBlock = taosArrayGet(pInfo->blockForNotLoaded, pInfo->offsetForNotLoaded);
pInfo->offsetForNotLoaded++;
return *pBlock;
} else {
setOperatorCompleted(pOperator);
clearPartitionOperator(pInfo);
return NULL;
}
}
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info; SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -757,12 +797,10 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
// try next group data // try next group data
++pInfo->groupIndex; if (pInfo->groupIndex + 1 >= taosArrayGetSize(pInfo->sortedGroupArray)) {
if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) { return buildPartitionResultForNotLoadBlock(pOperator);
setOperatorCompleted(pOperator);
clearPartitionOperator(pInfo);
return NULL;
} }
++pInfo->groupIndex;
pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex); pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
pInfo->pageIndex = 0; pInfo->pageIndex = 0;

View File

@ -769,6 +769,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
bool hasNext = false; bool hasNext = false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
pBlock->info.dataLoad = false;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();