diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 40f8a9e383..1cc4d9839e 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -31,13 +31,13 @@ typedef enum EFunctionType { FUNCTION_TYPE_ELAPSED, FUNCTION_TYPE_IRATE, FUNCTION_TYPE_LAST_ROW, - FUNCTION_TYPE_LEASTSQUARES, FUNCTION_TYPE_MAX, FUNCTION_TYPE_MIN, FUNCTION_TYPE_MODE, FUNCTION_TYPE_PERCENTILE, FUNCTION_TYPE_SPREAD, FUNCTION_TYPE_STDDEV, + FUNCTION_TYPE_LEASTSQUARES, FUNCTION_TYPE_SUM, FUNCTION_TYPE_TWA, FUNCTION_TYPE_HISTOGRAM, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index ce2f0b0651..c62a7f00cc 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -55,6 +55,12 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t stddevInvertFunction(SqlFunctionCtx* pCtx); +bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t leastSQRFunction(SqlFunctionCtx* pCtx); +int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx); + bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t percentileFunction(SqlFunctionCtx *pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b7bd023581..f88d84f50a 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -226,6 +226,23 @@ static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } +static int32_t translateLeastSQR(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + if (3 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + for (int32_t i = 0; i < numOfParams; ++i) { + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type; + if (!IS_NUMERIC_TYPE(colType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + } + + pFunc->node.resType = (SDataType) { .bytes = 64, .type = TSDB_DATA_TYPE_BINARY }; + return TSDB_CODE_SUCCESS; +} + static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (4 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -537,6 +554,17 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = stddevFinalize, .invertFunc = stddevInvertFunction }, + { + .name = "leastsquares", + .type = FUNCTION_TYPE_LEASTSQUARES, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateLeastSQR, + .getEnvFunc = getLeastSQRFuncEnv, + .initFunc = leastSQRFunctionSetup, + .processFunc = leastSQRFunction, + .finalizeFunc = leastSQRFinalize, + .invertFunc = leastSQRInvertFunction + }, { .name = "avg", .type = FUNCTION_TYPE_AVG, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1cb47a0bf1..4ed2677997 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -65,6 +65,13 @@ typedef struct SStddevRes { }; } SStddevRes; +typedef struct SLeastSQRInfo { + double matrix[2][3]; + double startVal; + double stepVal; + int64_t num; +} SLeastSQRInfo; + typedef struct SPercentileInfo { double result; tMemBucket* pMemBucket; @@ -1404,6 +1411,181 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +bool getLeastSQRFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SLeastSQRInfo); + return true; +} + +bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo); + + pInfo->startVal = IS_FLOAT_TYPE(pCtx->param[1].param.nType) ? pCtx->param[1].param.d : + (double)pCtx->param[1].param.i; + pInfo->stepVal = IS_FLOAT_TYPE(pCtx->param[1].param.nType) ? pCtx->param[2].param.d : + (double)pCtx->param[1].param.i; + return true; +} + +#define LEASTSQR_CAL(p, x, y, index, step) \ + do { \ + (p)[0][0] += (double)(x) * (x); \ + (p)[0][1] += (double)(x); \ + (p)[0][2] += (double)(x) * (y)[index]; \ + (p)[1][2] += (y)[index]; \ + (x) += step; \ + } while (0) + +int32_t leastSQRFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = 0; + + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; + + SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + SColumnInfoData* pCol = pInput->pData[0]; + + double(*param)[3] = pInfo->matrix; + double x = pInfo->startVal; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + switch (type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t* plist = (int8_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + numOfElem++; + LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal); + + break; + } + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t* plist = (int16_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem++; + LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal); + } + break; + } + + case TSDB_DATA_TYPE_INT: { + int32_t* plist = (int32_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem++; + LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal); + } + + break; + } + + case TSDB_DATA_TYPE_BIGINT: { + int64_t* plist = (int64_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem++; + LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal); + } + break; + } + + case TSDB_DATA_TYPE_FLOAT: { + float* plist = (float*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem++; + LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal); + } + break; + } + + case TSDB_DATA_TYPE_DOUBLE: { + double* plist = (double*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem++; + LEASTSQR_CAL(param, x, plist, i, pInfo->stepVal); + } + break; + } + + default: + break; + } + + pInfo->startVal = x; + pInfo->num += numOfElem; + + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); + + return TSDB_CODE_SUCCESS; +} + +int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SLeastSQRInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + int32_t currentRow = pBlock->info.rows; + + if (0 == pInfo->num) { + return 0; + } + + double(*param)[3] = pInfo->matrix; + + param[1][1] = (double)pInfo->num; + param[1][0] = param[0][1]; + + param[0][0] -= param[1][0] * (param[0][1] / param[1][1]); + param[0][2] -= param[1][2] * (param[0][1] / param[1][1]); + param[0][1] = 0; + param[1][2] -= param[0][2] * (param[1][0] / param[0][0]); + param[1][0] = 0; + param[0][2] /= param[0][0]; + + param[1][2] /= param[1][1]; + + char buf[64] = {0}; + size_t len = snprintf(varDataVal(buf), sizeof(buf) - VARSTR_HEADER_SIZE, "{slop:%.6lf, intercept:%.6lf}", param[0][2], param[1][2]); + varDataSetLen(buf, len); + + colDataAppend(pCol, currentRow, buf, false); + + return pResInfo->numOfRes; +} + +int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx) { + //TODO + return TSDB_CODE_SUCCESS; +} + bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SPercentileInfo); return true; @@ -2475,10 +2657,10 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t len; char buf[512] = {0}; if (!pInfo->normalized) { - len = sprintf(buf + VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%"PRId64"}", + len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%"PRId64"}", pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].count); } else { - len = sprintf(buf + VARSTR_HEADER_SIZE, "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}", + len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}", pInfo->bins[i].lower, pInfo->bins[i].upper, pInfo->bins[i].percentage); } varDataSetLen(buf, len); diff --git a/source/libs/index/inc/indexComm.h b/source/libs/index/inc/indexComm.h index 3b07429089..4cab71f92c 100644 --- a/source/libs/index/inc/indexComm.h +++ b/source/libs/index/inc/indexComm.h @@ -20,11 +20,23 @@ extern "C" { #endif +#include "indexInt.h" +#include "tcompare.h" + extern char JSON_COLUMN[]; extern char JSON_VALUE_DELIM; char* indexPackJsonData(SIndexTerm* itm); char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip); + +typedef enum { MATCH, CONTINUE, BREAK } TExeCond; + +typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); + +TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b); + +_cache_range_compare indexGetCompare(RangeType ty); + #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 929f33909e..0653c1d1fa 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -60,50 +60,6 @@ static int32_t cacheSearchRange_JSON(void* cache, SIndexTerm* ct, SIdxTempResult static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTempResult* tr, STermValueType* s, RangeType type); -typedef enum { MATCH, CONTINUE, BREAK } TExeCond; -typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); - -static TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) { - // optime later - int32_t ret = func(a, b); - switch (comType) { - case QUERY_LESS_THAN: { - if (ret < 0) return MATCH; - } break; - case QUERY_LESS_EQUAL: { - if (ret <= 0) return MATCH; - break; - } - case QUERY_GREATER_THAN: { - if (ret > 0) return MATCH; - break; - } - case QUERY_GREATER_EQUAL: { - if (ret >= 0) return MATCH; - } - } - return CONTINUE; -} -static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); - return tDoCommpare(func, QUERY_LESS_THAN, a, b); -} -static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); - return tDoCommpare(func, QUERY_LESS_EQUAL, a, b); -} -static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); - return tDoCommpare(func, QUERY_GREATER_THAN, a, b); -} -static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { - __compar_fn_t func = getComparFunc(type, 0); - return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b); -} - -static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual, - tCompareGreaterThan, tCompareGreaterEqual}; - static int32_t (*cacheSearch[][QUERY_MAX])(void* cache, SIndexTerm* ct, SIdxTempResult* tr, STermValueType* s) = { {cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, cacheSearchLessEqual, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange}, @@ -169,7 +125,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes return 0; } - _cache_range_compare cmpFn = rangeCompare[type]; + _cache_range_compare cmpFn = indexGetCompare(type); MemTable* mem = cache; IndexCache* pCache = mem->pCache; @@ -295,7 +251,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe if (cache == NULL) { return 0; } - _cache_range_compare cmpFn = rangeCompare[type]; + _cache_range_compare cmpFn = indexGetCompare(type); MemTable* mem = cache; IndexCache* pCache = mem->pCache; diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index cdd7b35675..9e85a6680a 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -13,12 +13,58 @@ * along with this program. If not, see . */ +#include "indexComm.h" #include "index.h" #include "indexInt.h" +#include "tcompare.h" char JSON_COLUMN[] = "JSON"; char JSON_VALUE_DELIM = '&'; +static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_LESS_THAN, a, b); +} +static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_LESS_EQUAL, a, b); +} +static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_GREATER_THAN, a, b); +} +static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { + __compar_fn_t func = getComparFunc(type, 0); + return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b); +} + +TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) { + // optime later + int32_t ret = func(a, b); + switch (comType) { + case QUERY_LESS_THAN: { + if (ret < 0) return MATCH; + } break; + case QUERY_LESS_EQUAL: { + if (ret <= 0) return MATCH; + break; + } + case QUERY_GREATER_THAN: { + if (ret > 0) return MATCH; + break; + } + case QUERY_GREATER_EQUAL: { + if (ret >= 0) return MATCH; + } + } + return CONTINUE; +} + +static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual, + tCompareGreaterThan, tCompareGreaterEqual}; + +_cache_range_compare indexGetCompare(RangeType ty) { return rangeCompare[ty]; } + char* indexPackJsonData(SIndexTerm* itm) { /* * |<-----colname---->|<-----dataType---->|<--------colVal---------->| @@ -46,6 +92,7 @@ char* indexPackJsonData(SIndexTerm* itm) { return buf; } + char* indexPackJsonDataPrefix(SIndexTerm* itm, int32_t* skip) { /* * |<-----colname---->|<-----dataType---->|<--------colVal---------->| diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index b5551e825f..c56d65fc6a 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -72,9 +72,23 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype); -static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = { - tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, - tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange}; +static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr); + +static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype); + +static int32_t (*tfSearch[][QUERY_MAX])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = { + {tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, tfSearchLessEqual, + tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange}, + {tfSearchTerm_JSON, tfSearchPrefix_JSON, tfSearchSuffix_JSON, tfSearchRegex_JSON, tfSearchLessThan_JSON, + tfSearchLessEqual_JSON, tfSearchGreaterThan_JSON, tfSearchGreaterEqual_JSON, tfSearchRange_JSON}}; TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache)); @@ -202,14 +216,10 @@ void tfileReaderDestroy(TFileReader* reader) { taosMemoryFree(reader); } static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { - bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); int ret = 0; char* p = tem->colVal; uint64_t sz = tem->nColVal; - if (hasJson) { - p = indexPackJsonData(tem); - sz = strlen(p); - } + int64_t st = taosGetTimestampUs(); FstSlice key = fstSliceCreate(p, sz); uint64_t offset; @@ -224,9 +234,6 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, tem->colName, tem->colVal, cost); } - if (hasJson) { - taosMemoryFree(p); - } fstSliceDestroy(&key); return 0; } @@ -308,14 +315,11 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) } static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) { - bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); - int ret = 0; - char* p = tem->colVal; - int skip = 0; + int ret = 0; + char* p = tem->colVal; + int skip = 0; + _cache_range_compare cmpFn = indexGetCompare(type); - if (hasJson) { - p = indexPackJsonDataPrefix(tem, &skip); - } SArray* offsets = taosArrayInit(16, sizeof(uint64_t)); AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS); @@ -328,7 +332,16 @@ static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult StreamWithState* st = streamBuilderIntoStream(sb); StreamWithStateResult* rt = NULL; while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { - taosArrayPush(offsets, &(rt->out.out)); + FstSlice* s = &rt->data; + char* ch = (char*)fstSliceData(s, NULL); + TExeCond cond = cmpFn(ch, p, tem->colType); + if (MATCH == cond) { + tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); + } else if (CONTINUE == cond) { + } else if (BREAK == cond) { + swsResultDestroy(rt); + break; + } swsResultDestroy(rt); } streamWithStateDestroy(st); @@ -376,17 +389,105 @@ static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) fstSliceDestroy(&key); return 0; } +static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + int ret = 0; + char* p = indexPackJsonData(tem); + int sz = strlen(p); + int64_t st = taosGetTimestampUs(); + FstSlice key = fstSliceCreate(p, sz); + uint64_t offset; + if (fstGet(((TFileReader*)reader)->fst, &key, &offset)) { + int64_t et = taosGetTimestampUs(); + int64_t cost = et - st; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, found table info in tindex, time cost: %" PRIu64 "us", + tem->suid, tem->colName, tem->colVal, cost); + + ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); + cost = taosGetTimestampUs() - et; + indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, + tem->colName, tem->colVal, cost); + } + fstSliceDestroy(&key); + return 0; + // deprecate api + return TSDB_CODE_SUCCESS; +} +static int32_t tfSearchPrefix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t tfSearchSuffix_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t tfSearchRegex_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + // impl later + return TSDB_CODE_SUCCESS; +} +static int32_t tfSearchLessThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc_JSON(reader, tem, tr, LT); +} +static int32_t tfSearchLessEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc_JSON(reader, tem, tr, LE); +} +static int32_t tfSearchGreaterThan_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc_JSON(reader, tem, tr, GT); +} +static int32_t tfSearchGreaterEqual_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc_JSON(reader, tem, tr, GE); +} +static int32_t tfSearchRange_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + // impl later + return TSDB_CODE_SUCCESS; +} + +static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype) { + int ret = 0; + int skip = 0; + + char* p = indexPackJsonDataPrefix(tem, &skip); + + _cache_range_compare cmpFn = indexGetCompare(ctype); + + SArray* offsets = taosArrayInit(16, sizeof(uint64_t)); + + AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_PREFIX); + FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx); + + FstSlice h = fstSliceCreate((uint8_t*)p, skip); + fstStreamBuilderSetRange(sb, &h, ctype); + fstSliceDestroy(&h); + + StreamWithState* st = streamBuilderIntoStream(sb); + StreamWithStateResult* rt = NULL; + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + FstSlice* s = &rt->data; + char* ch = (char*)fstSliceData(s, NULL); + TExeCond cond = cmpFn(ch, p, tem->colType); + if (MATCH == cond) { + tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); + } else if (CONTINUE == cond) { + } else if (BREAK == cond) { + swsResultDestroy(rt); + break; + } + swsResultDestroy(rt); + } + streamWithStateDestroy(st); + fstStreamBuilderDestroy(sb); + return TSDB_CODE_SUCCESS; +} int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr) { SIndexTerm* term = query->term; EIndexQueryType qtype = query->qType; - if (qtype >= sizeof(tfSearch) / sizeof(tfSearch[0])) { - indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found table info in tindex", term->suid, term->colName, - term->colVal); - return -1; + + if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) { + return tfSearch[1][qtype](reader, term, tr); } else { - return tfSearch[qtype](reader, term, tr); + return tfSearch[0][qtype](reader, term, tr); } + tfileReaderUnRef(reader); return 0; } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 323ef43e25..af66d39904 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -1108,7 +1108,7 @@ void transSendResponse(const STransMsg* msg) { SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); srvMsg->msg = tmsg; srvMsg->type = Normal; - tTrace("server conn %p start to send resp (1/2)", exh->handle); + tDebug("server conn %p start to send resp (1/2)", exh->handle); transSendAsync(pThrd->asyncPool, &srvMsg->q); uvReleaseExHandle(refId); return;