Merge pull request #11460 from taosdata/feature/TD-14243
feat(query): add timediff function
This commit is contained in:
commit
4e9a9318c5
|
@ -31,8 +31,8 @@ pNode will be freed in API;
|
|||
*/
|
||||
int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes);
|
||||
|
||||
/*
|
||||
pDst need to freed in caller
|
||||
/*
|
||||
pDst need to freed in caller
|
||||
*/
|
||||
int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst);
|
||||
|
||||
|
@ -77,6 +77,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp
|
|||
int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
||||
bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||
|
||||
|
|
|
@ -423,6 +423,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.sprocessFunc = timeTruncateFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "timediff",
|
||||
.type = FUNCTION_TYPE_TIMEDIFF,
|
||||
.classification = FUNC_MGT_SCALAR_FUNC,
|
||||
.checkFunc = checkAndGetResultType,
|
||||
.getEnvFunc = NULL,
|
||||
.initFunc = NULL,
|
||||
.sprocessFunc = timeDiffFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
.name = "_rowts",
|
||||
.type = FUNCTION_TYPE_ROWTS,
|
||||
|
@ -651,6 +661,10 @@ int32_t checkAndGetResultType(SFunctionNode* pFunc) {
|
|||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes, .type = TSDB_DATA_TYPE_TIMESTAMP};
|
||||
break;
|
||||
}
|
||||
case FUNCTION_TYPE_TIMEDIFF: {
|
||||
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
|
||||
break;
|
||||
}
|
||||
|
||||
case FUNCTION_TYPE_TBNAME: {
|
||||
// todo
|
||||
|
|
|
@ -1104,6 +1104,144 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
if (inputNum != 2 && inputNum != 3) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t timePrec = GET_PARAM_PRECISON(&pInput[0]);
|
||||
int64_t timeUnit = -1, timeVal[2] = {0};
|
||||
if (inputNum == 3) {
|
||||
if (GET_PARAM_TYPE(&pInput[2]) != TSDB_DATA_TYPE_BIGINT) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
GET_TYPED_DATA(timeUnit, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
|
||||
}
|
||||
|
||||
char *input[2];
|
||||
for (int32_t k = 0; k < 2; ++k) {
|
||||
int32_t type = GET_PARAM_TYPE(&pInput[k]);
|
||||
if (type != TSDB_DATA_TYPE_BIGINT && type != TSDB_DATA_TYPE_TIMESTAMP &&
|
||||
type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
input[k] = pInput[k].columnData->pData + pInput[k].columnData->varmeta.offset[0];
|
||||
} else {
|
||||
input[k] = pInput[k].columnData->pData;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
|
||||
for (int32_t k = 0; k < 2; ++k) {
|
||||
if (colDataIsNull_s(pInput[0].columnData, i)) {
|
||||
colDataAppendNULL(pOutput->columnData, i);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t type = GET_PARAM_TYPE(&pInput[k]);
|
||||
if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */
|
||||
convertStringToTimestamp(type, input[k], TSDB_TIME_PRECISION_NANO, &timeVal[k]);
|
||||
} else if (type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_TIMESTAMP) { /* unix timestamp or ts column*/
|
||||
GET_TYPED_DATA(timeVal[k], int64_t, type, input[k]);
|
||||
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
||||
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
||||
int64_t timeValSec = timeVal[k] / factor;
|
||||
if (timeValSec < 1000000000) {
|
||||
timeVal[k] = timeValSec;
|
||||
}
|
||||
}
|
||||
|
||||
char buf[20] = {0};
|
||||
NUM_TO_STRING(TSDB_DATA_TYPE_BIGINT, &timeVal[k], sizeof(buf), buf);
|
||||
int32_t tsDigits = (int32_t)strlen(buf);
|
||||
if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) {
|
||||
timeVal[k] = timeVal[k] * 1000000000;
|
||||
} else if (tsDigits == TSDB_TIME_PRECISION_MILLI_DIGITS) {
|
||||
timeVal[k] = timeVal[k] * 1000000;
|
||||
} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) {
|
||||
timeVal[k] = timeVal[k] * 1000;
|
||||
} else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) {
|
||||
timeVal[k] = timeVal[k];
|
||||
}
|
||||
}
|
||||
|
||||
if (pInput[k].numOfRows != 1) {
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
input[k] += varDataTLen(input[k]);
|
||||
} else {
|
||||
input[k] += tDataTypes[type].bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int64_t result = (timeVal[0] >= timeVal[1]) ? (timeVal[0] - timeVal[1]) :
|
||||
(timeVal[1] - timeVal[0]);
|
||||
|
||||
if (timeUnit < 0) { // if no time unit given use db precision
|
||||
switch(timePrec) {
|
||||
case TSDB_TIME_PRECISION_MILLI: {
|
||||
result = result / 1000000;
|
||||
break;
|
||||
}
|
||||
case TSDB_TIME_PRECISION_MICRO: {
|
||||
result = result / 1000;
|
||||
break;
|
||||
}
|
||||
case TSDB_TIME_PRECISION_NANO: {
|
||||
result = result / 1;
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 :
|
||||
(timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000);
|
||||
timeUnit = timeUnit * 1000 / factor;
|
||||
switch(timeUnit) {
|
||||
case 0: { /* 1u */
|
||||
result = result / 1000;
|
||||
break;
|
||||
}
|
||||
case 1: { /* 1a */
|
||||
result = result / 1000000;
|
||||
break;
|
||||
}
|
||||
case 1000: { /* 1s */
|
||||
result = result / 1000000000;
|
||||
break;
|
||||
}
|
||||
case 60000: { /* 1m */
|
||||
result = result / 1000000000 / 60;
|
||||
break;
|
||||
}
|
||||
case 3600000: { /* 1h */
|
||||
result = result / 1000000000 / 3600;
|
||||
break;
|
||||
}
|
||||
case 86400000: { /* 1d */
|
||||
result = result / 1000000000 / 86400;
|
||||
break;
|
||||
}
|
||||
case 604800000: { /* 1w */
|
||||
result = result / 1000000000 / 604800;
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
colDataAppend(pOutput->columnData, i, (char *)&result, false);
|
||||
}
|
||||
|
||||
pOutput->numOfRows = pInput->numOfRows;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return doScalarFunctionUnique(pInput, inputNum, pOutput, atan);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue