From cec0f0d17163f6ed89043fe61f84a9f64c32bd49 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 20 Apr 2022 10:25:35 +0800 Subject: [PATCH] refactor(query): refactor scalar function TD-14802 --- source/libs/scalar/src/sclfunc.c | 87 ++++++++++++-------------------- 1 file changed, 32 insertions(+), 55 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 5e8eb805e3..d696e7074f 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -304,7 +304,7 @@ static int32_t doLengthFunction(SScalarParam *pInput, int32_t inputNum, SScalarP continue; } - char *in = pInputData->pData + pInputData->varmeta.offset[i]; + char *in = colDataGetData(pInputData, i); out[i] = lenFn(in, type); } @@ -395,11 +395,8 @@ int32_t concatFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu int16_t dataLen = 0; for (int32_t i = 0; i < inputNum; ++i) { - if (pInput[i].numOfRows == 1) { - input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; - } else { - input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[k]; - } + int32_t rowIdx = (pInput[i].numOfRows == 1) ? 0 : k; + input[i] = colDataGetData(pInputData[i], rowIdx); ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen); if (ret != TSDB_CODE_SUCCESS) { @@ -473,11 +470,8 @@ int32_t concatWsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p break; } - if (pInput[i].numOfRows == 1) { - input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[0]; - } else { - input[i] = pInputData[i]->pData + pInputData[i]->varmeta.offset[k]; - } + int32_t rowIdx = (pInput[i].numOfRows == 1) ? 0 : k; + input[i] = colDataGetData(pInputData[i], rowIdx); ret = concatCopyHelper(input[i], output, hasNchar, GET_PARAM_TYPE(&pInput[i]), &dataLen); if (ret != TSDB_CODE_SUCCESS) { @@ -534,7 +528,7 @@ static int32_t doCaseConvFunction(SScalarParam *pInput, int32_t inputNum, SScala continue; } - char *input = pInputData->pData + pInputData->varmeta.offset[i]; + char *input = colDataGetData(pInput[0].columnData, i); int32_t len = varDataLen(input); if (type == TSDB_DATA_TYPE_VARCHAR) { for (int32_t j = 0; j < len; ++j) { @@ -575,8 +569,8 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar colDataAppendNULL(pOutputData, i); continue; } - char *input = pInputData->pData + pInputData->varmeta.offset[i]; + char *input = colDataGetData(pInput[0].columnData, i); int32_t len = varDataLen(input); int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE; trimFn(input, output, type, charLen); @@ -615,19 +609,16 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu SColumnInfoData *pInputData = pInput->columnData; SColumnInfoData *pOutputData = pOutput->columnData; - char *input = pInputData->pData + pInputData->varmeta.offset[0]; - char *output = NULL; - int32_t outputLen = pInputData->varmeta.length * pInput->numOfRows; char *outputBuf = taosMemoryCalloc(outputLen, 1); - output = outputBuf; + char *output = outputBuf; for (int32_t i = 0; i < pInput->numOfRows; ++i) { if (colDataIsNull_s(pInputData, i)) { colDataAppendNULL(pOutputData, i); continue; } - + char *input = colDataGetData(pInput[0].columnData, i); int32_t len = varDataLen(input); int32_t startPosBytes; @@ -646,7 +637,6 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu varDataSetLen(output, resLen); colDataAppend(pOutputData, i , output, false); - input += varDataTLen(input); output += varDataTLen(output); } @@ -799,13 +789,13 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t type = GET_PARAM_TYPE(pInput); - char *input = pInput[0].columnData->pData; for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { colDataAppendNULL(pOutput->columnData, i); continue; } + char *input = colDataGetData(pInput[0].columnData, i); char fraction[20] = {0}; bool hasFraction = false; NUM_TO_STRING(type, input, sizeof(fraction), fraction); @@ -822,7 +812,8 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { timeVal = timeVal / (1000 * 1000 * 1000); } else { - assert(0); + colDataAppendNULL(pOutput->columnData, i); + continue; } hasFraction = true; memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS); @@ -852,7 +843,6 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * varDataSetLen(buf, len); colDataAppend(pOutput->columnData, i, buf, false); - input += tDataTypes[type].bytes; } pOutput->numOfRows = pInput->numOfRows; @@ -864,12 +854,12 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP int32_t type = GET_PARAM_TYPE(pInput); int32_t timePrec = GET_PARAM_PRECISON(pInput); - char *input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[0]; for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { colDataAppendNULL(pOutput->columnData, i); continue; } + char *input = colDataGetData(pInput[0].columnData, i); int64_t timeVal = 0; int32_t ret = convertStringToTimestamp(type, input, timePrec, &timeVal); @@ -879,7 +869,6 @@ int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarP } colDataAppend(pOutput->columnData, i, (char *)&timeVal, false); - input += varDataTLen(input); } pOutput->numOfRows = pInput->numOfRows; @@ -897,19 +886,14 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); - char *input = NULL; - if (IS_VAR_DATA_TYPE(type)) { - input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[0]; - } else { - input = pInput[0].columnData->pData; - } - for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { colDataAppendNULL(pOutput->columnData, i); continue; } + char *input = colDataGetData(pInput[0].columnData, i); + if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */ int32_t ret = convertStringToTimestamp(type, input, TSDB_TIME_PRECISION_NANO, &timeVal); if (ret != TSDB_CODE_SUCCESS) { @@ -959,7 +943,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS){ timeVal = timeVal * factor; } else { - assert(0); + colDataAppendNULL(pOutput->columnData, i); + continue; } break; } @@ -973,7 +958,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { timeVal = timeVal * factor; } else { - assert(0); + colDataAppendNULL(pOutput->columnData, i); + continue; } break; } @@ -987,7 +973,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { timeVal = timeVal * factor / factor / 60 * 60 * factor; } else { - assert(0); + colDataAppendNULL(pOutput->columnData, i); + continue; } break; } @@ -1001,7 +988,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { timeVal = timeVal * factor / factor / 3600 * 3600 * factor; } else { - assert(0); + colDataAppendNULL(pOutput->columnData, i); + continue; } break; } @@ -1015,7 +1003,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { timeVal = timeVal * factor / factor / 86400* 86400 * factor; } else { - assert(0); + colDataAppendNULL(pOutput->columnData, i); + continue; } break; } @@ -1029,7 +1018,8 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { timeVal = timeVal * factor / factor / 604800 * 604800* factor; } else { - assert(0); + colDataAppendNULL(pOutput->columnData, i); + continue; } break; } @@ -1068,11 +1058,6 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara } colDataAppend(pOutput->columnData, i, (char *)&timeVal, false); - if (IS_VAR_DATA_TYPE(type)) { - input += varDataTLen(input); - } else { - input += tDataTypes[type].bytes; - } } pOutput->numOfRows = pInput->numOfRows; @@ -1094,12 +1079,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) { return TSDB_CODE_FAILED; } - - if (IS_VAR_DATA_TYPE(type)) { - input[k] = pInput[k].columnData->pData + pInput[k].columnData->varmeta.offset[0]; - } else { - input[k] = pInput[k].columnData->pData; - } } for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { @@ -1109,6 +1088,9 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p continue; } + int32_t rowIdx = (pInput[k].numOfRows == 1) ? 0 : i; + input[k] = colDataGetData(pInput[k].columnData, rowIdx); + int32_t type = GET_PARAM_TYPE(&pInput[k]); if (IS_VAR_DATA_TYPE(type)) { /* datetime format strings */ int32_t ret = convertStringToTimestamp(type, input[k], TSDB_TIME_PRECISION_NANO, &timeVal[k]); @@ -1138,14 +1120,9 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p timeVal[k] = timeVal[k] * 1000; } else if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { timeVal[k] = timeVal[k]; - } - } - - if (pInput[k].numOfRows != 1) { - if (IS_VAR_DATA_TYPE(type)) { - input[k] += varDataTLen(input[k]); } else { - input[k] += tDataTypes[type].bytes; + colDataAppendNULL(pOutput->columnData, i); + continue; } } }