refactor: do some internal refactor.
This commit is contained in:
parent
9b3d7aa410
commit
5c60c6046d
|
@ -266,6 +266,20 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
int32_t paraLen = LIST_LENGTH(pFunc->pParameterList);
|
||||
if (paraLen == 0 || paraLen > 2) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
|
||||
SExprNode* p1 = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
|
||||
if (!IS_NUMERIC_TYPE(p1->resType.type)) {
|
||||
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
}
|
||||
pFunc->node.resType = p1->resType;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
|
||||
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||
|
@ -613,7 +627,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.name = "diff",
|
||||
.type = FUNCTION_TYPE_DIFF,
|
||||
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
|
||||
.translateFunc = translateInOutNum,
|
||||
.translateFunc = translateDiff,
|
||||
.getEnvFunc = getDiffFuncEnv,
|
||||
.initFunc = diffFunctionSetup,
|
||||
.processFunc = diffFunction,
|
||||
|
|
|
@ -75,7 +75,7 @@ typedef struct SPercentileInfo {
|
|||
typedef struct SDiffInfo {
|
||||
bool hasPrev;
|
||||
bool includeNull;
|
||||
bool ignoreNegative;
|
||||
bool ignoreNegative; // replace the ignore with case when
|
||||
bool firstOutput;
|
||||
union {
|
||||
int64_t i64;
|
||||
|
@ -1208,32 +1208,126 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
|
|||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
pDiffInfo->hasPrev = false;
|
||||
pDiffInfo->prev.i64 = 0;
|
||||
pDiffInfo->ignoreNegative = false; // TODO set correct param
|
||||
pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param
|
||||
pDiffInfo->includeNull = false;
|
||||
pDiffInfo->firstOutput = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
static void doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv) {
|
||||
switch(type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
pDiffInfo->prev.i64 = *(int8_t*) pv; break;
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
pDiffInfo->prev.i64 = *(int32_t*) pv; break;
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
pDiffInfo->prev.i64 = *(int16_t*) pv; break;
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
pDiffInfo->prev.i64 = *(int64_t*) pv; break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
pDiffInfo->prev.d64 = *(float *) pv; break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
pDiffInfo->prev.d64 = *(double*) pv; break;
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos, int32_t order) {
|
||||
int32_t factor = (order == TSDB_ORDER_ASC)? 1:-1;
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
int32_t v = *(int32_t*)pv;
|
||||
int32_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
} else {
|
||||
colDataAppendInt32(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t v = *(int8_t*)pv;
|
||||
int8_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
} else {
|
||||
colDataAppendInt8(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t v = *(int16_t*)pv;
|
||||
int16_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
} else {
|
||||
colDataAppendInt16(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
int64_t v = *(int64_t*)pv;
|
||||
int64_t delta = factor*(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
} else {
|
||||
colDataAppendInt64(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.i64 = v;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float v = *(float*)pv;
|
||||
float delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
} else {
|
||||
colDataAppendFloat(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.d64 = v;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double v = *(double*)pv;
|
||||
double delta = factor*(v - pDiffInfo->prev.d64); // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
} else {
|
||||
colDataAppendDouble(pOutput, pos, &delta);
|
||||
}
|
||||
pDiffInfo->prev.d64 = v;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
||||
SInputColumnInfoData* pInput = &pCtx->input;
|
||||
|
||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||
|
||||
bool isFirstBlock = (pDiffInfo->hasPrev == false);
|
||||
int32_t numOfElems = 0;
|
||||
|
||||
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
|
||||
int32_t numOfElems = 0;
|
||||
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
|
||||
int32_t startOffset = pCtx->offset;
|
||||
switch (pInputCol->info.type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
|
||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
int32_t pos = startOffset + (isFirstBlock ? (numOfElems - 1) : numOfElems);
|
||||
int32_t pos = startOffset + numOfElems;
|
||||
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
if (pDiffInfo->includeNull) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
|
@ -1246,210 +1340,60 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) {
|
|||
continue;
|
||||
}
|
||||
|
||||
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
|
||||
if (pDiffInfo->hasPrev) {
|
||||
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
} else {
|
||||
colDataAppendInt32(pOutput, pos, &delta);
|
||||
}
|
||||
char* pv = colDataGetData(pInputCol, i);
|
||||
|
||||
if (pDiffInfo->hasPrev) {
|
||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
||||
if (pTsOutput != NULL) {
|
||||
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
|
||||
}
|
||||
|
||||
numOfElems++;
|
||||
} else {
|
||||
doSetPrevVal(pDiffInfo, pInputCol->info.type, pv);
|
||||
}
|
||||
|
||||
pDiffInfo->prev.i64 = v;
|
||||
pDiffInfo->hasPrev = true;
|
||||
numOfElems++;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
int32_t v = *(int32_t*)colDataGetData(pInputCol, i);
|
||||
int32_t pos = startOffset + numOfElems;
|
||||
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
if (pDiffInfo->includeNull) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
if (tsList != NULL) {
|
||||
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
|
||||
}
|
||||
|
||||
numOfElems += 1;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
char* pv = colDataGetData(pInputCol, i);
|
||||
|
||||
// there is a row of previous data block to be handled in the first place.
|
||||
if (pDiffInfo->hasPrev) {
|
||||
int32_t delta = (int32_t)(pDiffInfo->prev.i64 - v); // direct previous may be null
|
||||
if (delta < 0 && pDiffInfo->ignoreNegative) {
|
||||
colDataSetNull_f(pOutput->nullbitmap, pos);
|
||||
} else {
|
||||
colDataAppendInt32(pOutput, pos, &delta);
|
||||
}
|
||||
|
||||
doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order);
|
||||
if (pTsOutput != NULL) {
|
||||
colDataAppendInt64(pTsOutput, pos, &pDiffInfo->prevTs);
|
||||
}
|
||||
pDiffInfo->hasPrev = false;
|
||||
}
|
||||
|
||||
// it is not the last row of current block
|
||||
if (i < pInput->numOfRows + pInput->startRowIndex - 1) {
|
||||
int32_t next = *(int32_t*)colDataGetData(pInputCol, i + 1);
|
||||
|
||||
int32_t delta = v - next; // direct previous may be null
|
||||
colDataAppendInt32(pOutput, pos, &delta);
|
||||
|
||||
if (pTsOutput != NULL) {
|
||||
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
|
||||
}
|
||||
numOfElems++;
|
||||
} else {
|
||||
pDiffInfo->prev.i64 = v;
|
||||
doSetPrevVal(pDiffInfo, pInputCol->info.type, pv);
|
||||
}
|
||||
|
||||
pDiffInfo->hasPrev = true;
|
||||
if (pTsOutput != NULL) {
|
||||
pDiffInfo->prevTs = tsList[i];
|
||||
}
|
||||
pDiffInfo->hasPrev = true;
|
||||
}
|
||||
numOfElems++;
|
||||
}
|
||||
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
|
||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
|
||||
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t v = 0;
|
||||
if (pDiffInfo->hasPrev) {
|
||||
v = *(int64_t*)colDataGetData(pInputCol, i);
|
||||
int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null
|
||||
if (pDiffInfo->ignoreNegative) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// *(pOutput++) = delta;
|
||||
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||
//
|
||||
// pOutput += 1;
|
||||
// pTimestamp += 1;
|
||||
}
|
||||
|
||||
pDiffInfo->prev.i64 = v;
|
||||
pDiffInfo->hasPrev = true;
|
||||
numOfElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
#if 0
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
double *pData = (double *)data;
|
||||
double *pOutput = (double *)pCtx->pOutput;
|
||||
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pDiffInfo->hasPrev) { // initial value is not set yet
|
||||
SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null
|
||||
*pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
|
||||
pDiffInfo->d64Prev = pData[i];
|
||||
pDiffInfo->hasPrev = true;
|
||||
numOfElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
float *pData = (float *)data;
|
||||
float *pOutput = (float *)pCtx->pOutput;
|
||||
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pDiffInfo->hasPrev) { // initial value is not set yet
|
||||
*pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null
|
||||
*pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
|
||||
pDiffInfo->d64Prev = pData[i];
|
||||
pDiffInfo->hasPrev = true;
|
||||
numOfElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
int16_t *pData = (int16_t *)data;
|
||||
int16_t *pOutput = (int16_t *)pCtx->pOutput;
|
||||
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((const char*) &pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pDiffInfo->hasPrev) { // initial value is not set yet
|
||||
*pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
|
||||
*pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
|
||||
pDiffInfo->i64Prev = pData[i];
|
||||
pDiffInfo->hasPrev = true;
|
||||
numOfElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
int8_t *pData = (int8_t *)data;
|
||||
int8_t *pOutput = (int8_t *)pCtx->pOutput;
|
||||
|
||||
for (; i < pCtx->size && i >= 0; i += step) {
|
||||
if (pCtx->hasNull && isNull((char *)&pData[i], pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
if ((pDiffInfo->ignoreNegative) && (pData[i] < 0)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pDiffInfo->hasPrev) { // initial value is not set yet
|
||||
*pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
|
||||
*pTimestamp = (tsList != NULL)? tsList[i]:0;
|
||||
pOutput += 1;
|
||||
pTimestamp += 1;
|
||||
}
|
||||
|
||||
pDiffInfo->i64Prev = pData[i];
|
||||
pDiffInfo->hasPrev = true;
|
||||
numOfElems++;
|
||||
}
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
default:
|
||||
break;
|
||||
// qError("error input type");
|
||||
}
|
||||
|
||||
// initial value is not set yet
|
||||
if (numOfElems <= 0) {
|
||||
return 0;
|
||||
} else {
|
||||
return (isFirstBlock) ? numOfElems - 1 : numOfElems;
|
||||
}
|
||||
return numOfElems;
|
||||
}
|
||||
|
||||
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||
|
|
Loading…
Reference in New Issue