From aefa9bd89186464dc02a7a810c4e2f6afed86dcb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 May 2022 16:53:55 +0800 Subject: [PATCH 01/16] refactor: do some internal refactor. --- include/libs/function/function.h | 38 +- source/libs/executor/src/executorimpl.c | 16 +- source/libs/function/src/builtinsimpl.c | 23 +- source/libs/function/src/taggfunction.c | 652 +----------------------- 4 files changed, 36 insertions(+), 693 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 616aec8c02..7d3e969c41 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -170,32 +170,18 @@ typedef struct SInputColumnInfoData { // sql function runtime context typedef struct SqlFunctionCtx { - SInputColumnInfoData input; - SResultDataInfo resDataInfo; - uint32_t order; // data block scanner order: asc|desc - uint8_t scanFlag; // record current running step, default: 0 - //////////////////////////////////////////////////////////////// - int32_t startRow; // start row index - int32_t size; // handled processed row number - SColumnInfoData* pInput; - SColumnDataAgg agg; - int16_t inputType; // TODO remove it - int16_t inputBytes; // TODO remove it - bool hasNull; // null value exist in current block, TODO remove it - bool requireNull; // require null in some function, TODO remove it - int32_t columnIndex; // TODO remove it - bool isAggSet; - int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it - bool stableQuery; - ///////////////////////////////////////////////////////////////// - int16_t functionId; // function id - char * pOutput; // final result output buffer, point to sdata->data - int32_t numOfParams; - SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param - int64_t *ptsList; // corresponding timestamp array list - SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ - int32_t offset; - SVariant tag; + SInputColumnInfoData input; + SResultDataInfo resDataInfo; + uint32_t order; // data block scanner order: asc|desc + uint8_t scanFlag; // record current running step, default: 0 + int16_t functionId; // function id + char *pOutput; // final result output buffer, point to sdata->data + int32_t numOfParams; + SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param + int64_t *ptsList; // corresponding timestamp array list + SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ + int32_t offset; + SVariant tag; struct SResultRowEntryInfo *resultInfo; SSubsidiaryResInfo subsidiaries; SPoint1 start; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c261271728..1ba494a041 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -602,8 +602,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { - pCtx[k].startTs = pWin->skey; - // keep it temporarily bool hasAgg = pCtx[k].input.colDataAggIsSet; int32_t numOfRows = pCtx[k].input.numOfRows; @@ -619,8 +617,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow // not a whole block involved in query processing, statistics data can not be used // NOTE: the original value of isSet have been changed here - if (pCtx[k].isAggSet && forwardStep < numOfTotal) { - pCtx[k].isAggSet = false; + if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) { + pCtx[k].input.colDataAggIsSet = false; } if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) { @@ -680,7 +678,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC int32_t order) { for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; - pCtx[i].size = pBlock->info.rows; + pCtx[i].input.numOfRows = pBlock->info.rows; setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock); } } @@ -742,7 +740,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; - pCtx[i].size = pBlock->info.rows; + pCtx[i].input.numOfRows = pBlock->info.rows; + pCtx[i].pSrcBlock = pBlock; pCtx[i].scanFlag = scanFlag; @@ -827,7 +826,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { - pCtx[k].startTs = startTs; // todo add a dummy funtion to avoid process check if (pCtx[k].fpSet.process != NULL) { int32_t code = pCtx[k].fpSet.process(&pCtx[k]); @@ -3330,7 +3328,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3 static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr, int32_t rowIndex) { for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index - pCtx[j].startRow = rowIndex; +// pCtx[j].startRow = rowIndex; } for (int32_t j = 0; j < numOfExpr; ++j) { @@ -3381,7 +3379,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - pCtx[i].size = 1; +// pCtx[i].size = 1; } for (int32_t i = 0; i < pBlock->info.rows; ++i) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index af86eb4e90..4571631bc0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -287,6 +287,7 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) { } return numOfElem; } + /* * count function does need the finalize, if data is missing, the default value, which is 0, is used * count function does not use the pCtx->interResBuf to keep the intermediate buffer @@ -781,16 +782,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { int64_t val = GET_INT64_VAL(tval); if ((prev < val) ^ isMinFunc) { pBuf->v = val; - // for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { - // SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; - // if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor - // __ctx->tag.i = key; - // __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; - // } - // - // __ctx->fpSet.process(__ctx); - // } - if (pCtx->subsidiaries.num > 0) { saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } @@ -803,15 +794,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { uint64_t val = GET_UINT64_VAL(tval); if ((prev < val) ^ isMinFunc) { pBuf->v = val; - // for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { - // SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; - // if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor - // __ctx->tag.i = key; - // __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; - // } - // - // __ctx->fpSet.process(__ctx); - // } if (pCtx->subsidiaries.num > 0) { saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } @@ -823,7 +805,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { double val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { pBuf->v = val; - if (pCtx->subsidiaries.num > 0) { saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } @@ -1703,7 +1684,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pCol, i); double v = 0; - GET_TYPED_DATA(v, double, pCtx->inputType, data); + GET_TYPED_DATA(v, double, type, data); if (v < GET_DOUBLE_VAL(&pInfo->minval)) { SET_DOUBLE_VAL(&pInfo->minval, v); } diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index b6d5d38c9e..950655e480 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -480,7 +480,7 @@ static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize); return true; } - +#if 0 /** * in handling the stable query, function_finalizer is called after the secondary * merge being completed, during the first merge procedure, which is executed at the @@ -497,53 +497,6 @@ static void function_finalizer(SqlFunctionCtx *pCtx) { doFinalizer(pCtx); } -/* - * count function does need the finalize, if data is missing, the default value, which is 0, is used - * count function does not use the pCtx->interResBuf to keep the intermediate buffer - */ -static void count_function(SqlFunctionCtx *pCtx) { - int32_t numOfElem = 0; - - /* - * 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true; - * 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true; - * 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false; - */ - if (pCtx->isAggSet) { - numOfElem = pCtx->size - pCtx->agg.numOfNull; - } else { - if (pCtx->hasNull) { - for (int32_t i = 0; i < pCtx->size; ++i) { - char *val = GET_INPUT_DATA(pCtx, i); - if (isNull(val, pCtx->inputType)) { - continue; - } - - numOfElem += 1; - } - } else { - //when counting on the primary time stamp column and no statistics data is presented, use the size value directly. - numOfElem = pCtx->size; - } - } - - if (numOfElem > 0) { -// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; - } - - *((int64_t *)pCtx->pOutput) += numOfElem; - SET_VAL(pCtx, numOfElem, 1); -} - -static void count_func_merge(SqlFunctionCtx *pCtx) { - int64_t *pData = (int64_t *)GET_INPUT_DATA_LIST(pCtx); - for (int32_t i = 0; i < pCtx->size; ++i) { - *((int64_t *)pCtx->pOutput) += pData[i]; - } - - SET_VAL(pCtx, pCtx->size, 1); -} - /** * 1. If the column value for filter exists, we need to load the SFields, which serves * as the pre-filter to decide if the actual data block is required or not. @@ -633,113 +586,6 @@ int32_t noDataRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { LOOPCHECK_N(*_data, _list, ctx, tsdbType, sign, notNullElems); \ } while (0) -static void do_sum(SqlFunctionCtx *pCtx) { - int32_t notNullElems = 0; - - // Only the pre-computing information loaded and actual data does not loaded - if (pCtx->isAggSet) { - notNullElems = pCtx->size - pCtx->agg.numOfNull; - assert(pCtx->size >= pCtx->agg.numOfNull); - - if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { - int64_t *retVal = (int64_t *)pCtx->pOutput; - *retVal += pCtx->agg.sum; - } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { - uint64_t *retVal = (uint64_t *)pCtx->pOutput; - *retVal += (uint64_t)pCtx->agg.sum; - } else if (IS_FLOAT_TYPE(pCtx->inputType)) { - double *retVal = (double*) pCtx->pOutput; - SET_DOUBLE_VAL(retVal, *retVal + GET_DOUBLE_VAL((const char*)&(pCtx->agg.sum))); - } - } else { // computing based on the true data block - void *pData = GET_INPUT_DATA_LIST(pCtx); - notNullElems = 0; - - if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { - int64_t *retVal = (int64_t *)pCtx->pOutput; - - if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { - LIST_ADD_N(*retVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { - LIST_ADD_N(*retVal, pCtx, pData, int16_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { - LIST_ADD_N(*retVal, pCtx, pData, int32_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { - LIST_ADD_N(*retVal, pCtx, pData, int64_t, notNullElems, pCtx->inputType); - } - } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { - uint64_t *retVal = (uint64_t *)pCtx->pOutput; - - if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) { - LIST_ADD_N(*retVal, pCtx, pData, uint8_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) { - LIST_ADD_N(*retVal, pCtx, pData, uint16_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) { - LIST_ADD_N(*retVal, pCtx, pData, uint32_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) { - LIST_ADD_N(*retVal, pCtx, pData, uint64_t, notNullElems, pCtx->inputType); - } - } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - double *retVal = (double *)pCtx->pOutput; - LIST_ADD_N_DOUBLE(*retVal, pCtx, pData, double, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { - double *retVal = (double *)pCtx->pOutput; - LIST_ADD_N_DOUBLE_FLOAT(*retVal, pCtx, pData, float, notNullElems, pCtx->inputType); - } - } - - // data in the check operation are all null, not output - SET_VAL(pCtx, notNullElems, 1); - - if (notNullElems > 0) { -// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; - } -} - -static void sum_function(SqlFunctionCtx *pCtx) { - do_sum(pCtx); - - // keep the result data in output buffer, not in the intermediate buffer - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); -// if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { - // set the flag for super table query - SSumInfo *pSum = (SSumInfo *)pCtx->pOutput; - pSum->hasResult = DATA_SET_FLAG; -// } -} - -static void sum_func_merge(SqlFunctionCtx *pCtx) { - int32_t notNullElems = 0; - - GET_TRUE_DATA_TYPE(); - assert(pCtx->stableQuery); - - for (int32_t i = 0; i < pCtx->size; ++i) { - char * input = GET_INPUT_DATA(pCtx, i); - SSumInfo *pInput = (SSumInfo *)input; - if (pInput->hasResult != DATA_SET_FLAG) { - continue; - } - - notNullElems++; - - if (IS_SIGNED_NUMERIC_TYPE(type)) { - *(int64_t *)pCtx->pOutput += pInput->isum; - } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { - *(uint64_t *) pCtx->pOutput += pInput->usum; - } else { - SET_DOUBLE_VAL((double *)pCtx->pOutput, *(double *)pCtx->pOutput + pInput->dsum); - } - } - - SET_VAL(pCtx, notNullElems, 1); - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - - if (notNullElems > 0) { - //pResInfo->hasResult = DATA_SET_FLAG; - } -} - static int32_t statisRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { return BLK_DATA_SMA_LOAD; } @@ -822,17 +668,17 @@ static int32_t lastDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_ */ static void avg_function(SqlFunctionCtx *pCtx) { int32_t notNullElems = 0; - + // NOTE: keep the intermediate result into the interResultBuf SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - + SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); double *pVal = &pAvgInfo->sum; - + if (pCtx->isAggSet) { // Pre-aggregation notNullElems = pCtx->size - pCtx->agg.numOfNull; assert(notNullElems >= 0); - + if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { *pVal += pCtx->agg.sum; } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { @@ -842,7 +688,7 @@ static void avg_function(SqlFunctionCtx *pCtx) { } } else { void *pData = GET_INPUT_DATA_LIST(pCtx); - + if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { LIST_ADD_N(*pVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType); } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { @@ -865,18 +711,18 @@ static void avg_function(SqlFunctionCtx *pCtx) { LIST_ADD_N(*pVal, pCtx, pData, uint64_t, notNullElems, pCtx->inputType); } } - + if (!pCtx->hasNull) { assert(notNullElems == pCtx->size); } - + SET_VAL(pCtx, notNullElems, 1); pAvgInfo->num += notNullElems; - + if (notNullElems > 0) { //pResInfo->hasResult = DATA_SET_FLAG; } - + // keep the data into the final output buffer for super table query since this execution may be the last one if (pCtx->stableQuery) { memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo)); @@ -885,18 +731,18 @@ static void avg_function(SqlFunctionCtx *pCtx) { static void avg_func_merge(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - + double *sum = (double*) pCtx->pOutput; char *input = GET_INPUT_DATA_LIST(pCtx); - + for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { SAvgInfo *pInput = (SAvgInfo *)input; if (pInput->num == 0) { // current input is null continue; } - + SET_DOUBLE_VAL(sum, *sum + pInput->sum); - + // keep the number of data into the temp buffer *(int64_t *)GET_ROWCELL_INTERBUF(pResInfo) += pInput->num; } @@ -2735,18 +2581,6 @@ static void deriv_function(SqlFunctionCtx *pCtx) { GET_RES_INFO(pCtx)->numOfRes += notNullElems; } -#define DIFF_IMPL(ctx, d, type) \ - do { \ - if ((ctx)->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED) { \ - (ctx)->param[1].param.nType = (ctx)->inputType; \ - *(type *)&(ctx)->param[1].param.i = *(type *)(d); \ - } else { \ - *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].param.i)); \ - *(type *)(&(ctx)->param[1].param.i) = *(type *)(d); \ - *(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \ - } \ - } while (0); - // TODO difference in date column static void diff_function(SqlFunctionCtx *pCtx) { void *data = GET_INPUT_DATA_LIST(pCtx); @@ -4072,460 +3906,4 @@ int32_t functionCompatList[] = { // tid_tag, derivative, blk_info 6, 8, 7, }; - -SAggFunctionInfo aggFunc[35] = {{ - // 0, count function does not invoke the finalize function - "count", - FUNCTION_TYPE_AGG, - FUNCTION_COUNT, - FUNCTION_COUNT, - BASIC_FUNC_SO, - function_setup, - count_function, - doFinalizer, - count_func_merge, - countRequired, - }, - { - // 1 - "sum", - FUNCTION_TYPE_AGG, - FUNCTION_SUM, - FUNCTION_SUM, - BASIC_FUNC_SO, - function_setup, - sum_function, - function_finalizer, - sum_func_merge, - statisRequired, - }, - { - // 2 - "avg", - FUNCTION_TYPE_AGG, - FUNCTION_AVG, - FUNCTION_AVG, - BASIC_FUNC_SO, - function_setup, - avg_function, - avg_finalizer, - avg_func_merge, - statisRequired, - }, - { - // 3 - "min", - FUNCTION_TYPE_AGG, - FUNCTION_MIN, - FUNCTION_MIN, - BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, - min_func_setup, - NULL, - function_finalizer, - min_func_merge, - statisRequired, - }, - { - // 4 - "max", - FUNCTION_TYPE_AGG, - FUNCTION_MAX, - FUNCTION_MAX, - BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, - max_func_setup, - NULL, - function_finalizer, - max_func_merge, - statisRequired, - }, - { - // 5 - "stddev", - FUNCTION_TYPE_AGG, - FUNCTION_STDDEV, - FUNCTION_STDDEV_DST, - FUNCSTATE_SO | FUNCSTATE_STREAM, - function_setup, - stddev_function, - stddev_finalizer, - noop1, - dataBlockRequired, - }, - { - // 6 - "percentile", - FUNCTION_TYPE_AGG, - FUNCTION_PERCT, - FUNCTION_INVALID_ID, - FUNCSTATE_SO | FUNCSTATE_STREAM, - percentile_function_setup, - percentile_function, - percentile_finalizer, - noop1, - dataBlockRequired, - }, - { - // 7 - "apercentile", - FUNCTION_TYPE_AGG, - FUNCTION_APERCT, - FUNCTION_APERCT, - FUNCSTATE_SO | FUNCSTATE_STREAM | FUNCSTATE_STABLE, - apercentile_function_setup, - apercentile_function, - apercentile_finalizer, - apercentile_func_merge, - dataBlockRequired, - }, - { - // 8 - "first", - FUNCTION_TYPE_AGG, - FUNCTION_FIRST, - FUNCTION_FIRST_DST, - BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, - function_setup, - first_function, - function_finalizer, - noop1, - firstFuncRequired, - }, - { - // 9 - "last", - FUNCTION_TYPE_AGG, - FUNCTION_LAST, - FUNCTION_LAST_DST, - BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, - function_setup, - last_function, - function_finalizer, - noop1, - lastFuncRequired, - }, - { - // 10 - "last_row", - FUNCTION_TYPE_AGG, - FUNCTION_LAST_ROW, - FUNCTION_LAST_ROW, - FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - first_last_function_setup, - last_row_function, - last_row_finalizer, - last_dist_func_merge, - dataBlockRequired, - }, - { - // 11 - "top", - FUNCTION_TYPE_AGG, - FUNCTION_TOP, - FUNCTION_TOP, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - top_bottom_function_setup, - top_function, - top_bottom_func_finalizer, - top_func_merge, - dataBlockRequired, - }, - { - // 12 - "bottom", - FUNCTION_TYPE_AGG, - FUNCTION_BOTTOM, - FUNCTION_BOTTOM, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - top_bottom_function_setup, - bottom_function, - top_bottom_func_finalizer, - bottom_func_merge, - dataBlockRequired, - }, - { - // 13 - "spread", - FUNCTION_TYPE_AGG, - FUNCTION_SPREAD, - FUNCTION_SPREAD, - BASIC_FUNC_SO, - spread_function_setup, - spread_function, - spread_function_finalizer, - spread_func_merge, - countRequired, - }, - { - // 14 - "twa", - FUNCTION_TYPE_AGG, - FUNCTION_TWA, - FUNCTION_TWA, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - twa_function_setup, - twa_function, - twa_function_finalizer, - twa_function_copy, - dataBlockRequired, - }, - { - // 15 - "leastsquares", - FUNCTION_TYPE_AGG, - FUNCTION_LEASTSQR, - FUNCTION_INVALID_ID, - FUNCSTATE_SO | FUNCSTATE_STREAM, - leastsquares_function_setup, - leastsquares_function, - leastsquares_finalizer, - noop1, - dataBlockRequired, - }, - { - // 16 - "dummy", - FUNCTION_TYPE_AGG, - FUNCTION_TS, - FUNCTION_TS, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - function_setup, - date_col_output_function, - doFinalizer, - copy_function, - noDataRequired, - }, - { - // 17 - "ts", - FUNCTION_TYPE_AGG, - FUNCTION_TS_DUMMY, - FUNCTION_TS_DUMMY, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - function_setup, - noop1, - doFinalizer, - copy_function, - dataBlockRequired, - }, - { - // 18 - "tag_dummy", - FUNCTION_TYPE_AGG, - FUNCTION_TAG_DUMMY, - FUNCTION_TAG_DUMMY, - BASIC_FUNC_SO, - function_setup, - tag_function, - doFinalizer, - copy_function, - noDataRequired, - }, - { - // 19 - "ts", - FUNCTION_TYPE_AGG, - FUNCTION_TS_COMP, - FUNCTION_TS_COMP, - FUNCSTATE_MO | FUNCSTATE_NEED_TS, - ts_comp_function_setup, - ts_comp_function, - ts_comp_finalize, - copy_function, - dataBlockRequired, - }, - { - // 20 - "tag", - FUNCTION_TYPE_AGG, - FUNCTION_TAG, - FUNCTION_TAG, - BASIC_FUNC_SO, - function_setup, - tag_function, - doFinalizer, - copy_function, - noDataRequired, - }, - {//TODO this is a scala function - // 21, column project sql function - "colprj", - FUNCTION_TYPE_AGG, - FUNCTION_PRJ, - FUNCTION_PRJ, - BASIC_FUNC_MO | FUNCSTATE_NEED_TS, - function_setup, - col_project_function, - doFinalizer, - copy_function, - dataBlockRequired, - }, - { - // 22, multi-output, tag function has only one result - "tagprj", - FUNCTION_TYPE_AGG, - FUNCTION_TAGPRJ, - FUNCTION_TAGPRJ, - BASIC_FUNC_MO, - function_setup, - tag_project_function, - doFinalizer, - copy_function, - noDataRequired, - }, - { - // 23 - "arithmetic", - FUNCTION_TYPE_AGG, - FUNCTION_ARITHM, - FUNCTION_ARITHM, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS, - function_setup, - arithmetic_function, - doFinalizer, - copy_function, - dataBlockRequired, - }, - { - // 24 - "diff", - FUNCTION_TYPE_AGG, - FUNCTION_DIFF, - FUNCTION_INVALID_ID, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - diff_function_setup, - diff_function, - doFinalizer, - noop1, - dataBlockRequired, - }, - // distributed version used in two-stage aggregation processes - { - // 25 - "first_dist", - FUNCTION_TYPE_AGG, - FUNCTION_FIRST_DST, - FUNCTION_FIRST_DST, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - first_last_function_setup, - first_dist_function, - function_finalizer, - first_dist_func_merge, - firstDistFuncRequired, - }, - { - // 26 - "last_dist", - FUNCTION_TYPE_AGG, - FUNCTION_LAST_DST, - FUNCTION_LAST_DST, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - first_last_function_setup, - last_dist_function, - function_finalizer, - last_dist_func_merge, - lastDistFuncRequired, - }, - { - // 27 - "stddev", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_STDDEV_DST, - FUNCTION_AVG, - FUNCSTATE_SO | FUNCSTATE_STABLE, - function_setup, - NULL, - NULL, - NULL, - dataBlockRequired, - }, - { - // 28 - "interp", - FUNCTION_TYPE_AGG, - FUNCTION_INTERP, - FUNCTION_INTERP, - FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS , - function_setup, - interp_function, - doFinalizer, - copy_function, - dataBlockRequired, - }, - { - // 29 - "rate", - FUNCTION_TYPE_AGG, - FUNCTION_RATE, - FUNCTION_RATE, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - rate_function_setup, - rate_function, - rate_finalizer, - rate_func_copy, - dataBlockRequired, - }, - { - // 30 - "irate", - FUNCTION_TYPE_AGG, - FUNCTION_IRATE, - FUNCTION_IRATE, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - rate_function_setup, - irate_function, - rate_finalizer, - rate_func_copy, - dataBlockRequired, - }, - { - // 31 - "tbid", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_TID_TAG, - FUNCTION_TID_TAG, - FUNCSTATE_MO | FUNCSTATE_STABLE, - function_setup, - noop1, - noop1, - noop1, - dataBlockRequired, - }, - { //32 - "derivative", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_DERIVATIVE, - FUNCTION_INVALID_ID, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - deriv_function_setup, - deriv_function, - doFinalizer, - noop1, - dataBlockRequired, - }, - { - // 33 - "block_dist", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_BLKINFO, - FUNCTION_BLKINFO, - FUNCSTATE_SO | FUNCSTATE_STABLE, - function_setup, - blockInfo_func, - blockinfo_func_finalizer, - block_func_merge, - dataBlockRequired, - }, - { - // 34 - "cov", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_COV, - FUNCTION_COV, - FUNCSTATE_SO | FUNCSTATE_STABLE, - function_setup, - sum_function, - function_finalizer, - sum_func_merge, - statisRequired, - } - }; +#endif From 82a8f63decd088b53affb45c7aa137eb987b3158 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 17 May 2022 21:20:43 +0800 Subject: [PATCH 02/16] fix: bad code merge --- source/dnode/mnode/impl/src/mndTopic.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 8847bce890..e99046a32d 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -502,6 +502,17 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); + if (pTopic == NULL) { + if (dropReq.igNotExists) { + mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); + return 0; + } else { + terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); + return -1; + } + } + if (pTopic->refConsumerCnt != 0) { mndReleaseTopic(pMnode, pTopic); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; From 7563605418dd035aa07b9875a23949e24851f69c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 17 May 2022 21:24:32 +0800 Subject: [PATCH 03/16] fix: wrong db status --- source/dnode/mnode/impl/src/mndDb.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index bb7e593fea..c666c50864 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1447,8 +1447,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in } char *status = "ready"; - char b[24] = {0}; - STR_WITH_SIZE_TO_VARSTR(b, status, strlen(status)); + char statusB[24] = {0}; + STR_WITH_SIZE_TO_VARSTR(statusB, status, strlen(status)); if (sysDb) { for (int32_t i = 0; i < pShow->numOfColumns; ++i) { @@ -1458,7 +1458,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in } else if (i == 3) { colDataAppend(pColInfo, rows, (const char *)&numOfTables, false); } else if (i == 20) { - colDataAppend(pColInfo, rows, b, false); + colDataAppend(pColInfo, rows, statusB, false); } else { colDataAppendNULL(pColInfo, rows); } @@ -1481,9 +1481,10 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.replications, false); const char *src = pDb->cfg.strict ? "strict" : "nostrict"; - STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src)); + char strict[24] = {0}; + STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, rows, (const char *)b, false); + colDataAppend(pColInfo, rows, (const char *)strict, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.daysPerFile, false); @@ -1554,7 +1555,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfStables, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols); - colDataAppend(pColInfo, rows, (const char *)b, false); + colDataAppend(pColInfo, rows, (const char *)statusB, false); } } From b858688fab96a667c2d8fd486f0ec470d9afa0f0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 17 May 2022 21:31:59 +0800 Subject: [PATCH 04/16] fix: unreasonable error message when creating db with insufficient dnodes --- source/dnode/mnode/impl/src/mndDb.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c666c50864..c3e5195ad2 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -398,11 +398,14 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) { if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1; if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1; if (pCfg->replications < TSDB_MIN_DB_REPLICA || pCfg->replications > TSDB_MAX_DB_REPLICA) return -1; - if (pCfg->replications > mndGetDnodeSize(pMnode)) return -1; if (pCfg->replications != 1 && pCfg->replications != 3) return -1; if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1; if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1; if (pCfg->hashMethod != 1) return -1; + if (pCfg->replications > mndGetDnodeSize(pMnode)) { + terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + return -1; + } terrno = 0; return TSDB_CODE_SUCCESS; From a3b0136e3ce6d61951c946045ce1e6b4c9f60cf5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 17 May 2022 22:03:50 +0800 Subject: [PATCH 05/16] stmt query --- source/client/src/clientStmt.c | 8 ++++++ tests/script/api/batchprepare.c | 47 ++++++++++++++++++--------------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index fa8df28523..f2c5fdec0f 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -218,6 +218,13 @@ int32_t stmtParseSql(STscStmt* pStmt) { pStmt->bInfo.needParse = false; + if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) { + pStmt->sql.type = STMT_TYPE_INSERT; + } else if (pStmt->sql.pQuery->pPrepareRoot) { + pStmt->sql.type = STMT_TYPE_QUERY; + } + +/* switch (nodeType(pStmt->sql.pQuery->pRoot)) { case QUERY_NODE_VNODE_MODIF_STMT: if (0 == pStmt->sql.type) { @@ -231,6 +238,7 @@ int32_t stmtParseSql(STscStmt* pStmt) { tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot)); STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR); } +*/ return TSDB_CODE_SUCCESS; } diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index cb914f8d6d..546075a1d9 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -170,11 +170,11 @@ CaseCfg gCase[] = { // 22 {"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, true, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1}, -// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, -// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, + {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, + {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, - {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2}, - {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2}, +// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2}, +// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2}, }; @@ -209,7 +209,7 @@ typedef struct { int32_t caseRunNum; // total run case num } CaseCtrl; -#if 0 +#if 1 CaseCtrl gCaseCtrl = { // default .bindNullNum = 0, .printCreateTblSql = false, @@ -267,7 +267,7 @@ CaseCtrl gCaseCtrl = { }; #endif -#if 1 +#if 0 CaseCtrl gCaseCtrl = { // query case with specified col&oper .bindNullNum = 1, .printCreateTblSql = false, @@ -292,7 +292,7 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper #if 0 CaseCtrl gCaseCtrl = { // query case with specified col&oper - .bindNullNum = 0, + .bindNullNum = 1, .printCreateTblSql = true, .printQuerySql = true, .printStmtSql = true, @@ -309,10 +309,10 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper .printRes = true, .runTimes = 0, .caseRunIdx = -1, - .optrIdxListNum = tListLen(optrIdxList), - .optrIdxList = optrIdxList, - .bindColTypeNum = tListLen(bindColTypeList), - .bindColTypeList = bindColTypeList, + //.optrIdxListNum = tListLen(optrIdxList), + //.optrIdxList = optrIdxList, + //.bindColTypeNum = tListLen(bindColTypeList), + //.bindColTypeList = bindColTypeList, .caseIdx = 24, .caseNum = 1, .caseRunNum = 1, @@ -665,15 +665,16 @@ void bpGenerateConstInFuncSQL(BindData *data, int32_t tblIdx) { void generateQueryMiscSQL(BindData *data, int32_t tblIdx) { - switch(tblIdx) { - case 0: - //TODO FILL TEST - default: - bpGenerateConstInOpSQL(data, tblIdx); - break; - case FUNCTION_TEST_IDX: - bpGenerateConstInFuncSQL(data, tblIdx); - break; + if (tblIdx == FUNCTION_TEST_IDX && gCurCase->bindNullNum <= 0) { + bpGenerateConstInFuncSQL(data, tblIdx); + } else { + switch(tblIdx) { + case 0: + //TODO FILL TEST + default: + bpGenerateConstInOpSQL(data, tblIdx); + break; + } } if (gCaseCtrl.printStmtSql) { @@ -1064,6 +1065,8 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) { if (tblIdx == FUNCTION_TEST_IDX) { gCaseCtrl.numericParam = true; + } else { + gCaseCtrl.numericParam = false; } for (int b = 0; b < bindNum; b++) { @@ -1072,8 +1075,10 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) { } } + gCaseCtrl.numericParam = false; + generateQueryMiscSQL(data, tblIdx); - + return 0; } From 3f0e89cabdfcc96b9f4dd2563a4cffe79c8f5190 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 17 May 2022 22:16:26 +0800 Subject: [PATCH 06/16] feat(tmq): refine commit offset api --- example/src/tmq.c | 5 +- include/client/taos.h | 4 +- source/client/src/tmq.c | 172 ++++++++++++++++++++---- source/dnode/mnode/impl/src/mndOffset.c | 2 + tests/test/c/tmqSim.c | 74 +++++----- 5 files changed, 189 insertions(+), 68 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index b4013f26ee..b79d21d051 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -167,7 +167,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.pass", "taosdata"); /*tmq_conf_set(conf, "td.connect.db", "abc1");*/ tmq_conf_set(conf, "msg.with.table.name", "true"); - tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); return tmq; @@ -176,6 +176,7 @@ tmq_t* build_consumer() { tmq_list_t* build_topic_list() { tmq_list_t* topic_list = tmq_list_new(); tmq_list_append(topic_list, "topic_ctb_column"); + /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ return topic_list; } @@ -190,7 +191,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { int32_t cnt = 0; /*clock_t startTime = clock();*/ while (running) { - TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 0); if (tmqmessage) { cnt++; /*printf("get data\n");*/ diff --git a/include/client/taos.h b/include/client/taos.h index 486d5f5fef..0194357841 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -233,6 +233,8 @@ DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t wait_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); +DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param); +DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets); #if 0 DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); @@ -251,7 +253,7 @@ typedef enum tmq_conf_res_t tmq_conf_res_t; DLL_EXPORT tmq_conf_t *tmq_conf_new(); DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); -DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); +DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 2c3f64c5c1..bc80a01c27 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -182,10 +182,13 @@ typedef struct { typedef struct { tmq_t* tmq; - int32_t async; + int8_t async; + int8_t automatic; + tmq_commit_cb* userCb; tsem_t rspSem; tmq_resp_err_t rspErr; SArray* offsets; + void* userParam; } SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { @@ -314,6 +317,135 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { return sprintf(dst, "%s:%d", topicName, vg); } +int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { + SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; + pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; + if (pParam->async) { + if (pParam->automatic && pParam->tmq->commitCb) { + pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, + pParam->tmq->commitCbUserParam); + } else if (!pParam->automatic && pParam->userCb) { + pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam); + } + + if (pParam->offsets) { + taosArrayDestroy(pParam->offsets); + } + + taosMemoryFree(pParam); + } else { + tsem_post(&pParam->rspSem); + } + return 0; +} + +int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async, + tmq_commit_cb* userCb, void* userParam) { + SMqCMCommitOffsetReq req; + SArray* pOffsets = NULL; + void* buf = NULL; + SMqCommitCbParam* pParam = NULL; + SMsgSendInfo* sendInfo = NULL; + + if (offsets == NULL) { + pOffsets = taosArrayInit(0, sizeof(SMqOffset)); + for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + SMqOffset offset; + tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN); + tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN); + offset.vgId = pVg->vgId; + offset.offset = pVg->currentOffset; + taosArrayPush(pOffsets, &offset); + } + } + } else { + pOffsets = (SArray*)&offsets->container; + } + + req.num = (int32_t)pOffsets->size; + req.offsets = pOffsets->pData; + + int32_t code; + int32_t tlen = 0; + tEncodeSize(tEncodeSMqCMCommitOffsetReq, &req, tlen, code); + if (code < 0) { + goto END; + } + code = -1; + + buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + goto END; + } + SEncoder encoder; + tEncoderInit(&encoder, buf, tlen); + tEncodeSMqCMCommitOffsetReq(&encoder, &req); + tEncoderClear(&encoder); + + pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam)); + if (pParam == NULL) { + goto END; + } + pParam->tmq = tmq; + pParam->automatic = automatic; + pParam->async = async; + pParam->offsets = pOffsets; + pParam->userCb = userCb; + pParam->userParam = userParam; + if (!async) tsem_init(&pParam->rspSem, 0, 0); + + sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) goto END; + sendInfo->msgInfo = (SDataBuf){ + .pData = buf, + .len = tlen, + .handle = NULL, + }; + + sendInfo->requestId = generateRequestId(); + sendInfo->requestObjRefId = 0; + sendInfo->param = pParam; + sendInfo->fp = tmqCommitCb; + sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET; + + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + + if (!async) { + tsem_wait(&pParam->rspSem); + code = pParam->rspErr; + tsem_destroy(&pParam->rspSem); + taosMemoryFree(pParam); + } + + // avoid double free if msg is sent + buf = NULL; + + code = 0; +END: + if (buf) taosMemoryFree(buf); + if (pParam) taosMemoryFree(pParam); + if (sendInfo) taosMemoryFree(sendInfo); + + if (code != 0 && async) { + if (automatic) { + tmq->commitCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam); + } else { + userCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, userParam); + } + } + + if (offsets == NULL) { + taosArrayDestroy(pOffsets); + } + return code; +} + void tmqAssignDelayedHbTask(void* param, void* tmrId) { tmq_t* tmq = (tmq_t*)param; int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); @@ -350,7 +482,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { tmqAskEp(tmq, true); taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - tmq_commit(tmq, NULL, true); + /*tmq_commit(tmq, NULL, true);*/ + tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else { @@ -385,32 +518,11 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; - tmq_t* tmq = pParam->tmq; + /*tmq_t* tmq = pParam->tmq;*/ tsem_post(&pParam->rspSem); return 0; } -int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { - SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; - pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; - if (pParam->tmq->commitCb) { - pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, NULL, pParam->tmq->commitCbUserParam); - } - if (!pParam->async) - tsem_post(&pParam->rspSem); - else { - if (pParam->offsets) { - taosArrayDestroy(pParam->offsets); - } - tsem_destroy(&pParam->rspSem); - /*if (pParam->pArray) {*/ - /*taosArrayDestroy(pParam->pArray);*/ - /*}*/ - taosMemoryFree(pParam); - } - return 0; -} - tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { if (*topics == NULL) { *topics = tmq_list_new(); @@ -541,6 +653,8 @@ FAIL: } tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { + return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam); +#if 0 // TODO: add read write lock SRequestObj* pRequest = NULL; tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS; @@ -627,6 +741,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in } return resp; +#endif } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { @@ -723,7 +838,7 @@ FAIL: return code; } -void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) { +void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) { // conf->commitCb = cb; conf->commitCbUserParam = param; @@ -1384,3 +1499,10 @@ const char* tmq_get_table_name(TAOS_RES* res) { } return NULL; } +DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) { + tmqCommitInner(tmq, offsets, 0, 1, cb, param); +} + +DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { + return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL); +} diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index a7d174c9c5..469bde0b38 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -196,6 +196,8 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) { } } + tDecoderClear(&decoder); + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("mq-commit-offset-trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index b3dba695a7..5cc3bb345a 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -37,10 +37,10 @@ typedef struct { TdThread thread; int32_t consumerId; - int32_t ifManualCommit; - //int32_t autoCommitIntervalMs; // 1000 ms - //char autoCommit[8]; // true, false - //char autoOffsetRest[16]; // none, earliest, latest + int32_t ifManualCommit; + // int32_t autoCommitIntervalMs; // 1000 ms + // char autoCommit[8]; // true, false + // char autoOffsetRest[16]; // none, earliest, latest int32_t ifCheckData; int64_t expectMsgCnt; @@ -99,21 +99,15 @@ static void printHelp() { } void initLogFile() { - time_t now; - struct tm curTime; - char filename[256]; + time_t now; + struct tm curTime; + char filename[256]; - now = taosTime(NULL); + now = taosTime(NULL); taosLocalTime(&now, &curTime); - sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", - configDir, - curTime.tm_year+1900, - curTime.tm_mon+1, - curTime.tm_mday, - curTime.tm_hour, - curTime.tm_min, - curTime.tm_sec); - //sprintf(filename, "%s/../log/tmqlog.txt", configDir); + sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900, + curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec); + // sprintf(filename, "%s/../log/tmqlog.txt", configDir); TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (NULL == pFile) { fprintf(stderr, "Failed to open %s for save result\n", filename); @@ -137,9 +131,9 @@ void saveConfigToLogFile() { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); - //taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); - //taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); - //taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); + // taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); + // taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); + // taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); taosFprintfFile(g_fp, " Topics: "); for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); @@ -234,17 +228,17 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) while (1) { TAOS_ROW row = taos_fetch_row(msg); - if (row == NULL) break; + if (row == NULL) break; - TAOS_FIELD* fields = taos_fetch_fields(msg); + TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); - + taos_print_row(buf, row, fields, numOfFields); - - if (0 != g_stConfInfo.showRowFlag) { + + if (0 != g_stConfInfo.showRowFlag) { taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); } - + totalRows++; } @@ -276,9 +270,9 @@ void build_consumer(SThreadInfo* pInfo) { tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); - //tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); - tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); // tmq_conf_set(conf, "group.id", "cgrp1"); for (int32_t i = 0; i < pInfo->numOfKey; i++) { @@ -299,7 +293,7 @@ void build_consumer(SThreadInfo* pInfo) { pInfo->tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); - + return; } @@ -323,7 +317,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); taosFprintfFile(g_fp, "== save result sql: %s \n", sqlStr); - + TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); @@ -354,11 +348,11 @@ void loop_consume(SThreadInfo* pInfo) { totalMsgs++; if (totalRows >= pInfo->expectMsgCnt) { - taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); + taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n"); break; } - } else { - taosFprintfFile(g_fp, "==== delay over time, so break\n"); + } else { + taosFprintfFile(g_fp, "==== delay over time, so break\n"); break; } } @@ -386,7 +380,7 @@ void* consumeThreadFunc(void* param) { pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - + tmq_list_destroy(pInfo->topicList); pInfo->topicList = NULL; @@ -394,17 +388,17 @@ void* consumeThreadFunc(void* param) { if (pInfo->ifManualCommit) { taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n"); - pPrint("tmq_commit() manual commit when consume end.\n"); + pPrint("tmq_commit() manual commit when consume end.\n"); tmq_commit(pInfo->tmq, NULL, 0); } - + err = tmq_unsubscribe(pInfo->tmq); if (err) { pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); pInfo->consumeMsgCnt = -1; return NULL; } - + err = tmq_consumer_close(pInfo->tmq); if (err) { pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); @@ -482,9 +476,9 @@ int32_t getConsumeInfo() { int32_t* lengths = taos_fetch_lengths(pRes); // set default value - //g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; - //memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); - //memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); + // g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; + // memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); + // memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); for (int i = 0; i < num_fields; ++i) { if (row[i] == NULL || 0 == i) { From e96ff087f2ea91a58e229c1916f6062cb7de7300 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 17 May 2022 22:22:39 +0800 Subject: [PATCH 07/16] fix: index failed to filter float/double data --- source/libs/index/src/indexComm.c | 131 +++++++++++++++++------------ source/libs/index/src/indexTfile.c | 7 +- source/libs/index/test/jsonUT.cc | 2 +- 3 files changed, 83 insertions(+), 57 deletions(-) diff --git a/source/libs/index/src/indexComm.c b/source/libs/index/src/indexComm.c index eea30bfb03..9d3e0a5cc9 100644 --- a/source/libs/index/src/indexComm.c +++ b/source/libs/index/src/indexComm.c @@ -20,12 +20,13 @@ #include "tcompare.h" #include "tdataformat.h" #include "ttypes.h" +#include "tvariant.h" char JSON_COLUMN[] = "JSON"; char JSON_VALUE_DELIM = '&'; char* indexInt2str(int64_t val, char* dst, int radix) { - char buffer[65]; + char buffer[65] = {0}; char* p; int64_t new_val; uint64_t uval = (uint64_t)val; @@ -74,28 +75,70 @@ static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return tCompare(func, QUERY_GREATER_EQUAL, a, b, type); } TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) { - if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR) { + if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR || dtype == TSDB_DATA_TYPE_VARBINARY) { return tDoCompare(func, cmptype, a, b); } #if 1 - int8_t bytes = tDataTypes[dtype].bytes; - if (bytes == 1) { - int8_t va = taosStr2int64(a); - int8_t vb = taosStr2int64(b); - return tDoCompare(func, cmptype, &va, &vb); - } else if (bytes == 2) { - int16_t va = taosStr2int64(a); - int16_t vb = taosStr2int64(b); - return tDoCompare(func, cmptype, &va, &vb); - } else if (bytes == 4) { - int32_t va = taosStr2int64(a); - int32_t vb = taosStr2int64(b); - return tDoCompare(func, cmptype, &va, &vb); - } else { + if (dtype == TSDB_DATA_TYPE_TIMESTAMP) { int64_t va = taosStr2int64(a); int64_t vb = taosStr2int64(b); return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_BOOL || dtype == TSDB_DATA_TYPE_UTINYINT) { + uint8_t va = taosStr2int64(a); + uint8_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_TINYINT) { + int8_t va = taosStr2int64(a); + int8_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_SMALLINT) { + int16_t va = taosStr2int64(a); + int16_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_USMALLINT) { + uint16_t va = taosStr2int64(a); + uint16_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_INT) { + int32_t va = taosStr2int64(a); + int32_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_UINT) { + uint32_t va = taosStr2int64(a); + uint32_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_BIGINT) { + int64_t va = taosStr2int64(a); + int64_t vb = taosStr2int64(b); + return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_UBIGINT) { + uint64_t va, vb; + if (0 != toUInteger(a, strlen(a), 10, &va) || 0 != toUInteger(b, strlen(b), 10, &vb)) { + return CONTINUE; + } + return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_FLOAT) { + float va = strtod(a, NULL); + if (errno == ERANGE && va == -1) { + return CONTINUE; + } + float vb = strtod(b, NULL); + if (errno == ERANGE && va == -1) { + return CONTINUE; + } + return tDoCompare(func, cmptype, &va, &vb); + } else if (dtype == TSDB_DATA_TYPE_DOUBLE) { + double va = strtod(a, NULL); + if (errno == ERANGE && va == -1) { + return CONTINUE; + } + double vb = strtod(b, NULL); + if (errno == ERANGE && va == -1) { + return CONTINUE; + } + return tDoCompare(func, cmptype, &va, &vb); } + assert(0); #endif } TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { @@ -248,20 +291,16 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) { break; } case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY -#if 1 tlen = taosEncodeBinary(NULL, src, strlen(src)); *dst = taosMemoryCalloc(1, tlen + 1); tlen = taosEncodeBinary(dst, src, strlen(src)); break; -#endif } case TSDB_DATA_TYPE_VARBINARY: -#if 1 tlen = taosEncodeBinary(NULL, src, strlen(src)); *dst = taosMemoryCalloc(1, tlen + 1); tlen = taosEncodeBinary(dst, src, strlen(src)); break; -#endif default: TASSERT(0); break; @@ -271,87 +310,73 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) { return tlen; } int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) { - int tlen = tDataTypes[type].bytes; - + int tlen = tDataTypes[type].bytes; + int32_t bufSize = 64; switch (type) { case TSDB_DATA_TYPE_TIMESTAMP: - *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + *dst = taosMemoryCalloc(1, bufSize + 1); indexInt2str(*(int64_t*)src, *dst, -1); break; case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_UTINYINT: - // tlen = taosEncodeFixedU8(NULL, *(uint8_t*)src); - //*dst = taosMemoryCalloc(1, tlen + 1); - // tlen = taosEncodeFixedU8(dst, *(uint8_t*)src); - *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + *dst = taosMemoryCalloc(1, bufSize + 1); indexInt2str(*(uint8_t*)src, *dst, 1); break; case TSDB_DATA_TYPE_TINYINT: - *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + *dst = taosMemoryCalloc(1, bufSize + 1); indexInt2str(*(int8_t*)src, *dst, 1); break; case TSDB_DATA_TYPE_SMALLINT: - *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + *dst = taosMemoryCalloc(1, bufSize + 1); indexInt2str(*(int16_t*)src, *dst, -1); break; case TSDB_DATA_TYPE_USMALLINT: - *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + *dst = taosMemoryCalloc(1, bufSize + 1); indexInt2str(*(uint16_t*)src, *dst, -1); break; case TSDB_DATA_TYPE_INT: - *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + *dst = taosMemoryCalloc(1, bufSize + 1); indexInt2str(*(int32_t*)src, *dst, -1); break; - case TSDB_DATA_TYPE_FLOAT: - tlen = taosEncodeBinary(NULL, src, sizeof(float)); - *dst = taosMemoryCalloc(1, tlen + 1); - tlen = taosEncodeBinary(dst, src, sizeof(float)); - *dst = *dst - tlen; - break; case TSDB_DATA_TYPE_UINT: - *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + *dst = taosMemoryCalloc(1, bufSize + 1); indexInt2str(*(uint32_t*)src, *dst, 1); break; case TSDB_DATA_TYPE_BIGINT: - *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); - indexInt2str(*(int64_t*)src, *dst, 1); - break; - case TSDB_DATA_TYPE_DOUBLE: - tlen = taosEncodeBinary(NULL, src, sizeof(double)); - *dst = taosMemoryCalloc(1, tlen + 1); - tlen = taosEncodeBinary(dst, src, sizeof(double)); - *dst = *dst - tlen; + *dst = taosMemoryCalloc(1, bufSize + 1); + sprintf(*dst, "%" PRIu64, *(uint64_t*)src); break; case TSDB_DATA_TYPE_UBIGINT: - assert(0); - *dst = taosMemoryCalloc(1, sizeof(int64_t) + 1); + *dst = taosMemoryCalloc(1, bufSize + 1); indexInt2str(*(uint64_t*)src, *dst, 1); + case TSDB_DATA_TYPE_FLOAT: + *dst = taosMemoryCalloc(1, bufSize + 1); + sprintf(*dst, "%.9lf", *(float*)src); + break; + case TSDB_DATA_TYPE_DOUBLE: + *dst = taosMemoryCalloc(1, bufSize + 1); + sprintf(*dst, "%.9lf", *(double*)src); break; case TSDB_DATA_TYPE_NCHAR: { tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src)); *dst = taosMemoryCalloc(1, tlen + 1); tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src)); *dst = *dst - tlen; - break; } case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY -#if 1 tlen = taosEncodeBinary(NULL, src, strlen(src)); *dst = taosMemoryCalloc(1, tlen + 1); tlen = taosEncodeBinary(dst, src, strlen(src)); *dst = *dst - tlen; break; -#endif } case TSDB_DATA_TYPE_VARBINARY: -#if 1 tlen = taosEncodeBinary(NULL, src, strlen(src)); *dst = taosMemoryCalloc(1, tlen + 1); tlen = taosEncodeBinary(dst, src, strlen(src)); *dst = *dst - tlen; break; -#endif default: TASSERT(0); break; diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 6c59986744..83ffe13f8c 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -20,6 +20,7 @@ p * #include "indexFstCountingWriter.h" #include "indexUtil.h" #include "taosdef.h" +#include "taoserror.h" #include "tcoding.h" #include "tcompare.h" @@ -533,10 +534,12 @@ TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const c tfileGenFileFullName(fullname, path, suid, colName, version); WriterCtx* wc = writerCtxCreate(TFile, fullname, true, 1024 * 1024 * 1024); - indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size); if (wc == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + indexError("failed to open readonly file: %s, reason: %s", fullname, terrstr()); return NULL; } + indexInfo("open read file name:%s, file size: %d", wc->file.buf, wc->file.size); TFileReader* reader = tfileReaderCreate(wc); return reader; @@ -613,9 +616,7 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { if (tfileWriteData(tw, v) != 0) { indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId)); - // printf("write faile\n"); } else { - // printf("write sucee\n"); // indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, // (int)taosArrayGetSize(v->tableId)); diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index ff349b9b24..135ae61e83 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -553,7 +553,7 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache_FLOAT) { float val = 2.0; std::string colName("test1"); for (int i = 0; i < 1000; i++) { - WriteData(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), i); + WriteData(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), i + 1000); } } { From 3a76f8303251157edc78d9cf601cee0ea4873da8 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 17 May 2022 22:31:11 +0800 Subject: [PATCH 08/16] fix: index failed to filter float/double data --- source/libs/index/src/indexTfile.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 83ffe13f8c..9533d3429e 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -473,16 +473,16 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR int32_t sz = 0; char* ch = (char*)fstSliceData(s, &sz); - char* tmp = taosMemoryCalloc(1, sz + 1); - memcpy(tmp, ch, sz); + // char* tmp = taosMemoryCalloc(1, sz + 1); + // memcpy(tmp, ch, sz); - if (0 != strncmp(tmp, p, skip)) { + if (0 != strncmp(ch, p, skip)) { swsResultDestroy(rt); - taosMemoryFree(tmp); + // taosMemoryFree(tmp); break; } - TExeCond cond = cmpFn(tmp + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType)); + TExeCond cond = cmpFn(ch + skip, tem->colVal, INDEX_TYPE_GET_TYPE(tem->colType)); if (MATCH == cond) { tfileReaderLoadTableIds((TFileReader*)reader, rt->out.out, tr->total); @@ -491,7 +491,7 @@ static int32_t tfSearchCompareFunc_JSON(void* reader, SIndexTerm* tem, SIdxTempR swsResultDestroy(rt); break; } - taosMemoryFree(tmp); + // taosMemoryFree(tmp); swsResultDestroy(rt); } streamWithStateDestroy(st); From 1cc0708d856afa2d670b046dcb27fc79c2139112 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 May 2022 22:42:11 +0800 Subject: [PATCH 09/16] fix(query): expand the capacity for aggregate operator. --- source/libs/executor/src/executorimpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1ba494a041..ae9f2822d4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4246,7 +4246,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - int32_t numOfRows = 10; + int32_t numOfRows = 1024; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, numOfRows); From 52f6a66ba550a9abd98758de3d4f00f2e4a2310e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 May 2022 22:58:17 +0800 Subject: [PATCH 10/16] fix(query): fix some syntax error. --- source/libs/function/src/builtinsimpl.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index cf67bcc7ec..de96526a25 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2533,7 +2533,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { } } } else { // computing based on the true data block - if (0 == pCtx->size) { + if (0 == pInput->numOfRows) { if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->end.key != INT64_MIN) { pInfo->min = pCtx->end.key; @@ -2552,7 +2552,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start); if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->start.key == INT64_MIN) { - pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max; + pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max; } else { pInfo->max = pCtx->start.key + 1; } @@ -2572,7 +2572,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { if (pCtx->end.key != INT64_MIN) { pInfo->max = pCtx->end.key + 1; } else { - pInfo->max = ptsList[pCtx->size - 1]; + pInfo->max = ptsList[start + pInput->numOfRows - 1]; } } } From 156b0cfebf94863a7edc96ebde6b62ab3b801d77 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 17 May 2022 23:33:59 +0800 Subject: [PATCH 11/16] feat: tsma logic optimization --- include/common/tmsg.h | 13 +++ source/dnode/vnode/src/inc/sma.h | 2 +- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/meta/metaEntry.c | 8 +- source/dnode/vnode/src/meta/metaQuery.c | 7 -- source/dnode/vnode/src/sma/smaTDBImpl.c | 14 +-- source/dnode/vnode/src/sma/smaTimeRange.c | 119 +++++++++------------- source/dnode/vnode/src/tsdb/tsdbOpen.c | 5 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +- source/dnode/vnode/src/tsdb/tsdbSma.c | 10 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 2 +- 11 files changed, 89 insertions(+), 99 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1a46a462b1..6af4325371 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -403,6 +403,19 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra return 0; } +static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) { + if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; + + pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema)); + if (pSW->pSchema == NULL) return -1; + for (int32_t i = 0; i < pSW->nCols; i++) { + if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; + } + + return 0; +} + STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols); typedef struct { diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index a58e1808a6..2efe600b3d 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -70,7 +70,7 @@ struct SSmaStatItem { * N.B. only applicable to tsma */ int8_t state; // ETsdbSmaStat - SHashObj *expiredWindows; // key: skey of time window, value: N/A + SHashObj *expiredWindows; // key: skey of time window, value: version STSma *pTSma; // cache schema }; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 93a25da0a8..1195f9e2b3 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -60,7 +60,7 @@ typedef struct { TSKEY minKey; } SRtn; -#define TSDB_DATA_DIR_LEN 6 +#define TSDB_DATA_DIR_LEN 6 // adapt accordingly struct STsdb { char *path; SVnode *pVnode; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index 84a8957771..ae915b26f9 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -56,8 +56,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { if (tDecodeCStr(pCoder, &pME->name) < 0) return -1; if (pME->type == TSDB_SUPER_TABLE) { - if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1; - if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; + if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1; + if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; } else if (pME->type == TSDB_CHILD_TABLE) { if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; @@ -67,9 +67,9 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; - if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1; + if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { - pME->smaEntry.tsma = taosMemoryCalloc(1, sizeof(STSma)); + pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma)); if(!pME->smaEntry.tsma) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 369f16b430..b76258035e 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -394,11 +394,6 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { goto _err; } - SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); - if (pCur == NULL) { - goto _err; - } - SMetaReader mr = {0}; metaReaderInit(&mr, pMeta, 0); int64_t smaId; @@ -442,12 +437,10 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { metaReaderClear(&mr); taosArrayDestroy(pSmaIds); - metaCloseSmaCursor(pCur); return pSW; _err: metaReaderClear(&mr); taosArrayDestroy(pSmaIds); - metaCloseSmaCursor(pCur); tdFreeTSmaWrapper(pSW, deepCopy); return NULL; } diff --git a/source/dnode/vnode/src/sma/smaTDBImpl.c b/source/dnode/vnode/src/sma/smaTDBImpl.c index 821ec44aa5..cb58d9c083 100644 --- a/source/dnode/vnode/src/sma/smaTDBImpl.c +++ b/source/dnode/vnode/src/sma/smaTDBImpl.c @@ -55,12 +55,13 @@ static inline int tdSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int } static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) { - int ret; tdb_cmpr_fn_t compFunc; // Create a database compFunc = tdSmaKeyCmpr; - ret = tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB); + if(tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) { + return -1; + } return 0; } @@ -76,7 +77,7 @@ int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF) { // Open DBF if (smaOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) { - terrno = TSDB_CODE_TDB_INIT_FAILED; + smaError("failed to open DBF: %s", pDBF->path); smaCloseDBDb(pDBF->pDB); return -1; } @@ -97,9 +98,10 @@ int32_t smaCloseDBF(SDBFile *pDBF) { int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) { int32_t ret; - ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); + printf("save tsma data into %s, keyLen:%d valLen:%d txn:%p\n", pDBF->path, keyLen, valLen, txn); + ret = tdbDbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); if (ret < 0) { - smaError("failed to create insert sma data into db, ret = %d", ret); + smaError("failed to upsert tsma data into db, ret = %d", ret); return -1; } @@ -113,7 +115,7 @@ void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_ ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen); if (ret < 0) { - smaError("failed to get sma data from db, ret = %d", ret); + smaError("failed to get tsma data from db, ret = %d", ret); return NULL; } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 357cf710a2..1d54d75ad5 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -16,21 +16,22 @@ #include "sma.h" #include "tsdb.h" +typedef STsdbCfg STSmaKeepCfg; + #undef _TEST_SMA_PRINT_DEBUG_LOG_ -#define SMA_STORAGE_TSDB_DAYS 30 -#define SMA_STORAGE_TSDB_TIMES 10 -#define SMA_STORAGE_SPLIT_HOURS 24 -#define SMA_KEY_LEN 16 // TSKEY+groupId 8+8 -#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds +#define SMA_STORAGE_TSDB_MINUTES 86400 +#define SMA_STORAGE_TSDB_TIMES 10 +#define SMA_STORAGE_SPLIT_FACTOR 144 // least records in tsma file +#define SMA_KEY_LEN 16 // TSKEY+groupId 8+8 +#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds #define SMA_STATE_ITEM_HASH_SLOT 32 - typedef struct { SSma *pSma; SDBFile dFile; const SArray *pDataBlocks; // sma data - int32_t interval; // interval with the precision of DB + int64_t interval; // interval with the precision of DB } STSmaWriteH; typedef struct { @@ -42,10 +43,10 @@ typedef struct { STsdb *pTsdb; SSma *pSma; SDBFile dFile; - int32_t interval; // interval with the precision of DB + int64_t interval; // interval with the precision of DB int32_t blockSize; // size of SMA block item + int32_t days; int8_t storageLevel; - int8_t days; SmaFsIter smaFsIter; } STSmaReadH; @@ -58,9 +59,9 @@ typedef enum { // static func static int64_t tdGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted); -static int32_t tdGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); +static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval); static int32_t tdInitTSmaWriteH(STSmaWriteH *pSmaH, SSma *pSma, const SArray *pDataBlocks, int64_t interval, - int8_t intervalUnit); + int8_t intervalUnit); static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit); static void tdDestroyTSmaWriteH(STSmaWriteH *pSmaH); static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel); @@ -92,9 +93,10 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid); * @return int32_t */ static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit) { + STSmaKeepCfg *pCfg = SMA_TSDB_CFG(pSma); pSmaH->pSma = pSma; pSmaH->interval = tdGetIntervalByPrecision(interval, intervalUnit, SMA_TSDB_CFG(pSma)->precision, true); - pSmaH->storageLevel = tdGetSmaStorageLevel(interval, intervalUnit); + pSmaH->storageLevel = tdGetSmaStorageLevel(pCfg, interval); pSmaH->days = tdGetTSmaDays(pSma, pSmaH->interval, pSmaH->storageLevel); return TSDB_CODE_SUCCESS; } @@ -275,11 +277,13 @@ static int32_t tdSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t f */ static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel) { STsdbCfg *pCfg = SMA_TSDB_CFG(pSma); - int32_t daysPerFile = pCfg->days; + int32_t daysPerFile = pCfg->days; // unit is minute if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { - int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerMin[pCfg->precision]); - daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS; + int32_t minutes = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerMin[pCfg->precision]); + if (minutes > SMA_STORAGE_TSDB_MINUTES) { + daysPerFile = SMA_STORAGE_TSDB_MINUTES; + } } return daysPerFile; @@ -288,45 +292,14 @@ static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel) /** * @brief Judge the tSma storage level * + * @param pCfg * @param interval - * @param intervalUnit * @return int32_t */ -static int32_t tdGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { - // TODO: configurable for SMA_STORAGE_SPLIT_HOURS? - switch (intervalUnit) { - case TIME_UNIT_HOUR: - if (interval < SMA_STORAGE_SPLIT_HOURS) { - return SMA_STORAGE_LEVEL_DFILESET; - } - break; - case TIME_UNIT_MINUTE: - if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) { - return SMA_STORAGE_LEVEL_DFILESET; - } - break; - case TIME_UNIT_SECOND: - if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) { - return SMA_STORAGE_LEVEL_DFILESET; - } - break; - case TIME_UNIT_MILLISECOND: - if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) { - return SMA_STORAGE_LEVEL_DFILESET; - } - break; - case TIME_UNIT_MICROSECOND: - if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) { - return SMA_STORAGE_LEVEL_DFILESET; - } - break; - case TIME_UNIT_NANOSECOND: - if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) { - return SMA_STORAGE_LEVEL_DFILESET; - } - break; - default: - break; +static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval) { + int64_t mInterval = convertTimeFromPrecisionToUnit(interval, pCfg->precision, TIME_UNIT_MINUTE); + if (pCfg->days / mInterval >= SMA_STORAGE_SPLIT_FACTOR) { + return SMA_STORAGE_LEVEL_DFILESET; } return SMA_STORAGE_LEVEL_TSDB; } @@ -346,6 +319,7 @@ static int32_t tdGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { STsdbCfg *pCfg = SMA_TSDB_CFG(pSma); const SArray *pDataBlocks = (const SArray *)msg; + int64_t testSkey = TSKEY_INITIAL_VAL; // TODO: destroy SSDataBlocks(msg) @@ -403,8 +377,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { } // Step 1: Judge the storage level and days - int32_t storageLevel = tdGetSmaStorageLevel(pTSma->interval, pTSma->intervalUnit); - int32_t daysPerFile = tdGetTSmaDays(pSma, tSmaH.interval, storageLevel); + int32_t storageLevel = tdGetSmaStorageLevel(pCfg, tSmaH.interval); + int32_t minutePerFile = tdGetTSmaDays(pSma, tSmaH.interval, storageLevel); char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer? @@ -432,6 +406,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { if (!isStartKey) { isStartKey = true; skey = *(TSKEY *)var; + testSkey = skey; printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId); tdEncodeTSmaKey(groupId, skey, &pSmaKey); } else { @@ -503,9 +478,10 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { break; } } + printf("\n"); // if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) { if (tlen > 0) { - int32_t fid = (int32_t)(TSDB_KEY_FID(skey, daysPerFile, pCfg->precision)); + int32_t fid = (int32_t)(TSDB_KEY_FID(skey, minutePerFile, pCfg->precision)); // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index // file @@ -517,6 +493,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { smaCloseDBF(&tSmaH.dFile); } tdSetTSmaDataFile(&tSmaH, indexUid, fid); + smaDebug("@@@ vgId:%d write to DBF %s, days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi32 " queryKey:%" PRIi64, + SMA_VID(pSma), tSmaH.dFile.path, minutePerFile, tSmaH.interval, storageLevel, testSkey); if (smaOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) { smaWarn("vgId:%d open DB file %s failed since %s", SMA_VID(pSma), tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); @@ -528,16 +506,17 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { } if (tdInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) { - smaWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 - " since %s", - SMA_VID(pSma), indexUid, skey, groupId, tstrerror(terrno)); + smaWarn("vgId:%d insert tsma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 + " since %s", + SMA_VID(pSma), indexUid, skey, groupId, tstrerror(terrno)); tdSmaEndCommit(pEnv); tdDestroyTSmaWriteH(&tSmaH); tdUnRefSmaStat(pSma, pStat); return TSDB_CODE_FAILED; } - smaDebug("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64, - SMA_VID(pSma), indexUid, skey, groupId); + + smaDebug("vgId:%d insert tsma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64, + SMA_VID(pSma), indexUid, skey, groupId); // TODO:tsdbEndTSmaCommit(); // Step 3: reset the SSmaStat @@ -547,7 +526,6 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { SMA_VID(pSma), skey, tlen, indexUid); } - printf("\n"); } } tdSmaEndCommit(pEnv); // TODO: not commit for every insert @@ -579,14 +557,14 @@ static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyL TXN *txn) { SDBFile *pDBFile = &pSmaH->dFile; - // TODO: insert sma data blocks into B+Tree(TDB) + // TODO: insert tsma data blocks into B+Tree(TDB) if (smaSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) { - smaWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail", - SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); + smaWarn("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail", + SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); return TSDB_CODE_FAILED; } - smaDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed", - SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); + smaDebug("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed", + SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); #ifdef _TEST_SMA_PRINT_DEBUG_LOG_ uint32_t valueSize = 0; @@ -776,6 +754,8 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query tdUnRefSmaStat(pSma, pStat); tdInitTSmaFile(&tReadH, indexUid, querySKey); + smaDebug("### vgId:%d read from DBF %s days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi8 " queryKey:%" PRIi64, + SMA_VID(pSma), tReadH.dFile.path, tReadH.days, tReadH.interval, tReadH.storageLevel, querySKey); if (smaOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) { smaWarn("vgId:%d open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno)); return TSDB_CODE_FAILED; @@ -783,7 +763,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query char smaKey[SMA_KEY_LEN] = {0}; void *pSmaKey = &smaKey; - int64_t queryGroupId = 1; + int64_t queryGroupId = 0; tdEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey); smaDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma), @@ -915,8 +895,8 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; taosHashCleanup(pItem->expiredWindows); taosMemoryFree(pItem); - smaWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, - tstrerror(terrno)); + smaWarn("vgId:%d set expire window, get tsma meta failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), + indexUid, tstrerror(terrno)); return TSDB_CODE_FAILED; } pItem->pTSma = pTSma; @@ -1021,7 +1001,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version) pSW = tdFreeTSmaWrapper(pSW, false); break; } - if (!pSW || (pTSma->tableUid != msgIter.suid)) { + if (!pSW || (pTSma && (pTSma->tableUid != msgIter.suid))) { if (pSW) { pSW = tdFreeTSmaWrapper(pSW, false); } @@ -1043,6 +1023,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version) interval.slidingUnit = pTSma->slidingUnit; } + // TODO: process multiple tsma for one table uid TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision); if (lastWinSKey != winSKey) { diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index 8e689fc185..cf38a30a4c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -42,7 +42,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee int slen = 0; *ppTsdb = NULL; - slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + TSDB_DATA_DIR_LEN + 3; + slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + 3; // create handle pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen); @@ -73,7 +73,8 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee goto _err; } - tsdbDebug("vgId:%d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path); + tsdbDebug("vgId:%d tsdb is opened for %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days, + pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2); *ppTsdb = pTsdb; return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 254d452cb3..90adba6f4d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -372,13 +372,13 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, } if (level == TSDB_RETENTION_L0) { - tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L0); + tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L0); return VND_RSMA0(pVnode); } else if (level == TSDB_RETENTION_L1) { - tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L1); + tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L1); return VND_RSMA1(pVnode); } else { - tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L2); + tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L2); return VND_RSMA2(pVnode); } } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 1589513110..dc782cc022 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -876,13 +876,13 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t ke TXN *txn) { SDBFile *pDBFile = &pSmaH->dFile; - // TODO: insert sma data blocks into B+Tree(TDB) + // TODO: insert tsma data blocks into B+Tree(TDB) if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) { - tsdbWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail", + tsdbWarn("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail", REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); return TSDB_CODE_FAILED; } - tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed", + tsdbDebug("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed", REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); #ifdef _TEST_SMA_PRINT_DEBUG_LOG_ @@ -1245,7 +1245,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char } if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) { - tsdbWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 + tsdbWarn("vgId:%d insert tsma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno)); tsdbSmaEndCommit(pEnv); @@ -1253,7 +1253,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; } - tsdbDebug("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64, + tsdbDebug("vgId:%d insert tsma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64, REPO_ID(pTsdb), indexUid, skey, groupId); // TODO:tsdbEndTSmaCommit(); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index d44e30988d..739f7f9fa3 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -103,7 +103,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open sma if (smaOpen(pVnode)) { - vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); + vError("vgId:%d failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } From 9ff2228df188be752764a8c815d96139ef9704b2 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 18 May 2022 06:00:16 +0800 Subject: [PATCH 12/16] fix: alloc enough buf for tsdb --- source/dnode/vnode/src/tsdb/tsdbOpen.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbOpen.c b/source/dnode/vnode/src/tsdb/tsdbOpen.c index cf38a30a4c..180eea3237 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOpen.c +++ b/source/dnode/vnode/src/tsdb/tsdbOpen.c @@ -42,7 +42,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee int slen = 0; *ppTsdb = NULL; - slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + 3; + slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + 3; // create handle pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen); From d588d47bdb054dc78ebcd392e6a7d2aed9e44e6d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 17 May 2022 20:35:19 +0800 Subject: [PATCH 13/16] ci(stream): add test --- tests/script/tsim/tstream/basic1.sim | 88 ++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim index 37f9cb94c9..cb084ad537 100644 --- a/tests/script/tsim/tstream/basic1.sim +++ b/tests/script/tsim/tstream/basic1.sim @@ -372,4 +372,92 @@ if $data25 != 3 then return -1 endi +sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 0 +if $data01 != 4 then + print ======$data01 + return -1 +endi + +if $data02 != 4 then + print ======$data02 + return -1 +endi + +if $data03 != 14 then + print ======$data03 + return -1 +endi + +if $data04 != 4 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 4 then + print ======$data11 + # return -1 +endi + +if $data12 != 4 then + print ======$data12 + # return -1 +endi + +if $data13 != 10 then + print ======$data13 + # return -1 +endi + +if $data14 != 3 then + print ======$data14 + # return -1 +endi + +if $data15 != 1 then + print ======$data15 + # return -1 +endi + +# row 2 +if $data21 != 4 then + print ======$data21 + # return -1 +endi + +if $data22 != 4 then + print ======$data22 + # return -1 +endi + +if $data23 != 15 then + print ======$data23 + # return -1 +endi + +if $data24 != 4 then + print ======$data24 + # return -1 +endi + +if $data25 != 3 then + print ======$data25 + # return -1 +endi + + + system sh/exec.sh -n dnode1 -s stop -x SIGINT From 67b0fd9261e13783fc9388a3820737531bbc9e5f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 18 May 2022 09:25:00 +0800 Subject: [PATCH 14/16] fix: wrong db status --- source/dnode/mnode/impl/src/mndTopic.c | 20 +++++++++---------- .../script/tsim/tmq/prepareBasicEnv-1vgrp.sim | 2 +- .../script/tsim/tmq/prepareBasicEnv-4vgrp.sim | 2 +- tests/script/tsim/tmq/topic.sim | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e99046a32d..3f559cb6c0 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -502,16 +502,16 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); - if (pTopic == NULL) { - if (dropReq.igNotExists) { - mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); - return 0; - } else { - terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; - } - } + // if (pTopic == NULL) { + // if (dropReq.igNotExists) { + // mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); + // return 0; + // } else { + // terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + // mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); + // return -1; + // } + // } if (pTopic->refConsumerCnt != 0) { mndReleaseTopic(pMnode, pTopic); diff --git a/tests/script/tsim/tmq/prepareBasicEnv-1vgrp.sim b/tests/script/tsim/tmq/prepareBasicEnv-1vgrp.sim index fbdea96f93..43b93b7372 100644 --- a/tests/script/tsim/tmq/prepareBasicEnv-1vgrp.sim +++ b/tests/script/tsim/tmq/prepareBasicEnv-1vgrp.sim @@ -39,7 +39,7 @@ sql show databases print ==> rows: $rows print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] -if $data(db)[19] != nostrict then +if $data(db)[19] != ready then sleep 100 $loop_cnt = $loop_cnt + 1 goto check_db_ready diff --git a/tests/script/tsim/tmq/prepareBasicEnv-4vgrp.sim b/tests/script/tsim/tmq/prepareBasicEnv-4vgrp.sim index 8b5573486d..5e81137ffa 100644 --- a/tests/script/tsim/tmq/prepareBasicEnv-4vgrp.sim +++ b/tests/script/tsim/tmq/prepareBasicEnv-4vgrp.sim @@ -39,7 +39,7 @@ sql show databases print ==> rows: $rows print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] -if $data(db)[19] != nostrict then +if $data(db)[19] != ready then sleep 100 $loop_cnt = $loop_cnt + 1 goto check_db_ready diff --git a/tests/script/tsim/tmq/topic.sim b/tests/script/tsim/tmq/topic.sim index f1dbf98bb0..9add349a18 100644 --- a/tests/script/tsim/tmq/topic.sim +++ b/tests/script/tsim/tmq/topic.sim @@ -31,7 +31,7 @@ sql show databases print ==> rows: $rows print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] -if $data(db)[19] != nostrict then +if $data(db)[19] != ready then sleep 100 $loop_cnt = $loop_cnt + 1 goto check_db_ready From 5c2c2b378b3040b7ba808a6ef2873dd2d62a9d4d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 18 May 2022 10:15:29 +0800 Subject: [PATCH 15/16] fix(query): fix the invalid column length value during spill out the group data into disk. --- source/common/src/tdatablock.c | 29 ++++++++++++++++++++---- source/libs/executor/src/groupoperator.c | 28 ++++++++++++++++------- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 43dcf2dfa9..4d77f4eb71 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -600,10 +600,11 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { } int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { - pBlock->info.rows = *(int32_t*)buf; + pBlock->info.rows = *(int32_t*) buf; + pBlock->info.groupId = *(uint64_t*) (buf + sizeof(int32_t)); int32_t numOfCols = pBlock->info.numOfCols; - const char* pStart = buf + sizeof(uint32_t); + const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); @@ -669,7 +670,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t); } -double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { +double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { ASSERT(pBlock != NULL); double rowSize = 0; @@ -1224,7 +1225,27 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { - return (int32_t)((pageSize - blockDataGetSerialMetaSize(pBlock)) / blockDataGetSerialRowSize(pBlock)); + int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(pBlock); + + int32_t rowSize = pBlock->info.rowSize; + + int32_t nRows = payloadSize / rowSize; + + // the true value must be less than the value of nRows + int32_t additional = 0; + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + additional += nRows * sizeof(int32_t); + } else { + additional += BitmapLen(nRows); + } + } + + int32_t newRows = (payloadSize - additional) / rowSize; + ASSERT(newRows <= nRows && newRows > 1); + + return newRows; } void colDataDestroy(SColumnInfoData* pColData) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 76a773a095..b1e19213dd 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -396,8 +396,11 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { pGInfo->groupId = calcGroupId(pInfo->keyBuf, len); } + // number of rows int32_t* rows = (int32_t*) pPage; + // group id + size_t numOfCols = pOperator->numOfExprs; for(int32_t i = 0; i < numOfCols; ++i) { SExprInfo* pExpr = &pOperator->pExpr[i]; @@ -408,13 +411,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t bytes = pColInfoData->info.bytes; int32_t startOffset = pInfo->columnOffset[i]; - char* columnLen = NULL; - int32_t contentLen = 0; + int32_t* columnLen = NULL; + int32_t contentLen = 0; if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { int32_t* offset = (int32_t*)((char*)pPage + startOffset); - columnLen = (char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity; - char* data = (char*)(columnLen + sizeof(int32_t)); + columnLen = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity); + char* data = (char*)((char*) columnLen + sizeof(int32_t)); if (colDataIsNull_s(pColInfoData, j)) { offset[(*rows)] = -1; @@ -423,11 +426,15 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { offset[*rows] = (*columnLen); char* src = colDataGetData(pColInfoData, j); memcpy(data + (*columnLen), src, varDataTLen(src)); + int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); + ASSERT(v > 0); + printf("len:%d\n", v); + contentLen = varDataTLen(src); } } else { char* bitmap = (char*)pPage + startOffset; - columnLen = (char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity); + columnLen = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity)); char* data = (char*) columnLen + sizeof(int32_t); bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); @@ -440,6 +447,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } (*columnLen) += contentLen; + ASSERT(*columnLen >= 0); } (*rows) += 1; @@ -476,7 +484,12 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); taosArrayPush(p->pPageList, &pageId); - *(int32_t*) pPage = 0; +// // number of rows +// *(int32_t*) pPage = 0; +// +// uint64_t* groupId = (pPage + sizeof(int32_t)); +// *groupId = 0; + memset(pPage, 0, getBufPageSize(pInfo->pBuf)); } } @@ -500,7 +513,7 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { size_t numOfCols = pBlock->info.numOfCols; int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t)); - offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format + offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format for(int32_t i = 0; i < numOfCols - 1; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); @@ -571,7 +584,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); doHashPartition(pOperator, pBlock); } From d5c30f5ea5b2eb487a20aa89bfa3641f6efa3b7c Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 18 May 2022 10:29:32 +0800 Subject: [PATCH 16/16] fix: release udf func handle when udf agg failure --- source/libs/function/src/tudf.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 4322705c13..5c6209dc4d 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1386,7 +1386,7 @@ int32_t cleanUpUdfs() { if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { taosArrayPush(udfStubs, stub); } else { - fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", + fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", stub->udfName, stub->refCount, stub->lastRefTime); } } @@ -1602,6 +1602,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); + releaseUdfFuncHandle(pCtx->udfName); return false; } udfRes->interResNum = buf.numOfResult; @@ -1609,6 +1610,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); } else { fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); + releaseUdfFuncHandle(pCtx->udfName); return false; } freeUdfInterBuf(&buf); @@ -1674,6 +1676,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { blockDataDestroy(inputBlock); taosArrayDestroy(tempBlock.pDataBlock); + if (udfCode != 0) { + releaseUdfFuncHandle(pCtx->udfName); + } freeUdfInterBuf(&newState); return udfCode; }