From 162cf572eb51efb014931275e917ddba7556fb98 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 3 May 2022 17:51:21 +0800 Subject: [PATCH 01/13] modify udf comments --- include/libs/function/tudf.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index e49f5cac45..5207a73487 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -54,7 +54,7 @@ int32_t setupUdf(char udfName[], UdfcFuncHandle *handle); typedef struct SUdfColumnMeta { int16_t type; - int32_t bytes; // <0 var length, others fixed length bytes + int32_t bytes; uint8_t precision; uint8_t scale; } SUdfColumnMeta; @@ -133,8 +133,7 @@ typedef int32_t (*TUdfTeardownFunc)(); //TODO: add API to check function arguments type, number etc. //TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory. // then UDF framework will free the memory -//typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data); -//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data); +// int32_t udfColDataAppend(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); From eb18d518ed64e477e6a173c29a04833e3eeedc7a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 May 2022 15:44:46 +0800 Subject: [PATCH 02/13] fix(driver): eliminate the memory leak. --- source/libs/parser/src/parInsert.c | 1 + source/libs/parser/src/parTranslater.c | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index f452838921..152aa6a01e 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1043,6 +1043,7 @@ static void destroyDataBlock(STableDataBlocks* pDataBlock) { static void destroyInsertParseContext(SInsertParseContext* pCxt) { destroyInsertParseContextForTable(pCxt); taosHashCleanup(pCxt->pVgroupsHashObj); + taosHashCleanup(pCxt->pSubTableHashObj); destroyBlockHashmap(pCxt->pTableBlockHashObj); destroyBlockArrayList(pCxt->pVgDataBlocks); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 30e55420d4..fbb1f34217 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4062,7 +4062,8 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { int32_t translate(SParseContext* pParseCxt, SQuery* pQuery) { STranslateContext cxt = {0}; - int32_t code = initTranslateContext(pParseCxt, &cxt); + + int32_t code = initTranslateContext(pParseCxt, &cxt); if (TSDB_CODE_SUCCESS == code) { code = fmFuncMgtInit(); } From 455737f2cbd11be1301df5343b0c0059c8156c04 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 5 May 2022 17:15:09 +0800 Subject: [PATCH 03/13] fix(query): add hex encoding/decoding for transmitting nchar constant from client to server in JSON --- include/os/osString.h | 2 ++ source/libs/nodes/src/nodesCodeFuncs.c | 36 +++++++++++++++++++++++--- source/os/src/osString.c | 30 +++++++++++++++++++++ 3 files changed, 65 insertions(+), 3 deletions(-) diff --git a/include/os/osString.h b/include/os/osString.h index a4100652c3..026cb33ad9 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -59,6 +59,8 @@ bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes); TdUcs4* tasoUcs4Copy(TdUcs4 *target_ucs4, TdUcs4 *source_ucs4, int32_t len_ucs4); bool taosValidateEncodec(const char *encodec); +int32_t taosHexEncode(const char *src, char *dst, int32_t len); +int32_t taosHexDecode(const char *src, char *dst, int32_t len); int32_t taosWcharWidth(TdWchar wchar); int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0e6ec4f945..8320aa5a1b 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -1699,7 +1699,18 @@ static int32_t datumToJson(const void* pObj, SJson* pJson) { case TSDB_DATA_TYPE_DOUBLE: code = tjsonAddDoubleToObject(pJson, jkValueDatum, pNode->datum.d); break; - case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_NCHAR: { + //cJSON only support utf-8 encoding. Convert memory content to hex string. + char *buf = taosMemoryCalloc(varDataLen(pNode->datum.p) * 2 + 1, sizeof(char)); + code = taosHexEncode(varDataVal(pNode->datum.p), buf, varDataLen(pNode->datum.p)); + if(code != TSDB_CODE_SUCCESS) { + taosMemoryFree(buf); + return TSDB_CODE_TSC_INVALID_VALUE; + } + code = tjsonAddStringToObject(pJson, jkValueDatum, buf); + taosMemoryFree(buf); + break; + } case TSDB_DATA_TYPE_VARCHAR: case TSDB_DATA_TYPE_VARBINARY: code = tjsonAddStringToObject(pJson, jkValueDatum, varDataVal(pNode->datum.p)); @@ -1773,8 +1784,27 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) { code = TSDB_CODE_OUT_OF_MEMORY; break; } - varDataSetLen(pNode->datum.p, pNode->node.resType.bytes); - code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p)); + varDataSetLen(pNode->datum.p, pNode->node.resType.bytes - VARSTR_HEADER_SIZE); + if (TSDB_DATA_TYPE_NCHAR == pNode->node.resType.type) { + char *buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + VARSTR_HEADER_SIZE + 1); + if (NULL == buf) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + code = tjsonGetStringValue(pJson, jkValueDatum, buf); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(buf); + break; + } + code = taosHexDecode(buf, varDataVal(pNode->datum.p), pNode->node.resType.bytes - VARSTR_HEADER_SIZE); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(buf); + break; + } + taosMemoryFree(buf); + } else { + code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p)); + } break; } case TSDB_DATA_TYPE_JSON: diff --git a/source/os/src/osString.c b/source/os/src/osString.c index ed596a051d..375c5001f4 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -195,6 +195,36 @@ int32_t taosUcs4len(TdUcs4 *ucs4) { return n; } +//dst buffer size should be at least 2*len + 1 +int32_t taosHexEncode(const char *src, char *dst, int32_t len) { + if (!dst) { + return -1; + } + + for (int32_t i = 0; i < len; ++i) { + sprintf(dst + i * 2, "%02x", src[i] & 0xff); + } + + return 0; +} + +int32_t taosHexDecode(const char *src, char *dst, int32_t len) { + if (!dst) { + return -1; + } + + uint16_t hn, ln, out; + for (int i = 0, j = 0; i < len * 2; i += 2, ++j ) { + hn = src[i] > '9' ? src[i] - 'A' + 10 : src[i] - '0'; + ln = src[i + 1] > '9' ? src[i + 1] - 'A' + 10 : src[i + 1] - '0'; + + out = (hn << 4) | ln; + memcpy(dst + j, &out, 1); + } + + return 0; +} + int32_t taosWcharWidth(TdWchar wchar) { return wcwidth(wchar); } int32_t taosWcharsWidth(TdWchar *pWchar, int32_t size) { return wcswidth(pWchar, size); } From f904e5912ee4decd0642302839f3e858821814f2 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 5 May 2022 17:43:22 +0800 Subject: [PATCH 04/13] fix(query): fix cast constant to nchar type value output incorrect TD-15138 --- source/libs/scalar/src/sclfunc.c | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index d68dce6d12..d6cffdb736 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -789,29 +789,30 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } case TSDB_DATA_TYPE_NCHAR: { int32_t outputCharLen = (outputLen - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; + int32_t len; if (inputType == TSDB_DATA_TYPE_BOOL) { char tmp[8] = {0}; - int32_t len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" ); + len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" ); bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); if (!ret) { return TSDB_CODE_FAILED; } varDataSetLen(output, len); } else if (inputType == TSDB_DATA_TYPE_BINARY) { - int32_t len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen; + len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen; bool ret = taosMbsToUcs4(input + VARSTR_HEADER_SIZE, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); if (!ret) { return TSDB_CODE_FAILED; } varDataSetLen(output, len); } else if (inputType == TSDB_DATA_TYPE_NCHAR) { - int32_t len = TMIN(outputLen, varDataLen(input) + VARSTR_HEADER_SIZE); - memcpy(output, input, len); - varDataSetLen(output, len - VARSTR_HEADER_SIZE); + len = TMIN(outputLen - VARSTR_HEADER_SIZE, varDataLen(input)); + memcpy(output, input, len + VARSTR_HEADER_SIZE); + varDataSetLen(output, len); } else { char tmp[400] = {0}; NUM_TO_STRING(inputType, input, sizeof(tmp), tmp); - int32_t len = (int32_t)strlen(tmp); + len = (int32_t)strlen(tmp); len = outputCharLen > len ? len : outputCharLen; bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len); if (!ret) { @@ -819,6 +820,10 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } varDataSetLen(output, len); } + //for constant conversion, need to set proper length of pOutput description + if (len < outputLen - VARSTR_HEADER_SIZE) { + pOutput->columnData->info.bytes = len + VARSTR_HEADER_SIZE; + } break; } default: { From c1659805b623ea742d4dcf3b2fd0ee11ae725642 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 May 2022 18:54:19 +0800 Subject: [PATCH 05/13] fix(query): do perform arithmetic operator before apply the sort procedure. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/libs/executor/inc/executorimpl.h | 20 +- source/libs/executor/inc/tsort.h | 2 +- source/libs/executor/src/executorimpl.c | 198 ++++-------------- source/libs/executor/src/groupoperator.c | 28 +-- source/libs/executor/src/scanoperator.c | 12 +- source/libs/executor/src/timewindowoperator.c | 46 ++-- source/libs/executor/src/tsort.c | 20 +- 8 files changed, 110 insertions(+), 218 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index dc27087b5c..d945ae6a33 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3279,7 +3279,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa pDataBlockInfo->rows = cur->rows; pDataBlockInfo->window = cur->win; - pDataBlockInfo->numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); +// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle)); } /* diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0404b4d447..bc6139c304 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -277,7 +277,7 @@ typedef struct SOperatorInfo { uint8_t operatorType; bool blocking; // block operator or not uint8_t status; // denote if current operator is completed - int32_t numOfOutput; // number of columns of the current operator results + int32_t numOfExprs; // number of columns of the current operator results char* name; // name, used to show the query execution plan void* info; // extension attribution SExprInfo* pExpr; @@ -415,8 +415,6 @@ typedef struct SOptrBasicInfo { // TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset typedef struct SAggSupporter { SHashObj* pResultRowHashTable; // quick locate the window object for each result -// SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not -// SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row @@ -577,13 +575,13 @@ typedef struct SSortedMergeOperatorInfo { } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { + SOptrBasicInfo binfo; uint32_t sortBufSize; // max buffer size for in-memory sort - SSDataBlock* pDataBlock; SArray* pSortInfo; SSortHandle* pSortHandle; SArray* inputSlotMap; // for index map from table scan output int32_t bufPageSize; - int32_t numOfRowsInRes; +// int32_t numOfRowsInRes; // TODO extact struct int64_t startTs; // sort start time @@ -645,10 +643,13 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols); void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow); void cleanupAggSup(SAggSupporter* pAggSup); -void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); +void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); +void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); -void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, - int32_t* rowCellInfoOffset); +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity); +SSDataBlock* loadNextDataBlock(void* param); + +void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, @@ -663,7 +664,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); -SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pIndexMap, SExecTaskInfo* pTaskInfo); +SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, + SArray* pIndexMap, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index c584df05dd..2072707b30 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -89,7 +89,7 @@ int32_t tsortClose(SSortHandle* pHandle); * * @return */ -int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp); +int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param); /** * diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a91d204efa..a237eb0e7d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -202,9 +202,9 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData idata = {{0}}; SSlotDescNode* pDescNode = nodesListGetNode(pNode->pSlots, i); - if (!pDescNode->output) { - continue; - } +// if (!pDescNode->output) { // todo disable it temporarily +// continue; +// } idata.info.type = pDescNode->dataType.type; idata.info.bytes = pDescNode->dataType.bytes; @@ -651,7 +651,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order) { - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock); @@ -713,7 +713,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt bool createDummyCol) { int32_t code = TSDB_CODE_SUCCESS; - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { pCtx[i].order = order; pCtx[i].size = pBlock->info.rows; pCtx[i].pSrcBlock = pBlock; @@ -798,7 +798,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt } static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) { - for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { if (functionNeedToExecute(&pCtx[k])) { pCtx[k].startTs = startTs; // this can be set during create the struct pCtx[k].fpSet.process(&pCtx[k]); @@ -2815,7 +2815,6 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf // NOTE: sources columns are more than the destination SSDatablock columns. void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols) { size_t numOfSrcCols = taosArrayGetSize(pCols); - ASSERT(numOfSrcCols >= pBlock->info.numOfCols); int32_t i = 0, j = 0; while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { @@ -3287,7 +3286,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pBlock->info.numOfCols; + pOperator->numOfExprs = pBlock->info.numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, @@ -3345,49 +3344,6 @@ static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { cleanupAggSup(&pInfo->aggSup); } -// TODO merge aggregate super table -static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - bool isNull = tsortIsNullVal(pTupleHandle, i); - if (isNull) { - colDataAppend(pColInfo, pBlock->info.rows, NULL, true); - } else { - char* pData = tsortGetValue(pTupleHandle, i); - colDataAppend(pColInfo, pBlock->info.rows, pData, false); - } - } - - pBlock->info.rows += 1; -} - -SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { - blockDataCleanup(pDataBlock); - blockDataEnsureCapacity(pDataBlock, capacity); - - blockDataEnsureCapacity(pDataBlock, capacity); - - while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; - } - - appendOneRowToDataBlock(pDataBlock, pTupleHandle); - if (pDataBlock->info.rows >= capacity) { - return pDataBlock; - } - } - - return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; -} - -SSDataBlock* loadNextDataBlock(void* param) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; - return pOperator->fpSet.getNextFn(pOperator); -} - static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { size_t size = taosArrayGetSize(groupInfo); if (size == 0) { @@ -3490,8 +3446,8 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock doMergeResultImpl(pInfo, pCtx, numOfExpr, i); } else { doFinalizeResultImpl(pCtx, numOfExpr); - int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL); + // setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows); // TODO check for available buffer; @@ -3541,13 +3497,13 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->binfo.pCtx, pDataBlock, TSDB_ORDER_ASC, true); // updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, // pOperator->pRuntimeEnv, true); - doMergeImpl(pOperator, pOperator->numOfOutput, pDataBlock); + doMergeImpl(pOperator, pOperator->numOfExprs, pDataBlock); // flush to tuple store, and after all data have been handled, return to upstream node or sink node } - doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfOutput); - int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfOutput, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); + doFinalizeResultImpl(pInfo->binfo.pCtx, pOperator->numOfExprs); + int32_t numOfRows = getNumOfResult(pInfo->binfo.pCtx, pOperator->numOfExprs, NULL); + // setTagValueForMultipleRows(pCtx, pOperator->numOfExprs, numOfRows); // TODO check for available buffer; @@ -3571,7 +3527,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, NULL, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); @@ -3678,7 +3634,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = num; + pOperator->numOfExprs = num; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; @@ -3703,79 +3659,6 @@ _error: return NULL; } -static SSDataBlock* doSort(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSortOperatorInfo* pInfo = pOperator->info; - - if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); - } - - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, - pInfo->bufPageSize, numOfBufPage, pInfo->pDataBlock, pTaskInfo->id.str); - - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); - - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - ps->param = pOperator->pDownstream[0]; - tsortAddSource(pInfo->pSortHandle, ps); - - int32_t code = tsortOpen(pInfo->pSortHandle); - taosMemoryFreeClear(ps); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); - } - - pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); -} - -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, - SArray* pIndexMap, SExecTaskInfo* pTaskInfo) { - SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - int32_t rowSize = pResBlock->info.rowSize; - - if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header - - pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer - pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = pResBlock; - pInfo->pSortInfo = pSortInfo; - pInfo->inputSlotMap = pIndexMap; - - pOperator->name = "SortOperator"; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; - pOperator->blocking = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - - pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL); - - int32_t code = appendDownstream(pOperator, &downstream, 1); - return pOperator; - -_error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); - return NULL; -} - int32_t getTableScanOrder(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { @@ -3813,7 +3696,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { break; } // if (pAggInfo->current != NULL) { - // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // } // there is an scalar expression that needs to be calculated before apply the group aggregation. @@ -3827,7 +3710,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setExecutionContext(pOperator->numOfOutput, pBlock->info.groupId, pTaskInfo, pAggInfo); + setExecutionContext(pOperator->numOfExprs, pBlock->info.groupId, pTaskInfo, pAggInfo); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, true); doAggregateImpl(pOperator, 0, pInfo->pCtx); @@ -3848,7 +3731,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } closeAllResultRows(&pAggInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfOutput, pAggInfo->aggSup.pResultBuf, + finalizeMultiTupleQueryResult(pAggInfo->binfo.pCtx, pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo, pAggInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, false); @@ -4092,17 +3975,17 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // todo dynamic set tags // if (pTableQueryInfo != NULL) { - // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); // } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); - projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput); + projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs); if (pRes->info.rows >= pProjectInfo->binfo.capacity * 0.8) { - copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); - resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); + copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs); + resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfExprs); return pRes; } } @@ -4127,14 +4010,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pProjectInfo->existDataBlock = pBlock; break; } else { // init output buffer for a new group data - initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfOutput); + initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs); } } // todo dynamic set tags // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; // if (pTableQueryInfo != NULL) { - // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pTableQueryInfo->pTable, pInfo->pCtx, pOperator->numOfExprs); // } // the pDataBlock are always the same one, no need to call this again @@ -4143,7 +4026,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order, false); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); - projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, + projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfExprs, pProjectInfo->pPseudoColInfo); int32_t status = handleLimitOffset(pOperator, pBlock); @@ -4156,7 +4039,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pProjectInfo->curOutput += pInfo->pRes->info.rows; - // copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); + // copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs); return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; } @@ -4289,7 +4172,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { } if (pOperator->fpSet.closeFn != NULL) { - pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfOutput); + pOperator->fpSet.closeFn(pOperator->info, pOperator->numOfExprs); } if (pOperator->pDownstream != NULL) { @@ -4425,7 +4308,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo, @@ -4477,14 +4360,6 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { - SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; - pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); - - taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->inputSlotMap); -} - void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; taosArrayDestroy(pExInfo->pSources); @@ -4538,7 +4413,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = num; + pOperator->numOfExprs = num; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, NULL, NULL, NULL); @@ -4621,7 +4496,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = @@ -4979,7 +4854,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SArray* info = createSortInfo(pSortPhyNode->pSortKeys, pSortPhyNode->pTargets); SArray* slotMap = createIndexMap(pSortPhyNode->pTargets); - pOptr = createSortOperatorInfo(ops[0], pResBlock, info, slotMap, pTaskInfo); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + if (pSortPhyNode->pExprs != NULL) { + pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &numOfCols); + } + + pOptr = createSortOperatorInfo(ops[0], pResBlock, info, pExprInfo, numOfCols, slotMap, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == type) { SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; @@ -5566,7 +5448,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { // only the timestamp match support for ordinary table ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); if (*(int64_t*)pLeftVal == *(int64_t*)pRightVal) { - for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { + for (int32_t i = 0; i < pOperator->numOfExprs; ++i) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, i); SExprInfo* pExprInfo = &pOperator->pExpr[i]; @@ -5633,7 +5515,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index f5dd67217e..58efd75a0b 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -227,16 +227,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); - int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); + int32_t ret = setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } int32_t rowIndex = j - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); // assign the group keys or user input constant values if required - doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); + doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j, numOfGroupCols); num = 1; } @@ -244,15 +244,15 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (num > 0) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = - setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, + setGroupResultOutputBuf(&(pInfo->binfo), pOperator->numOfExprs, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfOutput, TSDB_ORDER_ASC); - doAssignGroupKeys(pCtx, pOperator->numOfOutput, pBlock->info.rows, rowIndex); + doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); + doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); } } @@ -291,19 +291,19 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL); } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); doHashGroupbyAgg(pOperator, pBlock); } pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); // if (!stableQuery) { // finalize include the update of result rows - // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput); + // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs); // } else { - // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, + // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo, // pInfo->binfo.rowCellInfoOffset); // } @@ -357,7 +357,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->status = OP_NOT_OPENED; // pOperator->operatorType = OP_Groupby; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -392,7 +392,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { int32_t* rows = (int32_t*) pPage; - size_t numOfCols = pOperator->numOfOutput; + size_t numOfCols = pOperator->numOfExprs; for(int32_t i = 0; i < numOfCols; ++i) { SExprInfo* pExpr = &pOperator->pExpr[i]; int32_t slotId = pExpr->base.pParam[0].pCol->slotId; @@ -565,7 +565,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); doHashPartition(pOperator, pBlock); } @@ -616,7 +616,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pInfo->binfo.pRes = pResultBlock; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pExpr = pExprInfo; pOperator->info = pInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7e721455c3..8c9fdfe4e6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -386,7 +386,7 @@ SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, SQueryTableDataCon pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfExprs = numOfOutput; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, NULL, NULL, NULL, NULL); @@ -646,7 +646,7 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->fpSet._openFn = operatorDummyOpenFn; pOperator->fpSet.getNextFn = doStreamBlockScan; pOperator->fpSet.closeFn = operatorDummyCloseFn; @@ -1020,7 +1020,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp; setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data, - pTableRsp->compLen, pOperator->numOfOutput, startTs, NULL, pInfo->scanCols); + pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols); // todo log the filter info doFilterResult(pInfo); @@ -1150,7 +1150,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->numOfOutput = pResBlock->info.numOfCols; + pOperator->numOfExprs = pResBlock->info.numOfCols; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; @@ -1247,7 +1247,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { char *data = NULL, *dst = NULL; int16_t type = 0, bytes = 0; - for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { + for(int32_t j = 0; j < pOperator->numOfExprs; ++j) { // not assign value in case of user defined constant output column if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.pColumns->flag)) { continue; @@ -1308,7 +1308,7 @@ SOperatorInfo* createTagScanOperatorInfo(void* readHandle, SExprInfo* pExpr, int pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; + pOperator->numOfExprs = numOfOutput; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 580614dc02..224f3db912 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -325,7 +325,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SqlFunctionCtx* pCtx = pInfo->pCtx; - for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { + for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { int32_t functionId = pCtx[k].functionId; if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) { pCtx[k].start.key = INT64_MIN; @@ -406,12 +406,12 @@ static bool setTimeWindowInterpolationStartTs(SOperatorInfo* pOperatorInfo, SqlF // start exactly from this point, no need to do interpolation TSKEY key = ascQuery ? win->skey : win->ekey; if (key == curTs) { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); return true; } if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery))) { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); return true; } @@ -427,7 +427,7 @@ static bool setTimeWindowInterpolationEndTs(SOperatorInfo* pOperatorInfo, SqlFun SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { int32_t order = TSDB_ORDER_ASC; - int32_t numOfOutput = pOperatorInfo->numOfOutput; + int32_t numOfOutput = pOperatorInfo->numOfExprs; TSKEY actualEndKey = tsCols[endRowIndex]; TSKEY key = order ? win->ekey : win->skey; @@ -572,7 +572,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); } // point interpolation does not require the end key time window interpolation. @@ -592,7 +592,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); } } else { - setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_END_INTERP); } } @@ -612,7 +612,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - int32_t numOfOutput = pOperatorInfo->numOfOutput; + int32_t numOfOutput = pOperatorInfo->numOfExprs; SArray* pUpdated = NULL; if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { @@ -683,7 +683,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfOutput, RESULT_ROW_START_INTERP); + setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -773,7 +773,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; @@ -798,7 +798,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -813,7 +813,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI int64_t gid = pBlock->info.groupId; bool masterScan = true; - int32_t numOfOutput = pOperator->numOfOutput; + int32_t numOfOutput = pOperator->numOfExprs; int16_t bytes = pStateColInfoData->info.bytes; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); @@ -916,7 +916,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -1013,13 +1013,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, true); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } - finalizeUpdatedResult(pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -1082,7 +1082,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, @@ -1141,7 +1141,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, @@ -1169,7 +1169,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId); bool masterScan = true; - int32_t numOfOutput = pOperator->numOfOutput; + int32_t numOfOutput = pOperator->numOfExprs; int64_t gid = pBlock->info.groupId; int64_t gap = pInfo->gap; @@ -1270,7 +1270,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, + finalizeMultiTupleQueryResult(pBInfo->pCtx, pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); @@ -1309,7 +1309,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { break; } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfExprs); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order, true); // hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); @@ -1319,7 +1319,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pSliceInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); + // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfExprs); // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); @@ -1346,7 +1346,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -1388,7 +1388,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; @@ -1440,7 +1440,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; + pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 46fc8da00c..50aa4cfc01 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -42,11 +42,7 @@ struct SSortHandle { _sort_fetch_block_fn_t fetchfp; _sort_merge_compar_fn_t comparFn; - - void *pParam; SMultiwayMergeTreeInfo *pMergeTree; - int32_t numOfCols; - int64_t startTs; uint64_t sortElapsed; uint64_t totalElapsed; @@ -61,6 +57,9 @@ struct SSortHandle { bool inMemSort; bool needAdjust; STupleHandle tupleHandle; + + void *param; + void (*beforeFp)(SSDataBlock* pBlock, void* param); }; static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param); @@ -533,6 +532,13 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { pHandle->pDataBlock = createOneDataBlock(pBlock, false); } + // perform the scalar function calculation before apply the sort + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } + + // todo relocate the columns + int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock, pHandle->pIndexMap); if (code != 0) { return code; @@ -623,8 +629,10 @@ int32_t tsortClose(SSortHandle* pHandle) { return TSDB_CODE_SUCCESS; } -int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fp) { - pHandle->fetchfp = fp; +int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param) { + pHandle->fetchfp = fetchFp; + pHandle->beforeFp = fp; + pHandle->param = param; return TSDB_CODE_SUCCESS; } From a5706ea7e5c35ca4fc9a528730a344ded32b8577 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 5 May 2022 18:57:09 +0800 Subject: [PATCH 06/13] feat(stream): insert data --- example/src/tstream.c | 7 +- include/common/tdatablock.h | 25 ++--- include/common/tdataformat.h | 15 ++- include/common/tmsg.h | 4 +- include/common/trow.h | 38 ++++---- include/libs/stream/tstream.h | 4 +- source/common/src/tdatablock.c | 102 +++++++++++++++++---- source/common/src/tdataformat.c | 47 +++++++++- source/common/src/tmsg.c | 19 ++++ source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndOffset.h | 2 + source/dnode/mnode/impl/inc/mndSubscribe.h | 2 + source/dnode/mnode/impl/src/mndDb.c | 8 +- source/dnode/mnode/impl/src/mndOffset.c | 33 +++++++ source/dnode/mnode/impl/src/mndScheduler.c | 4 + source/dnode/vnode/src/tq/tq.c | 6 ++ source/libs/stream/src/tstream.c | 14 ++- 17 files changed, 261 insertions(+), 70 deletions(-) diff --git a/example/src/tstream.c b/example/src/tstream.c index 77a002499e..5bd833213d 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -81,9 +81,10 @@ int32_t create_stream() { /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ - pRes = taos_query( - pConn, - "create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)"); + pRes = + taos_query(pConn, + "create stream stream1 trigger window_close as select _wstartts, min(k), max(k), sum(k) as sum_of_k " + "from tu1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 71e2e4fba3..a597123cd7 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -54,8 +54,8 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ } while (0) -#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) -#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) +#define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) +#define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) #define BitmapLen(_n) (((_n) + ((1 << NBIT) - 1)) >> NBIT) @@ -63,16 +63,15 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); #define colDataGetNumData(p1_, r_) ((p1_)->pData + ((r_) * (p1_)->info.bytes)) // SColumnInfoData, rowNumber -#define colDataGetData(p1_, r_) \ - ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) \ - : colDataGetNumData(p1_, r_)) +#define colDataGetData(p1_, r_) \ + ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? colDataGetVarData(p1_, r_) : colDataGetNumData(p1_, r_)) static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, uint32_t row) { - if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON){ - if(colDataIsNull_var(pColumnInfoData, row)){ + if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) { + if (colDataIsNull_var(pColumnInfoData, row)) { return true; } - char *data = colDataGetVarData(pColumnInfoData, row); + char* data = colDataGetVarData(pColumnInfoData, row); return (*data == TSDB_DATA_TYPE_NULL); } @@ -80,7 +79,7 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData, return false; } - if (pColumnInfoData->info.type== TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) { + if (pColumnInfoData->info.type == TSDB_DATA_TYPE_VARCHAR || pColumnInfoData->info.type == TSDB_DATA_TYPE_NCHAR) { return colDataIsNull_var(pColumnInfoData, row); } else { if (pColumnInfoData->nullbitmap == NULL) { @@ -132,7 +131,7 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { for (int32_t i = start; i < start + nRows; ++i) { - colDataSetNull_var(pColumnInfoData,i); // it is a null value of VAR type. + colDataSetNull_var(pColumnInfoData, i); // it is a null value of VAR type. } } else { for (int32_t i = start; i < start + nRows; ++i) { @@ -225,6 +224,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema); + static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); } @@ -238,10 +239,10 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32 static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) { - int32_t* actualLen = (int32_t*) data; + int32_t* actualLen = (int32_t*)data; data += sizeof(int32_t); - uint64_t* groupId = (uint64_t*) data; + uint64_t* groupId = (uint64_t*)data; data += sizeof(uint64_t); int32_t* colSizes = (int32_t*)data; diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index a0f4351bbf..a4c693c2e2 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -61,11 +61,11 @@ extern "C" { // ----------------- TSDB COLUMN DEFINITION #pragma pack(push, 1) typedef struct { - col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1)) - int32_t type : 8; // column type - int32_t bytes : 24; // column bytes (0~16M) - int32_t sma : 8; // block SMA: 0, no SMA, 1, sum/min/max, 2, ... - int32_t offset : 24; // point offset in STpRow after the header part. + col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1)) + int8_t type; // column type + int8_t sma; // block SMA: 0, no SMA, 1, sum/min/max, 2, ... + int32_t bytes; // column bytes (0~16M) + int32_t offset; // point offset in STpRow after the header part. } STColumn; #pragma pack(pop) @@ -387,8 +387,6 @@ typedef struct SDataCol { TSKEY ts; // only used in last NULL column } SDataCol; - - #define isAllRowsNull(pCol) ((pCol)->len == 0) #define isAllRowsNone(pCol) ((pCol)->len == 0) static FORCE_INLINE void dataColReset(SDataCol *pDataCol) { pDataCol->len = 0; } @@ -482,7 +480,8 @@ void tdResetDataCols(SDataCols *pCols); int32_t tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdFreeDataCols(SDataCols *pCols); -int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull, TDRowVerT maxVer); +int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool forceSetNull, + TDRowVerT maxVer); // ----------------- K-V data row structure /* |<-------------------------------------- len -------------------------------------------->| diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ea605090f9..b85b8ceb7a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -245,6 +245,8 @@ int32_t tInitSubmitMsgIter(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); +// for debug +int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); // TODO: KEEP one suite of iterator API finally. // 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts @@ -2137,7 +2139,7 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SCoder* pDecoder, SSchemaWrapp if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; - pSW->pSchema = (SSchema*)tCoderMalloc(pDecoder, sizeof(SSchema) * pSW->nCols); + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) return -1; for (int32_t i = 0; i < pSW->nCols; i++) { if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1; diff --git a/include/common/trow.h b/include/common/trow.h index 02f84b372e..8b1d612433 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -622,7 +622,6 @@ static FORCE_INLINE int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols return TSDB_CODE_SUCCESS; } - /** * @brief To judge row type: STpRow/SKvRow * @@ -758,7 +757,6 @@ static int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) { return TSDB_CODE_SUCCESS; } - /** * @brief 由调用方管理存储空间的分配及释放,一次输入多个参数 * @@ -1250,16 +1248,16 @@ static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, in } /** - * @brief - * - * @param pRow - * @param colId - * @param colType - * @param flen - * @param offset + * @brief + * + * @param pRow + * @param colId + * @param colType + * @param flen + * @param offset * @param colIdx start from 0 - * @param pVal - * @return FORCE_INLINE + * @param pVal + * @return FORCE_INLINE */ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t colType, int32_t flen, uint32_t offset, col_id_t colIdx, SCellVal *pVal) { @@ -1273,14 +1271,14 @@ static FORCE_INLINE bool tdSTpRowGetVal(STSRow *pRow, col_id_t colId, col_type_t } /** - * @brief - * - * @param pRow - * @param colId - * @param offset + * @brief + * + * @param pRow + * @param colId + * @param offset * @param colIdx start from 0 - * @param pVal - * @return FORCE_INLINE + * @param pVal + * @return FORCE_INLINE */ static FORCE_INLINE bool tdSKvRowGetVal(STSRow *pRow, col_id_t colId, uint32_t offset, col_id_t colIdx, SCellVal *pVal) { @@ -1397,14 +1395,14 @@ static void tdSCellValPrint(SCellVal *pVal, int8_t colType) { } } -static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char* tag) { +static void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) { STSRowIter iter = {0}; tdSTSRowIterInit(&iter, pSchema); tdSTSRowIterReset(&iter, row); printf("%s >>>", tag); for (int i = 0; i < pSchema->numOfCols; ++i) { STColumn *stCol = pSchema->columns + i; - SCellVal sVal = { 255, NULL}; + SCellVal sVal = {255, NULL}; if (!tdSTSRowIterNext(&iter, stCol->colId, stCol->type, &sVal)) { break; } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 2e5574511b..42180833df 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -70,8 +70,10 @@ typedef struct { } STaskDispatcherShuffle; typedef struct { - int8_t reserved; + int8_t reserved; + SSchemaWrapper* pSchemaWrapper; // not applicable to encoder and decoder + STSchema* pTSchema; SHashObj* pHash; // groupId to tbuid } STaskSinkTb; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4a6fef4626..84db728ce4 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -194,17 +194,17 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c int32_t i = 0; uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)]; - int32_t overCount = BitmapLen(total) - BitmapLen(numOfRow1); - while (i < len) { // size limit of pSource->nullbitmap + int32_t overCount = BitmapLen(total) - BitmapLen(numOfRow1); + while (i < len) { // size limit of pSource->nullbitmap if (i >= 1) { - start[i - 1] |= (p[i] >> remindBits); //copy remind bits + start[i - 1] |= (p[i] >> remindBits); // copy remind bits } - if (i >= overCount) { // size limit of pColumnInfoData->nullbitmap + if (i >= overCount) { // size limit of pColumnInfoData->nullbitmap return; } - start[i] |= (p[i] << shiftBits); //copy shift bits + start[i] |= (p[i] << shiftBits); // copy shift bits i += 1; } } @@ -354,7 +354,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc, SArray* pInd int32_t numOfCols = pDest->info.numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { int32_t mapIndex = i; - if(pIndexMap) { + if (pIndexMap) { mapIndex = *(int32_t*)taosArrayGet(pIndexMap, i); } SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); @@ -451,7 +451,6 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd // all fit in *stopIndex = numOfRows - 1; return TSDB_CODE_SUCCESS; - } SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) { @@ -556,7 +555,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { if (IS_VAR_DATA_TYPE(pCol->info.type)) { size_t metaSize = pBlock->info.rows * sizeof(int32_t); - char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small + char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -938,8 +937,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { copyBackToBlock(pDataBlock, pCols); int64_t p4 = taosGetTimestampUs(); - uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, - p3 - p2, p4 - p3, rows); + uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 + ", rows:%d\n", + p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); destroyTupleIndex(index); return TSDB_CODE_SUCCESS; @@ -1176,7 +1176,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) { } SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { - if(pDataBlock == NULL){ + if (pDataBlock == NULL) { return NULL; } @@ -1187,7 +1187,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; - pBlock->info.rowSize = pDataBlock->info.rows; + pBlock->info.rowSize = pDataBlock->info.rows; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; @@ -1217,7 +1217,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { } size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { - return (int32_t) ((pageSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock)); + return (int32_t)((pageSize - blockDataGetSerialMetaSize(pBlock)) / blockDataGetSerialRowSize(pBlock)); } void colDataDestroy(SColumnInfoData* pColData) { @@ -1234,14 +1234,14 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) { int32_t len = BitmapLen(total); int32_t newLen = BitmapLen(total - n); - if (n%8 == 0) { - memmove(nullBitmap, nullBitmap + n/8, newLen); + if (n % 8 == 0) { + memmove(nullBitmap, nullBitmap + n / 8, newLen); } else { int32_t tail = n % 8; int32_t i = 0; - uint8_t* p = (uint8_t*) nullBitmap; - while(i < len) { + uint8_t* p = (uint8_t*)nullBitmap; + while (i < len) { uint8_t v = p[i]; p[i] = 0; @@ -1268,7 +1268,7 @@ static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_ } } -int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) { +int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n) { if (n == 0) { return TSDB_CODE_SUCCESS; } @@ -1276,7 +1276,7 @@ int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) { if (pBlock->info.rows <= n) { blockDataCleanup(pBlock); } else { - for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows); } @@ -1462,3 +1462,67 @@ void blockDebugShowData(const SArray* dataBlocks) { } } +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema) { + SSubmitReq* ret = NULL; + + // cal size + int32_t cap = sizeof(SSubmitReq); + int32_t sz = taosArrayGetSize(pBlocks); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + int32_t rows = pDataBlock->info.rows; + // TODO min + int32_t rowSize = pDataBlock->info.rowSize; + int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); + cap += sizeof(SSubmitBlk) + rows * maxLen; + } + + // assign data + ret = taosMemoryCalloc(1, cap); + ret->version = htonl(1); + ret->length = htonl(cap - sizeof(SSubmitReq)); + ret->numOfBlocks = htonl(sz); + + void* submitBlk = POINTER_SHIFT(ret, sizeof(SSubmitReq)); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + + SSubmitBlk* blkHead = submitBlk; + blkHead->numOfRows = htons(pDataBlock->info.rows); + blkHead->schemaLen = 0; + blkHead->sversion = htonl(pTSchema->version); + // TODO + blkHead->suid = 0; + blkHead->uid = htobe64(pDataBlock->info.uid); + + int32_t rows = pDataBlock->info.rows; + int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); + /*blkHead->dataLen = htonl(rows * maxLen);*/ + blkHead->dataLen = 0; + + void* blockData = POINTER_SHIFT(submitBlk, sizeof(SSubmitBlk)); + STSRow* rowData = blockData; + + for (int32_t j = 0; j < pDataBlock->info.rows; j++) { + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTSchema->version); + tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen); + tdSRowResetBuf(&rb, rowData); + + for (int32_t k = 0; k < pTSchema->numOfCols; k++) { + const STColumn* pColumn = &pTSchema->columns[k]; + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); + void* data = colDataGetData(pColData, j); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k); + } + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + blkHead->dataLen += rowLen; + } + int32_t len = blkHead->dataLen; + blkHead->dataLen = htonl(len); + blkHead = POINTER_SHIFT(blkHead, len); + } + + return ret; +} diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 2ed08ac81f..f5fa1c6491 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "tdataformat.h" #include "tcoding.h" +#include "tdatablock.h" #include "tlog.h" static void dataColSetNEleNull(SDataCol *pCol, int nEle); @@ -128,6 +129,50 @@ void *tdDecodeSchema(void *buf, STSchema **pRSchema) { return buf; } +#if 0 +int32_t tEncodeSTColumn(SCoder *pEncoder, const STColumn *pCol) { + if (tEncodeI16(pEncoder, pCol->colId) < 0) return -1; + if (tEncodeI8(pEncoder, pCol->type) < 0) return -1; + if (tEncodeI8(pEncoder, pCol->sma) < 0) return -1; + if (tEncodeI32(pEncoder, pCol->bytes) < 0) return -1; + if (tEncodeI32(pEncoder, pCol->offset) < 0) return -1; + return pEncoder->pos; +} + +int32_t tDecodeSTColumn(SCoder *pDecoder, STColumn *pCol) { + if (tDecodeI16(pDecoder, &pCol->colId) < 0) return -1; + if (tDecodeI8(pDecoder, &pCol->type) < 0) return -1; + if (tDecodeI8(pDecoder, &pCol->sma) < 0) return -1; + if (tDecodeI32(pDecoder, &pCol->bytes) < 0) return -1; + if (tDecodeI32(pDecoder, &pCol->offset) < 0) return -1; + return 0; +} + +int32_t tEncodeSchema(SCoder *pEncoder, const STSchema *pSchema) { + if (tEncodeI32(pEncoder, pSchema->numOfCols) < 0) return -1; + if (tEncodeI16(pEncoder, pSchema->version) < 0) return -1; + if (tEncodeU16(pEncoder, pSchema->flen) < 0) return -1; + if (tEncodeI32(pEncoder, pSchema->vlen) < 0) return -1; + if (tEncodeI32(pEncoder, pSchema->tlen) < 0) return -1; + + for (int32_t i = 0; i < schemaNCols(pSchema); i++) { + const STColumn *pCol = schemaColAt(pSchema, i); + if (tEncodeSTColumn(pEncoder, pCol) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSchema(SCoder *pDecoder, STSchema *pSchema) { + if (tDecodeI32(pDecoder, &pSchema->numOfCols) < 0) return -1; + if (tDecodeI16(pDecoder, &pSchema->version) < 0) return -1; + if (tDecodeU16(pDecoder, &pSchema->flen) < 0) return -1; + if (tDecodeI32(pDecoder, &pSchema->vlen) < 0) return -1; + if (tDecodeI32(pDecoder, &pSchema->tlen) < 0) return -1; + + return 0; +} +#endif + int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, schema_ver_t version) { if (pBuilder == NULL) return -1; @@ -908,4 +953,4 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch taosArrayDestroy(stashRow); return buffer; } -#endif \ No newline at end of file +#endif diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 874e09b403..fc993511ab 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -174,6 +174,25 @@ STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) { } } +int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq *pReq, STSchema *pTschema) { + SSubmitMsgIter msgIter = {0}; + if (tInitSubmitMsgIterEx(pReq, &msgIter) < 0) return -1; + while (true) { + SSubmitBlk *pBlock = NULL; + if (tGetSubmitMsgNextEx(&msgIter, &pBlock) < 0) return -1; + if (pBlock == NULL) break; + SSubmitBlkIter blkIter = {0}; + tInitSubmitBlkIterEx(&msgIter, pBlock, &blkIter); + STSRowIter rowIter = {0}; + tdSTSRowIterInit(&rowIter, pTschema); + STSRow *row; + while ((row = tGetSubmitBlkNextEx(&blkIter)) != NULL) { + tdSRowPrint(row, pTschema, "stream"); + } + } + return 0; +} + int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) { if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 263fd0bad5..456a370db6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -415,6 +415,7 @@ typedef struct { typedef struct { char key[TSDB_PARTITION_KEY_LEN]; + int64_t dbUid; int64_t offset; } SMqOffsetObj; diff --git a/source/dnode/mnode/impl/inc/mndOffset.h b/source/dnode/mnode/impl/inc/mndOffset.h index 556bfe6a53..b468496165 100644 --- a/source/dnode/mnode/impl/inc/mndOffset.h +++ b/source/dnode/mnode/impl/inc/mndOffset.h @@ -37,6 +37,8 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c return snprintf(key, TSDB_PARTITION_KEY_LEN, "%d:%s:%s", vgId, cgroup, topicName); } +int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h index 7915006f27..5ca672e8dd 100644 --- a/source/dnode/mnode/impl/inc/mndSubscribe.h +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -31,6 +31,8 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); +int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 442bfb5c8a..347e959d3c 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -17,9 +17,12 @@ #include "mndDb.h" #include "mndAuth.h" #include "mndDnode.h" +#include "mndOffset.h" #include "mndShow.h" #include "mndSma.h" #include "mndStb.h" +#include "mndSubscribe.h" +#include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -1027,6 +1030,9 @@ static int32_t mndDropDb(SMnode *pMnode, SNodeMsg *pReq, SDbObj *pDb) { if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; + /*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ + /*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ + /*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER; int32_t rspLen = 0; @@ -1387,7 +1393,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in bool sysDb) { int32_t cols = 0; - int32_t bytes = pShow->pMeta->pSchemas[cols].bytes; + int32_t bytes = pShow->pMeta->pSchemas[cols].bytes; char *buf = taosMemoryMalloc(bytes); const char *name = mndGetDbStr(pDb->name); if (name != NULL) { diff --git a/source/dnode/mnode/impl/src/mndOffset.c b/source/dnode/mnode/impl/src/mndOffset.c index f075348704..e0ba4076f5 100644 --- a/source/dnode/mnode/impl/src/mndOffset.c +++ b/source/dnode/mnode/impl/src/mndOffset.c @@ -231,3 +231,36 @@ static void mndCancelGetNextOffset(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) { + SSdbRaw *pCommitRaw = mndOffsetActionEncode(pOffset); + if (pCommitRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; + if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + +int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqOffsetObj *pOffset = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pOffset); + if (pIter == NULL) break; + + if (pOffset->dbUid != pDb->uid) { + sdbRelease(pSdb, pOffset); + continue; + } + + if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) { + goto END; + } + } + + code = 0; +END: + return code; +} diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index b760dd6aa8..9274d35b8b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -204,6 +204,8 @@ int32_t mndAddShuffledSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* p pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); + ASSERT(pTask->tbSink.pSchemaWrapper); } // dispatch @@ -242,6 +244,7 @@ int32_t mndAddFixedSinkToStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStr pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } // // dispatch @@ -316,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->smaSink.smaId = pStream->smaId; } else { pTask->sinkType = TASK_SINK__TABLE; + pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } #endif } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 411d5c451c..261b4aa8ae 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -926,6 +926,12 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { pTask->ahandle = pTq->pVnode; if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaHandle = smaHandleRes; + } else if (pTask->sinkType == TASK_SINK__TABLE) { + ASSERT(pTask->tbSink.pSchemaWrapper); + ASSERT(pTask->tbSink.pSchemaWrapper->pSchema); + pTask->tbSink.pTSchema = + tdGetSTSChemaFromSSChema(&pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols); + ASSERT(pTask->tbSink.pTSchema); } taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index cf3cd162aa..b8198364c2 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -152,8 +152,10 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in // sink if (pTask->sinkType == TASK_SINK__TABLE) { - // - blockDebugShowData(pRes); + /*blockDebugShowData(pRes);*/ + ASSERT(pTask->tbSink.pTSchema); + SSubmitReq* pReq = tdBlockToSubmit(pRes, pTask->tbSink.pTSchema); + tPrintFixedSchemaSubmitReq(pReq, pTask->tbSink.pTSchema); } else if (pTask->sinkType == TASK_SINK__SMA) { pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); // @@ -274,7 +276,8 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; + /*if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;*/ + if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { @@ -318,7 +321,10 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { } if (pTask->sinkType == TASK_SINK__TABLE) { - if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; + /*if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;*/ + pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->tbSink.pSchemaWrapper == NULL) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { From 8b9e94a038de347dcf6a6532d69c5e1acaa45442 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 5 May 2022 19:03:05 +0800 Subject: [PATCH 07/13] scalar udf memory by itself --- include/common/tdatablock.h | 5 ++ include/libs/function/tudf.h | 117 ++++++++++++++++++++++++++-- source/libs/function/CMakeLists.txt | 3 +- source/libs/function/inc/tudfInt.h | 5 +- source/libs/function/src/tudf.c | 11 ++- source/libs/function/src/udfd.c | 9 +-- source/libs/function/test/udf1.c | 48 ++---------- 7 files changed, 134 insertions(+), 64 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 3a98102661..4b36b67e43 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -54,6 +54,11 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ } while (0) +#define colDataSetNotNull_f(bm_, r_) \ + do { \ + BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ + } while (0) + #define colDataIsNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] == -1) #define colDataSetNull_var(pColumnInfoData, row) (pColumnInfoData->varmeta.offset[row] = -1) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index 5207a73487..22314e4bd6 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -22,6 +22,7 @@ #include "tmsg.h" #include "tcommon.h" #include "function.h" +#include "tdatablock.h" #ifdef __cplusplus extern "C" { @@ -61,7 +62,7 @@ typedef struct SUdfColumnMeta { typedef struct SUdfColumnData { int32_t numOfRows; - bool varLengthColumn; + int32_t rowsAlloc; union { struct { int32_t nullBitmapLen; @@ -72,9 +73,10 @@ typedef struct SUdfColumnData { struct { int32_t varOffsetsLen; - char *varOffsets; + int32_t *varOffsets; int32_t payloadLen; char *payload; + int32_t payloadAllocLen; } varLenCol; }; } SUdfColumnData; @@ -131,9 +133,114 @@ typedef int32_t (*TUdfSetupFunc)(); typedef int32_t (*TUdfTeardownFunc)(); //TODO: add API to check function arguments type, number etc. -//TODO: another way to manage memory is provide api for UDF to add data to SUdfColumnData and UDF framework will allocate memory. -// then UDF framework will free the memory -// int32_t udfColDataAppend(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) + +#define UDF_MEMORY_EXP_GROWTH 1.5 + +static FORCE_INLINE int32_t udfColEnsureCapacity(SUdfColumn* pColumn, int32_t newCapacity) { + SUdfColumnMeta *meta = &pColumn->colMeta; + SUdfColumnData *data = &pColumn->colData; + + if (newCapacity== 0 || newCapacity <= data->rowsAlloc) { + return TSDB_CODE_SUCCESS; + } + + int allocCapacity = MAX(data->rowsAlloc, 8); + while (allocCapacity < newCapacity) { + allocCapacity *= UDF_MEMORY_EXP_GROWTH; + } + + if (IS_VAR_DATA_TYPE(meta->type)) { + char* tmp = taosMemoryRealloc(data->varLenCol.varOffsets, sizeof(int32_t) * allocCapacity); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + data->varLenCol.varOffsets = (int32_t*)tmp; + data->varLenCol.varOffsetsLen = sizeof(int32_t) * allocCapacity; + // for payload, add data in udfColDataAppend + } else { + char* tmp = taosMemoryRealloc(data->fixLenCol.nullBitmap, BitmapLen(allocCapacity)); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + data->fixLenCol.nullBitmap = tmp; + data->fixLenCol.nullBitmapLen = BitmapLen(allocCapacity); + if (meta->type == TSDB_DATA_TYPE_NULL) { + return TSDB_CODE_SUCCESS; + } + + tmp = taosMemoryRealloc(data->fixLenCol.data, allocCapacity* meta->bytes); + if (tmp == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + data->fixLenCol.data = tmp; + data->fixLenCol.dataLen = allocCapacity* meta->bytes; + } + + data->rowsAlloc = allocCapacity; + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t udfColSetRow(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) { + SUdfColumnMeta *meta = &pColumn->colMeta; + SUdfColumnData *data = &pColumn->colData; + udfColEnsureCapacity(pColumn, currentRow+1); + bool isVarCol = IS_VAR_DATA_TYPE(meta->type); + if (isNull) { + if (isVarCol) { + data->varLenCol.varOffsets[currentRow] = -1; + } else { + colDataSetNull_f(data->fixLenCol.nullBitmap, currentRow); + } + } else { + if (!isVarCol) { + colDataSetNotNull_f(data->fixLenCol.nullBitmap, currentRow); + memcpy(data->fixLenCol.data + meta->bytes * currentRow, pData, meta->bytes); + } else { + int32_t dataLen = varDataTLen(pData); + if (meta->type == TSDB_DATA_TYPE_JSON) { + if (*pData == TSDB_DATA_TYPE_NULL) { + dataLen = 0; + } else if (*pData == TSDB_DATA_TYPE_NCHAR) { + dataLen = varDataTLen(pData + CHAR_BYTES); + } else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) { + dataLen = LONG_BYTES; + } else if (*pData == TSDB_DATA_TYPE_BOOL) { + dataLen = CHAR_BYTES; + } + dataLen += CHAR_BYTES; + } + + if (data->varLenCol.payloadAllocLen < data->varLenCol.payloadLen + dataLen) { + uint32_t newSize = data->varLenCol.payloadAllocLen; + if (newSize <= 1) { + newSize = 8; + } + + while (newSize < data->varLenCol.payloadLen + dataLen) { + newSize = newSize * UDF_MEMORY_EXP_GROWTH; + } + + char *buf = taosMemoryRealloc(data->varLenCol.payload, newSize); + if (buf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + data->varLenCol.payload = buf; + data->varLenCol.payloadAllocLen = newSize; + } + + uint32_t len = data->varLenCol.payloadLen; + data->varLenCol.varOffsets[currentRow] = len; + + memcpy(data->varLenCol.payload + len, pData, dataLen); + data->varLenCol.payloadLen += dataLen; + } + } + data->numOfRows = MAX(currentRow + 1, data->numOfRows); + return 0; +} typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column); typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock* block, SUdfColumn *resultCol); diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index 15813b3cb0..c31cabda19 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -48,8 +48,7 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) target_link_libraries( - udf1 PUBLIC os -) + udf1 PUBLIC os) add_library(udf2 MODULE test/udf2.c) target_include_directories( diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 8aedd59c33..6f82542aee 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -19,9 +19,6 @@ extern "C" { #endif -//TODO replaces them with fnDebug -//#define debugPrint(...) taosPrintLog("Function", DEBUG_INFO, 135, __VA_ARGS__) -#define debugPrint(...) {fprintf(stderr, __VA_ARGS__);fprintf(stderr, "\n");} enum { UDF_TASK_SETUP = 0, UDF_TASK_CALL = 1, @@ -107,7 +104,7 @@ void* decodeUdfRequest(const void *buf, SUdfRequest* request); int32_t encodeUdfResponse(void **buf, const SUdfResponse *response); void* decodeUdfResponse(const void* buf, SUdfResponse *response); -void freeUdfColumnData(SUdfColumnData *data); +void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta); void freeUdfColumn(SUdfColumn* col); void freeUdfDataDataBlock(SUdfDataBlock *block); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 9aac2bfe0c..5b1573c88f 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -481,8 +481,8 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { return (void*)buf; } -void freeUdfColumnData(SUdfColumnData *data) { - if (data->varLengthColumn) { +void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) { + if (IS_VAR_DATA_TYPE(meta->type)) { taosMemoryFree(data->varLenCol.varOffsets); data->varLenCol.varOffsets = NULL; taosMemoryFree(data->varLenCol.payload); @@ -496,7 +496,7 @@ void freeUdfColumnData(SUdfColumnData *data) { } void freeUdfColumn(SUdfColumn* col) { - freeUdfColumnData(&col->colData); + freeUdfColumnData(&col->colData, &col->colMeta); } void freeUdfDataDataBlock(SUdfDataBlock *block) { @@ -528,8 +528,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colMeta.scale = col->info.scale; udfCol->colMeta.precision = col->info.precision; udfCol->colData.numOfRows = udfBlock->numOfRows; - udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type); - if (udfCol->colData.varLengthColumn) { + if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) { udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen); @@ -555,7 +554,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { block->info.numOfCols = 1; block->info.rows = udfCol->colData.numOfRows; - block->info.hasVarCol = udfCol->colData.varLengthColumn; + block->info.hasVarCol = IS_VAR_DATA_TYPE(udfCol->colMeta.type); block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); taosArraySetSize(block->pDataBlock, 1); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 06fa49e1c2..ba9fca2969 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -75,8 +75,8 @@ typedef struct SUdf { char path[PATH_MAX]; uv_lib_t lib; + TUdfScalarProcFunc scalarProcFunc; - TUdfFreeUdfColumnFunc freeUdfColumn; TUdfAggStartFunc aggStartFunc; TUdfAggProcessFunc aggProcFunc; @@ -106,11 +106,6 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(processFuncName, udfName); uv_dlsym(&udf->lib, processFuncName, (void **)(&udf->scalarProcFunc)); - char freeFuncName[TSDB_FUNC_NAME_LEN + 5] = {0}; - char *freeSuffix = "_free"; - strncpy(freeFuncName, processFuncName, strlen(processFuncName)); - strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); - uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); } else if (udf->funcType == TSDB_FUNC_TYPE_AGGREGATE) { char processFuncName[TSDB_FUNC_NAME_LEN] = {0}; strcpy(processFuncName, udfName); @@ -215,7 +210,7 @@ void udfdProcessRequest(uv_work_t *req) { udf->scalarProcFunc(&input, &output); convertUdfColumnToDataBlock(&output, &response.callRsp.resultData); - udf->freeUdfColumn(&output); + freeUdfColumn(&output); break; } case TSDB_UDF_CALL_AGG_INIT: { diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index b2367313ae..4384d326cb 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -18,52 +18,20 @@ int32_t udf1_destroy() { } int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { - SUdfColumnData *resultData = &resultCol->colData; - resultData->numOfRows = block->numOfRows; - SUdfColumnData *srcData = &block->udfCols[0]->colData; - resultData->varLengthColumn = srcData->varLengthColumn; - - if (resultData->varLengthColumn) { - resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen; - resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen); - memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen); - - resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen; - resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen); - memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen); - } else { - resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen; - resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen); - memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen); - - resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen; - resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen); - memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen); - for (int32_t i = 0; i < resultData->numOfRows; ++i) { - *(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88; - } - } - SUdfColumnMeta *meta = &resultCol->colMeta; meta->bytes = 4; meta->type = TSDB_DATA_TYPE_INT; meta->scale = 0; meta->precision = 0; - return 0; -} -int32_t udf1_free(SUdfColumn *col) { - SUdfColumnData *data = &col->colData; - if (data->varLengthColumn) { - free(data->varLenCol.varOffsets); - data->varLenCol.varOffsets = NULL; - free(data->varLenCol.payload); - data->varLenCol.payload = NULL; - } else { - free(data->fixLenCol.nullBitmap); - data->fixLenCol.nullBitmap = NULL; - free(data->fixLenCol.data); - data->fixLenCol.data = NULL; + SUdfColumnData *resultData = &resultCol->colData; + resultData->numOfRows = block->numOfRows; + SUdfColumnData *srcData = &block->udfCols[0]->colData; + + for (int32_t i = 0; i < resultData->numOfRows; ++i) { + int32_t luckyNum = 88; + udfColSetRow(resultCol, i, (char*)&luckyNum, false); } + return 0; } \ No newline at end of file From 8954035a2cf85cf64dcad5d0ee05c36999f3dcb5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 May 2022 19:05:55 +0800 Subject: [PATCH 08/13] fix(query): update the unit test cases. --- source/libs/executor/src/sortoperator.c | 139 ++++++++++++++++++++++++ source/libs/executor/test/sortTests.cpp | 6 +- 2 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 source/libs/executor/src/sortoperator.c diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c new file mode 100644 index 0000000000..0f973b0cf0 --- /dev/null +++ b/source/libs/executor/src/sortoperator.c @@ -0,0 +1,139 @@ +#include "tdatablock.h" +#include "executorimpl.h" + +static SSDataBlock* doSort(SOperatorInfo* pOperator); +static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); + +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols, + SArray* pIndexMap, SExecTaskInfo* pTaskInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + int32_t rowSize = pResBlock->info.rowSize; + + if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { + goto _error; + } + + pOperator->pExpr = pExprInfo; + pOperator->numOfExprs = numOfCols; + pInfo->binfo.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + pInfo->binfo.pRes = pResBlock; + + initResultSizeInfo(pOperator, 1024); + pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; // there are headers, so pageSize = rowSize + header + + pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer + pInfo->pSortInfo = pSortInfo; + pInfo->inputSlotMap = pIndexMap; + pOperator->name = "SortOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; + pOperator->blocking = true; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + + pOperator->pTaskInfo = pTaskInfo; + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL); + + int32_t code = appendDownstream(pOperator, &downstream, 1); + return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; +} + +// TODO merge aggregate super table +void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + bool isNull = tsortIsNullVal(pTupleHandle, i); + if (isNull) { + colDataAppend(pColInfo, pBlock->info.rows, NULL, true); + } else { + char* pData = tsortGetValue(pTupleHandle, i); + colDataAppend(pColInfo, pBlock->info.rows, pData, false); + } + } + + pBlock->info.rows += 1; +} + +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { + blockDataCleanup(pDataBlock); + blockDataEnsureCapacity(pDataBlock, capacity); + + blockDataEnsureCapacity(pDataBlock, capacity); + + while (1) { + STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + if (pTupleHandle == NULL) { + break; + } + + appendOneRowToDataBlock(pDataBlock, pTupleHandle); + if (pDataBlock->info.rows >= capacity) { + return pDataBlock; + } + } + + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + +SSDataBlock* loadNextDataBlock(void* param) { + SOperatorInfo* pOperator = (SOperatorInfo*)param; + return pOperator->fpSet.getNextFn(pOperator); +} + +// todo refactor: merged with fetch fp +void applyScalarFunction(SSDataBlock* pBlock, void* param) { + SOperatorInfo* pOperator = param; + SSortOperatorInfo* pSort = pOperator->info; + if (pOperator->pExpr != NULL) { + projectApplyFunctions(pOperator->pExpr, pBlock, pBlock, pSort->binfo.pCtx, pOperator->numOfExprs, NULL); + } +} + +SSDataBlock* doSort(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSortOperatorInfo* pInfo = pOperator->info; + + if (pOperator->status == OP_RES_TO_RETURN) { + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); + } + + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->inputSlotMap, SORT_SINGLESOURCE_SORT, + pInfo->bufPageSize, numOfBufPage, pInfo->binfo.pRes, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); + + + SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); + ps->param = pOperator->pDownstream[0]; + tsortAddSource(pInfo->pSortHandle, ps); + + int32_t code = tsortOpen(pInfo->pSortHandle); + taosMemoryFreeClear(ps); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + pOperator->status = OP_RES_TO_RETURN; + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity); +} + +void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { + SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; + pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); + + taosArrayDestroy(pInfo->pSortInfo); + taosArrayDestroy(pInfo->inputSlotMap); +} diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index e646387856..c037fae75f 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -210,7 +210,7 @@ TEST(testCase, inMem_sort_Test) { taosArrayPush(orderInfo, &oi); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 1024, 5, NULL, "test_abc"); - tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); + tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); _info* pInfo = (_info*) taosMemoryCalloc(1, sizeof(_info)); pInfo->startVal = 0; @@ -299,7 +299,7 @@ TEST(testCase, external_mem_sort_Test) { taosArrayPush(orderInfo, &oi); SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_SINGLESOURCE_SORT, 128, 3, NULL, "test_abc"); - tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); + tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); SSortSource* ps = static_cast(taosMemoryCalloc(1, sizeof(SSortSource))); ps->param = &pInfo[i]; @@ -366,7 +366,7 @@ TEST(testCase, ordered_merge_sort_Test) { } SSortHandle* phandle = tsortCreateSortHandle(orderInfo, NULL, SORT_MULTISOURCE_MERGE, 1024, 5, pBlock,"test_abc"); - tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock); + tsortSetFetchRawDataFp(phandle, getSingleColDummyBlock, NULL, NULL); tsortSetComparFp(phandle, docomp); SSortSource* p[10] = {0}; From a0c92acd56e5eb53d8ffd0e5312ee256b524edf1 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 5 May 2022 19:50:49 +0800 Subject: [PATCH 09/13] deal with not all columns has agg --- source/common/src/tdatablock.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index dcea167e81..cc7bcabee0 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -491,7 +491,12 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { - bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]); + bool isNull = false; + if (pBlock->pBlockAgg == NULL) { + isNull = colDataIsNull(pColData, pBlock->info.rows, j, NULL); + } else { + isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg[i]); + } char* p = colDataGetData(pColData, j); colDataAppend(pDstCol, j - startIndex, p, isNull); From 26285015cb1725189f3844f1960299b81b186ff4 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 5 May 2022 20:11:34 +0800 Subject: [PATCH 10/13] fix(query): fix cast function constant convert to nchar result length --- source/libs/nodes/src/nodesCodeFuncs.c | 2 +- source/libs/parser/src/parAstCreater.c | 6 ++---- source/libs/scalar/src/sclfunc.c | 6 +++++- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index c498c9a1cc..5dcacc4354 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2084,7 +2084,7 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) { code = TSDB_CODE_OUT_OF_MEMORY; break; } - varDataSetLen(pNode->datum.p, pNode->node.resType.bytes - VARSTR_HEADER_SIZE); + varDataSetLen(pNode->datum.p, pNode->node.resType.bytes); if (TSDB_DATA_TYPE_NCHAR == pNode->node.resType.type) { char *buf = taosMemoryCalloc(1, pNode->node.resType.bytes * 2 + VARSTR_HEADER_SIZE + 1); if (NULL == buf) { diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index c021f65090..82c5d17f10 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -363,10 +363,8 @@ SNode* createCastFunctionNode(SAstCreateContext* pCxt, SNode* pExpr, SDataType d CHECK_OUT_OF_MEM(func); strcpy(func->functionName, "cast"); func->node.resType = dt; - if (TSDB_DATA_TYPE_BINARY == dt.type) { - func->node.resType.bytes += 2; - } else if (TSDB_DATA_TYPE_NCHAR == dt.type) { - func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE + 2; + if (TSDB_DATA_TYPE_NCHAR == dt.type) { + func->node.resType.bytes = func->node.resType.bytes * TSDB_NCHAR_SIZE; } nodesListMakeAppend(&func->pParameterList, pExpr); return (SNode*)func; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 2ed4688e4b..51ac48adaf 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -709,6 +709,10 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int16_t outputType = GET_PARAM_TYPE(&pOutput[0]); int64_t outputLen = GET_PARAM_BYTES(&pOutput[0]); + if (IS_VAR_DATA_TYPE(outputType)) { + outputLen += VARSTR_HEADER_SIZE; + } + char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1); char *output = outputBuf; @@ -823,7 +827,7 @@ int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp } //for constant conversion, need to set proper length of pOutput description if (len < outputLen - VARSTR_HEADER_SIZE) { - pOutput->columnData->info.bytes = len + VARSTR_HEADER_SIZE; + pOutput->columnData->info.bytes = len; } break; } From ae33656e15a19d8145ad37a2f51f3b127ef77165 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 5 May 2022 20:17:53 +0800 Subject: [PATCH 11/13] fix(rpc): fix timeout error --- source/libs/transport/src/transComm.c | 2 +- tests/script/tsim/db/alter_option.sim | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 1bd7d0857e..98e9e67ede 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -435,7 +435,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_ if (minNode) { SDelayTask* minTask = container_of(minNode, SDelayTask, node); if (minTask->execTime < task->execTime) { - timeoutMs = minTask->execTime <= now ? 0 : now - minTask->execTime; + timeoutMs = minTask->execTime <= now ? 0 : minTask->execTime - now; } } diff --git a/tests/script/tsim/db/alter_option.sim b/tests/script/tsim/db/alter_option.sim index 417e53daff..40882306c8 100644 --- a/tests/script/tsim/db/alter_option.sim +++ b/tests/script/tsim/db/alter_option.sim @@ -66,7 +66,7 @@ print ============= create database # | REPLICA value [1 | 3] # | WAL value [1 | 2] -sql create database db CACHELAST 3 COMP 0 DAYS 345600 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1440000 PRECISION 'ns' REPLICA 1 WAL 2 VGROUPS 6 SINGLE_STABLE 1 +sql create database db CACHELAST 3 COMP 0 DAYS 345600 FSYNC 1000 MAXROWS 8000 MINROWS 10 KEEP 1440000 PRECISION 'ns' REPLICA 3 WAL 2 VGROUPS 6 SINGLE_STABLE 1 sql show databases print rows: $rows print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 @@ -86,7 +86,7 @@ endi if $data3_db != 0 then # ntables return -1 endi -if $data4_db != 1 then # replica +if $data4_db != 3 then # replica return -1 endi if $data5_db != nostrict then # strict From bfe01ed8ea3aafa2723abfabfaf2217afb54a831 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 5 May 2022 21:00:14 +0800 Subject: [PATCH 12/13] add test case --- include/common/tdatablock.h | 1 - tests/script/sh/copy_udf.sh | 33 ++++++++++++++++++++++++ tests/script/tsim/query/udf.sim | 45 +++++++++++++++++++++++++++++++++ 3 files changed, 78 insertions(+), 1 deletion(-) create mode 100755 tests/script/sh/copy_udf.sh create mode 100644 tests/script/tsim/query/udf.sim diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 5d1b408b14..19b108dcbf 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -54,7 +54,6 @@ SEpSet getEpSet_s(SCorEpSet* pEpSet); BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \ } while (0) -<<<<<<< HEAD #define colDataSetNotNull_f(bm_, r_) \ do { \ BMCharPos(bm_, r_) &= ~(1u << (7u - BitPos(r_))); \ diff --git a/tests/script/sh/copy_udf.sh b/tests/script/sh/copy_udf.sh new file mode 100755 index 0000000000..e4bc9adf82 --- /dev/null +++ b/tests/script/sh/copy_udf.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +set +e +#set -x + +echo "Executing gen_udf.sh" + +SCRIPT_DIR=`dirname $0` +cd $SCRIPT_DIR/../ + +IN_TDINTERNAL="community" +if [[ "$SCRIPT_DIR" == *"$IN_TDINTERNAL"* ]]; then + cd ../../.. +else + cd ../../ +fi + +TAOS_DIR=`pwd` +UDF1_DIR=`find $TAOS_DIR -name "libudf1.so"|grep lib|head -n1` +UDF2_DIR=`find $TAOS_DIR -name "libudf2.so"|grep lib|head -n1` + +echo $UDF1_DIR +echo $UDF2_DIR + +UDF_TMP=/tmp/udf +mkdir $UDF_TMP +rm $UDF_TMP/libudf1.so +rm $UDF_TMP/libudf2.so + +echo "Copy udf shared library files to $UDF_TMP" + +cp $UDF1_DIR $UDF_TMP +cp $UDF2_DIR $UDF_TMP diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim new file mode 100644 index 0000000000..a2684153df --- /dev/null +++ b/tests/script/tsim/query/udf.sim @@ -0,0 +1,45 @@ +system sh/stop_dnodes.sh + +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 -c wallevel -v 2 +system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1 + +print ========= start dnode1 as LEADER +system sh/exec.sh -n dnode1 -s start +sleep 2000 +sql connect + +print ======== step1 udf +system sh/copy_udf.sh +sql create database udf vgroups 3; +sql use udf; +sql show databases; + +sql create table t (ts timestamp, f int); +sql insert into t values(now, 1)(now+1s, 2); + +sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8; +sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8; +sql show functions; +sql select udf1(f) from t; +if $rows != 2 then + return -1 +endi +if $data00 != 88 then + return -1 +endi +if $data10 != 88 then + return -1 +endi + +sql select udf2(f) from t; +if $rows != 1 then + return -1 +endi +if $data00 != 2.236067977 then + return -1 +endi + +#sql drop function udf1; +#sql drop function udf2; +system sh/exec.sh -n dnode1 -s stop -x SIGKILL From 3a162be310b3548843a249a08f68c85c95c24696 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 5 May 2022 21:04:24 +0800 Subject: [PATCH 13/13] add test to jenkins --- tests/script/jenkins/basic.txt | 1 + tests/script/sh/copy_udf.sh | 2 +- tests/script/tsim/query/udf.sim | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index d1c5e83975..b95e822df3 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -39,6 +39,7 @@ ./test.sh -f tsim/query/explain.sim ./test.sh -f tsim/query/session.sim ./test.sh -f tsim/query/scalarNull.sim +./test.sh -f tsim/query/udf.sim # ---- qnode ./test.sh -f tsim/qnode/basic1.sim diff --git a/tests/script/sh/copy_udf.sh b/tests/script/sh/copy_udf.sh index e4bc9adf82..e1d9ff53d2 100755 --- a/tests/script/sh/copy_udf.sh +++ b/tests/script/sh/copy_udf.sh @@ -3,7 +3,7 @@ set +e #set -x -echo "Executing gen_udf.sh" +echo "Executing copy_udf.sh" SCRIPT_DIR=`dirname $0` cd $SCRIPT_DIR/../ diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index a2684153df..8acd07cfe4 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -21,6 +21,9 @@ sql insert into t values(now, 1)(now+1s, 2); sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8; sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8; sql show functions; +if $rows != 2 then + return -1 +endi sql select udf1(f) from t; if $rows != 2 then return -1