add first function partial/merge
This commit is contained in:
parent
6d95262688
commit
45ef3db78a
|
@ -93,8 +93,10 @@ int32_t diffFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t firstFunction(SqlFunctionCtx *pCtx);
|
int32_t firstFunction(SqlFunctionCtx *pCtx);
|
||||||
|
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx);
|
||||||
int32_t lastFunction(SqlFunctionCtx *pCtx);
|
int32_t lastFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
int32_t getFirstLastInfoSize(int32_t resBytes);
|
int32_t getFirstLastInfoSize(int32_t resBytes);
|
||||||
|
|
|
@ -1749,7 +1749,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = firstFunction,
|
.processFunc = firstFunction,
|
||||||
.finalizeFunc = firstLastFinalize,
|
.finalizeFunc = firstLastPartialFinalize,
|
||||||
.combineFunc = firstCombine,
|
.combineFunc = firstCombine,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -1759,7 +1759,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.translateFunc = translateFirstLastMerge,
|
.translateFunc = translateFirstLastMerge,
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = firstFunction,
|
.processFunc = firstFunctionMerge,
|
||||||
.finalizeFunc = firstLastFinalize,
|
.finalizeFunc = firstLastFinalize,
|
||||||
.combineFunc = firstCombine,
|
.combineFunc = firstCombine,
|
||||||
},
|
},
|
||||||
|
|
|
@ -72,6 +72,12 @@ typedef struct STopBotRes {
|
||||||
STopBotResItem* pItems;
|
STopBotResItem* pItems;
|
||||||
} STopBotRes;
|
} STopBotRes;
|
||||||
|
|
||||||
|
typedef struct SFirstLastRes {
|
||||||
|
bool hasResult;
|
||||||
|
int32_t bytes;
|
||||||
|
char buf[];
|
||||||
|
} SFirstLastRes;
|
||||||
|
|
||||||
typedef struct SStddevRes {
|
typedef struct SStddevRes {
|
||||||
double result;
|
double result;
|
||||||
int64_t count;
|
int64_t count;
|
||||||
|
@ -2245,12 +2251,12 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getFirstLastInfoSize(int32_t resBytes) {
|
int32_t getFirstLastInfoSize(int32_t resBytes) {
|
||||||
return resBytes + sizeof(int64_t);
|
return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
|
pEnv->calcMemSize = sizeof(SFirstLastRes) + pNode->node.resType.bytes + sizeof(int64_t);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2274,12 +2280,13 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
|
SFirstLastRes *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
int32_t bytes = pInputCol->info.bytes;
|
int32_t bytes = pInputCol->info.bytes;
|
||||||
|
pInfo->bytes = bytes;
|
||||||
|
|
||||||
// All null data column, return directly.
|
// All null data column, return directly.
|
||||||
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
||||||
|
@ -2297,7 +2304,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (blockDataOrder == TSDB_ORDER_ASC) {
|
if (blockDataOrder == TSDB_ORDER_ASC) {
|
||||||
// filter according to current result firstly
|
// filter according to current result firstly
|
||||||
if (pResInfo->numOfRes > 0) {
|
if (pResInfo->numOfRes > 0) {
|
||||||
TSKEY ts = *(TSKEY*)(buf + bytes);
|
TSKEY ts = *(TSKEY*)(pInfo->buf + bytes);
|
||||||
if (ts < startKey) {
|
if (ts < startKey) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2313,9 +2320,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
|
|
||||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
|
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) {
|
||||||
memcpy(buf, data, bytes);
|
memcpy(pInfo->buf, data, bytes);
|
||||||
*(TSKEY*)(buf + bytes) = cts;
|
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
||||||
|
pInfo->hasResult = true;
|
||||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||||
|
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = 1;
|
||||||
|
@ -2326,7 +2334,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
// in case of descending order time stamp serial, which usually happens as the results of the nest query,
|
// in case of descending order time stamp serial, which usually happens as the results of the nest query,
|
||||||
// all data needs to be check.
|
// all data needs to be check.
|
||||||
if (pResInfo->numOfRes > 0) {
|
if (pResInfo->numOfRes > 0) {
|
||||||
TSKEY ts = *(TSKEY*)(buf + bytes);
|
TSKEY ts = *(TSKEY*)(pInfo->buf + bytes);
|
||||||
if (ts < endKey) {
|
if (ts < endKey) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2342,9 +2350,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
|
|
||||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(buf + bytes) > cts) {
|
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) {
|
||||||
memcpy(buf, data, bytes);
|
memcpy(pInfo->buf, data, bytes);
|
||||||
*(TSKEY*)(buf + bytes) = cts;
|
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
||||||
|
pInfo->hasResult = true;
|
||||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
||||||
pResInfo->numOfRes = 1;
|
pResInfo->numOfRes = 1;
|
||||||
break;
|
break;
|
||||||
|
@ -2422,6 +2431,39 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void firstLastTransferInfo(SFirstLastRes* pInput, SFirstLastRes* pOutput) {
|
||||||
|
pOutput->bytes = pInput->bytes;
|
||||||
|
TSKEY *tsIn = (TSKEY*)(pInput->buf + pInput->bytes);
|
||||||
|
TSKEY *tsOut = (TSKEY*)(pOutput->buf + pInput->bytes);
|
||||||
|
if (pOutput->hasResult) {
|
||||||
|
if (*tsIn > *tsOut) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*tsOut = *tsIn;
|
||||||
|
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
|
||||||
|
pOutput->hasResult = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t firstFunctionMerge(SqlFunctionCtx *pCtx) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
char* data = colDataGetData(pCol, start);
|
||||||
|
SFirstLastRes* pInputInfo = (SFirstLastRes *)varDataVal(data);
|
||||||
|
|
||||||
|
firstLastTransferInfo(pInputInfo, pInfo);
|
||||||
|
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
@ -2435,6 +2477,24 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
|
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes);
|
||||||
|
char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
memcpy(varDataVal(res), pRes, resultBytes);
|
||||||
|
varDataSetLen(res, resultBytes);
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
|
|
||||||
|
taosMemoryFree(res);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
|
|
Loading…
Reference in New Issue