From aefa9bd89186464dc02a7a810c4e2f6afed86dcb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 May 2022 16:53:55 +0800 Subject: [PATCH 01/17] refactor: do some internal refactor. --- include/libs/function/function.h | 38 +- source/libs/executor/src/executorimpl.c | 16 +- source/libs/function/src/builtinsimpl.c | 23 +- source/libs/function/src/taggfunction.c | 652 +----------------------- 4 files changed, 36 insertions(+), 693 deletions(-) diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 616aec8c02..7d3e969c41 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -170,32 +170,18 @@ typedef struct SInputColumnInfoData { // sql function runtime context typedef struct SqlFunctionCtx { - SInputColumnInfoData input; - SResultDataInfo resDataInfo; - uint32_t order; // data block scanner order: asc|desc - uint8_t scanFlag; // record current running step, default: 0 - //////////////////////////////////////////////////////////////// - int32_t startRow; // start row index - int32_t size; // handled processed row number - SColumnInfoData* pInput; - SColumnDataAgg agg; - int16_t inputType; // TODO remove it - int16_t inputBytes; // TODO remove it - bool hasNull; // null value exist in current block, TODO remove it - bool requireNull; // require null in some function, TODO remove it - int32_t columnIndex; // TODO remove it - bool isAggSet; - int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it - bool stableQuery; - ///////////////////////////////////////////////////////////////// - int16_t functionId; // function id - char * pOutput; // final result output buffer, point to sdata->data - int32_t numOfParams; - SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param - int64_t *ptsList; // corresponding timestamp array list - SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ - int32_t offset; - SVariant tag; + SInputColumnInfoData input; + SResultDataInfo resDataInfo; + uint32_t order; // data block scanner order: asc|desc + uint8_t scanFlag; // record current running step, default: 0 + int16_t functionId; // function id + char *pOutput; // final result output buffer, point to sdata->data + int32_t numOfParams; + SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param + int64_t *ptsList; // corresponding timestamp array list + SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ + int32_t offset; + SVariant tag; struct SResultRowEntryInfo *resultInfo; SSubsidiaryResInfo subsidiaries; SPoint1 start; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c261271728..1ba494a041 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -602,8 +602,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { - pCtx[k].startTs = pWin->skey; - // keep it temporarily bool hasAgg = pCtx[k].input.colDataAggIsSet; int32_t numOfRows = pCtx[k].input.numOfRows; @@ -619,8 +617,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow // not a whole block involved in query processing, statistics data can not be used // NOTE: the original value of isSet have been changed here - if (pCtx[k].isAggSet && forwardStep < numOfTotal) { - pCtx[k].isAggSet = false; + if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) { + pCtx[k].input.colDataAggIsSet = false; } if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) { @@ -680,7 +678,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC int32_t order) { for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; - pCtx[i].size = pBlock->info.rows; + pCtx[i].input.numOfRows = pBlock->info.rows; setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock); } } @@ -742,7 +740,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; - pCtx[i].size = pBlock->info.rows; + pCtx[i].input.numOfRows = pBlock->info.rows; + pCtx[i].pSrcBlock = pBlock; pCtx[i].scanFlag = scanFlag; @@ -827,7 +826,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { - pCtx[k].startTs = startTs; // todo add a dummy funtion to avoid process check if (pCtx[k].fpSet.process != NULL) { int32_t code = pCtx[k].fpSet.process(&pCtx[k]); @@ -3330,7 +3328,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3 static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr, int32_t rowIndex) { for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index - pCtx[j].startRow = rowIndex; +// pCtx[j].startRow = rowIndex; } for (int32_t j = 0; j < numOfExpr; ++j) { @@ -3381,7 +3379,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock SqlFunctionCtx* pCtx = pInfo->binfo.pCtx; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - pCtx[i].size = 1; +// pCtx[i].size = 1; } for (int32_t i = 0; i < pBlock->info.rows; ++i) { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index af86eb4e90..4571631bc0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -287,6 +287,7 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) { } return numOfElem; } + /* * count function does need the finalize, if data is missing, the default value, which is 0, is used * count function does not use the pCtx->interResBuf to keep the intermediate buffer @@ -781,16 +782,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { int64_t val = GET_INT64_VAL(tval); if ((prev < val) ^ isMinFunc) { pBuf->v = val; - // for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { - // SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; - // if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor - // __ctx->tag.i = key; - // __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; - // } - // - // __ctx->fpSet.process(__ctx); - // } - if (pCtx->subsidiaries.num > 0) { saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } @@ -803,15 +794,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { uint64_t val = GET_UINT64_VAL(tval); if ((prev < val) ^ isMinFunc) { pBuf->v = val; - // for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) { - // SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i]; - // if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor - // __ctx->tag.i = key; - // __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; - // } - // - // __ctx->fpSet.process(__ctx); - // } if (pCtx->subsidiaries.num > 0) { saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } @@ -823,7 +805,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { double val = GET_DOUBLE_VAL(tval); if ((prev < val) ^ isMinFunc) { pBuf->v = val; - if (pCtx->subsidiaries.num > 0) { saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); } @@ -1703,7 +1684,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pCol, i); double v = 0; - GET_TYPED_DATA(v, double, pCtx->inputType, data); + GET_TYPED_DATA(v, double, type, data); if (v < GET_DOUBLE_VAL(&pInfo->minval)) { SET_DOUBLE_VAL(&pInfo->minval, v); } diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index b6d5d38c9e..950655e480 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -480,7 +480,7 @@ static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize); return true; } - +#if 0 /** * in handling the stable query, function_finalizer is called after the secondary * merge being completed, during the first merge procedure, which is executed at the @@ -497,53 +497,6 @@ static void function_finalizer(SqlFunctionCtx *pCtx) { doFinalizer(pCtx); } -/* - * count function does need the finalize, if data is missing, the default value, which is 0, is used - * count function does not use the pCtx->interResBuf to keep the intermediate buffer - */ -static void count_function(SqlFunctionCtx *pCtx) { - int32_t numOfElem = 0; - - /* - * 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true; - * 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true; - * 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false; - */ - if (pCtx->isAggSet) { - numOfElem = pCtx->size - pCtx->agg.numOfNull; - } else { - if (pCtx->hasNull) { - for (int32_t i = 0; i < pCtx->size; ++i) { - char *val = GET_INPUT_DATA(pCtx, i); - if (isNull(val, pCtx->inputType)) { - continue; - } - - numOfElem += 1; - } - } else { - //when counting on the primary time stamp column and no statistics data is presented, use the size value directly. - numOfElem = pCtx->size; - } - } - - if (numOfElem > 0) { -// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; - } - - *((int64_t *)pCtx->pOutput) += numOfElem; - SET_VAL(pCtx, numOfElem, 1); -} - -static void count_func_merge(SqlFunctionCtx *pCtx) { - int64_t *pData = (int64_t *)GET_INPUT_DATA_LIST(pCtx); - for (int32_t i = 0; i < pCtx->size; ++i) { - *((int64_t *)pCtx->pOutput) += pData[i]; - } - - SET_VAL(pCtx, pCtx->size, 1); -} - /** * 1. If the column value for filter exists, we need to load the SFields, which serves * as the pre-filter to decide if the actual data block is required or not. @@ -633,113 +586,6 @@ int32_t noDataRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { LOOPCHECK_N(*_data, _list, ctx, tsdbType, sign, notNullElems); \ } while (0) -static void do_sum(SqlFunctionCtx *pCtx) { - int32_t notNullElems = 0; - - // Only the pre-computing information loaded and actual data does not loaded - if (pCtx->isAggSet) { - notNullElems = pCtx->size - pCtx->agg.numOfNull; - assert(pCtx->size >= pCtx->agg.numOfNull); - - if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { - int64_t *retVal = (int64_t *)pCtx->pOutput; - *retVal += pCtx->agg.sum; - } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { - uint64_t *retVal = (uint64_t *)pCtx->pOutput; - *retVal += (uint64_t)pCtx->agg.sum; - } else if (IS_FLOAT_TYPE(pCtx->inputType)) { - double *retVal = (double*) pCtx->pOutput; - SET_DOUBLE_VAL(retVal, *retVal + GET_DOUBLE_VAL((const char*)&(pCtx->agg.sum))); - } - } else { // computing based on the true data block - void *pData = GET_INPUT_DATA_LIST(pCtx); - notNullElems = 0; - - if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { - int64_t *retVal = (int64_t *)pCtx->pOutput; - - if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { - LIST_ADD_N(*retVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { - LIST_ADD_N(*retVal, pCtx, pData, int16_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_INT) { - LIST_ADD_N(*retVal, pCtx, pData, int32_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) { - LIST_ADD_N(*retVal, pCtx, pData, int64_t, notNullElems, pCtx->inputType); - } - } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { - uint64_t *retVal = (uint64_t *)pCtx->pOutput; - - if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) { - LIST_ADD_N(*retVal, pCtx, pData, uint8_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) { - LIST_ADD_N(*retVal, pCtx, pData, uint16_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) { - LIST_ADD_N(*retVal, pCtx, pData, uint32_t, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) { - LIST_ADD_N(*retVal, pCtx, pData, uint64_t, notNullElems, pCtx->inputType); - } - } else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - double *retVal = (double *)pCtx->pOutput; - LIST_ADD_N_DOUBLE(*retVal, pCtx, pData, double, notNullElems, pCtx->inputType); - } else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) { - double *retVal = (double *)pCtx->pOutput; - LIST_ADD_N_DOUBLE_FLOAT(*retVal, pCtx, pData, float, notNullElems, pCtx->inputType); - } - } - - // data in the check operation are all null, not output - SET_VAL(pCtx, notNullElems, 1); - - if (notNullElems > 0) { -// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; - } -} - -static void sum_function(SqlFunctionCtx *pCtx) { - do_sum(pCtx); - - // keep the result data in output buffer, not in the intermediate buffer - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); -// if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) { - // set the flag for super table query - SSumInfo *pSum = (SSumInfo *)pCtx->pOutput; - pSum->hasResult = DATA_SET_FLAG; -// } -} - -static void sum_func_merge(SqlFunctionCtx *pCtx) { - int32_t notNullElems = 0; - - GET_TRUE_DATA_TYPE(); - assert(pCtx->stableQuery); - - for (int32_t i = 0; i < pCtx->size; ++i) { - char * input = GET_INPUT_DATA(pCtx, i); - SSumInfo *pInput = (SSumInfo *)input; - if (pInput->hasResult != DATA_SET_FLAG) { - continue; - } - - notNullElems++; - - if (IS_SIGNED_NUMERIC_TYPE(type)) { - *(int64_t *)pCtx->pOutput += pInput->isum; - } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { - *(uint64_t *) pCtx->pOutput += pInput->usum; - } else { - SET_DOUBLE_VAL((double *)pCtx->pOutput, *(double *)pCtx->pOutput + pInput->dsum); - } - } - - SET_VAL(pCtx, notNullElems, 1); - SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - - if (notNullElems > 0) { - //pResInfo->hasResult = DATA_SET_FLAG; - } -} - static int32_t statisRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { return BLK_DATA_SMA_LOAD; } @@ -822,17 +668,17 @@ static int32_t lastDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_ */ static void avg_function(SqlFunctionCtx *pCtx) { int32_t notNullElems = 0; - + // NOTE: keep the intermediate result into the interResultBuf SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - + SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo); double *pVal = &pAvgInfo->sum; - + if (pCtx->isAggSet) { // Pre-aggregation notNullElems = pCtx->size - pCtx->agg.numOfNull; assert(notNullElems >= 0); - + if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) { *pVal += pCtx->agg.sum; } else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) { @@ -842,7 +688,7 @@ static void avg_function(SqlFunctionCtx *pCtx) { } } else { void *pData = GET_INPUT_DATA_LIST(pCtx); - + if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) { LIST_ADD_N(*pVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType); } else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) { @@ -865,18 +711,18 @@ static void avg_function(SqlFunctionCtx *pCtx) { LIST_ADD_N(*pVal, pCtx, pData, uint64_t, notNullElems, pCtx->inputType); } } - + if (!pCtx->hasNull) { assert(notNullElems == pCtx->size); } - + SET_VAL(pCtx, notNullElems, 1); pAvgInfo->num += notNullElems; - + if (notNullElems > 0) { //pResInfo->hasResult = DATA_SET_FLAG; } - + // keep the data into the final output buffer for super table query since this execution may be the last one if (pCtx->stableQuery) { memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo)); @@ -885,18 +731,18 @@ static void avg_function(SqlFunctionCtx *pCtx) { static void avg_func_merge(SqlFunctionCtx *pCtx) { SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); - + double *sum = (double*) pCtx->pOutput; char *input = GET_INPUT_DATA_LIST(pCtx); - + for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { SAvgInfo *pInput = (SAvgInfo *)input; if (pInput->num == 0) { // current input is null continue; } - + SET_DOUBLE_VAL(sum, *sum + pInput->sum); - + // keep the number of data into the temp buffer *(int64_t *)GET_ROWCELL_INTERBUF(pResInfo) += pInput->num; } @@ -2735,18 +2581,6 @@ static void deriv_function(SqlFunctionCtx *pCtx) { GET_RES_INFO(pCtx)->numOfRes += notNullElems; } -#define DIFF_IMPL(ctx, d, type) \ - do { \ - if ((ctx)->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED) { \ - (ctx)->param[1].param.nType = (ctx)->inputType; \ - *(type *)&(ctx)->param[1].param.i = *(type *)(d); \ - } else { \ - *(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].param.i)); \ - *(type *)(&(ctx)->param[1].param.i) = *(type *)(d); \ - *(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \ - } \ - } while (0); - // TODO difference in date column static void diff_function(SqlFunctionCtx *pCtx) { void *data = GET_INPUT_DATA_LIST(pCtx); @@ -4072,460 +3906,4 @@ int32_t functionCompatList[] = { // tid_tag, derivative, blk_info 6, 8, 7, }; - -SAggFunctionInfo aggFunc[35] = {{ - // 0, count function does not invoke the finalize function - "count", - FUNCTION_TYPE_AGG, - FUNCTION_COUNT, - FUNCTION_COUNT, - BASIC_FUNC_SO, - function_setup, - count_function, - doFinalizer, - count_func_merge, - countRequired, - }, - { - // 1 - "sum", - FUNCTION_TYPE_AGG, - FUNCTION_SUM, - FUNCTION_SUM, - BASIC_FUNC_SO, - function_setup, - sum_function, - function_finalizer, - sum_func_merge, - statisRequired, - }, - { - // 2 - "avg", - FUNCTION_TYPE_AGG, - FUNCTION_AVG, - FUNCTION_AVG, - BASIC_FUNC_SO, - function_setup, - avg_function, - avg_finalizer, - avg_func_merge, - statisRequired, - }, - { - // 3 - "min", - FUNCTION_TYPE_AGG, - FUNCTION_MIN, - FUNCTION_MIN, - BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, - min_func_setup, - NULL, - function_finalizer, - min_func_merge, - statisRequired, - }, - { - // 4 - "max", - FUNCTION_TYPE_AGG, - FUNCTION_MAX, - FUNCTION_MAX, - BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, - max_func_setup, - NULL, - function_finalizer, - max_func_merge, - statisRequired, - }, - { - // 5 - "stddev", - FUNCTION_TYPE_AGG, - FUNCTION_STDDEV, - FUNCTION_STDDEV_DST, - FUNCSTATE_SO | FUNCSTATE_STREAM, - function_setup, - stddev_function, - stddev_finalizer, - noop1, - dataBlockRequired, - }, - { - // 6 - "percentile", - FUNCTION_TYPE_AGG, - FUNCTION_PERCT, - FUNCTION_INVALID_ID, - FUNCSTATE_SO | FUNCSTATE_STREAM, - percentile_function_setup, - percentile_function, - percentile_finalizer, - noop1, - dataBlockRequired, - }, - { - // 7 - "apercentile", - FUNCTION_TYPE_AGG, - FUNCTION_APERCT, - FUNCTION_APERCT, - FUNCSTATE_SO | FUNCSTATE_STREAM | FUNCSTATE_STABLE, - apercentile_function_setup, - apercentile_function, - apercentile_finalizer, - apercentile_func_merge, - dataBlockRequired, - }, - { - // 8 - "first", - FUNCTION_TYPE_AGG, - FUNCTION_FIRST, - FUNCTION_FIRST_DST, - BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, - function_setup, - first_function, - function_finalizer, - noop1, - firstFuncRequired, - }, - { - // 9 - "last", - FUNCTION_TYPE_AGG, - FUNCTION_LAST, - FUNCTION_LAST_DST, - BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY, - function_setup, - last_function, - function_finalizer, - noop1, - lastFuncRequired, - }, - { - // 10 - "last_row", - FUNCTION_TYPE_AGG, - FUNCTION_LAST_ROW, - FUNCTION_LAST_ROW, - FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - first_last_function_setup, - last_row_function, - last_row_finalizer, - last_dist_func_merge, - dataBlockRequired, - }, - { - // 11 - "top", - FUNCTION_TYPE_AGG, - FUNCTION_TOP, - FUNCTION_TOP, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - top_bottom_function_setup, - top_function, - top_bottom_func_finalizer, - top_func_merge, - dataBlockRequired, - }, - { - // 12 - "bottom", - FUNCTION_TYPE_AGG, - FUNCTION_BOTTOM, - FUNCTION_BOTTOM, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - top_bottom_function_setup, - bottom_function, - top_bottom_func_finalizer, - bottom_func_merge, - dataBlockRequired, - }, - { - // 13 - "spread", - FUNCTION_TYPE_AGG, - FUNCTION_SPREAD, - FUNCTION_SPREAD, - BASIC_FUNC_SO, - spread_function_setup, - spread_function, - spread_function_finalizer, - spread_func_merge, - countRequired, - }, - { - // 14 - "twa", - FUNCTION_TYPE_AGG, - FUNCTION_TWA, - FUNCTION_TWA, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - twa_function_setup, - twa_function, - twa_function_finalizer, - twa_function_copy, - dataBlockRequired, - }, - { - // 15 - "leastsquares", - FUNCTION_TYPE_AGG, - FUNCTION_LEASTSQR, - FUNCTION_INVALID_ID, - FUNCSTATE_SO | FUNCSTATE_STREAM, - leastsquares_function_setup, - leastsquares_function, - leastsquares_finalizer, - noop1, - dataBlockRequired, - }, - { - // 16 - "dummy", - FUNCTION_TYPE_AGG, - FUNCTION_TS, - FUNCTION_TS, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - function_setup, - date_col_output_function, - doFinalizer, - copy_function, - noDataRequired, - }, - { - // 17 - "ts", - FUNCTION_TYPE_AGG, - FUNCTION_TS_DUMMY, - FUNCTION_TS_DUMMY, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - function_setup, - noop1, - doFinalizer, - copy_function, - dataBlockRequired, - }, - { - // 18 - "tag_dummy", - FUNCTION_TYPE_AGG, - FUNCTION_TAG_DUMMY, - FUNCTION_TAG_DUMMY, - BASIC_FUNC_SO, - function_setup, - tag_function, - doFinalizer, - copy_function, - noDataRequired, - }, - { - // 19 - "ts", - FUNCTION_TYPE_AGG, - FUNCTION_TS_COMP, - FUNCTION_TS_COMP, - FUNCSTATE_MO | FUNCSTATE_NEED_TS, - ts_comp_function_setup, - ts_comp_function, - ts_comp_finalize, - copy_function, - dataBlockRequired, - }, - { - // 20 - "tag", - FUNCTION_TYPE_AGG, - FUNCTION_TAG, - FUNCTION_TAG, - BASIC_FUNC_SO, - function_setup, - tag_function, - doFinalizer, - copy_function, - noDataRequired, - }, - {//TODO this is a scala function - // 21, column project sql function - "colprj", - FUNCTION_TYPE_AGG, - FUNCTION_PRJ, - FUNCTION_PRJ, - BASIC_FUNC_MO | FUNCSTATE_NEED_TS, - function_setup, - col_project_function, - doFinalizer, - copy_function, - dataBlockRequired, - }, - { - // 22, multi-output, tag function has only one result - "tagprj", - FUNCTION_TYPE_AGG, - FUNCTION_TAGPRJ, - FUNCTION_TAGPRJ, - BASIC_FUNC_MO, - function_setup, - tag_project_function, - doFinalizer, - copy_function, - noDataRequired, - }, - { - // 23 - "arithmetic", - FUNCTION_TYPE_AGG, - FUNCTION_ARITHM, - FUNCTION_ARITHM, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS, - function_setup, - arithmetic_function, - doFinalizer, - copy_function, - dataBlockRequired, - }, - { - // 24 - "diff", - FUNCTION_TYPE_AGG, - FUNCTION_DIFF, - FUNCTION_INVALID_ID, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - diff_function_setup, - diff_function, - doFinalizer, - noop1, - dataBlockRequired, - }, - // distributed version used in two-stage aggregation processes - { - // 25 - "first_dist", - FUNCTION_TYPE_AGG, - FUNCTION_FIRST_DST, - FUNCTION_FIRST_DST, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - first_last_function_setup, - first_dist_function, - function_finalizer, - first_dist_func_merge, - firstDistFuncRequired, - }, - { - // 26 - "last_dist", - FUNCTION_TYPE_AGG, - FUNCTION_LAST_DST, - FUNCTION_LAST_DST, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - first_last_function_setup, - last_dist_function, - function_finalizer, - last_dist_func_merge, - lastDistFuncRequired, - }, - { - // 27 - "stddev", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_STDDEV_DST, - FUNCTION_AVG, - FUNCSTATE_SO | FUNCSTATE_STABLE, - function_setup, - NULL, - NULL, - NULL, - dataBlockRequired, - }, - { - // 28 - "interp", - FUNCTION_TYPE_AGG, - FUNCTION_INTERP, - FUNCTION_INTERP, - FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS , - function_setup, - interp_function, - doFinalizer, - copy_function, - dataBlockRequired, - }, - { - // 29 - "rate", - FUNCTION_TYPE_AGG, - FUNCTION_RATE, - FUNCTION_RATE, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - rate_function_setup, - rate_function, - rate_finalizer, - rate_func_copy, - dataBlockRequired, - }, - { - // 30 - "irate", - FUNCTION_TYPE_AGG, - FUNCTION_IRATE, - FUNCTION_IRATE, - BASIC_FUNC_SO | FUNCSTATE_NEED_TS, - rate_function_setup, - irate_function, - rate_finalizer, - rate_func_copy, - dataBlockRequired, - }, - { - // 31 - "tbid", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_TID_TAG, - FUNCTION_TID_TAG, - FUNCSTATE_MO | FUNCSTATE_STABLE, - function_setup, - noop1, - noop1, - noop1, - dataBlockRequired, - }, - { //32 - "derivative", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_DERIVATIVE, - FUNCTION_INVALID_ID, - FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY, - deriv_function_setup, - deriv_function, - doFinalizer, - noop1, - dataBlockRequired, - }, - { - // 33 - "block_dist", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_BLKINFO, - FUNCTION_BLKINFO, - FUNCSTATE_SO | FUNCSTATE_STABLE, - function_setup, - blockInfo_func, - blockinfo_func_finalizer, - block_func_merge, - dataBlockRequired, - }, - { - // 34 - "cov", // return table id and the corresponding tags for join match and subscribe - FUNCTION_TYPE_AGG, - FUNCTION_COV, - FUNCTION_COV, - FUNCSTATE_SO | FUNCSTATE_STABLE, - function_setup, - sum_function, - function_finalizer, - sum_func_merge, - statisRequired, - } - }; +#endif From 82a8f63decd088b53affb45c7aa137eb987b3158 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 17 May 2022 21:20:43 +0800 Subject: [PATCH 02/17] fix: bad code merge --- source/dnode/mnode/impl/src/mndTopic.c | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 8847bce890..e99046a32d 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -502,6 +502,17 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); + if (pTopic == NULL) { + if (dropReq.igNotExists) { + mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); + return 0; + } else { + terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); + return -1; + } + } + if (pTopic->refConsumerCnt != 0) { mndReleaseTopic(pMnode, pTopic); terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED; From 7563605418dd035aa07b9875a23949e24851f69c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 17 May 2022 21:24:32 +0800 Subject: [PATCH 03/17] fix: wrong db status --- source/dnode/mnode/impl/src/mndDb.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index bb7e593fea..c666c50864 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1447,8 +1447,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in } char *status = "ready"; - char b[24] = {0}; - STR_WITH_SIZE_TO_VARSTR(b, status, strlen(status)); + char statusB[24] = {0}; + STR_WITH_SIZE_TO_VARSTR(statusB, status, strlen(status)); if (sysDb) { for (int32_t i = 0; i < pShow->numOfColumns; ++i) { @@ -1458,7 +1458,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in } else if (i == 3) { colDataAppend(pColInfo, rows, (const char *)&numOfTables, false); } else if (i == 20) { - colDataAppend(pColInfo, rows, b, false); + colDataAppend(pColInfo, rows, statusB, false); } else { colDataAppendNULL(pColInfo, rows); } @@ -1481,9 +1481,10 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.replications, false); const char *src = pDb->cfg.strict ? "strict" : "nostrict"; - STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src)); + char strict[24] = {0}; + STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, rows, (const char *)b, false); + colDataAppend(pColInfo, rows, (const char *)strict, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.daysPerFile, false); @@ -1554,7 +1555,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfStables, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols); - colDataAppend(pColInfo, rows, (const char *)b, false); + colDataAppend(pColInfo, rows, (const char *)statusB, false); } } From b858688fab96a667c2d8fd486f0ec470d9afa0f0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 17 May 2022 21:31:59 +0800 Subject: [PATCH 04/17] fix: unreasonable error message when creating db with insufficient dnodes --- source/dnode/mnode/impl/src/mndDb.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index c666c50864..c3e5195ad2 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -398,11 +398,14 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) { if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1; if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1; if (pCfg->replications < TSDB_MIN_DB_REPLICA || pCfg->replications > TSDB_MAX_DB_REPLICA) return -1; - if (pCfg->replications > mndGetDnodeSize(pMnode)) return -1; if (pCfg->replications != 1 && pCfg->replications != 3) return -1; if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1; if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1; if (pCfg->hashMethod != 1) return -1; + if (pCfg->replications > mndGetDnodeSize(pMnode)) { + terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES; + return -1; + } terrno = 0; return TSDB_CODE_SUCCESS; From a3b0136e3ce6d61951c946045ce1e6b4c9f60cf5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 17 May 2022 22:03:50 +0800 Subject: [PATCH 05/17] stmt query --- source/client/src/clientStmt.c | 8 ++++++ tests/script/api/batchprepare.c | 47 ++++++++++++++++++--------------- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index fa8df28523..f2c5fdec0f 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -218,6 +218,13 @@ int32_t stmtParseSql(STscStmt* pStmt) { pStmt->bInfo.needParse = false; + if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) { + pStmt->sql.type = STMT_TYPE_INSERT; + } else if (pStmt->sql.pQuery->pPrepareRoot) { + pStmt->sql.type = STMT_TYPE_QUERY; + } + +/* switch (nodeType(pStmt->sql.pQuery->pRoot)) { case QUERY_NODE_VNODE_MODIF_STMT: if (0 == pStmt->sql.type) { @@ -231,6 +238,7 @@ int32_t stmtParseSql(STscStmt* pStmt) { tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot)); STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR); } +*/ return TSDB_CODE_SUCCESS; } diff --git a/tests/script/api/batchprepare.c b/tests/script/api/batchprepare.c index cb914f8d6d..546075a1d9 100644 --- a/tests/script/api/batchprepare.c +++ b/tests/script/api/batchprepare.c @@ -170,11 +170,11 @@ CaseCfg gCase[] = { // 22 {"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, true, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1}, -// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, -// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, + {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2}, + {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2}, - {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2}, - {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2}, +// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2}, +// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2}, }; @@ -209,7 +209,7 @@ typedef struct { int32_t caseRunNum; // total run case num } CaseCtrl; -#if 0 +#if 1 CaseCtrl gCaseCtrl = { // default .bindNullNum = 0, .printCreateTblSql = false, @@ -267,7 +267,7 @@ CaseCtrl gCaseCtrl = { }; #endif -#if 1 +#if 0 CaseCtrl gCaseCtrl = { // query case with specified col&oper .bindNullNum = 1, .printCreateTblSql = false, @@ -292,7 +292,7 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper #if 0 CaseCtrl gCaseCtrl = { // query case with specified col&oper - .bindNullNum = 0, + .bindNullNum = 1, .printCreateTblSql = true, .printQuerySql = true, .printStmtSql = true, @@ -309,10 +309,10 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper .printRes = true, .runTimes = 0, .caseRunIdx = -1, - .optrIdxListNum = tListLen(optrIdxList), - .optrIdxList = optrIdxList, - .bindColTypeNum = tListLen(bindColTypeList), - .bindColTypeList = bindColTypeList, + //.optrIdxListNum = tListLen(optrIdxList), + //.optrIdxList = optrIdxList, + //.bindColTypeNum = tListLen(bindColTypeList), + //.bindColTypeList = bindColTypeList, .caseIdx = 24, .caseNum = 1, .caseRunNum = 1, @@ -665,15 +665,16 @@ void bpGenerateConstInFuncSQL(BindData *data, int32_t tblIdx) { void generateQueryMiscSQL(BindData *data, int32_t tblIdx) { - switch(tblIdx) { - case 0: - //TODO FILL TEST - default: - bpGenerateConstInOpSQL(data, tblIdx); - break; - case FUNCTION_TEST_IDX: - bpGenerateConstInFuncSQL(data, tblIdx); - break; + if (tblIdx == FUNCTION_TEST_IDX && gCurCase->bindNullNum <= 0) { + bpGenerateConstInFuncSQL(data, tblIdx); + } else { + switch(tblIdx) { + case 0: + //TODO FILL TEST + default: + bpGenerateConstInOpSQL(data, tblIdx); + break; + } } if (gCaseCtrl.printStmtSql) { @@ -1064,6 +1065,8 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) { if (tblIdx == FUNCTION_TEST_IDX) { gCaseCtrl.numericParam = true; + } else { + gCaseCtrl.numericParam = false; } for (int b = 0; b < bindNum; b++) { @@ -1072,8 +1075,10 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) { } } + gCaseCtrl.numericParam = false; + generateQueryMiscSQL(data, tblIdx); - + return 0; } From 1cc0708d856afa2d670b046dcb27fc79c2139112 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 May 2022 22:42:11 +0800 Subject: [PATCH 06/17] fix(query): expand the capacity for aggregate operator. --- source/libs/executor/src/executorimpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1ba494a041..ae9f2822d4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4246,7 +4246,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - int32_t numOfRows = 10; + int32_t numOfRows = 1024; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, numOfRows); From 52f6a66ba550a9abd98758de3d4f00f2e4a2310e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 17 May 2022 22:58:17 +0800 Subject: [PATCH 07/17] fix(query): fix some syntax error. --- source/libs/function/src/builtinsimpl.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index cf67bcc7ec..de96526a25 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2533,7 +2533,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { } } } else { // computing based on the true data block - if (0 == pCtx->size) { + if (0 == pInput->numOfRows) { if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->end.key != INT64_MIN) { pInfo->min = pCtx->end.key; @@ -2552,7 +2552,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start); if (pCtx->order == TSDB_ORDER_DESC) { if (pCtx->start.key == INT64_MIN) { - pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max; + pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max; } else { pInfo->max = pCtx->start.key + 1; } @@ -2572,7 +2572,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) { if (pCtx->end.key != INT64_MIN) { pInfo->max = pCtx->end.key + 1; } else { - pInfo->max = ptsList[pCtx->size - 1]; + pInfo->max = ptsList[start + pInput->numOfRows - 1]; } } } From d588d47bdb054dc78ebcd392e6a7d2aed9e44e6d Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 17 May 2022 20:35:19 +0800 Subject: [PATCH 08/17] ci(stream): add test --- tests/script/tsim/tstream/basic1.sim | 88 ++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/tests/script/tsim/tstream/basic1.sim b/tests/script/tsim/tstream/basic1.sim index 37f9cb94c9..cb084ad537 100644 --- a/tests/script/tsim/tstream/basic1.sim +++ b/tests/script/tsim/tstream/basic1.sim @@ -372,4 +372,92 @@ if $data25 != 3 then return -1 endi +sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 0 +if $data01 != 4 then + print ======$data01 + return -1 +endi + +if $data02 != 4 then + print ======$data02 + return -1 +endi + +if $data03 != 14 then + print ======$data03 + return -1 +endi + +if $data04 != 4 then + print ======$data04 + return -1 +endi + +if $data05 != 3 then + print ======$data05 + return -1 +endi + +sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1); +sleep 100 +sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt; + +# row 1 +if $data11 != 4 then + print ======$data11 + # return -1 +endi + +if $data12 != 4 then + print ======$data12 + # return -1 +endi + +if $data13 != 10 then + print ======$data13 + # return -1 +endi + +if $data14 != 3 then + print ======$data14 + # return -1 +endi + +if $data15 != 1 then + print ======$data15 + # return -1 +endi + +# row 2 +if $data21 != 4 then + print ======$data21 + # return -1 +endi + +if $data22 != 4 then + print ======$data22 + # return -1 +endi + +if $data23 != 15 then + print ======$data23 + # return -1 +endi + +if $data24 != 4 then + print ======$data24 + # return -1 +endi + +if $data25 != 3 then + print ======$data25 + # return -1 +endi + + + system sh/exec.sh -n dnode1 -s stop -x SIGINT From 67b0fd9261e13783fc9388a3820737531bbc9e5f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 18 May 2022 09:25:00 +0800 Subject: [PATCH 09/17] fix: wrong db status --- source/dnode/mnode/impl/src/mndTopic.c | 20 +++++++++---------- .../script/tsim/tmq/prepareBasicEnv-1vgrp.sim | 2 +- .../script/tsim/tmq/prepareBasicEnv-4vgrp.sim | 2 +- tests/script/tsim/tmq/topic.sim | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e99046a32d..3f559cb6c0 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -502,16 +502,16 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) { } SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name); - if (pTopic == NULL) { - if (dropReq.igNotExists) { - mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); - return 0; - } else { - terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; - mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); - return -1; - } - } + // if (pTopic == NULL) { + // if (dropReq.igNotExists) { + // mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name); + // return 0; + // } else { + // terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; + // mError("topic:%s, failed to drop since %s", dropReq.name, terrstr()); + // return -1; + // } + // } if (pTopic->refConsumerCnt != 0) { mndReleaseTopic(pMnode, pTopic); diff --git a/tests/script/tsim/tmq/prepareBasicEnv-1vgrp.sim b/tests/script/tsim/tmq/prepareBasicEnv-1vgrp.sim index fbdea96f93..43b93b7372 100644 --- a/tests/script/tsim/tmq/prepareBasicEnv-1vgrp.sim +++ b/tests/script/tsim/tmq/prepareBasicEnv-1vgrp.sim @@ -39,7 +39,7 @@ sql show databases print ==> rows: $rows print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] -if $data(db)[19] != nostrict then +if $data(db)[19] != ready then sleep 100 $loop_cnt = $loop_cnt + 1 goto check_db_ready diff --git a/tests/script/tsim/tmq/prepareBasicEnv-4vgrp.sim b/tests/script/tsim/tmq/prepareBasicEnv-4vgrp.sim index 8b5573486d..5e81137ffa 100644 --- a/tests/script/tsim/tmq/prepareBasicEnv-4vgrp.sim +++ b/tests/script/tsim/tmq/prepareBasicEnv-4vgrp.sim @@ -39,7 +39,7 @@ sql show databases print ==> rows: $rows print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] -if $data(db)[19] != nostrict then +if $data(db)[19] != ready then sleep 100 $loop_cnt = $loop_cnt + 1 goto check_db_ready diff --git a/tests/script/tsim/tmq/topic.sim b/tests/script/tsim/tmq/topic.sim index f1dbf98bb0..9add349a18 100644 --- a/tests/script/tsim/tmq/topic.sim +++ b/tests/script/tsim/tmq/topic.sim @@ -31,7 +31,7 @@ sql show databases print ==> rows: $rows print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12] print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20] -if $data(db)[19] != nostrict then +if $data(db)[19] != ready then sleep 100 $loop_cnt = $loop_cnt + 1 goto check_db_ready From 5c2c2b378b3040b7ba808a6ef2873dd2d62a9d4d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 18 May 2022 10:15:29 +0800 Subject: [PATCH 10/17] fix(query): fix the invalid column length value during spill out the group data into disk. --- source/common/src/tdatablock.c | 29 ++++++++++++++++++++---- source/libs/executor/src/groupoperator.c | 28 ++++++++++++++++------- 2 files changed, 45 insertions(+), 12 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 43dcf2dfa9..4d77f4eb71 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -600,10 +600,11 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { } int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) { - pBlock->info.rows = *(int32_t*)buf; + pBlock->info.rows = *(int32_t*) buf; + pBlock->info.groupId = *(uint64_t*) (buf + sizeof(int32_t)); int32_t numOfCols = pBlock->info.numOfCols; - const char* pStart = buf + sizeof(uint32_t); + const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); @@ -669,7 +670,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t); } -double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { +double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { ASSERT(pBlock != NULL); double rowSize = 0; @@ -1224,7 +1225,27 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { - return (int32_t)((pageSize - blockDataGetSerialMetaSize(pBlock)) / blockDataGetSerialRowSize(pBlock)); + int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(pBlock); + + int32_t rowSize = pBlock->info.rowSize; + + int32_t nRows = payloadSize / rowSize; + + // the true value must be less than the value of nRows + int32_t additional = 0; + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + additional += nRows * sizeof(int32_t); + } else { + additional += BitmapLen(nRows); + } + } + + int32_t newRows = (payloadSize - additional) / rowSize; + ASSERT(newRows <= nRows && newRows > 1); + + return newRows; } void colDataDestroy(SColumnInfoData* pColData) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 76a773a095..b1e19213dd 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -396,8 +396,11 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { pGInfo->groupId = calcGroupId(pInfo->keyBuf, len); } + // number of rows int32_t* rows = (int32_t*) pPage; + // group id + size_t numOfCols = pOperator->numOfExprs; for(int32_t i = 0; i < numOfCols; ++i) { SExprInfo* pExpr = &pOperator->pExpr[i]; @@ -408,13 +411,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t bytes = pColInfoData->info.bytes; int32_t startOffset = pInfo->columnOffset[i]; - char* columnLen = NULL; - int32_t contentLen = 0; + int32_t* columnLen = NULL; + int32_t contentLen = 0; if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { int32_t* offset = (int32_t*)((char*)pPage + startOffset); - columnLen = (char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity; - char* data = (char*)(columnLen + sizeof(int32_t)); + columnLen = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity); + char* data = (char*)((char*) columnLen + sizeof(int32_t)); if (colDataIsNull_s(pColInfoData, j)) { offset[(*rows)] = -1; @@ -423,11 +426,15 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { offset[*rows] = (*columnLen); char* src = colDataGetData(pColInfoData, j); memcpy(data + (*columnLen), src, varDataTLen(src)); + int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage); + ASSERT(v > 0); + printf("len:%d\n", v); + contentLen = varDataTLen(src); } } else { char* bitmap = (char*)pPage + startOffset; - columnLen = (char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity); + columnLen = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity)); char* data = (char*) columnLen + sizeof(int32_t); bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j); @@ -440,6 +447,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } (*columnLen) += contentLen; + ASSERT(*columnLen >= 0); } (*rows) += 1; @@ -476,7 +484,12 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); taosArrayPush(p->pPageList, &pageId); - *(int32_t*) pPage = 0; +// // number of rows +// *(int32_t*) pPage = 0; +// +// uint64_t* groupId = (pPage + sizeof(int32_t)); +// *groupId = 0; + memset(pPage, 0, getBufPageSize(pInfo->pBuf)); } } @@ -500,7 +513,7 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) { size_t numOfCols = pBlock->info.numOfCols; int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t)); - offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format + offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format for(int32_t i = 0; i < numOfCols - 1; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); @@ -571,7 +584,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); doHashPartition(pOperator, pBlock); } From d5c30f5ea5b2eb487a20aa89bfa3641f6efa3b7c Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 18 May 2022 10:29:32 +0800 Subject: [PATCH 11/17] fix: release udf func handle when udf agg failure --- source/libs/function/src/tudf.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 4322705c13..5c6209dc4d 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1386,7 +1386,7 @@ int32_t cleanUpUdfs() { if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { taosArrayPush(udfStubs, stub); } else { - fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", + fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", stub->udfName, stub->refCount, stub->lastRefTime); } } @@ -1602,6 +1602,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode); + releaseUdfFuncHandle(pCtx->udfName); return false; } udfRes->interResNum = buf.numOfResult; @@ -1609,6 +1610,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult memcpy(udfRes->interResBuf, buf.buf, buf.bufLen); } else { fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize); + releaseUdfFuncHandle(pCtx->udfName); return false; } freeUdfInterBuf(&buf); @@ -1674,6 +1676,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { blockDataDestroy(inputBlock); taosArrayDestroy(tempBlock.pDataBlock); + if (udfCode != 0) { + releaseUdfFuncHandle(pCtx->udfName); + } freeUdfInterBuf(&newState); return udfCode; } From e20590814266d8833cfc48ea360482f227d2f1e3 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 18 May 2022 10:57:54 +0800 Subject: [PATCH 12/17] [test: add case of tmq] --- tests/system-test/7-tmq/subscribeDb1.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/system-test/7-tmq/subscribeDb1.py b/tests/system-test/7-tmq/subscribeDb1.py index eacf231c9a..7319fadc80 100644 --- a/tests/system-test/7-tmq/subscribeDb1.py +++ b/tests/system-test/7-tmq/subscribeDb1.py @@ -468,14 +468,10 @@ class TDTestCase: cfgPath = buildPath + "/../sim/psim/cfg" tdLog.info("cfgPath: %s" % cfgPath) - #self.tmqCase8(cfgPath, buildPath) - #self.tmqCase9(cfgPath, buildPath) - #self.tmqCase10(cfgPath, buildPath) + self.tmqCase8(cfgPath, buildPath) + self.tmqCase9(cfgPath, buildPath) + self.tmqCase10(cfgPath, buildPath) self.tmqCase11(cfgPath, buildPath) - # self.tmqCase12(cfgPath, buildPath) - # self.tmqCase13(cfgPath, buildPath) - # self.tmqCase14(cfgPath, buildPath) - def stop(self): tdSql.close() From 325959bd27bfda3a8b1e7679bcfc8f14b2ca0404 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 18 May 2022 11:01:39 +0800 Subject: [PATCH 13/17] fix(query): add parameters check for top/bottom functoin. --- source/libs/function/src/builtins.c | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 24d6c1ade9..bcb4c5c585 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -14,6 +14,7 @@ */ #include "builtins.h" +#include "querynodes.h" #include "builtinsimpl.h" #include "scalar.h" #include "taoserror.h" @@ -201,15 +202,32 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_ } static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + int32_t paraNum = LIST_LENGTH(pFunc->pParameterList); + if (2 != paraNum) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, 1); + if (nodeType(pParamNode) != QUERY_NODE_VALUE) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SValueNode* pValue = (SValueNode*) pParamNode; + if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + if (pValue->datum.i < 1 || pValue->datum.i > 100) { + return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); + } + SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; return TSDB_CODE_SUCCESS; } static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; - pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; - return TSDB_CODE_SUCCESS; + return translateTop(pFunc, pErrBuf, len); } static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { From 00c11b01fe6e8092e44bb4ea57eaa9e01b37f11a Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 18 May 2022 11:43:03 +0800 Subject: [PATCH 14/17] fix: udfc checks output type and output bytes when executing scalar function --- source/libs/function/src/tudf.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 5c6209dc4d..068998e469 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -1528,7 +1528,15 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, if (code != 0) { return code; } + SUdfcUvSession *session = handle; code = doCallUdfScalarFunc(handle, input, numOfCols, output); + if (session->outputType != output->columnData->info.type + || session->outputLen != output->columnData->info.bytes) { + fnError("udfc scalar function calculate error, session type: %d(%d), output type: %d(%d)", + session->outputType, session->outputLen, + output->columnData->info.type, output->columnData->info.bytes); + code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; + } releaseUdfFuncHandle(udfName); return code; } From 950835e1fcc6a2eaa79dbb0d3e9b1a5d9436e0be Mon Sep 17 00:00:00 2001 From: slzhou Date: Wed, 18 May 2022 12:16:28 +0800 Subject: [PATCH 15/17] fix: set correct udf output type when retrieve from mnode --- source/libs/function/src/udfd.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index cbb2e7f362..006914bf65 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -139,7 +139,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { SUdf* udf = msgInfo->param; udf->funcType = pFuncInfo->funcType; udf->scriptType = pFuncInfo->scriptType; - udf->outputType = pFuncInfo->funcType; + udf->outputType = pFuncInfo->outputType; udf->outputLen = pFuncInfo->outputLen; udf->bufSize = pFuncInfo->bufSize; From def7200d7caa437f6228e1e97c0e3cd4446329b4 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 18 May 2022 05:22:48 +0000 Subject: [PATCH 16/17] feat: return sver in submit rsp --- include/common/tmsg.h | 1 + source/common/src/tmsg.c | 2 ++ source/dnode/vnode/src/tsdb/tsdbMemTable.c | 8 ++++++++ 3 files changed, 11 insertions(+) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6af4325371..3ff566afe2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -258,6 +258,7 @@ typedef struct { char* tblFName; int32_t numOfRows; int32_t affectedRows; + int64_t sver; } SSubmitBlkRsp; typedef struct { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d180c2ff6e..4e97ebbe47 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4093,6 +4093,7 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl } if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1; if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1; + if (tEncodeI64v(pEncoder, pBlock->sver) < 0) return -1; tEndEncode(pEncoder); return 0; @@ -4111,6 +4112,7 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { } if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1; if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1; + if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1; tEndDecode(pDecoder); return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 037b099345..2fdbfdd206 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -309,6 +309,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo TSKEY keyMin; TSKEY keyMax; SSubmitBlk *pBlkCopy; + int64_t sverNew; // check if table exists SMetaReader mr = {0}; @@ -319,6 +320,12 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; return -1; } + if (mr.me.type == TSDB_NORMAL_TABLE) { + sverNew = mr.me.ntbEntry.schema.sver; + } else { + metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid); + sverNew = mr.me.stbEntry.schema.sver; + } metaReaderClear(&mr); // create container is nedd @@ -367,6 +374,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo pRsp->numOfRows = pMsgIter->numOfRows; pRsp->affectedRows = pMsgIter->numOfRows; + pRsp->sver = sverNew; return 0; } From f8b2bdecf1d6906c1ab4580e2176e4fef439284f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 18 May 2022 05:46:16 +0000 Subject: [PATCH 17/17] refact: TDB --- source/dnode/vnode/src/meta/metaOpen.c | 48 +++++++++++------------ source/dnode/vnode/src/meta/metaQuery.c | 14 +++---- source/dnode/vnode/src/meta/metaSma.c | 8 ++-- source/dnode/vnode/src/meta/metaTable.c | 30 +++++++------- source/dnode/vnode/src/sma/smaTDBImpl.c | 8 ++-- source/dnode/vnode/src/tsdb/tsdbTDBImpl.c | 8 ++-- source/libs/tdb/inc/tdb.h | 16 ++++---- source/libs/tdb/src/db/tdbDb.c | 16 ++++---- source/libs/tdb/test/tdbTest.cpp | 42 ++++++++++---------- 9 files changed, 95 insertions(+), 95 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 3ce146904c..127eb43f2b 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -57,56 +57,56 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { } // open pTbDb - ret = tdbDbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb); + ret = tdbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb); if (ret < 0) { metaError("vgId:%d failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pSkmDb - ret = tdbDbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb); + ret = tdbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb); if (ret < 0) { metaError("vgId:%d failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pUidIdx - ret = tdbDbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx); + ret = tdbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx); if (ret < 0) { metaError("vgId:%d failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pNameIdx - ret = tdbDbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx); + ret = tdbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx); if (ret < 0) { metaError("vgId:%d failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pCtbIdx - ret = tdbDbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx); + ret = tdbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx); if (ret < 0) { metaError("vgId:%d failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pTagIdx - ret = tdbDbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx); + ret = tdbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx); if (ret < 0) { metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pTtlIdx - ret = tdbDbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx); + ret = tdbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx); if (ret < 0) { metaError("vgId:%d failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; } // open pSmaIdx - ret = tdbDbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx); + ret = tdbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx); if (ret < 0) { metaError("vgId:%d failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; @@ -125,14 +125,14 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { _err: if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx); - if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx); - if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx); - if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx); - if (pMeta->pNameIdx) tdbDbClose(pMeta->pNameIdx); - if (pMeta->pUidIdx) tdbDbClose(pMeta->pUidIdx); - if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb); - if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb); + if (pMeta->pSmaIdx) tdbClose(pMeta->pSmaIdx); + if (pMeta->pTtlIdx) tdbClose(pMeta->pTtlIdx); + if (pMeta->pTagIdx) tdbClose(pMeta->pTagIdx); + if (pMeta->pCtbIdx) tdbClose(pMeta->pCtbIdx); + if (pMeta->pNameIdx) tdbClose(pMeta->pNameIdx); + if (pMeta->pUidIdx) tdbClose(pMeta->pUidIdx); + if (pMeta->pSkmDb) tdbClose(pMeta->pSkmDb); + if (pMeta->pTbDb) tdbClose(pMeta->pTbDb); if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv); metaDestroyLock(pMeta); taosMemoryFree(pMeta); @@ -142,14 +142,14 @@ _err: int metaClose(SMeta *pMeta) { if (pMeta) { if (pMeta->pIdx) metaCloseIdx(pMeta); - if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx); - if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx); - if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx); - if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx); - if (pMeta->pNameIdx) tdbDbClose(pMeta->pNameIdx); - if (pMeta->pUidIdx) tdbDbClose(pMeta->pUidIdx); - if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb); - if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb); + if (pMeta->pSmaIdx) tdbClose(pMeta->pSmaIdx); + if (pMeta->pTtlIdx) tdbClose(pMeta->pTtlIdx); + if (pMeta->pTagIdx) tdbClose(pMeta->pTagIdx); + if (pMeta->pCtbIdx) tdbClose(pMeta->pCtbIdx); + if (pMeta->pNameIdx) tdbClose(pMeta->pNameIdx); + if (pMeta->pUidIdx) tdbClose(pMeta->pUidIdx); + if (pMeta->pSkmDb) tdbClose(pMeta->pSkmDb); + if (pMeta->pTbDb) tdbClose(pMeta->pTbDb); if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv); metaDestroyLock(pMeta); taosMemoryFree(pMeta); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index b76258035e..f02b6402c4 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -35,7 +35,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u STbDbKey tbDbKey = {.version = version, .uid = uid}; // query table.db - if (tdbDbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) { + if (tdbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) { terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; goto _err; } @@ -58,7 +58,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) { int64_t version; // query uid.idx - if (tdbDbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) { + if (tdbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) { terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; return -1; } @@ -72,7 +72,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) { tb_uid_t uid; // query name.idx - if (tdbDbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) { + if (tdbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) { terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; return -1; } @@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo pKey = &skmDbKey; kLen = sizeof(skmDbKey); metaRLock(pMeta); - ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen); + ret = tdbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen); metaULock(pMeta); if (ret < 0) { return NULL; @@ -413,7 +413,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - memcpy((void*)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen); + memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen); } if (pTSma->tagsFilterLen > 0) { if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) { @@ -421,14 +421,14 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) { goto _err; } } - memcpy((void*)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen); + memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen); } else { pTSma->exprLen = 0; pTSma->expr = NULL; pTSma->tagsFilterLen = 0; pTSma->tagsFilter = NULL; } - + ++smaIdx; } diff --git a/source/dnode/vnode/src/meta/metaSma.c b/source/dnode/vnode/src/meta/metaSma.c index d0a3541152..1fd81bc2cb 100644 --- a/source/dnode/vnode/src/meta/metaSma.c +++ b/source/dnode/vnode/src/meta/metaSma.c @@ -117,7 +117,7 @@ static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) { tEncoderClear(&coder); // write to table.db - if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { + if (tdbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { goto _err; } @@ -130,17 +130,17 @@ _err: } static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) { - return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn); + return tdbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn); } static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { - return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn); + return tdbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn); } static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) { SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid}; - return tdbDbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn); + return tdbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn); } static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 9064085b2b..273f7885e6 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -389,7 +389,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl int c; // search name index - ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal); + ret = tdbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal); if (ret < 0) { terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; return -1; @@ -536,7 +536,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA int nData = 0; // search name index - ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal); + ret = tdbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal); if (ret < 0) { terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; return -1; @@ -573,9 +573,9 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA tDecoderClear(&dc); /* get stbEntry*/ - tdbDbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal); - tdbDbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey), - (void **)&stbEntry.pBuf, &nVal); + tdbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal); + tdbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey), + (void **)&stbEntry.pBuf, &nVal); tdbFree(pVal); tDecoderInit(&dc, stbEntry.pBuf, nVal); metaDecodeEntry(&dc, &stbEntry); @@ -632,7 +632,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA metaSaveToTbDb(pMeta, &ctbEntry); // save to uid.idx - tdbDbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn); + tdbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn); if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf); if (stbEntry.pBuf) tdbFree(stbEntry.pBuf); @@ -708,7 +708,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) { tEncoderClear(&coder); // write to table.db - if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { + if (tdbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) { goto _err; } @@ -721,11 +721,11 @@ _err: } static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) { - return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn); + return tdbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn); } static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { - return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn); + return tdbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn); } static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { @@ -748,12 +748,12 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60; ttlKey.uid = pME->uid; - return tdbDbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn); + return tdbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn); } static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) { SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid}; - return tdbDbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn); + return tdbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn); } static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid, @@ -801,10 +801,10 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { SDecoder dc = {0}; // get super table - tdbDbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData); + tdbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData); tbDbKey.uid = pCtbEntry->ctbEntry.suid; tbDbKey.version = *(int64_t *)pData; - tdbDbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData); + tdbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData); tDecoderInit(&dc, pData, nData); metaDecodeEntry(&dc, &stbEntry); @@ -817,7 +817,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) { &pTagIdxKey, &nTagIdxKey) < 0) { return -1; } - tdbDbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn); + tdbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn); metaDestroyTagIdxKey(pTagIdxKey); tDecoderClear(&dc); @@ -859,7 +859,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) { tEncoderInit(&coder, pVal, vLen); tEncodeSSchemaWrapper(&coder, pSW); - if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) { + if (tdbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) { rcode = -1; goto _exit; } diff --git a/source/dnode/vnode/src/sma/smaTDBImpl.c b/source/dnode/vnode/src/sma/smaTDBImpl.c index cb58d9c083..6e57656802 100644 --- a/source/dnode/vnode/src/sma/smaTDBImpl.c +++ b/source/dnode/vnode/src/sma/smaTDBImpl.c @@ -59,14 +59,14 @@ static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) { // Create a database compFunc = tdSmaKeyCmpr; - if(tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) { + if (tdbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) { return -1; } return 0; } -static int32_t smaCloseDBDb(TDB *pDB) { return tdbDbClose(pDB); } +static int32_t smaCloseDBDb(TDB *pDB) { return tdbClose(pDB); } int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF) { // TEnv is shared by a group of SDBFile @@ -99,7 +99,7 @@ int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, in int32_t ret; printf("save tsma data into %s, keyLen:%d valLen:%d txn:%p\n", pDBF->path, keyLen, valLen, txn); - ret = tdbDbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); + ret = tdbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); if (ret < 0) { smaError("failed to upsert tsma data into db, ret = %d", ret); return -1; @@ -112,7 +112,7 @@ void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_ void *pVal = NULL; int ret; - ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen); + ret = tdbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen); if (ret < 0) { smaError("failed to get tsma data from db, ret = %d", ret); diff --git a/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c index 8a553e94fb..bed61a186f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c @@ -60,12 +60,12 @@ static int32_t tsdbOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) { // Create a database compFunc = tsdbSmaKeyCmpr; - ret = tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB); + ret = tdbOpen(pFName, -1, -1, compFunc, pEnv, ppDB); return 0; } -static int32_t tsdbCloseDBDb(TDB *pDB) { return tdbDbClose(pDB); } +static int32_t tsdbCloseDBDb(TDB *pDB) { return tdbClose(pDB); } int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF) { // TEnv is shared by a group of SDBFile @@ -97,7 +97,7 @@ int32_t tsdbCloseDBF(SDBFile *pDBF) { int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) { int32_t ret; - ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); + ret = tdbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn); if (ret < 0) { tsdbError("Failed to create insert sma data into db, ret = %d", ret); return -1; @@ -110,7 +110,7 @@ void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32 void *pVal = NULL; int ret; - ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen); + ret = tdbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen); if (ret < 0) { tsdbError("Failed to get sma data from db, ret = %d", ret); diff --git a/source/libs/tdb/inc/tdb.h b/source/libs/tdb/inc/tdb.h index 90b07fb6ae..8ba66f1d26 100644 --- a/source/libs/tdb/inc/tdb.h +++ b/source/libs/tdb/inc/tdb.h @@ -37,14 +37,14 @@ int tdbBegin(TENV *pEnv, TXN *pTxn); int tdbCommit(TENV *pEnv, TXN *pTxn); // TDB -int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb); -int tdbDbClose(TDB *pDb); -int tdbDbDrop(TDB *pDb); -int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn); -int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn); -int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn); -int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen); -int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); +int tdbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb); +int tdbClose(TDB *pDb); +int tdbDrop(TDB *pDb); +int tdbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn); +int tdbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn); +int tdbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn); +int tdbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen); +int tdbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); // TDBC int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn); diff --git a/source/libs/tdb/src/db/tdbDb.c b/source/libs/tdb/src/db/tdbDb.c index 383807cc35..fdad797333 100644 --- a/source/libs/tdb/src/db/tdbDb.c +++ b/source/libs/tdb/src/db/tdbDb.c @@ -24,7 +24,7 @@ struct STDBC { SBTC btc; }; -int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb) { +int tdbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb) { TDB *pDb; SPager *pPager; int ret; @@ -65,7 +65,7 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn return 0; } -int tdbDbClose(TDB *pDb) { +int tdbClose(TDB *pDb) { if (pDb) { tdbBtreeClose(pDb->pBt); tdbOsFree(pDb); @@ -73,26 +73,26 @@ int tdbDbClose(TDB *pDb) { return 0; } -int tdbDbDrop(TDB *pDb) { +int tdbDrop(TDB *pDb) { // TODO return 0; } -int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) { +int tdbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) { return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn); } -int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); } +int tdbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); } -int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) { +int tdbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) { return tdbBtreeUpsert(pDb->pBt, pKey, kLen, pVal, vLen, pTxn); } -int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) { +int tdbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) { return tdbBtreeGet(pDb->pBt, pKey, kLen, ppVal, vLen); } -int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) { +int tdbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) { return tdbBtreePGet(pDb->pBt, pKey, kLen, ppKey, pkLen, ppVal, vLen); } diff --git a/source/libs/tdb/test/tdbTest.cpp b/source/libs/tdb/test/tdbTest.cpp index e575ac156f..3c8bcee3f7 100644 --- a/source/libs/tdb/test/tdbTest.cpp +++ b/source/libs/tdb/test/tdbTest.cpp @@ -131,7 +131,7 @@ TEST(tdb_test, simple_insert1) { // Create a database compFunc = tKeyCmpr; - ret = tdbDbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); GTEST_ASSERT_EQ(ret, 0); { @@ -152,7 +152,7 @@ TEST(tdb_test, simple_insert1) { for (int iData = 1; iData <= nData; iData++) { sprintf(key, "key%d", iData); sprintf(val, "value%d", iData); - ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn); + ret = tdbInsert(pDb, key, strlen(key), val, strlen(val), &txn); GTEST_ASSERT_EQ(ret, 0); // if pool is full, commit the transaction and start a new one @@ -181,7 +181,7 @@ TEST(tdb_test, simple_insert1) { sprintf(key, "key%d", i); sprintf(val, "value%d", i); - ret = tdbDbGet(pDb, key, strlen(key), &pVal, &vLen); + ret = tdbGet(pDb, key, strlen(key), &pVal, &vLen); ASSERT(ret == 0); GTEST_ASSERT_EQ(ret, 0); @@ -224,11 +224,11 @@ TEST(tdb_test, simple_insert1) { } } - ret = tdbDbDrop(pDb); + ret = tdbDrop(pDb); GTEST_ASSERT_EQ(ret, 0); // Close a database - tdbDbClose(pDb); + tdbClose(pDb); // Close Env ret = tdbEnvClose(pEnv); @@ -251,7 +251,7 @@ TEST(tdb_test, simple_insert2) { // Create a database compFunc = tDefaultKeyCmpr; - ret = tdbDbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); + ret = tdbOpen("db.db", -1, -1, compFunc, pEnv, &pDb); GTEST_ASSERT_EQ(ret, 0); { @@ -271,7 +271,7 @@ TEST(tdb_test, simple_insert2) { for (int iData = 1; iData <= nData; iData++) { sprintf(key, "key%d", iData); sprintf(val, "value%d", iData); - ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn); + ret = tdbInsert(pDb, key, strlen(key), val, strlen(val), &txn); GTEST_ASSERT_EQ(ret, 0); } @@ -311,11 +311,11 @@ TEST(tdb_test, simple_insert2) { tdbCommit(pEnv, &txn); tdbTxnClose(&txn); - ret = tdbDbDrop(pDb); + ret = tdbDrop(pDb); GTEST_ASSERT_EQ(ret, 0); // Close a database - tdbDbClose(pDb); + tdbClose(pDb); // Close Env ret = tdbEnvClose(pEnv); @@ -346,7 +346,7 @@ TEST(tdb_test, simple_delete1) { GTEST_ASSERT_EQ(ret, 0); // open database - ret = tdbDbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb); + ret = tdbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb); GTEST_ASSERT_EQ(ret, 0); tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); @@ -356,7 +356,7 @@ TEST(tdb_test, simple_delete1) { for (int iData = 0; iData < nKV; iData++) { sprintf(key, "key%d", iData); sprintf(data, "data%d", iData); - ret = tdbDbInsert(pDb, key, strlen(key), data, strlen(data), &txn); + ret = tdbInsert(pDb, key, strlen(key), data, strlen(data), &txn); GTEST_ASSERT_EQ(ret, 0); } @@ -365,7 +365,7 @@ TEST(tdb_test, simple_delete1) { sprintf(key, "key%d", iData); sprintf(data, "data%d", iData); - ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData); + ret = tdbGet(pDb, key, strlen(key), &pData, &nData); GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(memcmp(data, pData, nData), 0); } @@ -374,7 +374,7 @@ TEST(tdb_test, simple_delete1) { for (int iData = nKV - 1; iData > 30; iData--) { sprintf(key, "key%d", iData); - ret = tdbDbDelete(pDb, key, strlen(key), &txn); + ret = tdbDelete(pDb, key, strlen(key), &txn); GTEST_ASSERT_EQ(ret, 0); } @@ -382,7 +382,7 @@ TEST(tdb_test, simple_delete1) { for (int iData = 0; iData < nKV; iData++) { sprintf(key, "key%d", iData); - ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData); + ret = tdbGet(pDb, key, strlen(key), &pData, &nData); if (iData <= 30) { GTEST_ASSERT_EQ(ret, 0); } else { @@ -413,7 +413,7 @@ TEST(tdb_test, simple_delete1) { closePool(pPool); - tdbDbClose(pDb); + tdbClose(pDb); tdbEnvClose(pEnv); } @@ -435,7 +435,7 @@ TEST(tdb_test, simple_upsert1) { GTEST_ASSERT_EQ(ret, 0); // open database - ret = tdbDbOpen("db.db", -1, -1, NULL, pEnv, &pDb); + ret = tdbOpen("db.db", -1, -1, NULL, pEnv, &pDb); GTEST_ASSERT_EQ(ret, 0); pPool = openPool(); @@ -446,7 +446,7 @@ TEST(tdb_test, simple_upsert1) { for (int iData = 0; iData < nData; iData++) { sprintf(key, "key%d", iData); sprintf(data, "data%d", iData); - ret = tdbDbInsert(pDb, key, strlen(key), data, strlen(data), &txn); + ret = tdbInsert(pDb, key, strlen(key), data, strlen(data), &txn); GTEST_ASSERT_EQ(ret, 0); } @@ -454,7 +454,7 @@ TEST(tdb_test, simple_upsert1) { for (int iData = 0; iData < nData; iData++) { sprintf(key, "key%d", iData); sprintf(data, "data%d", iData); - ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData); + ret = tdbGet(pDb, key, strlen(key), &pData, &nData); GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0); } @@ -463,7 +463,7 @@ TEST(tdb_test, simple_upsert1) { for (int iData = 0; iData < nData; iData++) { sprintf(key, "key%d", iData); sprintf(data, "data%d-u", iData); - ret = tdbDbUpsert(pDb, key, strlen(key), data, strlen(data), &txn); + ret = tdbUpsert(pDb, key, strlen(key), data, strlen(data), &txn); GTEST_ASSERT_EQ(ret, 0); } @@ -473,11 +473,11 @@ TEST(tdb_test, simple_upsert1) { for (int iData = 0; iData < nData; iData++) { sprintf(key, "key%d", iData); sprintf(data, "data%d-u", iData); - ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData); + ret = tdbGet(pDb, key, strlen(key), &pData, &nData); GTEST_ASSERT_EQ(ret, 0); GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0); } - tdbDbClose(pDb); + tdbClose(pDb); tdbEnvClose(pEnv); } \ No newline at end of file