fix saveTuplePos
This commit is contained in:
parent
003002af1d
commit
d5c92d3788
|
@ -91,7 +91,6 @@ typedef struct STuplePos {
|
||||||
};
|
};
|
||||||
STupleKey streamTupleKey;
|
STupleKey streamTupleKey;
|
||||||
};
|
};
|
||||||
bool isValid;
|
|
||||||
} STuplePos;
|
} STuplePos;
|
||||||
|
|
||||||
typedef struct SFirstLastRes {
|
typedef struct SFirstLastRes {
|
||||||
|
|
|
@ -44,9 +44,10 @@ typedef struct SMinmaxResInfo {
|
||||||
bool nullTupleSaved;
|
bool nullTupleSaved;
|
||||||
int16_t type;
|
int16_t type;
|
||||||
} SMinmaxResInfo;
|
} SMinmaxResInfo;
|
||||||
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc);
|
|
||||||
|
|
||||||
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock);
|
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems);
|
||||||
|
|
||||||
|
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||||
int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||||
const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
|
const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
|
||||||
|
|
||||||
|
|
|
@ -757,13 +757,21 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t minFunction(SqlFunctionCtx* pCtx) {
|
int32_t minFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = doMinMaxHelper(pCtx, 1);
|
int32_t numOfElems = 0;
|
||||||
|
int32_t code = doMinMaxHelper(pCtx, 1, &numOfElems);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
|
int32_t numOfElems = 0;
|
||||||
|
int32_t code = doMinMaxHelper(pCtx, 0, &numOfElems);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2021,10 +2029,9 @@ static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowI
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pInfo->hasResult) {
|
if (!pInfo->hasResult) {
|
||||||
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock);
|
int32_t code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
||||||
if (!pInfo->pos.isValid) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
return code;
|
||||||
return terrno;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
||||||
|
@ -2804,8 +2811,8 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
|
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
|
||||||
|
|
||||||
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
|
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
|
||||||
|
|
||||||
|
@ -2827,11 +2834,17 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
char* data = colDataGetData(pCol, i);
|
char* data = colDataGetData(pCol, i);
|
||||||
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
|
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
|
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
|
||||||
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
pRes->nullTupleSaved = true;
|
pRes->nullTupleSaved = true;
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2855,11 +2868,17 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
char* data = colDataGetData(pCol, i);
|
char* data = colDataGetData(pCol, i);
|
||||||
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
|
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
|
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
|
||||||
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
pRes->nullTupleSaved = true;
|
pRes->nullTupleSaved = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2899,7 +2918,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
|
||||||
return (val1->v.d > val2->v.d) ? 1 : -1;
|
return (val1->v.d > val2->v.d) ? 1 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
|
||||||
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
|
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
|
||||||
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
|
||||||
|
|
||||||
|
@ -2917,7 +2936,10 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
|
|
||||||
// save the data of this tuple
|
// save the data of this tuple
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock);
|
int32_t code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
#ifdef BUF_PAGE_DEBUG
|
#ifdef BUF_PAGE_DEBUG
|
||||||
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
|
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
|
||||||
|
@ -2952,6 +2974,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
topBotResComparFn, NULL, !isTopQuery);
|
topBotResComparFn, NULL, !isTopQuery);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -2991,7 +3015,8 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key) {
|
static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key,
|
||||||
|
STuplePos* pPos) {
|
||||||
STuplePos p = {0};
|
STuplePos p = {0};
|
||||||
if (pHandle->pBuf != NULL) {
|
if (pHandle->pBuf != NULL) {
|
||||||
SFilePage* pPage = NULL;
|
SFilePage* pPage = NULL;
|
||||||
|
@ -2999,26 +3024,29 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
|
||||||
if (pHandle->currentPage == -1) {
|
if (pHandle->currentPage == -1) {
|
||||||
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
|
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
|
||||||
if (pPage == NULL) {
|
if (pPage == NULL) {
|
||||||
return p;
|
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
pPage->num = sizeof(SFilePage);
|
pPage->num = sizeof(SFilePage);
|
||||||
} else {
|
} else {
|
||||||
pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
|
pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
|
||||||
if (pPage == NULL) {
|
if (pPage == NULL) {
|
||||||
return p;
|
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
|
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
|
||||||
// current page is all used, let's prepare a new buffer page
|
// current page is all used, let's prepare a new buffer page
|
||||||
releaseBufPage(pHandle->pBuf, pPage);
|
releaseBufPage(pHandle->pBuf, pPage);
|
||||||
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
|
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
|
||||||
if (pPage == NULL) {
|
if (pPage == NULL) {
|
||||||
return p;
|
terrno = TSDB_CODE_NO_AVAIL_DISK;
|
||||||
|
return terrno;
|
||||||
}
|
}
|
||||||
pPage->num = sizeof(SFilePage);
|
pPage->num = sizeof(SFilePage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num, .isValid = true};
|
p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num};
|
||||||
memcpy(pPage->data + pPage->num, pBuf, length);
|
memcpy(pPage->data + pPage->num, pBuf, length);
|
||||||
|
|
||||||
pPage->num += length;
|
pPage->num += length;
|
||||||
|
@ -3032,10 +3060,11 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
|
||||||
p.streamTupleKey = key;
|
p.streamTupleKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
return p;
|
*pPos = p;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) {
|
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||||
prepareBuf(pCtx);
|
prepareBuf(pCtx);
|
||||||
|
|
||||||
STupleKey key;
|
STupleKey key;
|
||||||
|
@ -3050,7 +3079,7 @@ STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
|
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
|
||||||
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key);
|
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key, pPos);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) {
|
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) {
|
||||||
|
@ -4490,12 +4519,15 @@ static void sampleAssignResult(SSampleInfo* pInfo, char* data, int32_t index) {
|
||||||
assignVal(pInfo->data + index * pInfo->colBytes, data, pInfo->colBytes, pInfo->colType);
|
assignVal(pInfo->data + index * pInfo->colBytes, data, pInfo->colBytes, pInfo->colType);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* data, int32_t index) {
|
static int32_t doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* data, int32_t index) {
|
||||||
pInfo->totalPoints++;
|
pInfo->totalPoints++;
|
||||||
if (pInfo->numSampled < pInfo->samples) {
|
if (pInfo->numSampled < pInfo->samples) {
|
||||||
sampleAssignResult(pInfo, data, pInfo->numSampled);
|
sampleAssignResult(pInfo, data, pInfo->numSampled);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pInfo->numSampled++;
|
pInfo->numSampled++;
|
||||||
} else {
|
} else {
|
||||||
|
@ -4507,6 +4539,8 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
@ -4522,11 +4556,17 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
doReservoirSample(pCtx, pInfo, data, i);
|
int32_t code = doReservoirSample(pCtx, pInfo, data, i);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
|
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
|
||||||
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
pInfo->nullTupleSaved = true;
|
pInfo->nullTupleSaved = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4820,7 +4860,7 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) {
|
static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) {
|
||||||
int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
|
int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
|
||||||
SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
|
SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
|
||||||
if (pHashItem == NULL) {
|
if (pHashItem == NULL) {
|
||||||
|
@ -4830,7 +4870,10 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
||||||
pItem->count += 1;
|
pItem->count += 1;
|
||||||
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pItem->tuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
|
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
|
||||||
|
@ -4841,6 +4884,8 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx,
|
||||||
updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos));
|
updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t modeFunction(SqlFunctionCtx* pCtx) {
|
int32_t modeFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
@ -4861,7 +4906,10 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
doModeAdd(pInfo, i, pCtx, data);
|
int32_t code = doModeAdd(pInfo, i, pCtx, data);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
|
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
|
||||||
taosHashCleanup(pInfo->pHash);
|
taosHashCleanup(pInfo->pHash);
|
||||||
|
@ -4870,7 +4918,10 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
|
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
|
||||||
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
pInfo->nullTupleSaved = true;
|
pInfo->nullTupleSaved = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -700,7 +700,7 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
@ -746,7 +746,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -760,7 +763,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -774,7 +780,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -788,7 +797,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -804,7 +816,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -825,7 +840,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes);
|
memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes);
|
||||||
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
numOfElems = 1;
|
numOfElems = 1;
|
||||||
|
@ -889,9 +907,13 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
|
|
||||||
_over:
|
_over:
|
||||||
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) {
|
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) {
|
||||||
pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
pBuf->nullTupleSaved = true;
|
pBuf->nullTupleSaved = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return numOfElems;
|
*nElems = numOfElems;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue