<fix>: fix index map error from table scan to sort output
This commit is contained in:
parent
8df8c60f60
commit
1f8d2ea446
|
@ -36,10 +36,10 @@ IF (TD_WINDOWS)
|
||||||
ENDIF ()
|
ENDIF ()
|
||||||
|
|
||||||
ELSE ()
|
ELSE ()
|
||||||
#SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
||||||
#SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
|
||||||
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
#SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
||||||
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
#SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -g3")
|
||||||
|
|
||||||
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")
|
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")
|
||||||
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
|
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64" OR ${CMAKE_SYSTEM_PROCESSOR} MATCHES "aarch64")
|
||||||
|
|
|
@ -177,7 +177,7 @@ void colDataTrim(SColumnInfoData* pColumnInfoData);
|
||||||
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
|
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
|
||||||
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
|
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
|
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap);
|
||||||
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
|
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
|
||||||
int32_t pageSize);
|
int32_t pageSize);
|
||||||
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
|
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
|
||||||
|
|
|
@ -315,13 +315,17 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
|
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pIndexMap) {
|
||||||
assert(pSrc != NULL && pDest != NULL);
|
assert(pSrc != NULL && pDest != NULL);
|
||||||
|
|
||||||
int32_t numOfCols = pDest->info.numOfCols;
|
int32_t numOfCols = pDest->info.numOfCols;
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
int32_t mapIndex = i;
|
||||||
|
if(pIndexMap) {
|
||||||
|
mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i);
|
||||||
|
}
|
||||||
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
|
||||||
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
|
SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, mapIndex);
|
||||||
|
|
||||||
uint32_t oldLen = colDataGetLength(pCol2, pDest->info.rows);
|
uint32_t oldLen = colDataGetLength(pCol2, pDest->info.rows);
|
||||||
uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows);
|
uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows);
|
||||||
|
|
|
@ -1437,6 +1437,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
|
||||||
|
|
||||||
tdDestroyTSma(&vCreateSmaReq.tSma);
|
tdDestroyTSma(&vCreateSmaReq.tSma);
|
||||||
// TODO: return directly or go on follow steps?
|
// TODO: return directly or go on follow steps?
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
|
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
|
||||||
|
@ -1466,6 +1467,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
|
||||||
tsdbTSmaSub(pTsdb, 1);
|
tsdbTSmaSub(pTsdb, 1);
|
||||||
|
|
||||||
// TODO: return directly or go on follow steps?
|
// TODO: return directly or go on follow steps?
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
|
|
@ -614,6 +614,7 @@ typedef struct SSortOperatorInfo {
|
||||||
SSDataBlock *pDataBlock;
|
SSDataBlock *pDataBlock;
|
||||||
SArray* pSortInfo;
|
SArray* pSortInfo;
|
||||||
SSortHandle *pSortHandle;
|
SSortHandle *pSortHandle;
|
||||||
|
SArray* inputSlotMap; // for index map from table scan output
|
||||||
int32_t bufPageSize;
|
int32_t bufPageSize;
|
||||||
int32_t numOfRowsInRes;
|
int32_t numOfRowsInRes;
|
||||||
|
|
||||||
|
@ -651,7 +652,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo);
|
||||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
|
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
|
||||||
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
|
SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
|
||||||
|
|
|
@ -68,7 +68,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
|
||||||
* @param type
|
* @param type
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
|
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, SArray* pIndexMap, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
|
|
@ -4953,7 +4953,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
|
pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)");
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||||
|
@ -5100,7 +5100,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pDataBlock, pTaskInfo->id.str);
|
pInfo->pDataBlock, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||||
|
@ -5119,7 +5119,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) {
|
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo) {
|
||||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
int32_t rowSize = pResBlock->info.rowSize;
|
int32_t rowSize = pResBlock->info.rowSize;
|
||||||
|
@ -5137,6 +5137,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
pInfo->numOfRowsInRes = 1024;
|
pInfo->numOfRowsInRes = 1024;
|
||||||
pInfo->pDataBlock = pResBlock;
|
pInfo->pDataBlock = pResBlock;
|
||||||
pInfo->pSortInfo = pSortInfo;
|
pInfo->pSortInfo = pSortInfo;
|
||||||
|
pInfo->inputSlotMap = pIndexMap;
|
||||||
|
|
||||||
pOperator->name = "SortOperator";
|
pOperator->name = "SortOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT;
|
||||||
|
@ -6265,6 +6266,7 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
|
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->pSortInfo);
|
taosArrayDestroy(pInfo->pSortInfo);
|
||||||
|
taosArrayDestroy(pInfo->inputSlotMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
|
@ -7065,6 +7067,7 @@ static SArray* extractScanColumnId(SNodeList* pNodeList);
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
static SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols);
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
|
static SArray* createIndexMap(SNodeList* pNodeList);
|
||||||
|
|
||||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
||||||
|
@ -7183,7 +7186,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||||
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
SArray* info = createSortInfo(pSortPhyNode->pSortKeys);
|
||||||
return createSortOperatorInfo(op, pResBlock, info, pTaskInfo);
|
SArray* slotMap = createIndexMap(pSortPhyNode->pTargets);
|
||||||
|
return createSortOperatorInfo(op, pResBlock, info, slotMap, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == nodeType(pPhyNode)) {
|
||||||
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
size_t size = LIST_LENGTH(pPhyNode->pChildren);
|
||||||
assert(size == 1);
|
assert(size == 1);
|
||||||
|
@ -7303,8 +7307,7 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i);
|
SOrderByExprNode* pSortKey = (SOrderByExprNode*)nodesListGetNode(pNodeList, i);
|
||||||
SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode->pExpr;
|
|
||||||
SBlockOrderInfo bi = {0};
|
SBlockOrderInfo bi = {0};
|
||||||
bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
bi.order = (pSortKey->order == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||||
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
|
bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST);
|
||||||
|
@ -7326,6 +7329,24 @@ SArray* createSortInfo(SNodeList* pNodeList) {
|
||||||
return pList;
|
return pList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray* createIndexMap(SNodeList* pNodeList) {
|
||||||
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
|
SArray* pList = taosArrayInit(numOfCols, sizeof(int32_t));
|
||||||
|
if (pList == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return pList;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
STargetNode* pTarget = (STargetNode*)nodesListGetNode(pNodeList, i);
|
||||||
|
|
||||||
|
SColumnNode* pColNode = (SColumnNode*)pTarget->pExpr;
|
||||||
|
taosArrayPush(pList, &pColNode->slotId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pList;
|
||||||
|
}
|
||||||
|
|
||||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols) {
|
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols) {
|
||||||
size_t numOfCols = LIST_LENGTH(pNodeList);
|
size_t numOfCols = LIST_LENGTH(pNodeList);
|
||||||
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo));
|
||||||
|
|
|
@ -365,12 +365,12 @@ static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pDataBlock, pTaskInfo->id.str);
|
pInfo->pDataBlock, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock);
|
||||||
|
|
||||||
SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource));
|
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||||
ps->param = pOperator->pDownstream[0];
|
ps->param = pOperator->pDownstream[0];
|
||||||
tsortAddSource(pInfo->pSortHandle, ps);
|
tsortAddSource(pInfo->pSortHandle, ps);
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ struct SSortHandle {
|
||||||
SDiskbasedBuf *pBuf;
|
SDiskbasedBuf *pBuf;
|
||||||
|
|
||||||
SArray *pSortInfo;
|
SArray *pSortInfo;
|
||||||
|
SArray *pIndexMap;
|
||||||
SArray *pOrderedSource;
|
SArray *pOrderedSource;
|
||||||
|
|
||||||
_sort_fetch_block_fn_t fetchfp;
|
_sort_fetch_block_fn_t fetchfp;
|
||||||
|
@ -85,13 +86,14 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
|
||||||
* @param type
|
* @param type
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
|
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, SArray* pIndexMap, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) {
|
||||||
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
|
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
|
||||||
|
|
||||||
pSortHandle->type = type;
|
pSortHandle->type = type;
|
||||||
pSortHandle->pageSize = pageSize;
|
pSortHandle->pageSize = pageSize;
|
||||||
pSortHandle->numOfPages = numOfPages;
|
pSortHandle->numOfPages = numOfPages;
|
||||||
pSortHandle->pSortInfo = pSortInfo;
|
pSortHandle->pSortInfo = pSortInfo;
|
||||||
|
pSortHandle->pIndexMap = pIndexMap;
|
||||||
pSortHandle->pDataBlock = createOneDataBlock(pBlock);
|
pSortHandle->pDataBlock = createOneDataBlock(pBlock);
|
||||||
|
|
||||||
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
|
pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
@ -529,7 +531,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) {
|
||||||
pHandle->pDataBlock = createOneDataBlock(pBlock);
|
pHandle->pDataBlock = createOneDataBlock(pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -209,7 +209,7 @@ TEST(testCase, inMem_sort_Test) {
|
||||||
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||||
taosArrayPush(orderInfo, &oi);
|
taosArrayPush(orderInfo, &oi);
|
||||||
|
|
||||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc");
|
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc");
|
||||||
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||||
|
|
||||||
_info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info));
|
_info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info));
|
||||||
|
@ -298,7 +298,7 @@ TEST(testCase, external_mem_sort_Test) {
|
||||||
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
SArray* orderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||||
taosArrayPush(orderInfo, &oi);
|
taosArrayPush(orderInfo, &oi);
|
||||||
|
|
||||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc");
|
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc");
|
||||||
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||||
|
|
||||||
SSortSource* ps = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
|
SSortSource* ps = static_cast<SSortSource*>(taosMemoryCalloc(1, sizeof(SSortSource)));
|
||||||
|
@ -365,7 +365,7 @@ TEST(testCase, ordered_merge_sort_Test) {
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc");
|
SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc");
|
||||||
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock);
|
||||||
tsortSetComparFp(phandle, docomp);
|
tsortSetComparFp(phandle, docomp);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue