From b9471b8115b61224d4f5c832a0216ac76cb84e25 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 28 Mar 2022 11:13:59 +0800 Subject: [PATCH 01/11] [TD-14222] add test for encode/decode ResultRow --- source/libs/executor/src/executorimpl.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2417bd352a..b739ffb2b9 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6363,6 +6363,16 @@ static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); doAggregateImpl(pOperator, 0, pInfo->pCtx); + + char *result = NULL; + int32_t length = 0; + pOperator->encodeResultRow(pOperator, &result, &length); + SAggSupporter *pSup = &pAggInfo->aggSup; + taosHashClear(pSup->pResultRowHashTable); + pOperator->decodeResultRow(pOperator, result, length); + if(result){ + free(result); + } } finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput); @@ -6486,7 +6496,7 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t l initResultRow(resultRow); - pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset}; + //pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset}; } if (offset != length){ From c941a2c7109ff651877f362b0ae833236565459f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 29 Mar 2022 18:16:40 +0800 Subject: [PATCH 02/11] add test for encode/decode ResultRow in group by and interval --- include/libs/nodes/nodes.h | 3 +- source/libs/executor/src/executorimpl.c | 76 ++++++++++++++++++++++--- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 411d34063c..9698f0e13e 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -141,7 +141,8 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_SUBPLAN, - QUERY_NODE_PHYSICAL_PLAN + QUERY_NODE_PHYSICAL_PLAN, + QUERY_NODE_PHYSICAL_PLAN_GROUPBY } ENodeType; /** diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5d6495b52a..5ec5d8cd70 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6346,7 +6346,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) { taosHashClear(pSup->pResultRowHashTable); pOperator->decodeResultRow(pOperator, result, length); if(result){ - free(result); + taosMemoryFree(result); } } @@ -6377,8 +6377,27 @@ static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup) } static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) { - SAggOperatorInfo *pAggInfo = pOperator->info; - SAggSupporter *pSup = &pAggInfo->aggSup; + SAggSupporter *pSup = NULL; + switch(pOperator->operatorType){ + case QUERY_NODE_PHYSICAL_PLAN_AGG:{ + SAggOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{ + SGroupbyOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ + STableIntervalOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + default:{ + qDebug("invalid operatorType: %d", pOperator->operatorType); + } + } int32_t size = taosHashGetSize(pSup->pResultRowHashTable); size_t keyLen = POINTER_BYTES; // estimate the key length @@ -6434,9 +6453,28 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t l return false; } - SAggOperatorInfo *pAggInfo = pOperator->info; - SAggSupporter *pSup = &pAggInfo->aggSup; - SOptrBasicInfo *pInfo = &pAggInfo->binfo; + SAggSupporter *pSup = NULL; + switch(pOperator->operatorType){ + case QUERY_NODE_PHYSICAL_PLAN_AGG:{ + SAggOperatorInfo *pAggInfo = pOperator->info; + //SOptrBasicInfo *pInfo = &pAggInfo->binfo; + pSup = &pAggInfo->aggSup; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{ + SGroupbyOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ + STableIntervalOperatorInfo *pAggInfo = pOperator->info; + pSup = &pAggInfo->aggSup; + break; + } + default:{ + qDebug("invalid operatorType: %d", pOperator->operatorType); + } + } // int32_t size = taosHashGetSize(pSup->pResultRowHashTable); int32_t count = *(int32_t*)(result); @@ -6733,6 +6771,16 @@ static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); + + char *result = NULL; + int32_t length = 0; + pOperator->encodeResultRow(pOperator, &result, &length); + SAggSupporter *pSup = &pInfo->aggSup; + taosHashClear(pSup->pResultRowHashTable); + pOperator->decodeResultRow(pOperator, result, length); + if(result){ + taosMemoryFree(result); + } } closeAllResultRows(&pInfo->binfo.resultRowInfo); @@ -7149,6 +7197,16 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); doHashGroupbyAgg(pOperator, pBlock); + + char *result = NULL; + int32_t length = 0; + pOperator->encodeResultRow(pOperator, &result, &length); + SAggSupporter *pSup = &pInfo->aggSup; + taosHashClear(pSup->pResultRowHashTable); + pOperator->decodeResultRow(pOperator, result, length); + if(result){ + taosMemoryFree(result); + } } pOperator->status = OP_RES_TO_RETURN; @@ -7693,6 +7751,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->_openFn = doOpenIntervalAgg; pOperator->getNextFn = doBuildIntervalResult; pOperator->closeFn = destroyIntervalOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -7903,13 +7963,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; -// pOperator->operatorType = OP_Groupby; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUPBY; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = hashGroupbyAggregate; pOperator->closeFn = destroyGroupbyOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; code = appendDownstream(pOperator, &downstream, 1); return pOperator; From bceb557809114fd62a896fa7f318a50739b03745 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Mar 2022 10:05:20 +0800 Subject: [PATCH 03/11] rm useless code --- source/libs/executor/src/dataDispatcher.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 34894c235b..e527e3a315 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -125,7 +125,6 @@ static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputDat pBuf->useSize = sizeof(SRetrieveTableRsp); copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen); - pEntry->dataLen = pEntry->dataLen; pBuf->useSize += pEntry->dataLen; } From 9a543fb331eb5babe931c458ca2ef45fe7f47c1b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 11 Apr 2022 14:09:47 +0800 Subject: [PATCH 04/11] feat:fix error in result row --- source/libs/executor/src/executorimpl.c | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c57079cf4b..947e705c39 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -302,10 +302,6 @@ SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { } taosArrayPush(pBlock->pDataBlock, &idata); - - if (IS_VAR_DATA_TYPE(idata.info.type)) { - pBlock->info.hasVarCol = true; - } } return pBlock; @@ -365,7 +361,7 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env) pResultRowInfo->pPosition = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition)); int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity; - memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition)); + memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc); pResultRowInfo->capacity = (int32_t)newCapacity; } @@ -500,7 +496,7 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, pData = getBufPage(pResultBuf, getPageId(pi)); pageId = getPageId(pi); - if (pData->num + interBufSize + sizeof(SResultRow) > getBufPageSize(pResultBuf)) { + if (pData->num + interBufSize > getBufPageSize(pResultBuf)) { // release current page first, and prepare the next one releaseBufPageInfo(pResultBuf, pi); @@ -520,7 +516,7 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, pResultRow->pageId = pageId; pResultRow->offset = (int32_t)pData->num; - pData->num += interBufSize + sizeof(SResultRow); + pData->num += interBufSize; return pResultRow; } @@ -588,7 +584,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; - taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, POINTER_BYTES); + taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition)); SResultRowCell cell = {.groupId = tableGroupId, .pos = pos}; taosArrayPush(pSup->pResultRowArrayList, &cell); } else { From 8240efda4c98304918ad42993f3122fa09dd25e5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 11 Apr 2022 17:03:41 +0800 Subject: [PATCH 05/11] fix[query]: enable filter on nchar type. --- source/libs/scalar/src/filter.c | 29 +++++++++--------- source/libs/scalar/src/scalar.c | 1 - source/libs/scalar/src/sclvector.c | 48 ++++++++++++++++++++++-------- 3 files changed, 50 insertions(+), 28 deletions(-) diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 2e47e19023..191143de12 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1748,7 +1748,6 @@ int32_t fltInitValFieldData(SFilterInfo *info) { SFilterField* fi = right; SValueNode* var = (SValueNode *)fi->desc; - if (var == NULL) { assert(fi->data != NULL); continue; @@ -1767,13 +1766,18 @@ int32_t fltInitValFieldData(SFilterInfo *info) { } SDataType *dType = &var->node.resType; + size_t bytes = 0; if (type == TSDB_DATA_TYPE_BINARY) { size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE; - fi->data = taosMemoryCalloc(1, len + 1 + VARSTR_HEADER_SIZE); + bytes = len + 1 + VARSTR_HEADER_SIZE; + + fi->data = taosMemoryCalloc(1, bytes); } else if (type == TSDB_DATA_TYPE_NCHAR) { - size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE; - fi->data = taosMemoryCalloc(1, (len + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); + size_t len = (dType->type == TSDB_DATA_TYPE_BINARY || dType->type == TSDB_DATA_TYPE_NCHAR) ? dType->bytes : MAX_NUM_STR_SIZE; + bytes = (len + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; + + fi->data = taosMemoryCalloc(1, bytes); } else if (type != TSDB_DATA_TYPE_JSON){ if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { //TIME RANGE /* @@ -1797,8 +1801,11 @@ int32_t fltInitValFieldData(SFilterInfo *info) { } else { SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))}; out.columnData->info.type = type; - out.columnData->info.bytes = tDataTypes[type].bytes; - ASSERT(!IS_VAR_DATA_TYPE(type)); + if (IS_VAR_DATA_TYPE(type)) { + out.columnData->info.bytes = bytes; + } else { + out.columnData->info.bytes = tDataTypes[type].bytes; + } // todo refactor the convert int32_t code = doConvertDataType(var, &out); @@ -2985,13 +2992,13 @@ bool filterExecuteImplMisc(void *pinfo, int32_t numOfRows, int8_t** p, SColumnDa for (int32_t i = 0; i < numOfRows; ++i) { uint32_t uidx = info->groups[0].unitIdxs[0]; void *colData = colDataGetData((SColumnInfoData *)info->cunits[uidx].colData, i); - if (colData == NULL || colDataIsNull((SColumnInfoData *)info->cunits[uidx].colData, 0, i, NULL)) { + if (colData == NULL || colDataIsNull_s((SColumnInfoData *)info->cunits[uidx].colData, i)) { (*p)[i] = 0; all = false; continue; } - // match/nmatch for nchar type need convert from ucs4 to mbs + // match/nmatch for nchar type need convert from ucs4 to mbs if(info->cunits[uidx].dataType == TSDB_DATA_TYPE_NCHAR && (info->cunits[uidx].optr == OP_TYPE_MATCH || info->cunits[uidx].optr == OP_TYPE_NMATCH)){ char *newColData = taosMemoryCalloc(info->cunits[uidx].dataSize * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE, 1); int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(colData), varDataLen(colData), varDataVal(newColData)); @@ -3222,19 +3229,13 @@ int32_t fltInitFromNode(SNode* tree, SFilterInfo *info, uint32_t options) { info->unitFlags = taosMemoryMalloc(info->unitNum * sizeof(*info->unitFlags)); filterDumpInfoToString(info, "Final", 0); - return code; _return: - qInfo("init from node failed, code:%d", code); - return code; } - - - bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows) { if (FILTER_EMPTY_RES(info)) { return false; diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index f97e5a2d2a..116dfdd5d5 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -46,7 +46,6 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) { colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); colInfoDataEnsureCapacity(out->columnData, 1); - int32_t code = vectorConvertImpl(&in, out); sclFreeParam(&in); diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index 4bce8f33f2..ff677d2f7b 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -177,10 +177,25 @@ static FORCE_INLINE void varToBool(char *buf, SScalarParam* pOut, int32_t rowInd colDataAppendInt8(pOut->columnData, rowIndex, (int8_t*) &v); } +static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIndex) { + int32_t len = 0; + int32_t inputLen = varDataLen(buf); + + char* t = taosMemoryCalloc(1,(inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE); + /*int32_t resLen = */taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4*) varDataVal(t), pOut->columnData->info.bytes, &len); + varDataSetLen(t, len); + + colDataAppend(pOut->columnData, rowIndex, t, false); + taosMemoryFree(t); +} + +//TODO opt performance int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) { int32_t bufSize = pIn->columnData->info.bytes; char *tmp = taosMemoryMalloc(bufSize); + bool vton = false; + _bufConverteFunc func = NULL; if (TSDB_DATA_TYPE_BOOL == outType) { func = varToBool; @@ -190,6 +205,9 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in func = varToUnsigned; } else if (IS_FLOAT_TYPE(outType)) { func = varToFloat; + } else if (outType == TSDB_DATA_TYPE_NCHAR) { + func = varToNchar; + vton = true; } else { sclError("invalid convert outType:%d", outType); return TSDB_CODE_QRY_APP_ERROR; @@ -197,26 +215,30 @@ int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, in pOut->numOfRows = pIn->numOfRows; for (int32_t i = 0; i < pIn->numOfRows; ++i) { - if (colDataIsNull(pIn->columnData, pIn->numOfRows, i, NULL)) { + if (colDataIsNull_s(pIn->columnData, i)) { colDataAppendNULL(pOut->columnData, i); continue; } char* data = colDataGetData(pIn->columnData, i); - if (TSDB_DATA_TYPE_BINARY == inType) { - memcpy(tmp, varDataVal(data), varDataLen(data)); - tmp[varDataLen(data)] = 0; + if (vton) { + memcpy(tmp, data, varDataTLen(data)); } else { - ASSERT (varDataLen(data) <= bufSize); - - int len = taosUcs4ToMbs((TdUcs4*)varDataVal(data), varDataLen(data), tmp); - if (len < 0){ - sclError("castConvert taosUcs4ToMbs error 1"); - taosMemoryFreeClear(tmp); - return TSDB_CODE_QRY_APP_ERROR; + if (TSDB_DATA_TYPE_VARCHAR == inType) { + memcpy(tmp, varDataVal(data), varDataLen(data)); + tmp[varDataLen(data)] = 0; + } else { + ASSERT(varDataLen(data) <= bufSize); + + int len = taosUcs4ToMbs((TdUcs4 *)varDataVal(data), varDataLen(data), tmp); + if (len < 0) { + sclError("castConvert taosUcs4ToMbs error 1"); + taosMemoryFreeClear(tmp); + return TSDB_CODE_QRY_APP_ERROR; + } + + tmp[len] = 0; } - - tmp[len] = 0; } (*func)(tmp, pOut, i); From 4961b6ab8ee7cb58d51e204f73422c1ccc630b39 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 11 Apr 2022 17:08:13 +0800 Subject: [PATCH 06/11] feat: optimize encode/decode resultRow --- source/libs/executor/inc/executorimpl.h | 9 +- source/libs/executor/src/executorimpl.c | 134 ++++++++++------------- source/libs/executor/src/groupoperator.c | 6 +- 3 files changed, 68 insertions(+), 81 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9a75830ec4..526f43cb64 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -232,9 +232,11 @@ typedef struct STaskAttr { } STaskAttr; struct SOperatorInfo; +struct SAggSupporter; +struct SOptrBasicInfo; -typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length); -typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length); +typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char **result, int32_t *length); +typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter *pSup, struct SOptrBasicInfo *pInfo, char *result, int32_t length); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup); @@ -753,6 +755,9 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum); +bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length); +void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d344b93485..8addd53479 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4974,15 +4974,19 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); doAggregateImpl(pOperator, 0, pInfo->pCtx); - char *result = NULL; - int32_t length = 0; - pOperator->encodeResultRow(pOperator, &result, &length); - SAggSupporter *pSup = &pAggInfo->aggSup; - taosHashClear(pSup->pResultRowHashTable); - pOperator->decodeResultRow(pOperator, result, length); - if(result){ - taosMemoryFree(result); + if(pOperator->encodeResultRow){ + char *result = NULL; + int32_t length = 0; + SAggSupporter *pSup = &pAggInfo->aggSup; + pOperator->encodeResultRow(pOperator, pSup, pInfo, &result, &length); + taosHashClear(pSup->pResultRowHashTable); + pInfo->resultRowInfo.size = 0; + pOperator->decodeResultRow(pOperator, pSup, pInfo, result, length); + if(result){ + taosMemoryFree(result); + } } + } finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput); @@ -5011,43 +5015,33 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL; } -static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) { - SAggSupporter *pSup = NULL; - switch(pOperator->operatorType){ - case QUERY_NODE_PHYSICAL_PLAN_AGG:{ - SAggOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{ - SGroupbyOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ - STableIntervalOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - default:{ - qDebug("invalid operatorType: %d", pOperator->operatorType); - } - } - +void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char **result, int32_t *length) { int32_t size = taosHashGetSize(pSup->pResultRowHashTable); - size_t keyLen = POINTER_BYTES; // estimate the key length + size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize); *result = taosMemoryCalloc(1, totalSize); if (*result == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } *(int32_t*)(*result) = size; int32_t offset = sizeof(int32_t); + + // prepare memory + SResultRowPosition* pos = &pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.curPos]; + void* pPage = getBufPage(pSup->pResultBuf, pos->pageId); + SResultRow* pRow = (SResultRow*)((char*)pPage + pos->offset); + setBufPageDirty(pPage, true); + releaseBufPage(pSup->pResultBuf, pPage); + void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL); while (pIter) { void* key = taosHashGetKey(pIter, &keyLen); - SResultRow** p1 = (SResultRow**)pIter; + SResultRowPosition** p1 = (SResultRowPosition**)pIter; + + pPage = (SFilePage*) getBufPage(pSup->pResultBuf, (*p1)->pageId); + pRow = (SResultRow*)((char*)pPage + (*p1)->offset); + setBufPageDirty(pPage, true); + releaseBufPage(pSup->pResultBuf, pPage); // recalculate the result size int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize; @@ -5057,7 +5051,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t terrno = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFree(*result); *result = NULL; - return; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } else { *result = tmp; } @@ -5071,7 +5065,7 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t // save value *(int32_t*)(*result + offset) = pSup->resultRowSize; offset += sizeof(int32_t); - memcpy(*result + offset, *p1, pSup->resultRowSize); + memcpy(*result + offset, pRow, pSup->resultRowSize); offset += pSup->resultRowSize; pIter = taosHashIterate(pSup->pResultRowHashTable, pIter); @@ -5083,34 +5077,11 @@ static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t return; } -static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) { +bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasicInfo *pInfo, char* result, int32_t length) { if (!result || length <= 0) { return false; } - SAggSupporter *pSup = NULL; - switch(pOperator->operatorType){ - case QUERY_NODE_PHYSICAL_PLAN_AGG:{ - SAggOperatorInfo *pAggInfo = pOperator->info; - //SOptrBasicInfo *pInfo = &pAggInfo->binfo; - pSup = &pAggInfo->aggSup; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_GROUPBY:{ - SGroupbyOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ - STableIntervalOperatorInfo *pAggInfo = pOperator->info; - pSup = &pAggInfo->aggSup; - break; - } - default:{ - qDebug("invalid operatorType: %d", pOperator->operatorType); - } - } - // int32_t size = taosHashGetSize(pSup->pResultRowHashTable); int32_t count = *(int32_t*)(result); @@ -5122,17 +5093,16 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l uint64_t tableGroupId = *(uint64_t*)(result + offset); SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); if (!resultRow) { - terrno = TSDB_CODE_TSC_INVALID_INPUT; - return false; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); } // add a new result set for a new group - taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &resultRow, POINTER_BYTES); + SResultRowPosition pos = {.pageId = resultRow->pageId, .offset = resultRow->offset}; + taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &pos, sizeof(SResultRowPosition)); offset += keyLen; int32_t valueLen = *(int32_t*)(result + offset); if (valueLen != pSup->resultRowSize) { - terrno = TSDB_CODE_TSC_INVALID_INPUT; - return false; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); } offset += sizeof(int32_t); int32_t pageId = resultRow->pageId; @@ -5143,12 +5113,13 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t l offset += valueLen; initResultRow(resultRow); - //pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset}; + prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env); + pInfo->resultRowInfo.curPos = pInfo->resultRowInfo.size; + pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset}; } if (offset != length) { - terrno = TSDB_CODE_TSC_INVALID_INPUT; - return false; + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT); } return true; } @@ -5375,16 +5346,21 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); - char *result = NULL; - int32_t length = 0; - pOperator->encodeResultRow(pOperator, &result, &length); - SAggSupporter *pSup = &pInfo->aggSup; - taosHashClear(pSup->pResultRowHashTable); - pOperator->decodeResultRow(pOperator, result, length); - if(result){ - taosMemoryFree(result); +#if 0 + if(pOperator->encodeResultRow){ + char *result = NULL; + int32_t length = 0; + SAggSupporter *pSup = &pInfo->aggSup; + pOperator->encodeResultRow(pOperator, pSup, &pInfo->binfo, &result, &length); + taosHashClear(pSup->pResultRowHashTable); + pInfo->binfo.resultRowInfo.size = 0; + pOperator->decodeResultRow(pOperator, pSup, &pInfo->binfo, result, length); + if(result){ + taosMemoryFree(result); + } } } +#endif closeAllResultRows(&pInfo->binfo.resultRowInfo); finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, @@ -6335,6 +6311,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->info = pInfo; pOperator->getNextFn = doStateWindowAgg; pOperator->closeFn = destroyStateWindowOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6374,6 +6352,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pOperator->info = pInfo; pOperator->getNextFn = doSessionWindowAgg; pOperator->closeFn = destroySWindowOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 3eb8ff1b72..1de33466e3 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -165,7 +165,7 @@ static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { // assign the group keys or user input constant values if required static void doAssignGroupKeys(SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t totalRows, int32_t rowIndex) { for (int32_t i = 0; i < numOfOutput; ++i) { - if (pCtx[i].functionId == -1) { + if (pCtx[i].functionId == -1) { // select count(*),key from t group by key. SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[i]); SColumnInfoData* pColInfoData = pCtx[i].input.pData[0]; @@ -337,7 +337,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; - // pOperator->operatorType = OP_Groupby; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUPBY; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; @@ -345,6 +345,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->_openFn = operatorDummyOpenFn; pOperator->getNextFn = hashGroupbyAggregate; pOperator->closeFn = destroyGroupOperatorInfo; + pOperator->encodeResultRow = aggEncodeResultRow; + pOperator->decodeResultRow = aggDecodeResultRow; code = appendDownstream(pOperator, &downstream, 1); return pOperator; From 78fd2d3a3eabeb6d608c217bb96d04bb18776021 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 11 Apr 2022 17:30:27 +0800 Subject: [PATCH 07/11] feat: optimize encode/decode resultRow --- source/libs/executor/src/executorimpl.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8addd53479..eff5555dd2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4974,6 +4974,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); doAggregateImpl(pOperator, 0, pInfo->pCtx); +#if 0 // test for encode/decode result info if(pOperator->encodeResultRow){ char *result = NULL; int32_t length = 0; @@ -4986,7 +4987,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { taosMemoryFree(result); } } - +#endif } finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput); @@ -5036,10 +5037,10 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter *pSup, SOptrBasi void* pIter = taosHashIterate(pSup->pResultRowHashTable, NULL); while (pIter) { void* key = taosHashGetKey(pIter, &keyLen); - SResultRowPosition** p1 = (SResultRowPosition**)pIter; + SResultRowPosition* p1 = (SResultRowPosition*)pIter; - pPage = (SFilePage*) getBufPage(pSup->pResultBuf, (*p1)->pageId); - pRow = (SResultRow*)((char*)pPage + (*p1)->offset); + pPage = (SFilePage*) getBufPage(pSup->pResultBuf, p1->pageId); + pRow = (SResultRow*)((char*)pPage + p1->offset); setBufPageDirty(pPage, true); releaseBufPage(pSup->pResultBuf, pPage); @@ -5346,7 +5347,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); -#if 0 +#if 0 // test for encode/decode result info if(pOperator->encodeResultRow){ char *result = NULL; int32_t length = 0; From 45b766cd358f0eaeb7df335e6adbcebf507bbee8 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 11 Apr 2022 17:35:17 +0800 Subject: [PATCH 08/11] feat: optimize encode/decode resultRow --- include/libs/nodes/nodes.h | 3 +-- source/libs/executor/src/groupoperator.c | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 68056eff47..c91779ba8b 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -184,8 +184,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_DISPATCH, QUERY_NODE_PHYSICAL_PLAN_INSERT, QUERY_NODE_PHYSICAL_SUBPLAN, - QUERY_NODE_PHYSICAL_PLAN, - QUERY_NODE_PHYSICAL_PLAN_GROUPBY + QUERY_NODE_PHYSICAL_PLAN } ENodeType; /** diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 1de33466e3..8d780fc70c 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -337,7 +337,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->name = "GroupbyAggOperator"; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUPBY; + // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; From 2624fa2711ea5a73c20c07975516497576dbdc3b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 11 Apr 2022 17:36:10 +0800 Subject: [PATCH 09/11] fix[query]: invalid write. --- source/libs/scalar/src/sclvector.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index ff677d2f7b..05456790a5 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -189,10 +189,10 @@ static FORCE_INLINE void varToNchar(char* buf, SScalarParam* pOut, int32_t rowIn taosMemoryFree(t); } -//TODO opt performance +//TODO opt performance, tmp is not needed. int32_t vectorConvertFromVarData(const SScalarParam* pIn, SScalarParam* pOut, int32_t inType, int32_t outType) { int32_t bufSize = pIn->columnData->info.bytes; - char *tmp = taosMemoryMalloc(bufSize); + char *tmp = taosMemoryMalloc(bufSize + VARSTR_HEADER_SIZE); bool vton = false; From bc4e73a3ddc3b1f6eff98b3dbe225bdd8a063a0a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 11 Apr 2022 17:55:53 +0800 Subject: [PATCH 10/11] feat: optimize encode/decode resultRow --- source/libs/executor/src/executorimpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index eff5555dd2..63cd6b77b4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5360,8 +5360,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { taosMemoryFree(result); } } - } #endif + } closeAllResultRows(&pInfo->binfo.resultRowInfo); finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, From 8c27b944bac52f3356a80f991c43e5e8e40b9527 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 11 Apr 2022 18:54:31 +0800 Subject: [PATCH 11/11] fix[query]: disable the auto convert of ucs4 for taos_fetch_raw_block api. --- source/client/inc/clientInt.h | 4 +- source/client/src/clientImpl.c | 81 +++++++++++++++------------- source/client/src/clientMain.c | 6 +-- source/client/src/clientMsgHandler.c | 4 +- 4 files changed, 52 insertions(+), 43 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index ae1b34a3bd..562a1ce696 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -238,9 +238,9 @@ void initMsgHandleFp(); TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port); -void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr); +void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4); int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9938a2e1b9..2949d6eda6 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -13,7 +13,7 @@ static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet); static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest); static void destroySendMsgInfo(SMsgSendInfo* pMsgBody); -static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); +static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); static bool stringLengthCheck(const char* str, size_t maxsize) { if (str == NULL) { @@ -176,7 +176,7 @@ int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { SRetrieveTableRsp* pRsp = NULL; int32_t code = qExecCommand(pQuery->pRoot, &pRsp); if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { - code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp); + code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false); } return code; } @@ -616,7 +616,7 @@ static void doSetOneRowPtr(SReqResultInfo* pResultInfo) { } } -void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) { +void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { assert(pRequest != NULL); SReqResultInfo* pResultInfo = &pRequest->body.resInfo; @@ -637,7 +637,7 @@ void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) { return NULL; } - pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); + pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; return NULL; @@ -735,7 +735,42 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { return TSDB_CODE_SUCCESS; } -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) { +static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) { + for (int32_t i = 0; i < numOfCols; ++i) { + int32_t type = pResultInfo->fields[i].type; + int32_t bytes = pResultInfo->fields[i].bytes; + + if (type == TSDB_DATA_TYPE_NCHAR) { + char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pResultInfo->convertBuf[i] = p; + + SResultColumn* pCol = &pResultInfo->pCol[i]; + for (int32_t j = 0; j < numOfRows; ++j) { + if (pCol->offset[j] != -1) { + char* pStart = pCol->offset[j] + pCol->pData; + + int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p)); + ASSERT(len <= bytes); + + varDataSetLen(p, len); + pCol->offset[j] = (p - pResultInfo->convertBuf[i]); + p += (len + VARSTR_HEADER_SIZE); + } + } + + pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i]; + pResultInfo->row[i] = pResultInfo->pCol[i].pData; + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4) { assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); if (numOfRows == 0) { return TSDB_CODE_SUCCESS; @@ -767,37 +802,11 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 } // convert UCS4-LE encoded character to native multi-bytes character in current data block. - for (int32_t i = 0; i < numOfCols; ++i) { - int32_t type = pResultInfo->fields[i].type; - int32_t bytes = pResultInfo->fields[i].bytes; - - if (type == TSDB_DATA_TYPE_NCHAR) { - char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]); - if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - pResultInfo->convertBuf[i] = p; - - SResultColumn* pCol = &pResultInfo->pCol[i]; - for (int32_t j = 0; j < numOfRows; ++j) { - if (pCol->offset[j] != -1) { - pStart = pCol->offset[j] + pCol->pData; - - int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p)); - ASSERT(len <= bytes); - - varDataSetLen(p, len); - pCol->offset[j] = (p - pResultInfo->convertBuf[i]); - p += (len + VARSTR_HEADER_SIZE); - } - } - - pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i]; - pResultInfo->row[i] = pResultInfo->pCol[i].pData; - } + if (convertUcs4) { + code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength); } - return TSDB_CODE_SUCCESS; + return code; } char* getDbOfConnection(STscObj* pObj) { @@ -829,7 +838,7 @@ void resetConnectDB(STscObj* pTscObj) { taosThreadMutexUnlock(&pTscObj->mutex); } -int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) { +int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) { assert(pResultInfo != NULL && pRsp != NULL); pResultInfo->pRspMsg = (const char*)pRsp; @@ -842,5 +851,5 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR // TODO handle the compressed case pResultInfo->totalRows += pResultInfo->numOfRows; - return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows); + return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e10cf5179e..51629510d0 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -168,7 +168,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - return doFetchRow(pRequest, true); + return doFetchRow(pRequest, true, true); } int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { @@ -404,7 +404,7 @@ int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) { return 0; } - doFetchRow(pRequest, false); + doFetchRow(pRequest, false, true); // TODO refactor SReqResultInfo *pResultInfo = &pRequest->body.resInfo; @@ -426,7 +426,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) { return 0; } - doFetchRow(pRequest, false); + doFetchRow(pRequest, false, false); SReqResultInfo *pResultInfo = &pRequest->body.resInfo; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 4314391743..99c6e81551 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -191,7 +191,7 @@ int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) pResInfo->completed = pRetrieve->completed; pResInfo->current = 0; - setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); +// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows, pRetrieve->completed, pRequest->body.showInfo.execId); @@ -225,7 +225,7 @@ int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) { pResInfo->pData = pFetchRsp->data; pResInfo->current = 0; - setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); +// setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows); tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows, pFetchRsp->completed, pRequest->body.showInfo.execId);