Merge pull request #13812 from taosdata/enh/avg_stddev_split
enh(query): avg function enable splitting
This commit is contained in:
commit
464d72ab38
|
@ -142,6 +142,8 @@ typedef enum EFunctionType {
|
||||||
FUNCTION_TYPE_FIRST_MERGE,
|
FUNCTION_TYPE_FIRST_MERGE,
|
||||||
FUNCTION_TYPE_LAST_PARTIAL,
|
FUNCTION_TYPE_LAST_PARTIAL,
|
||||||
FUNCTION_TYPE_LAST_MERGE,
|
FUNCTION_TYPE_LAST_MERGE,
|
||||||
|
FUNCTION_TYPE_AVG_PARTIAL,
|
||||||
|
FUNCTION_TYPE_AVG_MERGE,
|
||||||
|
|
||||||
// user defined funcion
|
// user defined funcion
|
||||||
FUNCTION_TYPE_UDF = 10000
|
FUNCTION_TYPE_UDF = 10000
|
||||||
|
|
|
@ -55,9 +55,12 @@ int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
int32_t avgFunction(SqlFunctionCtx* pCtx);
|
||||||
|
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx);
|
||||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t avgInvertFunction(SqlFunctionCtx* pCtx);
|
int32_t avgInvertFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
|
int32_t getAvgInfoSize();
|
||||||
|
|
||||||
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
|
|
@ -155,6 +155,35 @@ static int32_t translateSum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateAvgPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(paraType) && !IS_NULL_TYPE(paraType)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = getAvgInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateAvgMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (TSDB_DATA_TYPE_BINARY != paraType) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateWduration(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// pseudo column do not need to check parameters
|
// pseudo column do not need to check parameters
|
||||||
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
|
@ -1517,6 +1546,32 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.finalizeFunc = avgFinalize,
|
.finalizeFunc = avgFinalize,
|
||||||
.invertFunc = avgInvertFunction,
|
.invertFunc = avgInvertFunction,
|
||||||
.combineFunc = avgCombine,
|
.combineFunc = avgCombine,
|
||||||
|
.pPartialFunc = "_avg_partial",
|
||||||
|
.pMergeFunc = "_avg_merge"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_avg_partial",
|
||||||
|
.type = FUNCTION_TYPE_AVG_PARTIAL,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateAvgPartial,
|
||||||
|
.getEnvFunc = getAvgFuncEnv,
|
||||||
|
.initFunc = avgFunctionSetup,
|
||||||
|
.processFunc = avgFunction,
|
||||||
|
.finalizeFunc = avgPartialFinalize,
|
||||||
|
.invertFunc = avgInvertFunction,
|
||||||
|
.combineFunc = avgCombine,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_avg_merge",
|
||||||
|
.type = FUNCTION_TYPE_AVG_MERGE,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateAvgMerge,
|
||||||
|
.getEnvFunc = getAvgFuncEnv,
|
||||||
|
.initFunc = avgFunctionSetup,
|
||||||
|
.processFunc = avgFunctionMerge,
|
||||||
|
.finalizeFunc = avgFinalize,
|
||||||
|
.invertFunc = avgInvertFunction,
|
||||||
|
.combineFunc = avgCombine,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "percentile",
|
.name = "percentile",
|
||||||
|
|
|
@ -51,6 +51,7 @@ typedef struct SAvgRes {
|
||||||
double result;
|
double result;
|
||||||
SSumRes sum;
|
SSumRes sum;
|
||||||
int64_t count;
|
int64_t count;
|
||||||
|
int16_t type; // store the original input type, used in merge function
|
||||||
} SAvgRes;
|
} SAvgRes;
|
||||||
|
|
||||||
typedef struct STuplePos {
|
typedef struct STuplePos {
|
||||||
|
@ -624,6 +625,8 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getAvgInfoSize() { return (int32_t)sizeof(SAvgRes); }
|
||||||
|
|
||||||
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SAvgRes);
|
pEnv->calcMemSize = sizeof(SAvgRes);
|
||||||
return true;
|
return true;
|
||||||
|
@ -642,11 +645,12 @@ bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElem = 0;
|
int32_t numOfElem = 0;
|
||||||
|
|
||||||
// Only the pre-computing information loaded and actual data does not loaded
|
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0];
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
int32_t type = pInput->pData[0]->info.type;
|
||||||
|
|
||||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
pAvgRes->type = type;
|
||||||
|
|
||||||
// computing based on the true data block
|
// computing based on the true data block
|
||||||
SColumnInfoData* pCol = pInput->pData[0];
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
@ -660,95 +664,107 @@ int32_t avgFunction(SqlFunctionCtx* pCtx) {
|
||||||
goto _avg_over;
|
goto _avg_over;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (type) {
|
if (pInput->colDataAggIsSet) {
|
||||||
case TSDB_DATA_TYPE_TINYINT: {
|
numOfElem = numOfRows - pAgg->numOfNull;
|
||||||
int8_t* plist = (int8_t*)pCol->pData;
|
ASSERT(numOfElem >= 0);
|
||||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
|
||||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
pAvgRes->count += numOfElem;
|
||||||
continue;
|
if (IS_INTEGER_TYPE(type)) {
|
||||||
|
pAvgRes->sum.isum += pAgg->sum;
|
||||||
|
} else if (IS_FLOAT_TYPE(type)) {
|
||||||
|
pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum));
|
||||||
|
}
|
||||||
|
} else { // computing based on the true data block
|
||||||
|
switch (type) {
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: {
|
||||||
|
int8_t* plist = (int8_t*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.isum += plist[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElem += 1;
|
break;
|
||||||
pAvgRes->count += 1;
|
|
||||||
pAvgRes->sum.isum += plist[i];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
case TSDB_DATA_TYPE_SMALLINT: {
|
||||||
}
|
int16_t* plist = (int16_t*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_SMALLINT: {
|
numOfElem += 1;
|
||||||
int16_t* plist = (int16_t*)pCol->pData;
|
pAvgRes->count += 1;
|
||||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
pAvgRes->sum.isum += plist[i];
|
||||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
}
|
||||||
continue;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case TSDB_DATA_TYPE_INT: {
|
||||||
|
int32_t* plist = (int32_t*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.isum += plist[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfElem += 1;
|
break;
|
||||||
pAvgRes->count += 1;
|
|
||||||
pAvgRes->sum.isum += plist[i];
|
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_INT: {
|
case TSDB_DATA_TYPE_BIGINT: {
|
||||||
int32_t* plist = (int32_t*)pCol->pData;
|
int64_t* plist = (int64_t*)pCol->pData;
|
||||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.isum += plist[i];
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
numOfElem += 1;
|
|
||||||
pAvgRes->count += 1;
|
|
||||||
pAvgRes->sum.isum += plist[i];
|
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
case TSDB_DATA_TYPE_FLOAT: {
|
||||||
}
|
float* plist = (float*)pCol->pData;
|
||||||
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_BIGINT: {
|
numOfElem += 1;
|
||||||
int64_t* plist = (int64_t*)pCol->pData;
|
pAvgRes->count += 1;
|
||||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
pAvgRes->sum.dsum += plist[i];
|
||||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
numOfElem += 1;
|
|
||||||
pAvgRes->count += 1;
|
|
||||||
pAvgRes->sum.isum += plist[i];
|
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_FLOAT: {
|
case TSDB_DATA_TYPE_DOUBLE: {
|
||||||
float* plist = (float*)pCol->pData;
|
double* plist = (double*)pCol->pData;
|
||||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
||||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
numOfElem += 1;
|
||||||
|
pAvgRes->count += 1;
|
||||||
|
pAvgRes->sum.dsum += plist[i];
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
numOfElem += 1;
|
|
||||||
pAvgRes->count += 1;
|
|
||||||
pAvgRes->sum.dsum += plist[i];
|
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_DATA_TYPE_DOUBLE: {
|
|
||||||
double* plist = (double*)pCol->pData;
|
|
||||||
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) {
|
|
||||||
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
numOfElem += 1;
|
|
||||||
pAvgRes->count += 1;
|
|
||||||
pAvgRes->sum.dsum += plist[i];
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_avg_over:
|
_avg_over:
|
||||||
|
@ -757,6 +773,37 @@ _avg_over:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) {
|
||||||
|
pOutput->type = pInput->type;
|
||||||
|
if (IS_INTEGER_TYPE(pOutput->type)) {
|
||||||
|
pOutput->sum.isum += pInput->sum.isum;
|
||||||
|
} else {
|
||||||
|
pOutput->sum.dsum += pInput->sum.dsum;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOutput->count += pInput->count;
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
char* data = colDataGetData(pCol, start);
|
||||||
|
SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data);
|
||||||
|
|
||||||
|
avgTransferInfo(pInputInfo, pInfo);
|
||||||
|
|
||||||
|
SET_VAL(GET_RES_INFO(pCtx), 1, 1);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
#define LIST_AVG_N(sumT, T) \
|
#define LIST_AVG_N(sumT, T) \
|
||||||
do { \
|
do { \
|
||||||
T* plist = (T*)pCol->pData; \
|
T* plist = (T*)pCol->pData; \
|
||||||
|
@ -841,8 +888,8 @@ int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
|
||||||
int32_t type = pInput->pData[0]->info.type;
|
|
||||||
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t type = pAvgRes->type;
|
||||||
|
|
||||||
if (IS_INTEGER_TYPE(type)) {
|
if (IS_INTEGER_TYPE(type)) {
|
||||||
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
|
pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count);
|
||||||
|
@ -858,6 +905,24 @@ int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
|
int32_t resultBytes = getAvgInfoSize();
|
||||||
|
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
memcpy(varDataVal(res), pInfo, resultBytes);
|
||||||
|
varDataSetLen(res, resultBytes);
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
|
|
||||||
|
taosMemoryFree(res);
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
|
||||||
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
return FUNC_DATA_REQUIRED_STATIS_LOAD;
|
||||||
}
|
}
|
||||||
|
@ -3226,11 +3291,10 @@ int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||||
SSpreadInfo* pInputInfo;
|
|
||||||
|
|
||||||
int32_t start = pInput->startRowIndex;
|
int32_t start = pInput->startRowIndex;
|
||||||
char* data = colDataGetData(pCol, start);
|
char* data = colDataGetData(pCol, start);
|
||||||
pInputInfo = (SSpreadInfo*)varDataVal(data);
|
SSpreadInfo* pInputInfo = (SSpreadInfo*)varDataVal(data);
|
||||||
|
|
||||||
spreadTransferInfo(pInputInfo, pInfo);
|
spreadTransferInfo(pInputInfo, pInfo);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue