fix(query): set correct source status in merge sort.
This commit is contained in:
parent
1b907e847c
commit
9c6a9f1c92
|
@ -392,10 +392,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
||||||
|
|
||||||
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||||
|
|
||||||
uint32_t numOfSources = taosArrayGetSize(pSortInfo);
|
// one additional is reserved for merged result.
|
||||||
numOfSources = TMAX(4, numOfSources);
|
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1);
|
||||||
|
|
||||||
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
|
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
||||||
|
|
|
@ -195,6 +195,11 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
||||||
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
|
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) {
|
||||||
|
pSource->src.rowIndex = -1;
|
||||||
|
++pHandle->numOfCompletedSources;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) {
|
static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) {
|
||||||
cmpParam->pSources = taosArrayGet(pSources, startIndex);
|
cmpParam->pSources = taosArrayGet(pSources, startIndex);
|
||||||
cmpParam->numOfSources = (endIndex - startIndex + 1);
|
cmpParam->numOfSources = (endIndex - startIndex + 1);
|
||||||
|
@ -205,8 +210,10 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
|
||||||
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
||||||
SSortSource* pSource = cmpParam->pSources[i];
|
SSortSource* pSource = cmpParam->pSources[i];
|
||||||
|
|
||||||
|
// set current source is done
|
||||||
if (taosArrayGetSize(pSource->pageIdList) == 0) {
|
if (taosArrayGetSize(pSource->pageIdList) == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
setCurrentSourceIsDone(pSource, pHandle);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
|
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
|
||||||
|
@ -233,10 +240,9 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
|
||||||
SSortSource* pSource = cmpParam->pSources[i];
|
SSortSource* pSource = cmpParam->pSources[i];
|
||||||
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
|
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
|
||||||
|
|
||||||
// set current source id done
|
// set current source is done
|
||||||
if (pSource->src.pBlock == NULL) {
|
if (pSource->src.pBlock == NULL) {
|
||||||
pSource->src.rowIndex = -1;
|
setCurrentSourceIsDone(pSource, pHandle);
|
||||||
++pHandle->numOfCompletedSources;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -946,8 +946,7 @@ int32_t taosGetFqdn(char *fqdn) {
|
||||||
#endif // __APPLE__
|
#endif // __APPLE__
|
||||||
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
|
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
|
||||||
if (!result) {
|
if (!result) {
|
||||||
// printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
|
fprintf(stderr,"failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
|
||||||
assert(0);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue