commit
8c042f0eef
|
@ -108,7 +108,7 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsortClearOrderdSource(SArray *pOrderedSource) {
|
void tsortClearOrderdSource(SArray* pOrderedSource) {
|
||||||
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
|
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
|
||||||
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
|
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
|
||||||
if (NULL == *pSource) {
|
if (NULL == *pSource) {
|
||||||
|
@ -121,6 +121,12 @@ void tsortClearOrderdSource(SArray *pOrderedSource) {
|
||||||
if ((*pSource)->param && !(*pSource)->onlyRef) {
|
if ((*pSource)->param && !(*pSource)->onlyRef) {
|
||||||
taosMemoryFree((*pSource)->param);
|
taosMemoryFree((*pSource)->param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
|
||||||
|
blockDataDestroy((*pSource)->src.pBlock);
|
||||||
|
(*pSource)->src.pBlock = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(*pSource);
|
taosMemoryFreeClear(*pSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -629,9 +635,9 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
|
|
||||||
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
||||||
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
|
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
|
||||||
SSortSource* source = *pSource;
|
SSortSource* source = *pSource;
|
||||||
*pSource = NULL;
|
*pSource = NULL;
|
||||||
|
|
||||||
tsortClearOrderdSource(pHandle->pOrderedSource);
|
tsortClearOrderdSource(pHandle->pOrderedSource);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -659,6 +665,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
if (source->param && !source->onlyRef) {
|
if (source->param && !source->onlyRef) {
|
||||||
taosMemoryFree(source->param);
|
taosMemoryFree(source->param);
|
||||||
}
|
}
|
||||||
|
if (!source->onlyRef && source->src.pBlock) {
|
||||||
|
blockDataDestroy(source->src.pBlock);
|
||||||
|
source->src.pBlock = NULL;
|
||||||
|
}
|
||||||
taosMemoryFree(source);
|
taosMemoryFree(source);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -672,6 +682,10 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
if (source->param && !source->onlyRef) {
|
if (source->param && !source->onlyRef) {
|
||||||
taosMemoryFree(source->param);
|
taosMemoryFree(source->param);
|
||||||
}
|
}
|
||||||
|
if (!source->onlyRef && source->src.pBlock) {
|
||||||
|
blockDataDestroy(source->src.pBlock);
|
||||||
|
source->src.pBlock = NULL;
|
||||||
|
}
|
||||||
taosMemoryFree(source);
|
taosMemoryFree(source);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -849,8 +863,8 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
||||||
SSortExecInfo info = {0};
|
SSortExecInfo info = {0};
|
||||||
|
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
info.sortMethod = SORT_QSORT_T; // by default
|
info.sortMethod = SORT_QSORT_T; // by default
|
||||||
info.sortBuffer = 2 * 1048576; // 2mb by default
|
info.sortBuffer = 2 * 1048576; // 2mb by default
|
||||||
} else {
|
} else {
|
||||||
info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
|
info.sortBuffer = pHandle->pageSize * pHandle->numOfPages;
|
||||||
info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
|
info.sortMethod = pHandle->inMemSort ? SORT_QSORT_T : SORT_SPILLED_MERGE_SORT_T;
|
||||||
|
|
|
@ -100,14 +100,7 @@ typedef void* queue[2];
|
||||||
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
|
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
|
||||||
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
|
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
|
||||||
|
|
||||||
#define TRANS_MAGIC_NUM 0x5f375a86
|
#define TRANS_MAGIC_NUM 0x5f375a86
|
||||||
|
|
||||||
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
|
||||||
|
|
||||||
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512
|
|
||||||
|
|
||||||
#define TRANS_MAGIC_NUM 0x5f375a86
|
|
||||||
|
|
||||||
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
#define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0)
|
||||||
|
|
||||||
typedef SRpcMsg STransMsg;
|
typedef SRpcMsg STransMsg;
|
||||||
|
|
Loading…
Reference in New Issue