optimize sort logic & fix memory leak
This commit is contained in:
parent
411086c811
commit
ff21ecd364
|
@ -131,44 +131,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
|
|||
memcpy(pColumnInfoData->pData + len, pData, varDataTLen(pData));
|
||||
pColumnInfoData->varmeta.length += varDataTLen(pData);
|
||||
} else {
|
||||
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BOOL: {
|
||||
*(bool*)p = *(bool*)pData;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_UTINYINT: {
|
||||
*(int8_t*)p = *(int8_t*)pData;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT: {
|
||||
*(int16_t*)p = *(int16_t*)pData;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_UINT: {
|
||||
*(int32_t*)p = *(int32_t*)pData;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT: {
|
||||
*(int64_t*)p = *(int64_t*)pData;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
*(float*)p = *(float*)pData;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
*(double*)p = *(double*)pData;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow, pData, pColumnInfoData->info.bytes);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -562,6 +525,11 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
|
|||
|
||||
size_t metaSize = pBlock->info.rows * sizeof(int32_t);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCol->varmeta.offset = (int32_t*)tmp;
|
||||
memcpy(pCol->varmeta.offset, pStart, metaSize);
|
||||
pStart += metaSize;
|
||||
} else {
|
||||
|
@ -738,61 +706,12 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB
|
|||
pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]];
|
||||
}
|
||||
} else {
|
||||
switch (pSrc->info.type) {
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
|
||||
int32_t* p = (int32_t*)pDst->pData;
|
||||
int32_t* srclist = (int32_t*)pSrc->pData;
|
||||
|
||||
p[j] = srclist[index[j]];
|
||||
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
|
||||
colDataSetNull_f(pDst->nullbitmap, j);
|
||||
}
|
||||
}
|
||||
break;
|
||||
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
|
||||
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
|
||||
colDataSetNull_f(pDst->nullbitmap, j);
|
||||
continue;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
|
||||
int32_t* p = (int32_t*)pDst->pData;
|
||||
int32_t* srclist = (int32_t*)pSrc->pData;
|
||||
|
||||
p[j] = srclist[index[j]];
|
||||
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
|
||||
colDataSetNull_f(pDst->nullbitmap, j);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
|
||||
int32_t* p = (int32_t*)pDst->pData;
|
||||
int32_t* srclist = (int32_t*)pSrc->pData;
|
||||
|
||||
p[j] = srclist[index[j]];
|
||||
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
|
||||
colDataSetNull_f(pDst->nullbitmap, j);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
|
||||
int32_t* p = (int32_t*)pDst->pData;
|
||||
int32_t* srclist = (int32_t*)pSrc->pData;
|
||||
|
||||
p[j] = srclist[index[j]];
|
||||
if (colDataIsNull_f(pSrc->nullbitmap, index[j])) {
|
||||
colDataSetNull_f(pDst->nullbitmap, j);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
assert(0);
|
||||
memcpy(pDst->pData + j * pDst->info.bytes, pSrc->pData + index[j] * pDst->info.bytes, pDst->info.bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -938,12 +857,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
|
||||
int64_t p2 = taosGetTimestampUs();
|
||||
|
||||
int32_t code = blockDataAssign(pCols, pDataBlock, index);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroyTupleIndex(index);
|
||||
terrno = code;
|
||||
return code;
|
||||
}
|
||||
blockDataAssign(pCols, pDataBlock, index);
|
||||
|
||||
int64_t p3 = taosGetTimestampUs();
|
||||
|
||||
|
|
|
@ -5769,10 +5769,11 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan
|
|||
pBlock->info.rows += 1;
|
||||
}
|
||||
|
||||
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol,
|
||||
int32_t capacity) {
|
||||
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) {
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
||||
blockDataEnsureCapacity(pDataBlock, capacity);
|
||||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
|
@ -5971,7 +5972,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSortedMergeOperatorInfo* pInfo = pOperator->info;
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity);
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity);
|
||||
}
|
||||
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
|
@ -6116,10 +6117,9 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SSortOperatorInfo* pInfo = pOperator->info;
|
||||
bool hasVarCol = pInfo->pDataBlock->info.hasVarCol;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||
}
|
||||
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
|
@ -6139,7 +6139,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
|
|||
}
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes);
|
||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||
}
|
||||
|
||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo,
|
||||
|
|
|
@ -111,6 +111,17 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
|
|||
return pSortHandle;
|
||||
}
|
||||
|
||||
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
|
||||
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
||||
SExternalMemSource* pSource = cmpParam->pSources[i];
|
||||
blockDataDestroy(pSource->src.pBlock);
|
||||
taosMemoryFreeClear(pSource);
|
||||
}
|
||||
|
||||
cmpParam->numOfSources = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
|
||||
tsortClose(pSortHandle);
|
||||
if (pSortHandle->pMergeTree != NULL) {
|
||||
|
@ -119,6 +130,8 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
|
|||
|
||||
destroyDiskbasedBuf(pSortHandle->pBuf);
|
||||
taosMemoryFreeClear(pSortHandle->idStr);
|
||||
blockDataDestroy(pSortHandle->pDataBlock);
|
||||
sortComparClearup(&pSortHandle->cmpParam); // pOrderedSource is in cmpParam
|
||||
taosMemoryFreeClear(pSortHandle);
|
||||
}
|
||||
|
||||
|
@ -168,6 +181,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
|
||||
if (pPage == NULL) {
|
||||
blockDataDestroy(p);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -227,17 +241,6 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t sortComparClearup(SMsortComparParam* cmpParam) {
|
||||
for(int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
||||
SExternalMemSource* pSource = cmpParam->pSources[i];
|
||||
blockDataDestroy(pSource->src.pBlock);
|
||||
taosMemoryFreeClear(pSource);
|
||||
}
|
||||
|
||||
cmpParam->numOfSources = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) {
|
||||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
@ -312,7 +315,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
|
||||
static SSDataBlock* getSortedBlockDataInner(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
|
||||
blockDataCleanup(pHandle->pDataBlock);
|
||||
|
||||
while(1) {
|
||||
|
@ -454,7 +457,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = getSortedBlockData(pHandle, &pHandle->cmpParam, numOfRows);
|
||||
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
|
||||
if (pDataBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -198,6 +198,7 @@ TEST(testCase, inMem_sort_Test) {
|
|||
|
||||
int32_t code = tsortOpen(phandle);
|
||||
int32_t row = 1;
|
||||
taosMemoryFreeClear(ps);
|
||||
|
||||
while(1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(phandle);
|
||||
|
@ -235,6 +236,7 @@ TEST(testCase, external_mem_sort_Test) {
|
|||
|
||||
int32_t code = tsortOpen(phandle);
|
||||
int32_t row = 1;
|
||||
taosMemoryFreeClear(ps);
|
||||
|
||||
while(1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(phandle);
|
||||
|
@ -245,8 +247,8 @@ TEST(testCase, external_mem_sort_Test) {
|
|||
void* v = tsortGetValue(pTupleHandle, 0);
|
||||
printf("%d: %d\n", row, *(int32_t*) v);
|
||||
ASSERT_EQ(row++, *(int32_t*) v);
|
||||
//char buf[64] = {0};
|
||||
//snprintf(buf, varDataLen(v), "%s", varDataVal(v));
|
||||
char buf[64] = {0};
|
||||
memcpy(buf, varDataVal(v), varDataLen(v));
|
||||
//printf("%d: %s\n", row, buf);
|
||||
}
|
||||
tsortDestroySortHandle(phandle);
|
||||
|
|
Loading…
Reference in New Issue