[td-13039] refactor.
This commit is contained in:
parent
b95e95dc0a
commit
0c5a8dc3ff
|
@ -63,33 +63,33 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
|
|||
* @param type
|
||||
* @return
|
||||
*/
|
||||
SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr);
|
||||
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pSortHandle
|
||||
*/
|
||||
void destroySortHandle(SSortHandle* pSortHandle);
|
||||
void tsortDestroySortHandle(SSortHandle* pSortHandle);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pHandle
|
||||
* @return
|
||||
*/
|
||||
int32_t sortOpen(SSortHandle* pHandle);
|
||||
int32_t tsortOpen(SSortHandle* pHandle);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pHandle
|
||||
* @return
|
||||
*/
|
||||
int32_t sortClose(SSortHandle* pHandle);
|
||||
int32_t tsortClose(SSortHandle* pHandle);
|
||||
|
||||
/**
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
int32_t setFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp);
|
||||
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -97,7 +97,7 @@ int32_t setFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp);
|
|||
* @param fp
|
||||
* @return
|
||||
*/
|
||||
int32_t setComparFn(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
|
||||
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -105,14 +105,14 @@ int32_t setComparFn(SSortHandle* pHandle, _sort_merge_compar_fn_t fp);
|
|||
* @param pSource
|
||||
* @return success or failed
|
||||
*/
|
||||
int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource);
|
||||
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param pHandle
|
||||
* @return
|
||||
*/
|
||||
STupleHandle* sortNextTuple(SSortHandle* pHandle);
|
||||
STupleHandle* tsortNextTuple(SSortHandle* pHandle);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -120,7 +120,7 @@ STupleHandle* sortNextTuple(SSortHandle* pHandle);
|
|||
* @param colIndex
|
||||
* @return
|
||||
*/
|
||||
bool sortIsValueNull(STupleHandle* pVHandle, int32_t colIndex);
|
||||
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex);
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -128,7 +128,7 @@ bool sortIsValueNull(STupleHandle* pVHandle, int32_t colIndex);
|
|||
* @param colIndex
|
||||
* @return
|
||||
*/
|
||||
void* sortGetValue(STupleHandle* pVHandle, int32_t colIndex);
|
||||
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -5586,7 +5586,7 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosArrayDestroy(pInfo->groupInfo);
|
||||
|
||||
if (pInfo->pSortHandle != NULL) {
|
||||
destroySortHandle(pInfo->pSortHandle);
|
||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||
}
|
||||
blockDataDestroy(pInfo->binfo.pRes);
|
||||
|
||||
|
@ -5617,11 +5617,11 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan
|
|||
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
bool isNull = sortIsValueNull(pTupleHandle, i);
|
||||
bool isNull = tsortIsNullVal(pTupleHandle, i);
|
||||
if (isNull) {
|
||||
colDataAppend(pColInfo, pBlock->info.rows, NULL, true);
|
||||
} else {
|
||||
char* pData = sortGetValue(pTupleHandle, i);
|
||||
char* pData = tsortGetValue(pTupleHandle, i);
|
||||
colDataAppend(pColInfo, pBlock->info.rows, pData, false);
|
||||
}
|
||||
}
|
||||
|
@ -5633,7 +5633,7 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB
|
|||
blockDataClearup(pDataBlock, hasVarCol);
|
||||
|
||||
while(1) {
|
||||
STupleHandle* pTupleHandle = sortNextTuple(pHandle);
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -5788,7 +5788,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
|||
|
||||
blockDataClearup(pDataBlock, pInfo->hasVarCol);
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = sortNextTuple(pHandle);
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -5835,19 +5835,19 @@ static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
|
|||
|
||||
SSchema* p = blockDataExtractSchema(pInfo->binfo.pRes, NULL);
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize,
|
||||
numOfBufPage, p, pInfo->binfo.pRes->info.numOfCols, "GET_TASKID(pTaskInfo)");
|
||||
|
||||
tfree(p);
|
||||
setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||
|
||||
for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
|
||||
SGenericSource* ps = calloc(1, sizeof(SGenericSource));
|
||||
ps->param = pOperator->pDownstream[i];
|
||||
sortAddSource(pInfo->pSortHandle, ps);
|
||||
tsortAddSource(pInfo->pSortHandle, ps);
|
||||
}
|
||||
|
||||
int32_t code = sortOpen(pInfo->pSortHandle);
|
||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
@ -6006,18 +6006,18 @@ static SSDataBlock* doSort(void* param, bool* newgroup) {
|
|||
|
||||
SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL);
|
||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||
pInfo->pSortHandle = createSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize,
|
||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize,
|
||||
numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)");
|
||||
|
||||
tfree(p);
|
||||
setFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||
|
||||
SGenericSource* ps = calloc(1, sizeof(SGenericSource));
|
||||
ps->param = pOperator;
|
||||
sortAddSource(pInfo->pSortHandle, ps);
|
||||
tsortAddSource(pInfo->pSortHandle, ps);
|
||||
|
||||
// TODO set error code;
|
||||
int32_t code = sortOpen(pInfo->pSortHandle);
|
||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
|
|||
* @param type
|
||||
* @return
|
||||
*/
|
||||
SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr) {
|
||||
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr) {
|
||||
SSortHandle* pSortHandle = calloc(1, sizeof(SSortHandle));
|
||||
|
||||
pSortHandle->type = type;
|
||||
|
@ -99,7 +99,7 @@ SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type,
|
|||
pSortHandle->cmpParam.orderInfo = pOrderInfo;
|
||||
|
||||
pSortHandle->pDataBlock = createDataBlock_rv(pSchema, numOfCols);
|
||||
setComparFn(pSortHandle, msortComparFn);
|
||||
tsortSetComparFp(pSortHandle, msortComparFn);
|
||||
|
||||
if (idstr != NULL) {
|
||||
pSortHandle->idStr = strdup(idstr);
|
||||
|
@ -108,8 +108,8 @@ SSortHandle* createSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type,
|
|||
return pSortHandle;
|
||||
}
|
||||
|
||||
void destroySortHandle(SSortHandle* pSortHandle) {
|
||||
sortClose(pSortHandle);
|
||||
void tsortDestroySortHandle(SSortHandle* pSortHandle) {
|
||||
tsortClose(pSortHandle);
|
||||
if (pSortHandle->pMergeTree != NULL) {
|
||||
tMergeTreeDestroy(pSortHandle->pMergeTree);
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ void destroySortHandle(SSortHandle* pSortHandle) {
|
|||
tfree(pSortHandle);
|
||||
}
|
||||
|
||||
int32_t sortAddSource(SSortHandle* pSortHandle, void* pSource) {
|
||||
int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
|
||||
taosArrayPush(pSortHandle->pOrderedSource, &pSource);
|
||||
}
|
||||
|
||||
|
@ -573,7 +573,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t sortOpen(SSortHandle* pHandle) {
|
||||
int32_t tsortOpen(SSortHandle* pHandle) {
|
||||
if (pHandle->opened) {
|
||||
return 0;
|
||||
}
|
||||
|
@ -611,19 +611,19 @@ int32_t sortOpen(SSortHandle* pHandle) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t sortClose(SSortHandle* pHandle) {
|
||||
int32_t tsortClose(SSortHandle* pHandle) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
int32_t setFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) {
|
||||
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) {
|
||||
pHandle->fetchfp = fp;
|
||||
}
|
||||
|
||||
int32_t setComparFn(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
|
||||
int32_t tsortSetComparFp(SSortHandle* pHandle, _sort_merge_compar_fn_t fp) {
|
||||
pHandle->comparFn = fp;
|
||||
}
|
||||
|
||||
STupleHandle* sortNextTuple(SSortHandle* pHandle) {
|
||||
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
|
||||
if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -669,11 +669,11 @@ STupleHandle* sortNextTuple(SSortHandle* pHandle) {
|
|||
return &pHandle->tupleHandle;
|
||||
}
|
||||
|
||||
bool sortIsValueNull(STupleHandle* pVHandle, int32_t colIndex) {
|
||||
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void* sortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
|
||||
void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
|
||||
SColumnInfoData* pColInfo = TARRAY_GET_ELEM(pVHandle->pBlock->pDataBlock, colIndex);
|
||||
return colDataGetData(pColInfo, pVHandle->rowIndex);
|
||||
}
|
||||
|
|
|
@ -164,24 +164,24 @@ TEST(testCase, inMem_sort_Test) {
|
|||
taosArrayPush(orderInfo, &oi);
|
||||
|
||||
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, };
|
||||
SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
|
||||
setFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||
sortAddSource(phandle, &numOfRows);
|
||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
|
||||
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||
tsortAddSource(phandle, &numOfRows);
|
||||
|
||||
int32_t code = sortOpen(phandle);
|
||||
int32_t code = tsortOpen(phandle);
|
||||
int32_t row = 1;
|
||||
|
||||
while(1) {
|
||||
STupleHandle* pTupleHandle = sortNextTuple(phandle);
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(phandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
void* v = sortGetValue(pTupleHandle, 0);
|
||||
void* v = tsortGetValue(pTupleHandle, 0);
|
||||
printf("%d: %d\n", row++, *(int32_t*) v);
|
||||
|
||||
}
|
||||
destroySortHandle(phandle);
|
||||
tsortDestroySortHandle(phandle);
|
||||
}
|
||||
|
||||
TEST(testCase, external_mem_sort_Test) {
|
||||
|
@ -198,8 +198,8 @@ TEST(testCase, external_mem_sort_Test) {
|
|||
taosArrayPush(orderInfo, &oi);
|
||||
|
||||
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4, };
|
||||
SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
|
||||
setFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, false, SORT_SINGLESOURCE_SORT, 1024, 5, &s, 1, "test_abc");
|
||||
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||
|
||||
_info* pInfo = (_info*) calloc(1, sizeof(_info));
|
||||
pInfo->startVal = 100000;
|
||||
|
@ -209,22 +209,22 @@ TEST(testCase, external_mem_sort_Test) {
|
|||
SGenericSource* ps = static_cast<SGenericSource*>(calloc(1, sizeof(SGenericSource)));
|
||||
ps->param = pInfo;
|
||||
|
||||
sortAddSource(phandle, ps);
|
||||
tsortAddSource(phandle, ps);
|
||||
|
||||
int32_t code = sortOpen(phandle);
|
||||
int32_t code = tsortOpen(phandle);
|
||||
int32_t row = 1;
|
||||
|
||||
while(1) {
|
||||
STupleHandle* pTupleHandle = sortNextTuple(phandle);
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(phandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
void* v = sortGetValue(pTupleHandle, 0);
|
||||
void* v = tsortGetValue(pTupleHandle, 0);
|
||||
printf("%d: %d\n", row++, *(int32_t*) v);
|
||||
|
||||
}
|
||||
destroySortHandle(phandle);
|
||||
tsortDestroySortHandle(phandle);
|
||||
}
|
||||
|
||||
TEST(testCase, ordered_merge_sort_Test) {
|
||||
|
@ -242,9 +242,9 @@ TEST(testCase, ordered_merge_sort_Test) {
|
|||
taosArrayPush(orderInfo, &oi);
|
||||
|
||||
SSchema s = {.type = TSDB_DATA_TYPE_INT, .colId = 1, .bytes = 4};
|
||||
SSortHandle* phandle = createSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc");
|
||||
setFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||
setComparFn(phandle, docomp);
|
||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, false, SORT_MULTISOURCE_MERGE, 1024, 5, &s, 1,"test_abc");
|
||||
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||
tsortSetComparFp(phandle, docomp);
|
||||
|
||||
for(int32_t i = 0; i < 10; ++i) {
|
||||
SGenericSource* p = static_cast<SGenericSource*>(calloc(1, sizeof(SGenericSource)));
|
||||
|
@ -254,23 +254,23 @@ TEST(testCase, ordered_merge_sort_Test) {
|
|||
c->startVal = 0;
|
||||
|
||||
p->param = c;
|
||||
sortAddSource(phandle, p);
|
||||
tsortAddSource(phandle, p);
|
||||
}
|
||||
|
||||
int32_t code = sortOpen(phandle);
|
||||
int32_t code = tsortOpen(phandle);
|
||||
int32_t row = 1;
|
||||
|
||||
while(1) {
|
||||
STupleHandle* pTupleHandle = sortNextTuple(phandle);
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(phandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
void* v = sortGetValue(pTupleHandle, 0);
|
||||
void* v = tsortGetValue(pTupleHandle, 0);
|
||||
printf("%d: %d\n", row++, *(int32_t*) v);
|
||||
|
||||
}
|
||||
destroySortHandle(phandle);
|
||||
tsortDestroySortHandle(phandle);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
Loading…
Reference in New Issue