fix: add group id to create initial source

This commit is contained in:
shenglian zhou 2022-06-09 09:36:14 +08:00
parent 1783c971e9
commit 9afcca0355
2 changed files with 39 additions and 23 deletions

View File

@ -1217,6 +1217,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pBlock->info.numOfCols = numOfCols; pBlock->info.numOfCols = numOfCols;
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
pBlock->info.rowSize = pDataBlock->info.rowSize; pBlock->info.rowSize = pDataBlock->info.rowSize;
pBlock->info.groupId = pDataBlock->info.groupId;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0}; SColumnInfoData colInfo = {0};

View File

@ -526,16 +526,24 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
if (pHandle->type == SORT_SINGLESOURCE_SORT) { if (pHandle->type == SORT_SINGLESOURCE_SORT) {
SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0);
taosArrayClear(pHandle->pOrderedSource); taosArrayClear(pHandle->pOrderedSource);
bool hasGroupId = false;
SSDataBlock* prefetchedDataBlock = NULL;
while (1) { while (1) {
SSDataBlock* pBlock = pHandle->fetchfp(source->param); SSDataBlock* pBlock = NULL;
if (prefetchedDataBlock == NULL) {
pBlock = pHandle->fetchfp(source->param);
} else {
pBlock = prefetchedDataBlock;
prefetchedDataBlock = NULL;
}
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
if (pHandle->pDataBlock == NULL) { if (!hasGroupId) {
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
// calculate the buffer pages according to the total available buffers. // calculate the buffer pages according to the total available buffers.
int32_t rowSize = blockDataGetRowSize(pBlock); int32_t rowSize = blockDataGetRowSize(pBlock);
if (rowSize * 4 > 4096) { if (rowSize * 4 > 4096) {
@ -547,29 +555,36 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
// todo!! // todo!!
pHandle->numOfPages = 1024; pHandle->numOfPages = 1024;
sortBufSize = pHandle->numOfPages * pHandle->pageSize; sortBufSize = pHandle->numOfPages * pHandle->pageSize;
hasGroupId = true;
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
} }
// perform the scalar function calculation before apply the sort if (pHandle->pDataBlock->info.groupId == pBlock->info.groupId) {
if (pHandle->beforeFp != NULL) { // perform the scalar function calculation before apply the sort
pHandle->beforeFp(pBlock, pHandle->param); if (pHandle->beforeFp != NULL) {
} pHandle->beforeFp(pBlock, pHandle->param);
}
// todo relocate the columns
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != 0) {
return code;
}
// todo relocate the columns size_t size = blockDataGetSize(pHandle->pDataBlock);
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); if (size > sortBufSize) {
if (code != 0) { // Perform the in-memory sort and then flush data in the buffer into disk.
return code; int64_t p = taosGetTimestampUs();
} blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
size_t size = blockDataGetSize(pHandle->pDataBlock); int64_t el = taosGetTimestampUs() - p;
if (size > sortBufSize) { pHandle->sortElapsed += el;
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
int64_t el = taosGetTimestampUs() - p; doAddToBuf(pHandle->pDataBlock, pHandle);
pHandle->sortElapsed += el; }
} else {
doAddToBuf(pHandle->pDataBlock, pHandle); prefetchedDataBlock = pBlock;
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
} }
} }