From fb36f5e24e4857b3d97d14b1a888e78ae4b9ab00 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Apr 2022 19:10:53 +0800 Subject: [PATCH 1/4] [td-14428] return correct length for both nchar and varchar type. --- source/client/inc/clientInt.h | 1 + source/client/src/clientEnv.c | 1 + source/client/src/clientImpl.c | 16 ++++++++++++++-- source/client/src/clientMain.c | 2 +- 4 files changed, 17 insertions(+), 3 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 49cb12cccd..e930dc2675 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -153,6 +153,7 @@ typedef struct SReqResultInfo { const char* pRspMsg; const char* pData; TAOS_FIELD* fields; + TAOS_FIELD* userFields; // the fields info that return to user uint32_t numOfCols; int32_t* length; char** convertBuf; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 53cc77af1a..769870ada8 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -169,6 +169,7 @@ static void doFreeReqResultInfo(SReqResultInfo *pResInfo) { taosMemoryFreeClear(pResInfo->row); taosMemoryFreeClear(pResInfo->pCol); taosMemoryFreeClear(pResInfo->fields); + taosMemoryFreeClear(pResInfo->userFields); if (pResInfo->convertBuf != NULL) { for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 8cd53f18b0..4b41785d08 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -228,12 +228,24 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t assert(pSchema != NULL && numOfCols > 0); pResInfo->numOfCols = numOfCols; - pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(pSchema[0])); + pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); + pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD)); for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { pResInfo->fields[i].bytes = pSchema[i].bytes; - pResInfo->fields[i].type = pSchema[i].type; + pResInfo->fields[i].type = pSchema[i].type; + + pResInfo->userFields[i].bytes = pSchema[i].bytes; + pResInfo->userFields[i].type = pSchema[i].type; + + if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) { + pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE; + } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) { + pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; + } + tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); + tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name)); } } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 42d4b5b2e5..2dd77b7f22 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -146,7 +146,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) { } SReqResultInfo *pResInfo = &(((SRequestObj *)res)->body.resInfo); - return pResInfo->fields; + return pResInfo->userFields; } TAOS_RES *taos_query(TAOS *taos, const char *sql) { From 9d16e862770f61b3230e30845958746d84239c8b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Apr 2022 19:11:14 +0800 Subject: [PATCH 2/4] [td-13039] refactor. --- source/libs/executor/inc/executorimpl.h | 21 -- source/libs/executor/src/executorimpl.c | 334 ------------------------ 2 files changed, 355 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 031f06adfd..ee07ea8eb1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -501,12 +501,6 @@ typedef struct SProjectOperatorInfo { int64_t curOutput; } SProjectOperatorInfo; -typedef struct SLimitOperatorInfo { - SLimit limit; - int64_t currentOffset; - int64_t currentRows; -} SLimitOperatorInfo; - typedef struct SSLimitOperatorInfo { int64_t groupTotal; int64_t currentGroupOffset; @@ -525,11 +519,6 @@ typedef struct SSLimitOperatorInfo { int64_t threshold; } SSLimitOperatorInfo; -typedef struct SFilterOperatorInfo { - SSingleColumnFilterInfo* pFilterInfo; - int32_t numOfFilterCols; -} SFilterOperatorInfo; - typedef struct SFillOperatorInfo { struct SFillInfo* pFillInfo; SSDataBlock* pRes; @@ -682,22 +671,12 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); - void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput); - -void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); -int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters); - -int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId); -void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); - STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win); -STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 845e2cd878..21ba328b43 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -194,9 +194,6 @@ static void getNextTimeWindow(SInterval* pInterval, int32_t precision, int32_t o } static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); -static void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, - int32_t numOfCols, int32_t* rowCellInfoOffset); - void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); static bool functionNeedToExecute(SqlFunctionCtx* pCtx); @@ -214,8 +211,6 @@ static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); // static STsdbQueryCond createTsdbQueryCond(STaskAttr* pQueryAttr, STimeWindow* win); static STableIdInfo createTableIdInfo(STableQueryInfo* pTableQueryInfo); -static void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInfo* pDownstream); - static int32_t getNumOfScanTimes(STaskAttr* pQueryAttr); static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); @@ -264,10 +259,6 @@ static void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); -static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, - SExprInfo* pExpr); -static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, - SExprInfo* pExpr, char* val, int16_t bytes); static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo); @@ -3344,11 +3335,6 @@ void setTagValue(SOperatorInfo* pOperatorInfo, void* pTable, SqlFunctionCtx* pCt offset += pLocalExprInfo->base.resSchema.bytes; } - - // todo : use index to avoid iterator all possible output columns - if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { - setParamForStableStddev(pRuntimeEnv, pCtx, numOfOutput, pExprInfo); - } } // set the tsBuf start position before check each data block @@ -3544,19 +3530,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput) } } -void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity) { - SSDataBlock* pDataBlock = pBInfo->pRes; - - for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); - - int32_t functionId = pBInfo->pCtx[i].functionId; - if (functionId < 0) { - memset(pBInfo->pCtx[i].pOutput, 0, pColInfo->info.bytes * (*bufCapacity)); - } - } -} - void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) { for (int32_t j = 0; j < size; ++j) { struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]); @@ -3714,23 +3687,6 @@ STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow return pTableQueryInfo; } -STableQueryInfo* createTmpTableQueryInfo(STimeWindow win) { - STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(1, sizeof(STableQueryInfo)); - - // pTableQueryInfo->win = win; - pTableQueryInfo->lastKey = win.skey; - - // set more initial size of interval/groupby query - int32_t initialSize = 16; - int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize); - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pTableQueryInfo); - return NULL; - } - - return pTableQueryInfo; -} - void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; @@ -3834,30 +3790,6 @@ void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKe pAggInfo->groupId = tableGroupId; } -void setResultOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfCols, - int32_t* rowCellInfoOffset) { - // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group - SFilePage* page = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); - - int16_t offset = 0; - for (int32_t i = 0; i < numOfCols; ++i) { - pCtx[i].pOutput = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResult->offset, offset); - offset += pCtx[i].resDataInfo.bytes; - - int32_t functionId = pCtx[i].functionId; - if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF || - functionId == FUNCTION_DERIVATIVE) { - if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput; - } - - /* - * set the output buffer information and intermediate buffer, - * not all queries require the interResultBuf, such as COUNT - */ - pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); - } -} - void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable) { STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -4626,73 +4558,6 @@ static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_ return terrno; } -int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator, - void* param) { - STaskRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv; - - STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; - pQueryAttr->tsdb = tsdb; - - if (tsdb != NULL) { - int32_t code = setupQueryHandle(tsdb, pRuntimeEnv, pQInfo->qId, pQueryAttr->stableQuery); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - - pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr); - - pRuntimeEnv->groupResInfo.totalGroup = (int32_t)(pQueryAttr->stableQuery ? GET_NUM_OF_TABLEGROUP(pRuntimeEnv) : 0); - pRuntimeEnv->enableGroupData = false; - - pRuntimeEnv->pQueryAttr = pQueryAttr; - pRuntimeEnv->pTsBuf = pTsBuf; - pRuntimeEnv->cur.vgroupIndex = -1; - setResultBufSize(pQueryAttr, &pRuntimeEnv->resultInfo); - - if (sourceOptr != NULL) { - assert(pRuntimeEnv->proot == NULL); - pRuntimeEnv->proot = sourceOptr; - } - - if (pTsBuf != NULL) { - int16_t order = (pQueryAttr->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; - tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order); - } - - int32_t ps = 4096; - getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize); - - int32_t TENMB = 1024 * 1024 * 10; - int32_t code = createDiskbasedBuf(&pRuntimeEnv->pResultBuf, ps, TENMB, "", "/tmp"); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // create runtime environment - int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables; - pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); - pQInfo->summary.queryProfEvents = taosArrayInit(512, sizeof(SQueryProfEvent)); - if (pQInfo->summary.queryProfEvents == NULL) { - // qDebug("QInfo:0x%"PRIx64" failed to allocate query prof events array", pQInfo->qId); - } - - pQInfo->summary.operatorProfResults = - taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK); - - if (pQInfo->summary.operatorProfResults == NULL) { - // qDebug("QInfo:0x%"PRIx64" failed to allocate operator prof results hash", pQInfo->qId); - } - - code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t)pQueryAttr->tableGroupInfo.numOfTables, pOperator, param); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // setTaskStatus(pOperator->pTaskInfo, QUERY_NOT_COMPLETED); - return TSDB_CODE_SUCCESS; -} - static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) { #if 0 if (order == TSDB_ORDER_ASC) { @@ -6890,58 +6755,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; } -static SSDataBlock* doLimit(SOperatorInfo* pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SLimitOperatorInfo* pInfo = pOperator->info; - - SSDataBlock* pBlock = NULL; - SOperatorInfo* pDownstream = pOperator->pDownstream[0]; - - while (1) { - publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pDownstream->getNextFn(pDownstream, newgroup); - publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); - return NULL; - } - - if (pInfo->currentOffset == 0) { - break; - } else if (pInfo->currentOffset >= pBlock->info.rows) { - pInfo->currentOffset -= pBlock->info.rows; - } else { // TODO handle the data movement - int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); - pBlock->info.rows = remain; - - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); - } - - pInfo->currentOffset = 0; - break; - } - } - - if (pInfo->currentRows + pBlock->info.rows >= pInfo->limit.limit) { - pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->currentRows); - pInfo->currentRows = pInfo->limit.limit; - - doSetOperatorCompleted(pOperator); - } else { - pInfo->currentRows += pBlock->info.rows; - } - - return pBlock; -} - static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; @@ -7791,11 +7604,6 @@ static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pSortInfo); } -static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { - SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*)param; - doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols); -} - static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param; taosHashCleanup(pInfo->pSet); @@ -9175,41 +8983,6 @@ _complete: return code; } -int32_t cloneExprFilterInfo(SColumnFilterInfo** dst, SColumnFilterInfo* src, int32_t filterNum) { - if (filterNum <= 0) { - return TSDB_CODE_SUCCESS; - } - - *dst = taosMemoryCalloc(filterNum, sizeof(*src)); - if (*dst == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - - memcpy(*dst, src, sizeof(*src) * filterNum); - - for (int32_t i = 0; i < filterNum; i++) { - if ((*dst)[i].filterstr && dst[i]->len > 0) { - void* pz = taosMemoryCalloc(1, (size_t)(*dst)[i].len + 1); - - if (pz == NULL) { - if (i == 0) { - taosMemoryFree(*dst); - } else { - freeColumnFilterInfo(*dst, i); - } - - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - - memcpy(pz, (void*)src->pz, (size_t)src->len + 1); - - (*dst)[i].pz = (int64_t)pz; - } - } - - return TSDB_CODE_SUCCESS; -} - static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SColumnInfo* pTagCols, SExprInfo* pExprs, int32_t numOfOutput, int32_t tagLen, bool superTable) { for (int32_t i = 0; i < numOfOutput; ++i) { @@ -9232,113 +9005,6 @@ static int32_t updateOutputBufForTopBotQuery(SQueriedTableInfo* pTableInfo, SCol return TSDB_CODE_SUCCESS; } -// TODO tag length should be passed from client, refactor -int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters) { - tExprNode* expr = NULL; - - TRY(TSDB_MAX_TAG_CONDITIONS) { expr = exprTreeFromBinary(data, len); } - CATCH(code) { - CLEANUP_EXECUTE(); - return code; - } - END_TRY - - if (expr == NULL) { - // qError("failed to create expr tree"); - return TSDB_CODE_QRY_APP_ERROR; - } - - // int32_t ret = filterInitFromTree(expr, pFilters, 0); - // tExprTreeDestroy(expr, NULL); - - // return ret; -} - -// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** -// pFilterInfo, uint64_t qId) { -// *pFilterInfo = taosMemoryCalloc(1, sizeof(SSingleColumnFilterInfo) * numOfFilterCols); -// if (*pFilterInfo == NULL) { -// return TSDB_CODE_QRY_OUT_OF_MEMORY; -// } -// -// for (int32_t i = 0, j = 0; i < numOfCols; ++i) { -// if (pCols[i].flist.numOfFilters > 0) { -// SSingleColumnFilterInfo* pFilter = &((*pFilterInfo)[j]); -// -// memcpy(&pFilter->info, &pCols[i], sizeof(SColumnInfo)); -// pFilter->info = pCols[i]; -// -// pFilter->numOfFilters = pCols[i].flist.numOfFilters; -// pFilter->pFilters = taosMemoryCalloc(pFilter->numOfFilters, sizeof(SColumnFilterElem)); -// if (pFilter->pFilters == NULL) { -// return TSDB_CODE_QRY_OUT_OF_MEMORY; -// } -// -// for (int32_t f = 0; f < pFilter->numOfFilters; ++f) { -// SColumnFilterElem* pSingleColFilter = &pFilter->pFilters[f]; -// pSingleColFilter->filterInfo = pCols[i].flist.filterInfo[f]; -// -// int32_t lower = pSingleColFilter->filterInfo.lowerRelOptr; -// int32_t upper = pSingleColFilter->filterInfo.upperRelOptr; -// if (lower == TSDB_RELATION_INVALID && upper == TSDB_RELATION_INVALID) { -// //qError("QInfo:0x%"PRIx64" invalid filter info", qId); -// return TSDB_CODE_QRY_INVALID_MSG; -// } -// -// pSingleColFilter->fp = getFilterOperator(lower, upper); -// if (pSingleColFilter->fp == NULL) { -// //qError("QInfo:0x%"PRIx64" invalid filter info", qId); -// return TSDB_CODE_QRY_INVALID_MSG; -// } -// -// pSingleColFilter->bytes = pCols[i].bytes; -// -// if (lower == TSDB_RELATION_IN) { -//// buildFilterSetFromBinary(&pSingleColFilter->q, (char *)(pSingleColFilter->filterInfo.pz), -///(int32_t)(pSingleColFilter->filterInfo.len)); -// } -// } -// -// j++; -// } -// } -// -// return TSDB_CODE_SUCCESS; -//} - -void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) { - // for (int32_t i = 0; i < numOfFilterCols; ++i) { - // if (pFilterInfo[i].numOfFilters > 0) { - // if (pFilterInfo[i].pFilters->filterInfo.lowerRelOptr == TSDB_RELATION_IN) { - // taosHashCleanup((SHashObj *)(pFilterInfo[i].pFilters->q)); - // } - // taosMemoryFreeClear(pFilterInfo[i].pFilters); - // } - // } - // - // taosMemoryFreeClear(pFilterInfo); - return NULL; -} - -int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId) { - for (int32_t i = 0; i < pQueryAttr->numOfCols; ++i) { - // if (pQueryAttr->tableCols[i].flist.numOfFilters > 0 && pQueryAttr->tableCols[i].flist.filterInfo != NULL) { - // pQueryAttr->numOfFilterCols++; - // } - } - - if (pQueryAttr->numOfFilterCols == 0) { - return TSDB_CODE_SUCCESS; - } - - // doCreateFilterInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols, pQueryAttr->numOfFilterCols, - // &pQueryAttr->pFilterInfo, qId); - - pQueryAttr->createFilterOperator = true; - - return TSDB_CODE_SUCCESS; -} - static void doUpdateExprColumnIndex(STaskAttr* pQueryAttr) { assert(pQueryAttr->pExpr1 != NULL && pQueryAttr != NULL); From 803e1ab162a916f74c6d889f4637d040461ae565 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Apr 2022 19:31:24 +0800 Subject: [PATCH 3/4] [td-14393] fix bug. --- include/client/taos.h | 2 +- source/client/inc/clientInt.h | 5 +++-- source/client/src/clientImpl.c | 13 ++++++++++++- source/client/src/clientMain.c | 10 ++++++---- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 111cd8ad3b..792037aaa7 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -188,7 +188,7 @@ DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); -DLL_EXPORT void taos_reset_current_db(TAOS *taos); +DLL_EXPORT void taos_reset_current_db(TAOS *taos); DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index e930dc2675..0749ce9e7d 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -136,7 +136,7 @@ typedef struct STscObj { uint32_t connId; int32_t connType; uint64_t id; // ref ID returned by taosAddRef - TdThreadMutex mutex; // used to protect the operation on db + TdThreadMutex mutex; // used to protect the operation on db int32_t numOfReqs; // number of sqlObj bound to this connection SAppInstInfo* pAppInfo; } STscObj; @@ -152,7 +152,7 @@ typedef struct SResultColumn { typedef struct SReqResultInfo { const char* pRspMsg; const char* pData; - TAOS_FIELD* fields; + TAOS_FIELD* fields; // todo, column names are not needed. TAOS_FIELD* userFields; // the fields info that return to user uint32_t numOfCols; int32_t* length; @@ -222,6 +222,7 @@ void destroyRequest(SRequestObj* pRequest); char* getDbOfConnection(STscObj* pObj); void setConnectionDB(STscObj* pTscObj, const char* db); +void resetConnectDB(STscObj* pTscObj); void taos_init_imp(void); int taos_options_imp(TSDB_OPTION option, const char* str); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 4b41785d08..4e68b5331f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -56,7 +56,7 @@ TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, } char localDb[TSDB_DB_NAME_LEN] = {0}; - if (db != NULL) { + if (db != NULL && strlen(db) > 0) { if (!validateDbName(db)) { terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH; return NULL; @@ -164,6 +164,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) { if ((*pQuery)->haveResultSet) { setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols); } + TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*); TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*); } @@ -803,6 +804,16 @@ void setConnectionDB(STscObj* pTscObj, const char* db) { taosThreadMutexUnlock(&pTscObj->mutex); } +void resetConnectDB(STscObj* pTscObj) { + if (pTscObj == NULL) { + return; + } + + taosThreadMutexLock(&pTscObj->mutex); + pTscObj->db[0] = 0; + taosThreadMutexUnlock(&pTscObj->mutex); +} + int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { assert(pResultInfo != NULL && pRsp != NULL); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 2dd77b7f22..0ca6169883 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -365,8 +365,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { } bool taos_is_update_query(TAOS_RES *res) { - // TODO - return true; + return taos_num_fields(res) == 0; } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { @@ -393,8 +392,11 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { int taos_validate_sql(TAOS *taos, const char *sql) { return true; } void taos_reset_current_db(TAOS *taos) { - // TODO - return; + if (taos == NULL) { + return; + } + + resetConnectDB(taos); } const char *taos_get_server_info(TAOS *taos) { From 751cad9c2a6f3f2202dd4e52a2907e4527b5b9d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 1 Apr 2022 20:19:40 +0800 Subject: [PATCH 4/4] [td-14393] fix bug. --- source/libs/executor/inc/executorimpl.h | 3 +- source/libs/executor/src/executorimpl.c | 51 ++++++++++++------- source/libs/scalar/src/filter.c | 1 - .../libs/scalar/test/filter/filterTests.cpp | 20 -------- 4 files changed, 36 insertions(+), 39 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ee07ea8eb1..64e2b2c1d3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -393,6 +393,7 @@ typedef struct STableScanInfo { int32_t times; // repeat counts int32_t current; int32_t reverseTimes; // 0 by default + SNode* pFilterNode; // filter operator info SqlFunctionCtx* pCtx; // next operator query context SResultRowInfo* pResultRowInfo; int32_t* rowCellInfoOffset; @@ -628,7 +629,7 @@ typedef struct SDistinctOperatorInfo { SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, - int32_t reverseTime, SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo); + int32_t reverseTime, SArray* pColMatchInfo, SNode* pCondition, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 21ba328b43..6cb95b9a3e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3016,6 +3016,23 @@ int32_t loadDataBlock(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, taosArraySet(pBlock->pDataBlock, pColMatchInfo->targetSlotId, p); } + if (pTableScanInfo->pFilterNode != NULL) { + SFilterInfo* filter = NULL; + int32_t code = filterInitFromNode((SNode*)pTableScanInfo->pFilterNode, &filter, 0); + + SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; + code = filterSetDataFromSlotId(filter, ¶m1); + + int8_t* rowRes = NULL; + bool keep = filterExecute(filter, pBlock, &rowRes, NULL, param1.numOfCols); + +// filterSetColFieldData(pQueryAttr->pFilters, pBlock->info.numOfCols, pBlock->pDataBlock); + +// if (pQueryAttr->pFilters != NULL) { +// filterColRowsInDataBlock(pRuntimeEnv, pBlock, ascQuery); +// } + } + return TSDB_CODE_SUCCESS; } @@ -4655,7 +4672,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { // break; // } // - // pRuntimeEnv->current = *pTableQueryInfo; // doTableQueryInfoTimeWindowCheck(pTaskInfo, *pTableQueryInfo, pTableScanInfo->order); // } @@ -5413,7 +5429,7 @@ SSDataBlock* createResultDataBlock(const SArray* pExprInfo) { SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, - SExecTaskInfo* pTaskInfo) { + SNode* pCondition, SExecTaskInfo* pTaskInfo) { assert(repeatTime > 0); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); @@ -5432,21 +5448,22 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, taosArrayPush(pInfo->block.pDataBlock, &idata); } - pInfo->pTsdbReadHandle = pTsdbReadHandle; - pInfo->times = repeatTime; - pInfo->reverseTimes = reverseTime; - pInfo->order = order; - pInfo->current = 0; - pInfo->scanFlag = MAIN_SCAN; - pInfo->pColMatchInfo = pColMatchInfo; - pOperator->name = "TableScanOperator"; + pInfo->pFilterNode = pCondition; + pInfo->pTsdbReadHandle = pTsdbReadHandle; + pInfo->times = repeatTime; + pInfo->reverseTimes = reverseTime; + pInfo->order = order; + pInfo->current = 0; + pInfo->scanFlag = MAIN_SCAN; + pInfo->pColMatchInfo = pColMatchInfo; + pOperator->name = "TableScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->blockingOptr = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; - pOperator->getNextFn = doTableScan; - pOperator->pTaskInfo = pTaskInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->numOfOutput = numOfOutput; + pOperator->getNextFn = doTableScan; + pOperator->pTaskInfo = pTaskInfo; return pOperator; } @@ -5703,7 +5720,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { SColumnInfoData* pColInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i); int64_t tmp = 0; char t[10] = {0}; - STR_TO_VARSTR(t, "_"); + STR_TO_VARSTR(t, "_"); //TODO if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { colDataAppend(pColInfoData, numOfRows, t, false); } else { @@ -8618,7 +8635,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, - pScanPhyNode->reverse, pColList, pTaskInfo); + pScanPhyNode->reverse, pColList, pScanPhyNode->node.pConditions, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index b0632bbc34..434316add6 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3668,7 +3668,6 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData taosArrayPush(pList, &pSrc); FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output)); - taosArrayDestroy(pList); // TODO Fix it // *p = output.orig.data; diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index e2ac0d7421..1a82c8f955 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -275,7 +275,6 @@ TEST(timerangeTest, greater_and_lower) { nodesDestroyNode(logicNode); } - TEST(columnTest, smallint_column_greater_double_value) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; int16_t leftv[5]= {1, 2, 3, 4, 5}; @@ -386,7 +385,6 @@ TEST(columnTest, int_column_greater_smallint_value) { blockDataDestroy(src); } - TEST(columnTest, int_column_in_double_list) { SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL; int32_t leftv[5] = {1, 2, 3, 4, 5}; @@ -432,8 +430,6 @@ TEST(columnTest, int_column_in_double_list) { blockDataDestroy(src); } - - TEST(columnTest, binary_column_in_binary_list) { SNode *pLeft = NULL, *pRight = NULL, *listNode = NULL, *opNode = NULL; bool eRes[5] = {true, true, false, false, false}; @@ -497,7 +493,6 @@ TEST(columnTest, binary_column_in_binary_list) { blockDataDestroy(src); } - TEST(columnTest, binary_column_like_binary) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; char rightv[64] = {0}; @@ -546,7 +541,6 @@ TEST(columnTest, binary_column_like_binary) { blockDataDestroy(src); } - TEST(columnTest, binary_column_is_null) { SNode *pLeft = NULL, *opNode = NULL; char leftv[5][5]= {0}; @@ -641,8 +635,6 @@ TEST(columnTest, binary_column_is_not_null) { blockDataDestroy(src); } - - TEST(opTest, smallint_column_greater_int_column) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; int16_t leftv[5] = {1, -6, -2, 11, 101}; @@ -680,7 +672,6 @@ TEST(opTest, smallint_column_greater_int_column) { blockDataDestroy(src); } - TEST(opTest, smallint_value_add_int_column) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; int32_t leftv = 1; @@ -719,8 +710,6 @@ TEST(opTest, smallint_value_add_int_column) { blockDataDestroy(src); } - - TEST(opTest, bigint_column_multi_binary_column) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; int64_t leftv[5]= {1, 2, 3, 4, 5}; @@ -845,8 +834,6 @@ TEST(opTest, smallint_column_or_float_column) { blockDataDestroy(src); } - - TEST(opTest, smallint_column_or_double_value) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; int16_t leftv[5]= {0, 2, 3, 0, -1}; @@ -885,7 +872,6 @@ TEST(opTest, smallint_column_or_double_value) { blockDataDestroy(src); } - TEST(opTest, binary_column_is_true) { SNode *pLeft = NULL, *opNode = NULL; char leftv[5][5]= {0}; @@ -930,7 +916,6 @@ TEST(opTest, binary_column_is_true) { blockDataDestroy(src); } - TEST(filterModelogicTest, diff_columns_and_or_and) { flttInitLogFile(); @@ -1071,7 +1056,6 @@ TEST(filterModelogicTest, same_column_and_or_and) { blockDataDestroy(src); } - TEST(filterModelogicTest, diff_columns_or_and_or) { SNode *pLeft1 = NULL, *pRight1 = NULL, *pLeft2 = NULL, *pRight2 = NULL, *opNode1 = NULL, *opNode2 = NULL; SNode *logicNode1 = NULL, *logicNode2 = NULL; @@ -1210,8 +1194,6 @@ TEST(filterModelogicTest, same_column_or_and_or) { blockDataDestroy(src); } - - TEST(scalarModelogicTest, diff_columns_or_and_or) { flttInitLogFile(); @@ -1283,8 +1265,6 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) { blockDataDestroy(src); } - - int main(int argc, char** argv) { taosSeedRand(taosGetTimestampSec()); testing::InitGoogleTest(&argc, argv);