[TD-14422]<fix> fix error in sort logic & add test case
This commit is contained in:
parent
878e54f143
commit
69a5e55a24
|
@ -1201,6 +1201,9 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
|
||||
if(pDataBlock == NULL){
|
||||
return NULL;
|
||||
}
|
||||
int32_t numOfCols = pDataBlock->info.numOfCols;
|
||||
|
||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
|
|
|
@ -148,7 +148,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
int32_t start = 0;
|
||||
|
||||
if (pHandle->pBuf == NULL) {
|
||||
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
|
||||
int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "doAddToBuf", "/tmp");
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -212,7 +212,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
|
|||
} else {
|
||||
// multi-pass internal merge sort is required
|
||||
if (pHandle->pBuf == NULL) {
|
||||
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp");
|
||||
code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, "sortComparInit", "/tmp");
|
||||
dBufSetPrintInfo(pHandle->pBuf);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -411,6 +411,7 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
|
|||
assert(0);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||
|
|
|
@ -152,23 +152,24 @@ int32_t docomp(const void* p1, const void* p2, void* param) {
|
|||
|
||||
#if 1
|
||||
TEST(testCase, inMem_sort_Test) {
|
||||
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
|
||||
SOrder o = {.order = TSDB_ORDER_ASC};
|
||||
o.col.info.colId = 1;
|
||||
o.col.info.type = TSDB_DATA_TYPE_INT;
|
||||
taosArrayPush(pOrderVal, &o);
|
||||
|
||||
int32_t numOfRows = 1000;
|
||||
SBlockOrderInfo oi = {0};
|
||||
oi.order = TSDB_ORDER_ASC;
|
||||
oi.colIndex = 0;
|
||||
oi.slotId = 0;
|
||||
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||
taosArrayPush(orderInfo, &oi);
|
||||
|
||||
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, };
|
||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
|
||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc");
|
||||
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||
tsortAddSource(phandle, &numOfRows);
|
||||
|
||||
_info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info));
|
||||
pInfo->startVal = 0;
|
||||
pInfo->pageRows = 100;
|
||||
pInfo->count = 6;
|
||||
|
||||
SGenericSource* ps = static_cast<SGenericSource*>(taosMemoryCalloc(1, sizeof(SGenericSource)));
|
||||
ps->param = pInfo;
|
||||
tsortAddSource(phandle, ps);
|
||||
|
||||
int32_t code = tsortOpen(phandle);
|
||||
int32_t row = 1;
|
||||
|
@ -180,7 +181,8 @@ TEST(testCase, inMem_sort_Test) {
|
|||
}
|
||||
|
||||
void* v = tsortGetValue(pTupleHandle, 0);
|
||||
printf("%d: %d\n", row++, *(int32_t*) v);
|
||||
printf("%d: %d\n", row, *(int32_t*) v);
|
||||
ASSERT_EQ(row++, *(int32_t*) v);
|
||||
|
||||
}
|
||||
tsortDestroySortHandle(phandle);
|
||||
|
@ -193,13 +195,13 @@ TEST(testCase, external_mem_sort_Test) {
|
|||
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||
taosArrayPush(orderInfo, &oi);
|
||||
|
||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 1024, 5, &s, "test_abc");
|
||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 32, 6, NULL, "test_abc");
|
||||
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||
|
||||
_info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info));
|
||||
pInfo->startVal = 100000;
|
||||
pInfo->pageRows = 1000;
|
||||
pInfo->count = 50;
|
||||
pInfo->startVal = 0;
|
||||
pInfo->pageRows = 100;
|
||||
pInfo->count = 6;
|
||||
|
||||
SGenericSource* ps = static_cast<SGenericSource*>(taosMemoryCalloc(1, sizeof(SGenericSource)));
|
||||
ps->param = pInfo;
|
||||
|
@ -216,28 +218,22 @@ TEST(testCase, external_mem_sort_Test) {
|
|||
}
|
||||
|
||||
void* v = tsortGetValue(pTupleHandle, 0);
|
||||
printf("%d: %d\n", row++, *(int32_t*) v);
|
||||
printf("%d: %d\n", row, *(int32_t*) v);
|
||||
ASSERT_EQ(row++, *(int32_t*) v);
|
||||
|
||||
}
|
||||
tsortDestroySortHandle(phandle);
|
||||
}
|
||||
|
||||
TEST(testCase, ordered_merge_sort_Test) {
|
||||
SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder));
|
||||
SOrder o = {.order = TSDB_ORDER_ASC};
|
||||
o.col.info.colId = 1;
|
||||
o.col.info.type = TSDB_DATA_TYPE_INT;
|
||||
taosArrayPush(pOrderVal, &o);
|
||||
|
||||
int32_t numOfRows = 1000;
|
||||
SBlockOrderInfo oi = {0};
|
||||
oi.order = TSDB_ORDER_ASC;
|
||||
oi.colIndex = 0;
|
||||
oi.slotId = 0;
|
||||
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||
taosArrayPush(orderInfo, &oi);
|
||||
|
||||
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4};
|
||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc");
|
||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_MULTISOURCE_MERGE, 1024, 5, NULL,"test_abc");
|
||||
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||
tsortSetComparFp(phandle, docomp);
|
||||
|
||||
|
@ -246,7 +242,7 @@ TEST(testCase, ordered_merge_sort_Test) {
|
|||
_info* c = static_cast<_info*>(taosMemoryCalloc(1, sizeof(_info)));
|
||||
c->count = 1;
|
||||
c->pageRows = 1000;
|
||||
c->startVal = 0;
|
||||
c->startVal = i*1000;
|
||||
|
||||
p->param = c;
|
||||
tsortAddSource(phandle, p);
|
||||
|
@ -262,7 +258,8 @@ TEST(testCase, ordered_merge_sort_Test) {
|
|||
}
|
||||
|
||||
void* v = tsortGetValue(pTupleHandle, 0);
|
||||
printf("%d: %d\n", row++, *(int32_t*) v);
|
||||
printf("%d: %d\n", row, *(int32_t*) v);
|
||||
ASSERT_EQ(row++, *(int32_t*) v);
|
||||
|
||||
}
|
||||
tsortDestroySortHandle(phandle);
|
||||
|
|
|
@ -138,7 +138,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
|||
pBuf->nextPos += size;
|
||||
|
||||
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
|
||||
if (ret != 0) {
|
||||
if (ret == -1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -169,7 +169,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
|||
|
||||
// 3. write to disk.
|
||||
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
|
||||
if (ret != 0) {
|
||||
if (ret == -1) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -224,7 +224,7 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
|||
// load file block data in disk
|
||||
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
||||
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
|
||||
if (ret != 0) {
|
||||
if (ret == -1) {
|
||||
ret = TAOS_SYSTEM_ERROR(errno);
|
||||
return ret;
|
||||
}
|
||||
|
@ -371,7 +371,6 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
|
|||
pPBuf->totalBufSize = 0;
|
||||
pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit.
|
||||
pPBuf->allocateId = -1;
|
||||
pPBuf->comp = true;
|
||||
pPBuf->pFile = NULL;
|
||||
pPBuf->id = strdup(id);
|
||||
pPBuf->fileSize = 0;
|
||||
|
|
Loading…
Reference in New Issue