diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ce70a9ba4a..c9675c3844 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -331,6 +331,7 @@ enum OPERATOR_TYPE_E { OP_Distinct = 20, OP_Join = 21, OP_StateWindow = 22, + OP_Order = 23, }; typedef struct SOperatorInfo { @@ -506,7 +507,7 @@ typedef struct SStateWindowOperatorInfo { int32_t start; char* prevData; // previous data bool reptScan; -} SStateWindowOperatorInfo ; +} SStateWindowOperatorInfo; typedef struct SDistinctOperatorInfo { SHashObj *pSet; @@ -539,6 +540,13 @@ typedef struct SMultiwayMergeInfo { SArray *udfInfo; } SMultiwayMergeInfo; +// todo support the disk-based sort +typedef struct SOrderOperatorInfo { + int32_t colIndex; + int32_t order; + SSDataBlock *pDataBlock; +} SOrderOperatorInfo; + void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3f6df2ec07..fb9f5d93d4 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -16,7 +16,6 @@ #include "qFill.h" #include "taosmsg.h" #include "tglobal.h" -#include "talgo.h" #include "exception.h" #include "hash.h" @@ -242,8 +241,7 @@ static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, - SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, - int32_t groupIndex); + SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId); SArray* getOrderCheckColumns(SQueryAttr* pQuery); @@ -886,8 +884,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t break; } } - - return; } static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, @@ -5218,6 +5214,35 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx return pOperator; } +SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { + SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); + + { + SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); + pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + for(int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData col = {0}; + col.info.bytes = pExpr->base.resBytes; + col.info.colId = pExpr->base.resColId; + col.info.type = pExpr->base.resType; + taosArrayPush(pDataBlock->pDataBlock, &col); + } + + pDataBlock->info.numOfCols = numOfOutput; + } + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "Order"; + pOperator->operatorType = OP_Order; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->exec = doSort; + pOperator->cleanup = NULL; + + return pOperator; +} + static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } @@ -5642,7 +5667,6 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->pRes; } - static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STableQueryInfo* item = pRuntimeEnv->current; @@ -5775,6 +5799,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes; } + static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -5986,6 +6011,106 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { } } +static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { + assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols); + + int32_t numOfCols = pSrc->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock->pData, i); + SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock->pData, i); + + int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes; + char* tmp = realloc(pCol2->pData, newSize); + if (tmp != NULL) { + pCol2->pData = tmp; + int32_t offset = pCol2->info.bytes * pDest->info.rows; + memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes); + } else { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* doSort(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SOrderOperatorInfo* pInfo = pOperator->info; +// SSDataBlock* pRes = pInfo->pRes; + +// pRes->info.rows = 0; + SSDataBlock* pBlock = NULL; + while(1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + + // start to flush data into disk and try do multiway merge sort + if (pBlock == NULL) { +// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); +// pOperator->status = OP_EXEC_DONE; + break; + } + + int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock); + if (code != TSDB_CODE_SUCCESS) { +// return code; + } + + /*SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); + + int16_t bytes = pColInfoData->info.bytes; + int16_t type = pColInfoData->info.type; + + // ensure the output buffer size + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0); + if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { + int32_t newSize = pRes->info.rows + pBlock->info.rows; + char* tmp = realloc(pResultColInfoData->pData, newSize * bytes); + if (tmp == NULL) { + return NULL; + } else { + pResultColInfoData->pData = tmp; + pInfo->outputCapacity = newSize; + } + } + + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + char* val = ((char*)pColInfoData->pData) + bytes * i; + if (isNull(val, type)) { + continue; + } + + size_t keyLen = 0; + if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) { + tstr* var = (tstr*)(val); + keyLen = varDataLen(var); + } else { + keyLen = bytes; + } + + int dummy; + void* res = taosHashGet(pInfo->pSet, val, keyLen); + if (res == NULL) { + taosHashPut(pInfo->pSet, val, keyLen, &dummy, sizeof(dummy)); + char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows; + memcpy(start, val, bytes); + pRes->info.rows += 1; + } + } + + if (pRes->info.rows >= pInfo->threshold) { + break; + }*/ + } + + return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; +} + // todo set the attribute of query scan count static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr) { for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { @@ -6607,11 +6732,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { return NULL; } - SDistinctOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->pRes; - pRes->info.rows = 0; SSDataBlock* pBlock = NULL; while(1) { diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index e953f4c464..55ba14f84f 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -286,10 +286,6 @@ int32_t taosArrayCompareString(const void* a, const void* b) { return compareLenPrefixedStr(x, y); } -//static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) { -// const SArray* arr = (const SArray*) pRight; -// return taosArraySearchString(arr, pLeft, taosArrayCompareString, TD_EQ) == NULL ? 0 : 1; -//} static int32_t compareFindItemInSet(const void *pLeft, const void* pRight) { return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0; }