From 8412703d8560c65c9dd3214f7025ca08d8a4d4e1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 3 Aug 2021 19:19:03 +0800 Subject: [PATCH 01/22] [td-225] --- src/query/inc/qExecutor.h | 10 ++- src/query/src/qExecutor.c | 139 +++++++++++++++++++++++++++++++++++--- src/util/src/tcompare.c | 4 -- 3 files changed, 140 insertions(+), 13 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index ce70a9ba4a..c9675c3844 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -331,6 +331,7 @@ enum OPERATOR_TYPE_E { OP_Distinct = 20, OP_Join = 21, OP_StateWindow = 22, + OP_Order = 23, }; typedef struct SOperatorInfo { @@ -506,7 +507,7 @@ typedef struct SStateWindowOperatorInfo { int32_t start; char* prevData; // previous data bool reptScan; -} SStateWindowOperatorInfo ; +} SStateWindowOperatorInfo; typedef struct SDistinctOperatorInfo { SHashObj *pSet; @@ -539,6 +540,13 @@ typedef struct SMultiwayMergeInfo { SArray *udfInfo; } SMultiwayMergeInfo; +// todo support the disk-based sort +typedef struct SOrderOperatorInfo { + int32_t colIndex; + int32_t order; + SSDataBlock *pDataBlock; +} SOrderOperatorInfo; + void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3f6df2ec07..fb9f5d93d4 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -16,7 +16,6 @@ #include "qFill.h" #include "taosmsg.h" #include "tglobal.h" -#include "talgo.h" #include "exception.h" #include "hash.h" @@ -242,8 +241,7 @@ static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, - SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, - int32_t groupIndex); + SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId); SArray* getOrderCheckColumns(SQueryAttr* pQuery); @@ -886,8 +884,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t break; } } - - return; } static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, @@ -5218,6 +5214,35 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx return pOperator; } +SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { + SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); + + { + SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); + pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); + for(int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData col = {0}; + col.info.bytes = pExpr->base.resBytes; + col.info.colId = pExpr->base.resColId; + col.info.type = pExpr->base.resType; + taosArrayPush(pDataBlock->pDataBlock, &col); + } + + pDataBlock->info.numOfCols = numOfOutput; + } + + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + pOperator->name = "Order"; + pOperator->operatorType = OP_Order; + pOperator->blockingOptr = true; + pOperator->status = OP_IN_EXECUTING; + pOperator->info = pInfo; + pOperator->exec = doSort; + pOperator->cleanup = NULL; + + return pOperator; +} + static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } @@ -5642,7 +5667,6 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { return pIntervalInfo->pRes; } - static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STableQueryInfo* item = pRuntimeEnv->current; @@ -5775,6 +5799,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes; } + static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -5986,6 +6011,106 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { } } +static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { + assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols); + + int32_t numOfCols = pSrc->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock->pData, i); + SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock->pData, i); + + int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes; + char* tmp = realloc(pCol2->pData, newSize); + if (tmp != NULL) { + pCol2->pData = tmp; + int32_t offset = pCol2->info.bytes * pDest->info.rows; + memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes); + } else { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* doSort(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SOrderOperatorInfo* pInfo = pOperator->info; +// SSDataBlock* pRes = pInfo->pRes; + +// pRes->info.rows = 0; + SSDataBlock* pBlock = NULL; + while(1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + + // start to flush data into disk and try do multiway merge sort + if (pBlock == NULL) { +// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); +// pOperator->status = OP_EXEC_DONE; + break; + } + + int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock); + if (code != TSDB_CODE_SUCCESS) { +// return code; + } + + /*SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); + + int16_t bytes = pColInfoData->info.bytes; + int16_t type = pColInfoData->info.type; + + // ensure the output buffer size + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0); + if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { + int32_t newSize = pRes->info.rows + pBlock->info.rows; + char* tmp = realloc(pResultColInfoData->pData, newSize * bytes); + if (tmp == NULL) { + return NULL; + } else { + pResultColInfoData->pData = tmp; + pInfo->outputCapacity = newSize; + } + } + + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + char* val = ((char*)pColInfoData->pData) + bytes * i; + if (isNull(val, type)) { + continue; + } + + size_t keyLen = 0; + if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) { + tstr* var = (tstr*)(val); + keyLen = varDataLen(var); + } else { + keyLen = bytes; + } + + int dummy; + void* res = taosHashGet(pInfo->pSet, val, keyLen); + if (res == NULL) { + taosHashPut(pInfo->pSet, val, keyLen, &dummy, sizeof(dummy)); + char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows; + memcpy(start, val, bytes); + pRes->info.rows += 1; + } + } + + if (pRes->info.rows >= pInfo->threshold) { + break; + }*/ + } + + return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; +} + // todo set the attribute of query scan count static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr) { for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { @@ -6607,11 +6732,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { return NULL; } - SDistinctOperatorInfo* pInfo = pOperator->info; SSDataBlock* pRes = pInfo->pRes; - pRes->info.rows = 0; SSDataBlock* pBlock = NULL; while(1) { diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index e953f4c464..55ba14f84f 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -286,10 +286,6 @@ int32_t taosArrayCompareString(const void* a, const void* b) { return compareLenPrefixedStr(x, y); } -//static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) { -// const SArray* arr = (const SArray*) pRight; -// return taosArraySearchString(arr, pLeft, taosArrayCompareString, TD_EQ) == NULL ? 0 : 1; -//} static int32_t compareFindItemInSet(const void *pLeft, const void* pRight) { return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0; } From 8ad04a513c67d1437f1038fcfbb954fed7a68ef7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 4 Aug 2021 19:21:34 +0800 Subject: [PATCH 02/22] [td-5796]: fix the invalid error message. --- src/client/src/tscServer.c | 6 ++++-- src/client/src/tscSubquery.c | 5 +++-- src/client/src/tscUtil.c | 28 +++++++++++++++------------- 3 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 57aefac852..7120d780f9 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2980,11 +2980,13 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid); } + // remove stored tableMeta info in hash table tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self); + tscResetSqlCmd(pCmd, true); - pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); - pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); +// pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); +// pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SArray* pNameList = taosArrayInit(1, POINTER_BYTES); SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 8c0d642ca6..3816e6d619 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2721,9 +2721,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO SSqlCmd* pParentCmd = &pParentSql->cmd; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); + tscResetSqlCmd(pParentCmd, true); - pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); - pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); +// pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); +// pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->retry++; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 694d2b2af6..849cbb3aa8 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1374,18 +1374,19 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { pCmd->insertParam.tagData.dataLen = 0; tscFreeQueryInfo(pCmd, clearCachedMeta); + pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); - if (pCmd->pTableMetaMap != NULL) { - STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL); - while (p) { - taosArrayDestroy(p->vgroupIdList); - tfree(p->pTableMeta); - p = taosHashIterate(pCmd->pTableMetaMap, p); - } - - taosHashCleanup(pCmd->pTableMetaMap); - pCmd->pTableMetaMap = NULL; - } +// if (pCmd->pTableMetaMap != NULL) { +// STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL); +// while (p) { +// taosArrayDestroy(p->vgroupIdList); +// tfree(p->pTableMeta); +// p = taosHashIterate(pCmd->pTableMetaMap, p); +// } +// +// taosHashCleanup(pCmd->pTableMetaMap); +// pCmd->pTableMetaMap = NULL; +// } } void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) { @@ -3845,9 +3846,10 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { SSqlCmd* pParentCmd = &pParentSql->cmd; STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); + tscResetSqlCmd(pParentCmd, true); - pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); - pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); +// pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); +// pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->retry++; From 7466d787ce07085199cea07a14923a16a0cfc4d8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Aug 2021 11:05:22 +0800 Subject: [PATCH 03/22] [td-5573]: support order clause in outer query. --- src/query/inc/qExtbuffer.h | 2 + src/query/src/qExecutor.c | 171 ++++++++++++++-------------------- src/query/src/qExtbuffer.c | 57 +++++++++++- src/query/src/qPercentile.c | 2 +- src/query/tests/cSortTest.cpp | 124 ++++++++++++++++++++++++ src/query/tests/unitTest.cpp | 2 - src/util/inc/tcompare.h | 2 +- src/util/src/tcompare.c | 95 ++++++++++++++----- src/util/src/tskiplist.c | 2 +- 9 files changed, 326 insertions(+), 131 deletions(-) create mode 100644 src/query/tests/cSortTest.cpp diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h index cf0e8ce31a..b5ea9932b9 100644 --- a/src/query/inc/qExtbuffer.h +++ b/src/query/inc/qExtbuffer.h @@ -227,6 +227,8 @@ typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType); +void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn); + int32_t compare_sa(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data); int32_t compare_sd(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index fb9f5d93d4..f836d83730 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5214,7 +5214,76 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx return pOperator; } -SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { +static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { + assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols); + + int32_t numOfCols = pSrc->info.numOfCols; + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock->pData, i); + SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock->pData, i); + + int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes; + char* tmp = realloc(pCol2->pData, newSize); + if (tmp != NULL) { + pCol2->pData = tmp; + int32_t offset = pCol2->info.bytes * pDest->info.rows; + memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes); + } else { + return TSDB_CODE_VND_OUT_OF_MEMORY; + } + } + + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* doSort(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SOrderOperatorInfo* pInfo = pOperator->info; + + SSDataBlock* pBlock = NULL; + while(1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + + // start to flush data into disk and try do multiway merge sort + if (pBlock == NULL) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + break; + } + + int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock); + if (code != TSDB_CODE_SUCCESS) { + // todo handle error + } + } + + int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; + void** pCols = calloc(numOfCols, POINTER_BYTES); + SSchema* pSchema = calloc(numOfCols, sizeof(SSchema)); + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); + pCols[i] = p1->pData; + pSchema[i].colId = p1->info.colId; + pSchema[i].bytes = p1->info.bytes; + pSchema[i].type = p1->info.type; + } + + __compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order); + taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp); + + tfree(pCols); + tfree(pSchema); + return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; +} + +SOperatorInfo *createOrderOperatorInfo(SExprInfo* pExpr, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); { @@ -6011,106 +6080,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { } } -static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { - assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols); - - int32_t numOfCols = pSrc->info.numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock->pData, i); - SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock->pData, i); - - int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes; - char* tmp = realloc(pCol2->pData, newSize); - if (tmp != NULL) { - pCol2->pData = tmp; - int32_t offset = pCol2->info.bytes * pDest->info.rows; - memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes); - } else { - return TSDB_CODE_VND_OUT_OF_MEMORY; - } - } - - return TSDB_CODE_SUCCESS; -} - -static SSDataBlock* doSort(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SOrderOperatorInfo* pInfo = pOperator->info; -// SSDataBlock* pRes = pInfo->pRes; - -// pRes->info.rows = 0; - SSDataBlock* pBlock = NULL; - while(1) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - - // start to flush data into disk and try do multiway merge sort - if (pBlock == NULL) { -// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); -// pOperator->status = OP_EXEC_DONE; - break; - } - - int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock); - if (code != TSDB_CODE_SUCCESS) { -// return code; - } - - /*SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); - - int16_t bytes = pColInfoData->info.bytes; - int16_t type = pColInfoData->info.type; - - // ensure the output buffer size - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, 0); - if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { - int32_t newSize = pRes->info.rows + pBlock->info.rows; - char* tmp = realloc(pResultColInfoData->pData, newSize * bytes); - if (tmp == NULL) { - return NULL; - } else { - pResultColInfoData->pData = tmp; - pInfo->outputCapacity = newSize; - } - } - - for(int32_t i = 0; i < pBlock->info.rows; ++i) { - char* val = ((char*)pColInfoData->pData) + bytes * i; - if (isNull(val, type)) { - continue; - } - - size_t keyLen = 0; - if (IS_VAR_DATA_TYPE(pOperator->pExpr->base.colType)) { - tstr* var = (tstr*)(val); - keyLen = varDataLen(var); - } else { - keyLen = bytes; - } - - int dummy; - void* res = taosHashGet(pInfo->pSet, val, keyLen); - if (res == NULL) { - taosHashPut(pInfo->pSet, val, keyLen, &dummy, sizeof(dummy)); - char* start = pResultColInfoData->pData + bytes * pInfo->pRes->info.rows; - memcpy(start, val, bytes); - pRes->info.rows += 1; - } - } - - if (pRes->info.rows >= pInfo->threshold) { - break; - }*/ - } - - return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; -} - // todo set the attribute of query scan count static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr) { for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index cc47cc824b..c4f5d6efd5 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -12,7 +12,6 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "qExtbuffer.h" #include "os.h" #include "qAggMain.h" #include "queryLog.h" @@ -21,6 +20,8 @@ #include "taosmsg.h" #include "tulog.h" #include "qExecutor.h" +#include "qExtbuffer.h" +#include "tcompare.h" #define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \ (data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes) @@ -767,6 +768,60 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta free(buf); } +void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn) { + assert(numOfRows > 0 && numOfCols > 0 && index >= 0 && index < numOfCols); + + int32_t bytes = pSchema[index].bytes; + int32_t size = bytes + sizeof(int32_t); + + char* buf = calloc(1, size * numOfRows); + + for(int32_t i = 0; i < numOfRows; ++i) { + char* dest = buf + size * i; + memcpy(dest, pCols[index] + bytes * i, bytes); + *(int32_t*)(dest+bytes) = i; + } + + qsort(buf, numOfRows, size, compareFn); + + int32_t prevLength = 0; + char* p = NULL; + + for(int32_t i = 0; i < numOfCols; ++i) { + int32_t bytes1 = pSchema[i].bytes; + + if (i == index) { + for(int32_t j = 0; j < numOfRows; ++j){ + char* src = buf + (j * size); + char* dest = pCols[i] + (j * bytes1); + memcpy(dest, src, bytes1); + } + } else { + // make sure memory buffer is enough + if (prevLength < bytes1) { + char *tmp = realloc(p, bytes1 * numOfRows); + assert(tmp); + + p = tmp; + prevLength = bytes1; + } + + memcpy(p, pCols[i], bytes1 * numOfRows); + + for(int32_t j = 0; j < numOfRows; ++j){ + char* dest = pCols[i] + bytes1 * j; + + int32_t newPos = *(int32_t*)(buf + (j * size) + bytes); + char* src = p + (newPos * bytes1); + memcpy(dest, src, bytes1); + } + } + } + + tfree(buf); + tfree(p); +} + /* * deep copy of sschema */ diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index e3326cc26b..e9022db503 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -237,7 +237,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, } pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes; - pBucket->comparFn = getKeyComparFunc(pBucket->type); + pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC); pBucket->hashFunc = getHashFunc(pBucket->type); if (pBucket->hashFunc == NULL) { diff --git a/src/query/tests/cSortTest.cpp b/src/query/tests/cSortTest.cpp new file mode 100644 index 0000000000..aa5aa89afc --- /dev/null +++ b/src/query/tests/cSortTest.cpp @@ -0,0 +1,124 @@ +#include +#include + +#include "taos.h" +#include "tsdb.h" +#include "qExtbuffer.h" + +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +namespace { + int32_t comp(const void* p1, const void* p2) { + int32_t* x1 = (int32_t*) p1; + int32_t* x2 = (int32_t*) p2; + + if (*x1 == *x2) { + return 0; + } else { + return (*x1 > *x2)? 1:-1; + } + } + + int32_t comp1(const void* p1, const void* p2) { + int32_t ret = strncmp((char*) p1, (char*) p2, 20); + + if (ret == 0) { + return 0; + } else { + return ret > 0 ? 1:-1; + } + } +} + +TEST(testCase, colunmnwise_sort_test) { + // void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn) + void* pCols[2] = {0}; + + SSchema s[2] = {{0}}; + s[0].type = TSDB_DATA_TYPE_INT; + s[0].bytes = 4; + s[0].colId = 0; + strcpy(s[0].name, "col1"); + + s[1].type = TSDB_DATA_TYPE_BINARY; + s[1].bytes = 20; + s[1].colId = 1; + strcpy(s[1].name, "col2"); + + int32_t* p = (int32_t*) calloc(5, sizeof(int32_t)); + p[0] = 12; + p[1] = 8; + p[2] = 99; + p[3] = 7; + p[4] = 1; + + char* t1 = (char*) calloc(5, 20); + strcpy(t1, "abc"); + strcpy(t1 + 20, "def"); + strcpy(t1 + 40, "xyz"); + strcpy(t1 + 60, "klm"); + strcpy(t1 + 80, "hij"); + + pCols[0] = (char*) p; + pCols[1] = (char*) t1; + taoscQSort(reinterpret_cast(pCols), s, 2, 5, 0, comp); + + int32_t* px = (int32_t*) pCols[0]; + ASSERT_EQ(px[0], 1); + ASSERT_EQ(px[1], 7); + ASSERT_EQ(px[2], 8); + ASSERT_EQ(px[3], 12); + ASSERT_EQ(px[4], 99); + + char* px1 = (char*) pCols[1]; + ASSERT_STRCASEEQ(px1 + 20 * 0, "hij"); + ASSERT_STRCASEEQ(px1 + 20 * 1, "klm"); + ASSERT_STRCASEEQ(px1 + 20 * 2, "def"); + ASSERT_STRCASEEQ(px1 + 20 * 3, "abc"); + ASSERT_STRCASEEQ(px1 + 20 * 4, "xyz"); + + taoscQSort(pCols, s, 2, 5, 1, comp1); + px = (int32_t*) pCols[0]; + ASSERT_EQ(px[0], 12); + ASSERT_EQ(px[1], 8); + ASSERT_EQ(px[2], 1); + ASSERT_EQ(px[3], 7); + ASSERT_EQ(px[4], 99); + + px1 = (char*) pCols[1]; + ASSERT_STRCASEEQ(px1 + 20 * 0, "abc"); + ASSERT_STRCASEEQ(px1 + 20 * 1, "def"); + ASSERT_STRCASEEQ(px1 + 20 * 2, "hij"); + ASSERT_STRCASEEQ(px1 + 20 * 3, "klm"); + ASSERT_STRCASEEQ(px1 + 20 * 4, "xyz"); +} + +TEST(testCase, columnsort_test) { + SSchema field[1] = { + {TSDB_DATA_TYPE_INT, "k", sizeof(int32_t)}, + }; + + const int32_t num = 2000; + + int32_t *d = (int32_t *)malloc(sizeof(int32_t) * num); + for (int32_t i = 0; i < num; ++i) { + d[i] = i % 4; + } + + const int32_t numOfOrderCols = 1; + int32_t orderColIdx = 0; + SColumnModel *pModel = createColumnModel(field, 1, 1000); + tOrderDescriptor *pDesc = tOrderDesCreate(&orderColIdx, numOfOrderCols, pModel, 1); + + tColDataQSort(pDesc, num, 0, num - 1, (char *)d, 1); + + for (int32_t i = 0; i < num; ++i) { + printf("%d\t", d[i]); + } + printf("\n"); + + destroyColumnModel(pModel); +} \ No newline at end of file diff --git a/src/query/tests/unitTest.cpp b/src/query/tests/unitTest.cpp index e5487a061d..2338adc2c5 100644 --- a/src/query/tests/unitTest.cpp +++ b/src/query/tests/unitTest.cpp @@ -1,6 +1,4 @@ -#include "os.h" #include -#include #include #include "taos.h" diff --git a/src/util/inc/tcompare.h b/src/util/inc/tcompare.h index 612ce7ede0..4861779acd 100644 --- a/src/util/inc/tcompare.h +++ b/src/util/inc/tcompare.h @@ -47,7 +47,7 @@ int WCSPatternMatch(const wchar_t *pattern, const wchar_t *str, size_t size, con int32_t doCompare(const char* a, const char* b, int32_t type, size_t size); -__compar_fn_t getKeyComparFunc(int32_t keyType); +__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order); __compar_fn_t getComparFunc(int32_t type, int32_t optr); diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index 55ba14f84f..57689b7269 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -16,21 +16,17 @@ #include "os.h" #include "ttype.h" #include "tcompare.h" -#include "tarray.h" #include "hash.h" -int32_t compareInt32Val(const void *pLeft, const void *pRight) { - int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight); +int32_t compareInt8Val(const void *pLeft, const void *pRight) { + int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } -int32_t compareInt64Val(const void *pLeft, const void *pRight) { - int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; - return 0; +int32_t compareInt8ValDesc(const void *pLeft, const void *pRight) { + return compareInt8Val(pRight, pLeft); } int32_t compareInt16Val(const void *pLeft, const void *pRight) { @@ -40,13 +36,32 @@ int32_t compareInt16Val(const void *pLeft, const void *pRight) { return 0; } -int32_t compareInt8Val(const void *pLeft, const void *pRight) { - int8_t left = GET_INT8_VAL(pLeft), right = GET_INT8_VAL(pRight); +int32_t compareInt16ValDesc(const void* pLeft, const void* pRight) { + return compareInt16Val(pRight, pLeft); +} + +int32_t compareInt32Val(const void *pLeft, const void *pRight) { + int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } +int32_t compareInt32ValDesc(const void* pLeft, const void* pRight) { + return compareInt32Val(pRight, pLeft); +} + +int32_t compareInt64Val(const void *pLeft, const void *pRight) { + int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight); + if (left > right) return 1; + if (left < right) return -1; + return 0; +} + +int32_t compareInt64ValDesc(const void* pLeft, const void* pRight) { + return compareInt64Val(pRight, pLeft); +} + int32_t compareUint32Val(const void *pLeft, const void *pRight) { int32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight); if (left > right) return 1; @@ -54,6 +69,10 @@ int32_t compareUint32Val(const void *pLeft, const void *pRight) { return 0; } +int32_t compareUint32ValDesc(const void* pLeft, const void* pRight) { + return compareUint32Val(pRight, pLeft); +} + int32_t compareUint64Val(const void *pLeft, const void *pRight) { int64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight); if (left > right) return 1; @@ -61,6 +80,10 @@ int32_t compareUint64Val(const void *pLeft, const void *pRight) { return 0; } +int32_t compareUint64ValDesc(const void* pLeft, const void* pRight) { + return compareUint64Val(pRight, pLeft); +} + int32_t compareUint16Val(const void *pLeft, const void *pRight) { int16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight); if (left > right) return 1; @@ -68,6 +91,10 @@ int32_t compareUint16Val(const void *pLeft, const void *pRight) { return 0; } +int32_t compareUint16ValDesc(const void* pLeft, const void* pRight) { + return compareUint16Val(pRight, pLeft); +} + int32_t compareUint8Val(const void* pLeft, const void* pRight) { uint8_t left = GET_UINT8_VAL(pLeft), right = GET_UINT8_VAL(pRight); if (left > right) return 1; @@ -75,6 +102,10 @@ int32_t compareUint8Val(const void* pLeft, const void* pRight) { return 0; } +int32_t compareUint8ValDesc(const void* pLeft, const void* pRight) { + return compareUint8Val(pRight, pLeft); +} + int32_t compareFloatVal(const void *pLeft, const void *pRight) { float p1 = GET_FLOAT_VAL(pLeft); float p2 = GET_FLOAT_VAL(pRight); @@ -96,6 +127,10 @@ int32_t compareFloatVal(const void *pLeft, const void *pRight) { return FLT_GREATER(p1, p2) ? 1: -1; } +int32_t compareFloatValDesc(const void* pLeft, const void* pRight) { + return compareFloatVal(pRight, pLeft); +} + int32_t compareDoubleVal(const void *pLeft, const void *pRight) { double p1 = GET_DOUBLE_VAL(pLeft); double p2 = GET_DOUBLE_VAL(pRight); @@ -117,6 +152,10 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) { return FLT_GREATER(p1, p2) ? 1: -1; } +int32_t compareDoubleValDesc(const void* pLeft, const void* pRight) { + return compareDoubleVal(pRight, pLeft); +} + int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) { int32_t len1 = varDataLen(pLeft); int32_t len2 = varDataLen(pRight); @@ -133,6 +172,10 @@ int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) { } } +int32_t compareLenPrefixedStrDesc(const void* pLeft, const void* pRight) { + return compareLenPrefixedStr(pRight, pLeft); +} + int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) { int32_t len1 = varDataLen(pLeft); int32_t len2 = varDataLen(pRight); @@ -149,6 +192,10 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) { } } +int32_t compareLenPrefixedWStrDesc(const void* pLeft, const void* pRight) { + return compareLenPrefixedWStr(pRight, pLeft); +} + /* * Compare two strings * TSDB_MATCH: Match @@ -349,50 +396,50 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) { return comparFn; } -__compar_fn_t getKeyComparFunc(int32_t keyType) { +__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order) { __compar_fn_t comparFn = NULL; switch (keyType) { case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_BOOL: - comparFn = compareInt8Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt8Val:compareInt8ValDesc; break; case TSDB_DATA_TYPE_SMALLINT: - comparFn = compareInt16Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt16Val:compareInt16ValDesc; break; case TSDB_DATA_TYPE_INT: - comparFn = compareInt32Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc; break; case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_TIMESTAMP: - comparFn = compareInt64Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt64Val:compareInt64ValDesc; break; case TSDB_DATA_TYPE_FLOAT: - comparFn = compareFloatVal; + comparFn = (order == TSDB_ORDER_ASC)? compareFloatVal:compareFloatValDesc; break; case TSDB_DATA_TYPE_DOUBLE: - comparFn = compareDoubleVal; + comparFn = (order == TSDB_ORDER_ASC)? compareDoubleVal:compareDoubleValDesc; break; case TSDB_DATA_TYPE_UTINYINT: - comparFn = compareUint8Val; + comparFn = (order == TSDB_ORDER_ASC)? compareUint8Val:compareUint8ValDesc; break; case TSDB_DATA_TYPE_USMALLINT: - comparFn = compareUint16Val; + comparFn = (order == TSDB_ORDER_ASC)? compareUint16Val:compareUint16ValDesc; break; case TSDB_DATA_TYPE_UINT: - comparFn = compareUint32Val; + comparFn = (order == TSDB_ORDER_ASC)? compareUint32Val:compareUint32ValDesc; break; case TSDB_DATA_TYPE_UBIGINT: - comparFn = compareUint64Val; + comparFn = (order == TSDB_ORDER_ASC)? compareUint64Val:compareUint64ValDesc; break; case TSDB_DATA_TYPE_BINARY: - comparFn = compareLenPrefixedStr; + comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedStr:compareLenPrefixedStrDesc; break; case TSDB_DATA_TYPE_NCHAR: - comparFn = compareLenPrefixedWStr; + comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedWStr:compareLenPrefixedWStrDesc; break; default: - comparFn = compareInt32Val; + comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc; break; } diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c index b464519ba6..98fd9c094c 100644 --- a/src/util/src/tskiplist.c +++ b/src/util/src/tskiplist.c @@ -54,7 +54,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _ pSkipList->keyFn = fn; pSkipList->seed = rand(); if (comparFn == NULL) { - pSkipList->comparFn = getKeyComparFunc(keyType); + pSkipList->comparFn = getKeyComparFunc(keyType, TSDB_ORDER_ASC); } else { pSkipList->comparFn = comparFn; } From 6400fe192cfcf3c656ee8900392a1a0e729ac722 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Aug 2021 13:13:17 +0800 Subject: [PATCH 04/22] [td-225]fix compiler error. --- src/query/src/qExecutor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index f836d83730..5d8d2bb545 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5272,7 +5272,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { pCols[i] = p1->pData; pSchema[i].colId = p1->info.colId; pSchema[i].bytes = p1->info.bytes; - pSchema[i].type = p1->info.type; + pSchema[i].type = (uint8_t) p1->info.type; } __compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order); From fbe535bd8be422cb2ddade9528302c5eab4335db Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Aug 2021 16:27:13 +0800 Subject: [PATCH 05/22] [td-225] fix compiler error and invalid in query. --- src/client/src/tscSQLParser.c | 7 ++++--- src/common/src/tname.c | 3 +-- src/query/src/qExtbuffer.c | 6 +++--- tests/pytest/query/long_where_query.py | 6 +----- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 5739333886..df2f39ee87 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -3736,7 +3736,8 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, if (pRight->tokenId != TK_SET || !serializeExprListToVariant(pRight->Expr.paramList, &pVal, colType, timePrecision)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg); } - pColumnFilter->pz = (int64_t)calloc(1, pVal->nLen + 1); + + pColumnFilter->pz = (int64_t)calloc(1, pVal->nLen); pColumnFilter->len = pVal->nLen; pColumnFilter->filterstr = 1; memcpy((char *)(pColumnFilter->pz), (char *)(pVal->pz), pVal->nLen); @@ -8772,13 +8773,13 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf tfree(p); } -#if 0 +//#if 0 SQueryNode* p = qCreateQueryPlan(pQueryInfo); char* s = queryPlanToString(p); printf("%s\n", s); tfree(s); qDestroyQueryPlan(p); -#endif +//#endif return TSDB_CODE_SUCCESS; // Does not build query message here } diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 5da48b2e9a..aeade05df2 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -70,12 +70,11 @@ SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFil memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters); for (int32_t j = 0; j < numOfFilters; ++j) { - if (pFilter[j].filterstr) { size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE; pFilter[j].pz = (int64_t) calloc(1, len); - memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len); + memcpy((char*)pFilter[j].pz, (char*)src[j].pz, pFilter[j].len); } } diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c index c4f5d6efd5..9f9347b327 100644 --- a/src/query/src/qExtbuffer.c +++ b/src/query/src/qExtbuffer.c @@ -778,7 +778,7 @@ void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOf for(int32_t i = 0; i < numOfRows; ++i) { char* dest = buf + size * i; - memcpy(dest, pCols[index] + bytes * i, bytes); + memcpy(dest, ((char*)pCols[index]) + bytes * i, bytes); *(int32_t*)(dest+bytes) = i; } @@ -793,7 +793,7 @@ void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOf if (i == index) { for(int32_t j = 0; j < numOfRows; ++j){ char* src = buf + (j * size); - char* dest = pCols[i] + (j * bytes1); + char* dest = (char*) pCols[i] + (j * bytes1); memcpy(dest, src, bytes1); } } else { @@ -809,7 +809,7 @@ void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOf memcpy(p, pCols[i], bytes1 * numOfRows); for(int32_t j = 0; j < numOfRows; ++j){ - char* dest = pCols[i] + bytes1 * j; + char* dest = (char*) pCols[i] + bytes1 * j; int32_t newPos = *(int32_t*)(buf + (j * size) + bytes); char* src = p + (newPos * bytes1); diff --git a/tests/pytest/query/long_where_query.py b/tests/pytest/query/long_where_query.py index 62e9533b62..9bb5f0b3d7 100644 --- a/tests/pytest/query/long_where_query.py +++ b/tests/pytest/query/long_where_query.py @@ -287,13 +287,9 @@ class TDTestCase: tdLog.info(len(sql)) tdSql.error(sql) - endTime = time.time() print("total time %ds" % (endTime - startTime)) - - - - os.system("rm -rf query/long_where_query.py.sql") + #os.system("rm -rf query/long_where_query.py.sql") def stop(self): From d582adfa128064b05a3ac493df91f1a511174fb6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Aug 2021 16:33:55 +0800 Subject: [PATCH 06/22] [td-225]disable print logic plan. --- src/client/src/tscSQLParser.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index df2f39ee87..399f2f17d1 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -8773,13 +8773,13 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf tfree(p); } -//#if 0 +#if 0 SQueryNode* p = qCreateQueryPlan(pQueryInfo); char* s = queryPlanToString(p); printf("%s\n", s); tfree(s); qDestroyQueryPlan(p); -//#endif +#endif return TSDB_CODE_SUCCESS; // Does not build query message here } From c0d80b875d146e16411b8ace491ce297e3e0e818 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Thu, 5 Aug 2021 18:09:59 +0800 Subject: [PATCH 07/22] fix crash issue --- src/client/src/tscProfile.c | 6 +++++- src/client/src/tscUtil.c | 3 +-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index 92ad9b7924..e307690c24 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -264,9 +264,12 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { snprintf(p, remainLen, "N/A"); } else { int32_t len; + if (pSql->pSubs != NULL && pSql->subState.states != NULL) { for (int32_t i = 0; i < pQdesc->numOfSub; ++i) { + SSqlObj* psub = pSql->pSubs[i]; + int64_t self = (psub != NULL)? psub->self:0; len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i, - pSql->pSubs[i]->self, + self, pSql->subState.states[i] ? 'C' : 'I'); if (len > remainLen) { break; @@ -274,6 +277,7 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { remainLen -= len; p += len; } + } } pQdesc->numOfSub = htonl(pQdesc->numOfSub); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 71aa75dafb..ecd65a7ae4 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1482,8 +1482,6 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscFreeMetaSqlObj(&pSql->metaRid); tscFreeMetaSqlObj(&pSql->svgroupRid); - tscFreeSubobj(pSql); - SSqlCmd* pCmd = &pSql->cmd; int32_t cmd = pCmd->command; if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || @@ -1491,6 +1489,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { tscRemoveFromSqlList(pSql); } + tscFreeSubobj(pSql); pSql->signature = NULL; pSql->fp = NULL; tfree(pSql->sqlstr); From 24db3339cf1557f1d80d3e0f600e7b2f4a4d1b18 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Aug 2021 18:18:15 +0800 Subject: [PATCH 08/22] [td-225]fix concurrent issue during building the heartbeat message. --- src/client/src/tscProfile.c | 45 +++++++++++++++++++------------------ src/client/src/tscUtil.c | 1 + 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index e307690c24..b75eb5716f 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -227,16 +227,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) { int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { SHeartBeatMsg *pHeartbeat = pMsg; + int allocedQueriesNum = pHeartbeat->numOfQueries; int allocedStreamsNum = pHeartbeat->numOfStreams; pHeartbeat->numOfQueries = 0; SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData; - // We extract the lock to tscBuildHeartBeatMsg function. - int64_t now = taosGetTimestampMs(); SSqlObj *pSql = pObj->sqlList; + while (pSql) { /* * avoid sqlobj may not be correctly removed from sql list @@ -248,45 +248,46 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { } tstrncpy(pQdesc->sql, pSql->sqlstr, sizeof(pQdesc->sql)); - pQdesc->stime = htobe64(pSql->stime); - pQdesc->queryId = htonl(pSql->queryId); - //pQdesc->useconds = htobe64(pSql->res.useconds); + pQdesc->stime = htobe64(pSql->stime); + pQdesc->queryId = htonl(pSql->queryId); pQdesc->useconds = htobe64(now - pSql->stime); - pQdesc->qId = htobe64(pSql->res.qId); + pQdesc->qId = htobe64(pSql->res.qId); pQdesc->sqlObjId = htobe64(pSql->self); - pQdesc->pid = pHeartbeat->pid; - pQdesc->stableQuery = pSql->cmd.pQueryInfo->stableQuery; + pQdesc->pid = pHeartbeat->pid; pQdesc->numOfSub = pSql->subState.numOfSub; + pQdesc->stableQuery = pSql->cmd.pQueryInfo->stableQuery; char *p = pQdesc->subSqlInfo; int32_t remainLen = sizeof(pQdesc->subSqlInfo); if (pQdesc->numOfSub == 0) { snprintf(p, remainLen, "N/A"); } else { - int32_t len; if (pSql->pSubs != NULL && pSql->subState.states != NULL) { - for (int32_t i = 0; i < pQdesc->numOfSub; ++i) { - SSqlObj* psub = pSql->pSubs[i]; - int64_t self = (psub != NULL)? psub->self:0; - len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i, - self, - pSql->subState.states[i] ? 'C' : 'I'); - if (len > remainLen) { - break; + for (int32_t i = 0; i < pQdesc->numOfSub; ++i) { + SSqlObj *psub = pSql->pSubs[i]; + int64_t self = (psub != NULL)? psub->self : 0; + + int32_t len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i, self, pSql->subState.states[i] ? 'C' : 'I'); + if (len > remainLen) { + break; + } + + remainLen -= len; + p += len; } - remainLen -= len; - p += len; - } } } - pQdesc->numOfSub = htonl(pQdesc->numOfSub); + pQdesc->numOfSub = htonl(pQdesc->numOfSub); taosGetFqdn(pQdesc->fqdn); pHeartbeat->numOfQueries++; pQdesc++; + pSql = pSql->next; - if (pHeartbeat->numOfQueries >= allocedQueriesNum) break; + if (pHeartbeat->numOfQueries >= allocedQueriesNum) { + break; + } } pHeartbeat->numOfStreams = 0; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index ecd65a7ae4..a0e3c83a2e 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1490,6 +1490,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { } tscFreeSubobj(pSql); + pSql->signature = NULL; pSql->fp = NULL; tfree(pSql->sqlstr); From cf73e1f8afff74da0be97b30082068e6d5a41dfa Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Aug 2021 18:47:11 +0800 Subject: [PATCH 09/22] [td-225]add one retry case for subquery of super table. --- src/client/src/tscSubquery.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 3816e6d619..eb7907f556 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -3032,7 +3032,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) { if (taos_errno(pSql) != TSDB_CODE_SUCCESS) { assert(code == taos_errno(pSql)); - if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID)) { + if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID && code != TSDB_CODE_VND_INVALID_VGROUP_ID)) { tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry); int32_t sent = 0; From fbbc3f10f9e2aa6b90e31d5c10ce2571b9abdb1a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 5 Aug 2021 23:26:01 +0800 Subject: [PATCH 10/22] Update tname.c --- src/common/src/tname.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/src/tname.c b/src/common/src/tname.c index aeade05df2..40433d1f00 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -74,7 +74,7 @@ SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFil size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE; pFilter[j].pz = (int64_t) calloc(1, len); - memcpy((char*)pFilter[j].pz, (char*)src[j].pz, pFilter[j].len); + memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t) pFilter[j].len); } } From 39a96b3ebd6fb2179afc376461cb94bf39e0d235 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 6 Aug 2021 10:11:44 +0800 Subject: [PATCH 11/22] [td-225] fix compiler error. --- src/query/src/qExecutor.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e75b2ae12d..e9a1089268 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5290,7 +5290,7 @@ SOperatorInfo *createOrderOperatorInfo(SExprInfo* pExpr, int32_t numOfOutput) { SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); for(int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData col = {0}; + SColumnInfoData col = {{0}}; col.info.bytes = pExpr->base.resBytes; col.info.colId = pExpr->base.resColId; col.info.type = pExpr->base.resType; From aa60eb572205b002ff4367c823a5bd1152f32cbb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 6 Aug 2021 14:34:23 +0800 Subject: [PATCH 12/22] [td-225]fix bug found by the regression test. --- src/client/src/tscAsync.c | 3 +-- src/client/src/tscServer.c | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index f39169c193..1c198fb8c6 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -346,7 +346,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (pSql->pStream == NULL) { SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { + if (pQueryInfo != NULL && TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) { tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self); code = tsParseSql(pSql, false); @@ -376,7 +376,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { } else { if (pSql->retryReason != TSDB_CODE_SUCCESS) { tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", pSql->self); - tscResetSqlCmd(pCmd, false); pSql->retryReason = TSDB_CODE_SUCCESS; } else { tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b2ed942b5e..b278235269 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2241,6 +2241,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) { pMsg = buf; } + if (pParentCmd->pTableMetaMap == NULL) { + pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + } + for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) { STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg; int32_t code = tableMetaMsgConvert(pMetaMsg); From b251e7b35953d913471e21fc66b39dfbfacd2c69 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 6 Aug 2021 23:05:30 +0800 Subject: [PATCH 13/22] [td-225]fix bug found by regression test. --- src/client/src/tscProfile.c | 10 ++++++++-- src/inc/taosmsg.h | 2 +- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index b75eb5716f..f63e908dc0 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -18,11 +18,11 @@ #include "tsclient.h" #include "tsocket.h" #include "ttimer.h" -#include "tutil.h" #include "taosmsg.h" #include "tcq.h" #include "taos.h" +#include "tscUtil.h" void tscSaveSlowQueryFp(void *handle, void *tmrId); TAOS *tscSlowQueryConn = NULL; @@ -255,13 +255,19 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pQdesc->sqlObjId = htobe64(pSql->self); pQdesc->pid = pHeartbeat->pid; pQdesc->numOfSub = pSql->subState.numOfSub; - pQdesc->stableQuery = pSql->cmd.pQueryInfo->stableQuery; char *p = pQdesc->subSqlInfo; int32_t remainLen = sizeof(pQdesc->subSqlInfo); if (pQdesc->numOfSub == 0) { snprintf(p, remainLen, "N/A"); } else { + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); + if (pQueryInfo != NULL) { + pQdesc->stableQuery = (pQueryInfo->stableQuery)?1:0; + } else { + pQdesc->stableQuery = 0; + } + if (pSql->pSubs != NULL && pSql->subState.states != NULL) { for (int32_t i = 0; i < pQdesc->numOfSub; ++i) { SSqlObj *psub = pSql->pSubs[i]; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index c2918cfdf7..2af4e4857a 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -877,7 +877,7 @@ typedef struct { uint64_t sqlObjId; int32_t pid; char fqdn[TSDB_FQDN_LEN]; - bool stableQuery; + uint8_t stableQuery; int32_t numOfSub; char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; //include subqueries' index, Obj IDs and states(C-complete/I-imcomplete) } SQueryDesc; From b46edae0b1237beda20138d5d8a873881bf6aea9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 Aug 2021 14:03:30 +0800 Subject: [PATCH 14/22] [td-225]fix a race condition. --- src/client/src/tscProfile.c | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c index f63e908dc0..b00138b4c4 100644 --- a/src/client/src/tscProfile.c +++ b/src/client/src/tscProfile.c @@ -256,17 +256,20 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) { pQdesc->pid = pHeartbeat->pid; pQdesc->numOfSub = pSql->subState.numOfSub; + // todo race condition + pQdesc->stableQuery = 0; + char *p = pQdesc->subSqlInfo; int32_t remainLen = sizeof(pQdesc->subSqlInfo); if (pQdesc->numOfSub == 0) { snprintf(p, remainLen, "N/A"); } else { - SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); - if (pQueryInfo != NULL) { - pQdesc->stableQuery = (pQueryInfo->stableQuery)?1:0; - } else { - pQdesc->stableQuery = 0; - } +// SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); +// if (pQueryInfo != NULL) { +// pQdesc->stableQuery = (pQueryInfo->stableQuery)?1:0; +// } else { +// pQdesc->stableQuery = 0; +// } if (pSql->pSubs != NULL && pSql->subState.states != NULL) { for (int32_t i = 0; i < pQdesc->numOfSub; ++i) { From 50d6fef714559e17b7bfc10a912a215fee27ff71 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 Aug 2021 14:17:47 +0800 Subject: [PATCH 15/22] [td-225] fix the bug that failed to record the total elapsed time in the log file. --- src/query/src/qExecutor.c | 5 ++++- src/query/src/queryMain.c | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e9a1089268..3d5e19ccd3 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2874,7 +2874,7 @@ void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFi int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { *status = BLK_DATA_NO_NEEDED; - pBlock->pDataBlock = NULL; + pBlock->pDataBlock = NULL; pBlock->pBlockStatis = NULL; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -2884,6 +2884,9 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa SQInfo* pQInfo = pRuntimeEnv->qinfo; SQueryCostInfo* pCost = &pQInfo->summary; + pCost->totalBlocks += 1; + pCost->totalRows += pBlock->info.rows; + if (pRuntimeEnv->pTsBuf != NULL) { (*status) = BLK_DATA_ALL_NEEDED; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 7d30f7c668..878698752f 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -199,7 +199,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi return code; } - bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { SQInfo *pQInfo = (SQInfo *)qinfo; assert(pQInfo && pQInfo->signature == pQInfo); @@ -240,7 +239,11 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { bool newgroup = false; publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC); + + int64_t st = taosGetTimestampUs(); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup); + pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st); + publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC); pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv); From 2855bd73902edddf908d832d7e531e772c32d89e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 Aug 2021 14:29:15 +0800 Subject: [PATCH 16/22] [td-225]update the performance log info. --- src/query/src/qExecutor.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 3d5e19ccd3..9c8219c559 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3310,12 +3310,11 @@ void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBl } } - // enough results in data buffer, return - if (pBlock->info.rows >= threshold) { - break; - } + // enough results in data buffer, return + if (pBlock->info.rows >= threshold) { + break; } - + } } static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) { @@ -5694,11 +5693,15 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { + int64_t st = taosGetTimestampUs(); copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } + SQInfo* pQInfo = pRuntimeEnv->qinfo; + pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); + return pIntervalInfo->pRes; } @@ -5731,11 +5734,15 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { doCloseAllTimeWindow(pRuntimeEnv); setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); + int64_t st = taosGetTimestampUs(); copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset); if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } + SQInfo* pQInfo = pRuntimeEnv->qinfo; + pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st); + return pIntervalInfo->pRes; } From 94b26bfea9b195bf6a8d973b7f393298ce1467cd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 Aug 2021 19:25:51 +0800 Subject: [PATCH 17/22] [td-225] --- src/client/inc/tscUtil.h | 6 ++-- src/client/inc/tsclient.h | 2 +- src/client/src/tscParseInsert.c | 4 +-- src/client/src/tscPrepare.c | 2 +- src/client/src/tscSQLParser.c | 2 +- src/client/src/tscServer.c | 10 ++---- src/client/src/tscStream.c | 62 ++++++++++++++++----------------- src/client/src/tscSubquery.c | 15 +++----- src/client/src/tscUtil.c | 51 ++++++++------------------- 9 files changed, 61 insertions(+), 93 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index b3674a7bf5..401da65908 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -220,7 +220,7 @@ void tscExprDestroy(SArray* pExprInfo); int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num); -void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta); +void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta, uint64_t id); SColumn* tscColumnClone(const SColumn* src); void tscColumnCopy(SColumn* pDest, const SColumn* pSrc); @@ -318,7 +318,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex); bool hasMoreVnodesToTry(SSqlObj *pSql); bool hasMoreClauseToTry(SSqlObj* pSql); -void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta); +void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeCachedMeta, uint64_t id); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); @@ -356,7 +356,7 @@ char* strdup_throw(const char* str); bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src); SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg); -void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id); +void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id); #ifdef __cplusplus } diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index ac5adcbbb4..91619f2d39 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -368,7 +368,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent); void destroyTableNameList(SInsertStatementParam* pInsertParam); -void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta); +void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta, uint64_t id); /** * free query result of the sql object diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 73e4f898c8..6b82a1ef17 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1922,7 +1922,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION)) { tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret)); - tscResetSqlCmd(pCmd, true); + tscResetSqlCmd(pCmd, true, pSql->self); pSql->parseRetry++; if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) { @@ -1939,7 +1939,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) { if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && sqlInfo.type == TSDB_SQL_SELECT) { tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret)); - tscResetSqlCmd(pCmd, true); + tscResetSqlCmd(pCmd, true, pSql->self); pSql->parseRetry++; ret = tscValidateSqlInfo(pSql, &sqlInfo); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index 2c2a299549..9743c2837d 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -1694,7 +1694,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) { SHashObj* hashList = pCmd->insertParam.pTableBlockHashList; pCmd->insertParam.pTableBlockHashList = NULL; - tscResetSqlCmd(pCmd, false); + tscResetSqlCmd(pCmd, false, pSql->self); pCmd->insertParam.pTableBlockHashList = hashList; } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 399f2f17d1..8f8ed6ae5e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -8483,7 +8483,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf } if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) { - clearAllTableMetaInfo(pQueryInfo, false); + clearAllTableMetaInfo(pQueryInfo, false, pSql->self); pQueryInfo->numOfTables = 0; // parse the subquery in the first place diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b278235269..69c3357230 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -2581,7 +2581,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) { int tscProcessDropTableRsp(SSqlObj *pSql) { STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self); + tscRemoveCachedTableMeta(pTableMetaInfo, pSql->self); tfree(pTableMetaInfo->pTableMeta); return 0; } @@ -2967,13 +2967,9 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { // remove stored tableMeta info in hash table - tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self); - tscResetSqlCmd(pCmd, true); + tscResetSqlCmd(pCmd, true, pSql->self); -// pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); -// pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - - SArray* pNameList = taosArrayInit(1, POINTER_BYTES); + SArray* pNameList = taosArrayInit(1, POINTER_BYTES); SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); char* n = strdup(name); diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 2c4bc5f764..9f2b79e891 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -113,7 +113,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) { pQueryInfo->command = TSDB_SQL_SELECT; - pSql->fp = tscProcessStreamQueryCallback; + pSql->fp = tscProcessStreamQueryCallback; pSql->fetchFp = tscProcessStreamQueryCallback; executeQuery(pSql, pQueryInfo); tscIncStreamExecutionCount(pStream); @@ -142,6 +142,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { if(pSql == NULL) { return ; } + SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd); tscDebug("0x%"PRIx64" add into timer", pSql->self); @@ -186,14 +187,16 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { } // launch stream computing in a new thread - SSchedMsg schedMsg = { 0 }; - schedMsg.fp = tscProcessStreamLaunchQuery; + SSchedMsg schedMsg = {0}; + schedMsg.fp = tscProcessStreamLaunchQuery; schedMsg.ahandle = pStream; schedMsg.thandle = (void *)1; - schedMsg.msg = NULL; + schedMsg.msg = NULL; taosScheduleTask(tscQhandle, &schedMsg); } +static void cbParseSql(void* param, TAOS_RES* res, int code); + static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) { SSqlStream *pStream = (SSqlStream *)param; if (tres == NULL || numOfRows < 0) { @@ -201,24 +204,26 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self, pStream, numOfRows, retryDelay); - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0); + SSqlObj* pSql = pStream->pSql; - char name[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, name); + tscFreeSqlResult(pSql); + tscFreeSubobj(pSql); + tfree(pSql->pSubs); + pSql->subState.numOfSub = 0; - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + int32_t code = tsParseSql(pSql, true); + if (code == TSDB_CODE_SUCCESS) { + cbParseSql(pStream, pSql, code); + } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self); + } else { + tscError("0x%"PRIx64" open stream failed, code:%s", pSql->self, tstrerror(code)); + taosReleaseRef(tscObjRef, pSql->self); + free(pStream); + } - tfree(pTableMetaInfo->pTableMeta); - - tscFreeSqlResult(pStream->pSql); - tscFreeSubobj(pStream->pSql); - tfree(pStream->pSql->pSubs); - pStream->pSql->subState.numOfSub = 0; - - pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); - - tscSetRetryTimer(pStream, pStream->pSql, retryDelay); - return; +// tscSetRetryTimer(pStream, pStream->pSql, retryDelay); +// return; } taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param); @@ -555,7 +560,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { if (code != TSDB_CODE_SUCCESS) { pSql->res.code = code; tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code)); - pStream->fp(pStream->param, NULL, NULL); return; } @@ -582,9 +586,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { // set stime with ltime if ltime > stime const char* dstTable = pStream->dstTable? pStream->dstTable: ""; - tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime); + tscDebug("0x%"PRIx64" CQ table %s ltime is %"PRId64, pSql->self, dstTable, pStream->ltime); + if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) { - tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime); + tscWarn("0x%"PRIx64" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime > 0", pSql->self, dstTable, pStream->stime, pStream->ltime); pStream->stime = pStream->ltime; } @@ -592,7 +597,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pCmd->command = TSDB_SQL_SELECT; tscAddIntoStreamList(pStream); - taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer); tscDebug("0x%"PRIx64" stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql->self, @@ -659,10 +663,9 @@ void cbParseSql(void* param, TAOS_RES* res, int code) { char sql[128] = ""; sprintf(sql, "select last_row(*) from %s;", pStream->dstTable); taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param); - return ; } -TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), +TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW), int64_t stime, void *param, void (*callback)(void *), void* cqhandle) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) return NULL; @@ -697,14 +700,12 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c pStream->param = param; pStream->pSql = pSql; pStream->cqhandle = cqhandle; - pSql->pStream = pStream; - pSql->param = pStream; - pSql->maxRetry = TSDB_MAX_REPLICA; tscSetStreamDestTable(pStream, dstTable); pSql->pStream = pStream; pSql->param = pStream; pSql->maxRetry = TSDB_MAX_REPLICA; + pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); if (pSql->sqlstr == NULL) { tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self); @@ -725,14 +726,13 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c pSql->fp = cbParseSql; pSql->fetchFp = cbParseSql; - registerSqlObj(pSql); int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_SUCCESS) { cbParseSql(pStream, pSql, code); } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr); + tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self); } else { tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code)); taosReleaseRef(tscObjRef, pSql->self); @@ -743,7 +743,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c return pStream; } -TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), +TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW), int64_t stime, void *param, void (*callback)(void *)) { return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index eb7907f556..e1cb79b357 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2718,17 +2718,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO int32_t code = pParentSql->res.code; if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) { // remove the cached tableMeta and vgroup id list, and then parse the sql again - SSqlCmd* pParentCmd = &pParentSql->cmd; - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); - tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); - tscResetSqlCmd(pParentCmd, true); + tscResetSqlCmd( &pParentSql->cmd, true, pParentSql->self); -// pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); -// pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - - pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->retry++; - + pParentSql->res.code = TSDB_CODE_SUCCESS; tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, tstrerror(code), pParentSql->retry); @@ -3143,7 +3136,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) numOfFailed += 1; // clean up tableMeta in cache - tscFreeQueryInfo(&pSql->cmd, false); + tscFreeQueryInfo(&pSql->cmd, false, pSql->self); SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd); STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0); tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL); @@ -3165,7 +3158,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) } pParentObj->res.code = TSDB_CODE_SUCCESS; - tscResetSqlCmd(&pParentObj->cmd, false); + tscResetSqlCmd(&pParentObj->cmd, false, pParentObj->self); // in case of insert, redo parsing the sql string and build new submit data block for two reasons: // 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly. diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a0e3c83a2e..a6712778ba 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1300,12 +1300,13 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) { pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free } -void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { +void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeCachedMeta, uint64_t id) { if (pCmd == NULL) { return; } SQueryInfo* pQueryInfo = pCmd->pQueryInfo; + while(pQueryInfo != NULL) { SQueryInfo* p = pQueryInfo->sibling; @@ -1314,7 +1315,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { SQueryInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i); freeQueryInfoImpl(pUpQueryInfo); - clearAllTableMetaInfo(pUpQueryInfo, removeMeta); + clearAllTableMetaInfo(pUpQueryInfo, removeCachedMeta, id); if (pUpQueryInfo->pQInfo != NULL) { qDestroyQueryInfo(pUpQueryInfo->pQInfo); pUpQueryInfo->pQInfo = NULL; @@ -1330,7 +1331,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { } freeQueryInfoImpl(pQueryInfo); - clearAllTableMetaInfo(pQueryInfo, removeMeta); + clearAllTableMetaInfo(pQueryInfo, removeCachedMeta, id); if (pQueryInfo->pQInfo != NULL) { qDestroyQueryInfo(pQueryInfo->pQInfo); @@ -1359,7 +1360,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) { tfree(pInsertParam->pTableNameList); } -void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { +void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) { pCmd->command = 0; pCmd->numOfCols = 0; pCmd->count = 0; @@ -1373,20 +1374,8 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) { tfree(pCmd->insertParam.tagData.data); pCmd->insertParam.tagData.dataLen = 0; - tscFreeQueryInfo(pCmd, clearCachedMeta); + tscFreeQueryInfo(pCmd, clearCachedMeta, id); pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); - -// if (pCmd->pTableMetaMap != NULL) { -// STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL); -// while (p) { -// taosArrayDestroy(p->vgroupIdList); -// tfree(p->pTableMeta); -// p = taosHashIterate(pCmd->pTableMetaMap, p); -// } -// -// taosHashCleanup(pCmd->pTableMetaMap); -// pCmd->pTableMetaMap = NULL; -// } } void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) { @@ -1501,7 +1490,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { pSql->self = 0; tscFreeSqlResult(pSql); - tscResetSqlCmd(pCmd, false); + tscResetSqlCmd(pCmd, false, pSql->self); memset(pCmd->payload, 0, (size_t)pCmd->allocSize); tfree(pCmd->payload); @@ -3369,20 +3358,15 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables) { return pa; } -void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) { +void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta, uint64_t id) { for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); - if (removeMeta) { - char name[TSDB_TABLE_FNAME_LEN] = {0}; - tNameExtractFullName(&pTableMetaInfo->name, name); - taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN)); + tscRemoveCachedTableMeta(pTableMetaInfo, id); } tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables); tscClearTableMetaInfo(pTableMetaInfo); - - free(pTableMetaInfo); } tfree(pQueryInfo->pTableMetaInfo); @@ -3449,10 +3433,12 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo) { } tfree(pTableMetaInfo->pTableMeta); - pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList); + tscColumnListDestroy(pTableMetaInfo->tagColList); pTableMetaInfo->tagColList = NULL; + + free(pTableMetaInfo); } void tscResetForNextRetrieve(SSqlRes* pRes) { @@ -3845,14 +3831,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { // todo refactor tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self); - - SSqlCmd* pParentCmd = &pParentSql->cmd; - STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0); - tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self); - tscResetSqlCmd(pParentCmd, true); - -// pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap); -// pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + tscResetSqlCmd(&pParentSql->cmd, true, pParentSql->self); pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->retry++; @@ -3871,7 +3850,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { return; } - SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); executeQuery(pParentSql, pQueryInfo); return; } @@ -4995,7 +4974,7 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) { return info; } -void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) { +void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id) { char fname[TSDB_TABLE_FNAME_LEN] = {0}; tNameExtractFullName(&pTableMetaInfo->name, fname); From 842098942b22cbabbccd540fb4e6693bd244ac85 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 11 Aug 2021 17:48:56 +0800 Subject: [PATCH 18/22] [td-225]update the log --- src/client/src/tscServer.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 11c458e9bc..87289f7728 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -525,6 +525,7 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { } void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { + int64_t st = taosGetTimestampUs(); SSchedMsg schedMsg = {0}; schedMsg.fp = doProcessMsgFromServer; @@ -543,6 +544,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { schedMsg.msg = NULL; taosScheduleTask(tscQhandle, &schedMsg); + + int64_t et = taosGetTimestampUs(); + if (et - st > 100) { + tscDebug("add message to task queue, elapsed time:%"PRId64, et - st); + } } int doBuildAndSendMsg(SSqlObj *pSql) { From d82e04be8c273c7af37d32683c20d0545e69ee3e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 12 Aug 2021 13:50:29 +0800 Subject: [PATCH 19/22] [td-5881]: Sort the result according to any single column in the outer query result is allowed. --- src/client/inc/tscUtil.h | 11 ++--- src/client/src/tscSQLParser.c | 85 ++++++++++++++++++++--------------- src/client/src/tscServer.c | 6 +-- src/client/src/tscUtil.c | 2 - src/query/inc/qExecutor.h | 2 +- src/query/src/qExecutor.c | 64 +++++++++++++++----------- src/query/src/qPlan.c | 15 ++++--- 7 files changed, 106 insertions(+), 79 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index b74fe29a61..17e05b1111 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -29,15 +29,16 @@ extern "C" { #include "tsched.h" #include "tsclient.h" -#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \ +#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE)) + #define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) - -#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\ - (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) -#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ +#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \ + (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo))) + +#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) #pragma pack(push,1) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index d6df43af68..4e51d4a294 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5747,14 +5747,19 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) { pQueryInfo->order.order = TSDB_ORDER_ASC; if (isTopBottomQuery(pQueryInfo)) { pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; - } else { // in case of select tbname from super_table, the defualt order column can not be the primary ts column - pQueryInfo->order.orderColId = INT32_MIN; + } else { // in case of select tbname from super_table, the default order column can not be the primary ts column + pQueryInfo->order.orderColId = INT32_MIN; // todo define a macro } /* for super table query, set default ascending order for group output */ if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC; } + + if (pQueryInfo->distinct) { + pQueryInfo->order.order = TSDB_ORDER_ASC; + pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; + } } int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) { @@ -5766,17 +5771,12 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq setDefaultOrderInfo(pQueryInfo); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - - if (pQueryInfo->distinct == true) { - pQueryInfo->order.order = TSDB_ORDER_ASC; - pQueryInfo->order.orderColId = 0; - return TSDB_CODE_SUCCESS; - } - if (pSqlNode->pSortOrder == NULL) { + if (pQueryInfo->distinct || pSqlNode->pSortOrder == NULL) { return TSDB_CODE_SUCCESS; } - SArray* pSortorder = pSqlNode->pSortOrder; + char* pMsgBuf = tscGetErrorMsgPayload(pCmd); + SArray* pSortOrder = pSqlNode->pSortOrder; /* * for table query, there is only one or none order option is allowed, which is the @@ -5784,19 +5784,19 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq * * for super table query, the order option must be less than 3. */ - size_t size = taosArrayGetSize(pSortorder); - if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) { + size_t size = taosArrayGetSize(pSortOrder); + if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo)) { if (size > 1) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg0); + return invalidOperationMsg(pMsgBuf, msg0); } } else { if (size > 2) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); + return invalidOperationMsg(pMsgBuf, msg3); } } // handle the first part of order by - tVariant* pVar = taosArrayGet(pSortorder, 0); + tVariant* pVar = taosArrayGet(pSortOrder, 0); // e.g., order by 1 asc, return directly with out further check. if (pVar->nType >= TSDB_DATA_TYPE_TINYINT && pVar->nType <= TSDB_DATA_TYPE_BIGINT) { @@ -5808,7 +5808,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + return invalidOperationMsg(pMsgBuf, msg1); } bool orderByTags = false; @@ -5820,7 +5820,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq // it is a tag column if (pQueryInfo->groupbyExpr.columnInfo == NULL) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0); if (relTagIndex == pColIndex->colIndex) { @@ -5841,13 +5841,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq orderByGroupbyCol = true; } } + if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomQuery(pQueryInfo)) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); + return invalidOperationMsg(pMsgBuf, msg3); } else { // order by top/bottom result value column is not supported in case of interval query. assert(!(orderByTags && orderByTS && orderByGroupbyCol)); } - size_t s = taosArrayGetSize(pSortorder); + size_t s = taosArrayGetSize(pSortOrder); if (s == 1) { if (orderByTags) { pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); @@ -5866,7 +5867,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq pExpr = tscExprGet(pQueryInfo, 1); if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); @@ -5884,9 +5885,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq addPrimaryTsColIntoResult(pQueryInfo, pCmd); } } - } - - if (s == 2) { + } else { tVariantListItem *pItem = taosArrayGet(pSqlNode->pSortOrder, 0); if (orderByTags) { pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta); @@ -5903,22 +5902,23 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq tVariant* pVar2 = &pItem->pVar; SStrToken cname = {pVar2->nLen, pVar2->nType, pVar2->pz}; if (getColumnIndexByName(&cname, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + return invalidOperationMsg(pMsgBuf, msg1); } if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } else { - tVariantListItem* p1 = taosArrayGet(pSortorder, 1); + tVariantListItem* p1 = taosArrayGet(pSortOrder, 1); pQueryInfo->order.order = p1->sortOrder; pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX; } } - } else { // meter query - if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); + } else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table + if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) { + return invalidOperationMsg(pMsgBuf, msg1); } + if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomQuery(pQueryInfo)) { bool validOrder = false; SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo; @@ -5926,13 +5926,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq SColIndex* pColIndex = taosArrayGet(columnInfo, 0); validOrder = (pColIndex->colIndex == index.columnIndex); } + if (!validOrder) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } + tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId; pQueryInfo->groupbyExpr.orderType = p1->sortOrder; - } if (isTopBottomQuery(pQueryInfo)) { @@ -5948,13 +5949,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq pExpr = tscExprGet(pQueryInfo, 1); if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } + validOrder = true; } if (!validOrder) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); + return invalidOperationMsg(pMsgBuf, msg2); } tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); @@ -5964,6 +5966,18 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq return TSDB_CODE_SUCCESS; } + tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); + pQueryInfo->order.order = pItem->sortOrder; + pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; + } else { + // handle the temp table order by clause. You can order by any single column in case of the temp table, created by + // inner subquery. + assert(UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo) && taosArrayGetSize(pSqlNode->pSortOrder) == 1); + + if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) { + return invalidOperationMsg(pMsgBuf, msg1); + } + tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0); pQueryInfo->order.order = pItem->sortOrder; pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId; @@ -8736,8 +8750,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf const char* msg8 = "condition missing for join query"; const char* msg9 = "not support 3 level select"; - int32_t code = TSDB_CODE_SUCCESS; - + int32_t code = TSDB_CODE_SUCCESS; SSqlCmd* pCmd = &pSql->cmd; STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); @@ -9028,8 +9041,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo); pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo); pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo); - //pQueryInfo->globalMerge = tscIsTwoStageSTableQuery(pQueryInfo, 0); - pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo); pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 87289f7728..dd8e95d664 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -903,16 +903,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { } SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); + STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; SQueryAttr query = {{0}}; tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql); + query.vgId = pTableMeta->vgId; SArray* tableScanOperator = createTableScanPlan(&query); SArray* queryOperator = createExecOperatorPlan(&query); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; - SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload; tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version)); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 4857265d17..b134b6c0c9 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1228,11 +1228,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue if (pCond && pCond->cond) { createQueryFilter(pCond->cond, pCond->len, &pFilters); } - //createInputDataFlterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo); } SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilters); - pOutput->precision = pSqlObjList[0]->res.precision; SSchema* schema = NULL; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a65ef524ae..a2947678d7 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -421,7 +421,6 @@ typedef struct STableScanInfo { int32_t *rowCellInfoOffset; SExprInfo *pExpr; SSDataBlock block; - bool loadExternalRows; // load external rows (prev & next rows) int32_t numOfOutput; int64_t elapsedTime; @@ -579,6 +578,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); +SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 4d697b06a5..e8c4d57bfa 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -223,6 +223,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput); static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput); +static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); @@ -2036,6 +2037,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } break; } + case OP_StateWindow: { pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType; @@ -2052,24 +2054,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_Filter: { // todo refactor int32_t numOfFilterCols = 0; -// if (pQueryAttr->numOfFilterCols > 0) { -// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, -// pQueryAttr->numOfOutput, pQueryAttr->tableCols, pQueryAttr->numOfFilterCols); -// } else { - if (pQueryAttr->stableQuery) { - SColumnInfo* pColInfo = - extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols); - pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); - freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); - } else { - SColumnInfo* pColInfo = - extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); - pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, - pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); - freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); - } -// } + if (pQueryAttr->stableQuery) { + SColumnInfo* pColInfo = + extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols); + pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, + pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols); + freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3); + } else { + SColumnInfo* pColInfo = + extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols); + pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, + pQueryAttr->numOfOutput, pColInfo, numOfFilterCols); + freeColumnInfo(pColInfo, pQueryAttr->numOfOutput); + } + break; } @@ -2081,11 +2079,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf case OP_MultiwayMergeSort: { bool groupMix = true; - if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { + if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { groupMix = false; } + pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, - 4096, merger, groupMix); // TODO hack it + 4096, merger, groupMix); // TODO hack it break; } @@ -2106,6 +2105,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf break; } + case OP_Order: { + pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + break; + } + default: { assert(0); } @@ -5213,8 +5217,8 @@ static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { int32_t numOfCols = pSrc->info.numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock->pData, i); - SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock->pData, i); + SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); + SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes; char* tmp = realloc(pCol2->pData, newSize); @@ -5227,6 +5231,7 @@ static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { } } + pDest->info.rows += pSrc->info.rows; return TSDB_CODE_SUCCESS; } @@ -5277,7 +5282,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } -SOperatorInfo *createOrderOperatorInfo(SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); { @@ -5292,17 +5297,20 @@ SOperatorInfo *createOrderOperatorInfo(SExprInfo* pExpr, int32_t numOfOutput) { } pDataBlock->info.numOfCols = numOfOutput; + pInfo->pDataBlock = pDataBlock; } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); - pOperator->name = "Order"; + pOperator->name = "InMemoryOrder"; pOperator->operatorType = OP_Order; pOperator->blockingOptr = true; pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->exec = doSort; - pOperator->cleanup = NULL; + pOperator->cleanup = destroyOrderOperatorInfo; + pOperator->pRuntimeEnv = pRuntimeEnv; + appendUpstream(pOperator, upstream); return pOperator; } @@ -6197,6 +6205,11 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { pInfo->pRes = destroyOutputBuf(pInfo->pRes); } +static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { + SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; + pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock); +} + static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*) param; doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols); @@ -6494,7 +6507,6 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn pOperator->numOfOutput = numOfOutput; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->exec = doFill; pOperator->cleanup = destroySFillOperatorInfo; diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index e01f41276f..1a4fe5b5c0 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -557,10 +557,9 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { int32_t op = 0; if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query - if (onlyQueryTags(pQueryAttr)) { - op = OP_TagScan; - taosArrayPush(plan, &op); - } + op = OP_TagScan; + taosArrayPush(plan, &op); + if (pQueryAttr->distinct) { op = OP_Distinct; taosArrayPush(plan, &op); @@ -643,8 +642,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { taosArrayPush(plan, &op); } } + + // outer query order by support + int32_t orderColId = pQueryAttr->order.orderColId; + if (pQueryAttr->vgId == 0 && orderColId != PRIMARYKEY_TIMESTAMP_COL_INDEX && orderColId != INT32_MIN) { + op = OP_Order; + taosArrayPush(plan, &op); + } } - if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) { op = OP_Limit; From 255e3b0d646e21e8590d3d6f4920c6c6a9fec5c9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 12 Aug 2021 16:03:49 +0800 Subject: [PATCH 20/22] [td-5881]: Sort the result according to any single column in the outer query result is allowed. --- src/query/inc/qExecutor.h | 2 +- src/query/src/qExecutor.c | 15 ++++++++++----- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index a2947678d7..eaa4da1ca8 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -578,7 +578,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index e8c4d57bfa..477dfaa982 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2106,7 +2106,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_Order: { - pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); + pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); break; } @@ -5232,6 +5232,7 @@ static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) { } pDest->info.rows += pSrc->info.rows; + return TSDB_CODE_SUCCESS; } @@ -5282,7 +5283,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } -SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { +SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); { @@ -5290,10 +5291,14 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); for(int32_t i = 0; i < numOfOutput; ++i) { SColumnInfoData col = {{0}}; - col.info.bytes = pExpr->base.resBytes; - col.info.colId = pExpr->base.resColId; - col.info.type = pExpr->base.resType; + col.info.colId = pExpr[i].base.colInfo.colId; + col.info.bytes = pExpr[i].base.colBytes; + col.info.type = pExpr[i].base.colType; taosArrayPush(pDataBlock->pDataBlock, &col); + + if (col.info.colId == pOrderVal->orderColId) { + pInfo->colIndex = i; + } } pDataBlock->info.numOfCols = numOfOutput; From 789918b993c6c06b1ec95fb7701b034013d1b237 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 12 Aug 2021 16:21:48 +0800 Subject: [PATCH 21/22] [td-5881]fix bug in TD-5881 --- src/query/src/qExecutor.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 477dfaa982..abdd296c09 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -5302,6 +5302,7 @@ SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI } pDataBlock->info.numOfCols = numOfOutput; + pInfo->order = pOrderVal->order; pInfo->pDataBlock = pDataBlock; } From 90e0005c57f88d2f750d3594eae319eec3aa0b96 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 13 Aug 2021 11:16:57 +0800 Subject: [PATCH 22/22] [td-255] sleep a little bit longer before checking the status of dnodes in the clusters. --- tests/script/unique/dnode/alternativeRole.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/unique/dnode/alternativeRole.sim b/tests/script/unique/dnode/alternativeRole.sim index 955b757f06..14a6e92f06 100644 --- a/tests/script/unique/dnode/alternativeRole.sim +++ b/tests/script/unique/dnode/alternativeRole.sim @@ -30,7 +30,7 @@ sql create dnode $hostname2 system sh/exec.sh -n dnode2 -s start sql create dnode $hostname3 system sh/exec.sh -n dnode3 -s start -sleep 3000 +sleep 5000 sql show dnodes print dnode1 $data5_1