diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 733270dbed..eace9d7dd6 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -340,12 +340,8 @@ typedef struct tDataTypeDescriptor { } tDataTypeDescriptor; extern tDataTypeDescriptor tDataTypes[TSDB_DATA_TYPE_MAX]; - bool isValidDataType(int32_t type); -void setVardataNull(void *val, int32_t type); -//void setNull(void *val, int32_t type, int32_t bytes); -//void setNullN(void *val, int32_t type, int32_t bytes, int32_t numOfElems); void assignVal(char *val, const char *src, int32_t len, int32_t type); void operateVal(void *dst, void *s1, void *s2, int32_t optr, int32_t type); void *getDataMin(int32_t type); diff --git a/include/util/tdef.h b/include/util/tdef.h index bb7f908ad4..936fbdf0d5 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -237,7 +237,6 @@ typedef enum ELogicConditionType { #define TSDB_MAX_BYTES_PER_ROW 49151 #define TSDB_MAX_TAGS_LEN 16384 #define TSDB_MAX_TAGS 128 -#define TSDB_MAX_TAG_CONDITIONS 1024 #define TSDB_MAX_COL_TAG_NUM (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS) #define TSDB_MAX_JSON_TAG_LEN 16384 @@ -274,8 +273,6 @@ typedef enum ELogicConditionType { #define TSDB_PAYLOAD_SIZE TSDB_DEFAULT_PKT_SIZE #define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value -#define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth -#define TSDB_CQ_SQL_SIZE 1024 #define TSDB_MIN_VNODES 16 #define TSDB_MAX_VNODES 512 @@ -285,10 +282,7 @@ typedef enum ELogicConditionType { #define TSDB_MAX_REPLICA 5 -#define TSDB_TBNAME_COLUMN_INDEX (-1) -#define TSDB_UD_COLUMN_INDEX (-1000) -#define TSDB_RES_COL_ID (-5000) - +#define TSDB_TBNAME_COLUMN_INDEX (-1) #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta #define TSDB_MIN_VNODES_PER_DB 1 @@ -398,9 +392,6 @@ typedef enum ELogicConditionType { #define TSDB_MAX_EXPLAIN_RATIO 1 #define TSDB_DEFAULT_EXPLAIN_RATIO 0.001 -#define TSDB_MAX_JOIN_TABLE_NUM 10 -#define TSDB_MAX_UNION_CLAUSE 5 - #define TSDB_DEFAULT_EXPLAIN_VERBOSE false #define TSDB_EXPLAIN_RESULT_ROW_SIZE (16 * 1024) @@ -419,7 +410,6 @@ typedef enum ELogicConditionType { #endif #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type -#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode #define TSDB_META_COMPACT_RATIO 0 // disable tsdb meta compact by default diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index bb82863e73..e0cc5bd311 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -244,8 +244,7 @@ int32_t parseTimezone(char* str, int64_t* tzOffset) { * 2013-04-12T15:52:01.123+0800 */ int32_t parseTimeWithTz(const char* timestr, int64_t* time, int32_t timePrec, char delim) { - int64_t factor = - (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); + int64_t factor = TSDB_TICK_PER_SECOND(timePrec); int64_t tzOffset = 0; struct tm tm = {0}; @@ -339,8 +338,8 @@ static FORCE_INLINE bool validateTm(struct tm* pTm) { return true; } -int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) { - *time = 0; +int32_t parseLocaltime(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) { + *utime = 0; struct tm tm = {0}; char* str; @@ -378,15 +377,12 @@ int32_t parseLocaltime(char* timestr, int32_t len, int64_t* time, int32_t timePr } } - int64_t factor = - (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); - *time = factor * seconds + fraction; - + *utime = TSDB_TICK_PER_SECOND(timePrec) * seconds + fraction; return 0; } -int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t timePrec, char delim) { - *time = 0; +int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* utime, int32_t timePrec, char delim) { + *utime = 0; struct tm tm = {0}; tm.tm_isdst = -1; @@ -411,7 +407,6 @@ int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t tim int64_t seconds = taosMktime(&tm); int64_t fraction = 0; - if (*str == '.') { /* parse the second fraction part */ if ((fraction = parseFraction(str + 1, &str, timePrec)) < 0) { @@ -419,9 +414,7 @@ int32_t parseLocaltimeDst(char* timestr, int32_t len, int64_t* time, int32_t tim } } - int64_t factor = - (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); - *time = factor * seconds + fraction; + *utime = TSDB_TICK_PER_SECOND(timePrec) * seconds + fraction; return 0; } @@ -437,58 +430,61 @@ char getPrecisionUnit(int32_t precision) { } } -int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision) { - assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO || +int64_t convertTimePrecision(int64_t utime, int32_t fromPrecision, int32_t toPrecision) { + ASSERT(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO || fromPrecision == TSDB_TIME_PRECISION_NANO); - assert(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO || + ASSERT(toPrecision == TSDB_TIME_PRECISION_MILLI || toPrecision == TSDB_TIME_PRECISION_MICRO || toPrecision == TSDB_TIME_PRECISION_NANO); - double tempResult = (double)time; + + double tempResult = (double)utime; + switch (fromPrecision) { case TSDB_TIME_PRECISION_MILLI: { switch (toPrecision) { case TSDB_TIME_PRECISION_MILLI: - return time; + return utime; case TSDB_TIME_PRECISION_MICRO: tempResult *= 1000; - time *= 1000; + utime *= 1000; goto end_; case TSDB_TIME_PRECISION_NANO: tempResult *= 1000000; - time *= 1000000; + utime *= 1000000; goto end_; } } // end from milli case TSDB_TIME_PRECISION_MICRO: { switch (toPrecision) { case TSDB_TIME_PRECISION_MILLI: - return time / 1000; + return utime / 1000; case TSDB_TIME_PRECISION_MICRO: - return time; + return utime; case TSDB_TIME_PRECISION_NANO: tempResult *= 1000; - time *= 1000; + utime *= 1000; goto end_; } } // end from micro case TSDB_TIME_PRECISION_NANO: { switch (toPrecision) { case TSDB_TIME_PRECISION_MILLI: - return time / 1000000; + return utime / 1000000; case TSDB_TIME_PRECISION_MICRO: - return time / 1000; + return utime / 1000; case TSDB_TIME_PRECISION_NANO: - return time; + return utime; } } // end from nano default: { assert(0); - return time; // only to pass windows compilation + return utime; // only to pass windows compilation } } // end switch fromPrecision + end_: if (tempResult >= (double)INT64_MAX) return INT64_MAX; if (tempResult <= (double)INT64_MIN) return INT64_MIN; // INT64_MIN means NULL - return time; + return utime; } // !!!!notice:there are precision problems, double lose precison if time is too large, for example: diff --git a/source/common/src/ttypes.c b/source/common/src/ttypes.c index 3dbd8a05a2..a4e7a12ce4 100644 --- a/source/common/src/ttypes.c +++ b/source/common/src/ttypes.c @@ -16,7 +16,6 @@ #define _DEFAULT_SOURCE #include "ttypes.h" #include "tcompression.h" -#include "trow.h" const int32_t TYPE_BYTES[16] = { -1, // TSDB_DATA_TYPE_NULL @@ -86,18 +85,6 @@ FORCE_INLINE void *getDataMax(int32_t type) { bool isValidDataType(int32_t type) { return type >= TSDB_DATA_TYPE_NULL && type < TSDB_DATA_TYPE_MAX; } -void setVardataNull(void *val, int32_t type) { - if (type == TSDB_DATA_TYPE_BINARY) { - varDataSetLen(val, sizeof(int8_t)); - *(uint8_t *)varDataVal(val) = TSDB_DATA_BINARY_NULL; - } else if (type == TSDB_DATA_TYPE_NCHAR) { - varDataSetLen(val, sizeof(int32_t)); - *(uint32_t *)varDataVal(val) = TSDB_DATA_NCHAR_NULL; - } else { - assert(0); - } -} - #define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b))) void assignVal(char *val, const char *src, int32_t len, int32_t type) { diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 20352bb502..ed491ca182 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -194,7 +194,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { - STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); + STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(pr->pTableList, i); code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 2f4fdfc5f8..8808f3c50c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -120,44 +120,46 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) { return &pInfo->blockData[1]; } - pInfo->currentLoadBlockIndex ^= 1; - if (pIter->pSttBlk != NULL) { // current block not loaded yet - int64_t st = taosGetTimestampUs(); - - SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex]; - - TABLEID id = {0}; - if (pIter->pSttBlk->suid != 0) { - id.suid = pIter->pSttBlk->suid; - } else { - id.uid = pIter->uid; - } - - code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols); - if (code != TSDB_CODE_SUCCESS) { - goto _exit; - } - - code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock); - if (code != TSDB_CODE_SUCCESS) { - goto _exit; - } - - double el = (taosGetTimestampUs() - st) / 1000.0; - pInfo->elapsedTime += el; - pInfo->loadBlocks += 1; - - tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64 - ", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s", - pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el, - idStr); - - pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; - tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr); - - pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; + if (pIter->pSttBlk == NULL) { + return NULL; } + // current block not loaded yet + pInfo->currentLoadBlockIndex ^= 1; + int64_t st = taosGetTimestampUs(); + + SBlockData *pBlock = &pInfo->blockData[pInfo->currentLoadBlockIndex]; + + TABLEID id = {0}; + if (pIter->pSttBlk->suid != 0) { + id.suid = pIter->pSttBlk->suid; + } else { + id.uid = pIter->uid; + } + + code = tBlockDataInit(pBlock, &id, pInfo->pSchema, pInfo->colIds, pInfo->numOfCols); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + + code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlock); + if (code != TSDB_CODE_SUCCESS) { + goto _exit; + } + + double el = (taosGetTimestampUs() - st) / 1000.0; + pInfo->elapsedTime += el; + pInfo->loadBlocks += 1; + + tsdbDebug("read last block, total load:%d, trigger by uid:%" PRIu64 + ", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s", + pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el, + idStr); + + pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk; + tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr); + + pIter->iRow = (pIter->backward) ? pInfo->blockData[pInfo->currentLoadBlockIndex].nRow : -1; return &pInfo->blockData[pInfo->currentLoadBlockIndex]; _exit: @@ -259,7 +261,8 @@ static int32_t binarySearchForStartRowIndex(uint64_t *uidList, int32_t num, uint int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, const char *idStr) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; + *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); if (*pIter == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -336,7 +339,10 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t (*pIter)->iRow = ((*pIter)->backward) ? (*pIter)->pSttBlk->nRow : -1; } + return code; + _exit: + taosMemoryFree(*pIter); return code; } @@ -473,7 +479,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) { int32_t iBlockL = pIter->iSttBlk; SBlockData *pBlockData = loadLastBlock(pIter, idStr); - if (pBlockData == NULL && terrno != TSDB_CODE_SUCCESS) { + if (pBlockData == NULL || terrno != TSDB_CODE_SUCCESS) { goto _exit; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ea4e86fb78..11cae00358 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1047,11 +1047,16 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; } -static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { +static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); if (pBlockInfo != NULL) { STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); - int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); + if (pScanInfo == NULL) { + tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, %s", pBlockInfo->uid, idStr); + return TSDB_CODE_INVALID_PARA; + } + + int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk); } @@ -1135,7 +1140,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte pBlockIter->index = asc ? 0 : (numOfBlocks - 1); cleanupBlockOrderSupporter(&sup); - doSetCurrentBlock(pBlockIter); + doSetCurrentBlock(pBlockIter, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -1175,12 +1180,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte taosMemoryFree(pTree); pBlockIter->index = asc ? 0 : (numOfBlocks - 1); - doSetCurrentBlock(pBlockIter); + doSetCurrentBlock(pBlockIter, pReader->idStr); return TSDB_CODE_SUCCESS; } -static bool blockIteratorNext(SDataBlockIter* pBlockIter) { +static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) { bool asc = ASCENDING_TRAVERSE(pBlockIter->order); int32_t step = asc ? 1 : -1; @@ -1189,7 +1194,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { } pBlockIter->index += step; - doSetCurrentBlock(pBlockIter); + doSetCurrentBlock(pBlockIter, idStr); return true; } @@ -1260,7 +1265,7 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx); } - doSetCurrentBlock(pBlockIter); + doSetCurrentBlock(pBlockIter, ""); return TSDB_CODE_SUCCESS; } @@ -2190,6 +2195,8 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI } static int32_t buildComposedDataBlock(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pResBlock = pReader->pResBlock; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); @@ -2200,6 +2207,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo = NULL; if (pBlockInfo != NULL) { pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pBlockScanInfo == NULL) { + code = TSDB_CODE_INVALID_PARA; + tsdbError("failed to locate the uid:%"PRIu64" in query table uid list, total tables:%d, %s", + pBlockInfo->uid, taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + goto _end; + } + SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); @@ -2276,7 +2290,7 @@ _end: pResBlock->info.rows, el, pReader->idStr); } - return TSDB_CODE_SUCCESS; + return code; } void setComposedBlockFlag(STsdbReader* pReader, bool composed) { pReader->status.composedDataBlock = composed; } @@ -2732,7 +2746,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { // current block are exhausted, try the next file block if (pDumpInfo->allDumped) { // try next data block in current file - bool hasNext = blockIteratorNext(&pReader->status.blockIter); + bool hasNext = blockIteratorNext(&pReader->status.blockIter, pReader->idStr); if (hasNext) { // check for the next block in the block accessed order list initBlockDumpInfo(pReader, pBlockIter); } else { @@ -3658,10 +3672,6 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFree(pLReader); } - if (pReader->innerReader[0] != 0) { - tsdbUntakeReadSnap(pReader->innerReader[0]->pTsdb, pReader->innerReader[0]->pReadSnap, pReader->idStr); - } - tsdbDebug( "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 " SMA-time:%.2f ms, fileBlocks:%" PRId64 @@ -3849,8 +3859,14 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { return pReader->pResBlock->pDataBlock; } - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter); - STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); + STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + if (pBlockScanInfo == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, + taosHashGetSize(pReader->status.pTableMap), pReader->idStr); + return NULL; + } int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { @@ -3979,7 +3995,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa int32_t bucketIndex = getBucketIndex(pTableBlockInfo->defMinRows, bucketRange, numOfRows); pTableBlockInfo->blockRowsHisto[bucketIndex]++; - hasNext = blockIteratorNext(&pStatus->blockIter); + hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr); } else { code = initForFirstBlockInFile(pReader, pBlockIter); if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1aa3d55efc..01497cc059 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -954,9 +954,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); @@ -980,9 +978,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, - SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index ebf23ba6bc..1074678efd 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -162,7 +162,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pTableList->map != NULL) { int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); - pInfo->pRes->info.groupId = *groupId; + if (groupId != NULL) { + pInfo->pRes->info.groupId = *groupId; + } } else { ASSERT(taosArrayGetSize(pTableList->pTableList) == 1); STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index c02b5d2c50..0714d0f3ac 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -234,8 +234,11 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { while (!taosQueueEmpty(pDeleter->pDataBlocks)) { SDataDeleterBuf* pBuf = NULL; taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); - taosMemoryFreeClear(pBuf->pData); - taosFreeQitem(pBuf); + + if (pBuf != NULL) { + taosMemoryFreeClear(pBuf->pData); + taosFreeQitem(pBuf); + } } taosCloseQueue(pDeleter->pDataBlocks); taosThreadMutexDestroy(&pDeleter->mutex); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c60c8ff301..cad3e3c44c 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -358,7 +358,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SDataSinkMgtCfg cfg = {.maxDataBlockNum = 10000, .maxDataBlockNumPerQuery = 5000}; code = dsDataSinkMgtInit(&cfg); if (code != TSDB_CODE_SUCCESS) { - qError("failed to dsDataSinkMgtInit, code: %s", tstrerror(code)); + qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str); goto _error; } @@ -366,7 +366,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, void* pSinkParam = NULL; code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle); if (code != TSDB_CODE_SUCCESS) { - qError("failed to createDataSinkParam, code: %s", tstrerror(code)); + qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str); goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index feced8dc60..cb7a644815 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3048,32 +3048,40 @@ void cleanupExprSupp(SExprSupp* pSupp) { taosMemoryFree(pSupp->rowEntryInfoOffset); } -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode,SExecTaskInfo* pTaskInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); + initBasicInfo(&pInfo->binfo, pResBlock); + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + if (pAggNode->pExprs != NULL) { + pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + } + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initBasicInfo(&pInfo->binfo, pResultBlock); code = initExprSupp(&pInfo->scalarExprSup, pScalarExprInfo, numOfScalarExpr); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pInfo->binfo.mergeResultBlock = mergeResult; + pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupId = UINT64_MAX; - pInfo->pCondition = pCondition; + pInfo->pCondition = pAggNode->node.pConditions; pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_AGG; pOperator->blocking = true; @@ -3332,8 +3340,6 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT return pTaskInfo; } -static SArray* extractColumnInfo(SNodeList* pNodeList); - SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, SExecTaskInfo* pTaskInfo) { @@ -3710,22 +3716,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createProjectOperatorInfo(ops[0], (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_AGG == type) { SAggPhysiNode* pAggNode = (SAggPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - - int32_t numOfScalarExpr = 0; - SExprInfo* pScalarExprInfo = NULL; - if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); - } - if (pAggNode->pGroupKeys != NULL) { - SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); - pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, - pScalarExprInfo, numOfScalarExpr, pTaskInfo); + pOptr = createGroupOperatorInfo(ops[0], pAggNode, pTaskInfo); } else { - pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, - pScalarExprInfo, numOfScalarExpr, pAggNode->mergeDataBlock, pTaskInfo); + pOptr = createAggregateOperatorInfo(ops[0], pAggNode, pTaskInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -3815,39 +3809,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOptr; } -SArray* extractColumnInfo(SNodeList* pNodeList) { - size_t numOfCols = LIST_LENGTH(pNodeList); - SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); - if (pList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - - for (int32_t i = 0; i < numOfCols; ++i) { - STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); - - if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) { - SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - - SColumn c = extractColumnFromColumnNode(pColNode); - taosArrayPush(pList, &c); - } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) { - SValueNode* pValNode = (SValueNode*)pNode->pExpr; - SColumn c = {0}; - c.slotId = pNode->slotId; - c.colId = pNode->slotId; - c.type = pValNode->node.type; - c.bytes = pValNode->node.resType.bytes; - c.scale = pValNode->node.resType.scale; - c.precision = pValNode->node.resType.precision; - - taosArrayPush(pList, &c); - } - } - - return pList; -} - static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -4071,6 +4032,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead (*pTaskInfo)->sql = sql; sql = NULL; + (*pTaskInfo)->pSubplan = pPlan; (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 4ca4ef7b3f..60b11b7326 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -30,6 +30,7 @@ static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDa static int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity); static int32_t setGroupResultOutputBuf(SOperatorInfo* pOperator, SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t bytes, uint64_t groupId, SDiskbasedBuf* pBuf, SAggSupporter* pAggSup); +static SArray* extractColumnInfo(SNodeList* pNodeList); static void freeGroupKey(void* param) { SGroupKeys* pKey = (SGroupKeys*)param; @@ -61,7 +62,7 @@ static int32_t initGroupOptrInfo(SArray** pGroupColVals, int32_t* keyLen, char** int32_t numOfGroupCols = taosArrayGetSize(pGroupColList); for (int32_t i = 0; i < numOfGroupCols; ++i) { - SColumn* pCol = taosArrayGet(pGroupColList, i); + SColumn* pCol = (SColumn*) taosArrayGet(pGroupColList, i); (*keyLen) += pCol->bytes; // actual data + null_flag SGroupKeys key = {0}; @@ -396,41 +397,48 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { return buildGroupResultDataBlock(pOperator); } -SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, - SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode *pAggNode, SExecTaskInfo* pTaskInfo) { SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pGroupCols = pGroupColList; - pInfo->pCondition = pCondition; + SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); + initBasicInfo(&pInfo->binfo, pResBlock); + + int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + if (pAggNode->pExprs != NULL) { + pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + } + + pInfo->pGroupCols = extractColumnInfo(pAggNode->pGroupKeys); + pInfo->pCondition = pAggNode->node.pConditions; int32_t code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr); if (code != TSDB_CODE_SUCCESS) { goto _error; } - code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); + code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; } initResultSizeInfo(&pOperator->resultInfo, 4096); - code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); + code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initBasicInfo(&pInfo->binfo, pResultBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); pOperator->name = "GroupbyAggOperator"; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; - // pOperator->operatorType = OP_Groupby; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; @@ -451,8 +459,6 @@ _error: } static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { - // SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SPartitionOperatorInfo* pInfo = pOperator->info; for (int32_t j = 0; j < pBlock->info.rows; ++j) { @@ -760,7 +766,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols); - pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys); if (pPartNode->pExprs != NULL) { @@ -781,14 +786,13 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition uint32_t defaultPgsz = 0; uint32_t defaultBufsz = 0; - SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); - getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz); + pInfo->binfo.pRes = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); + getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; pTaskInfo->code = terrno; qError("Create partition operator info failed since %s", terrstr(terrno)); - blockDataDestroy(pResBlock); goto _error; } @@ -797,8 +801,8 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition goto _error; } - pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf)); - pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity); + pInfo->rowCapacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf)); + pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -808,7 +812,6 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; - pInfo->binfo.pRes = pResBlock; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->info = pInfo; @@ -1102,3 +1105,37 @@ _error: taosMemoryFreeClear(pOperator); return NULL; } + + +SArray* extractColumnInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SColumn)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); + + if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) { + SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + + SColumn c = extractColumnFromColumnNode(pColNode); + taosArrayPush(pList, &c); + } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) { + SValueNode* pValNode = (SValueNode*)pNode->pExpr; + SColumn c = {0}; + c.slotId = pNode->slotId; + c.colId = pNode->slotId; + c.type = pValNode->node.type; + c.bytes = pValNode->node.resType.bytes; + c.scale = pValNode->node.resType.scale; + c.precision = pValNode->node.resType.precision; + + taosArrayPush(pList, &c); + } + } + + return pList; +} diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 53cfa6c27a..eab0307e01 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -59,15 +59,16 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { SJoinOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SJoinOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + + int32_t code = TSDB_CODE_SUCCESS; if (pOperator == NULL || pInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } + int32_t numOfCols = 0; SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); - - int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); - + SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = pResBlock; @@ -84,8 +85,18 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); + if (pInfo->pCondAfterMerge == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(pInfo->pCondAfterMerge); pLogicCond->pParameterList = nodesMakeList(); + if (pLogicCond->pParameterList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _error; + } + nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->pOnConditions)); nodesListMakeAppend(&pLogicCond->pParameterList, nodesCloneNode(pJoinNode->node.pConditions)); pLogicCond->condType = LOGIC_COND_TYPE_AND; @@ -106,7 +117,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL); - int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); + code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -114,9 +125,12 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t return pOperator; _error: - taosMemoryFree(pInfo); + if (pInfo != NULL) { + destroyMergeJoinOperator(pInfo); + } + taosMemoryFree(pOperator); - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + pTaskInfo->code = code; return NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 058b9b1b44..cdc59deee1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -494,7 +494,13 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p SScalarParam srcParam = {.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData}; SScalarParam param = {.columnData = pColInfoData}; - fpSet.process(&srcParam, 1, ¶m); + + if (fpSet.process != NULL) { + fpSet.process(&srcParam, 1, ¶m); + } else { + qError("failed to get the corresponding callback function, functionId:%d", functionId); + } + colDataDestroy(&infoData); } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 7ca3de5214..c00b5c4802 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -719,12 +719,16 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SDataBlockDescNode* pDescNode = pPhyNode->pOutputDataBlockDesc; - pInfo->binfo.pRes = createResDataBlock(pDescNode); - int32_t rowSize = pInfo->binfo.pRes->info.rowSize; - if (pInfo == NULL || pOperator == NULL || rowSize > 100 * 1024 * 1024) { + int32_t code = TSDB_CODE_SUCCESS; + if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } + pInfo->binfo.pRes = createResDataBlock(pDescNode); + int32_t rowSize = pInfo->binfo.pRes->info.rowSize; + ASSERT(rowSize < 100 * 1024 * 1024); + SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); int32_t numOfOutputCols = 0; SArray* pColMatchColInfo = @@ -737,6 +741,9 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; pInfo->pInputBlock = pInputBlock; + pInfo->bufPageSize = getProperSortPageSize(rowSize); + pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result. + pOperator->name = "MultiwayMerge"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE; pOperator->blocking = false; @@ -744,15 +751,10 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pInfo->bufPageSize = getProperSortPageSize(rowSize); - - // one additional is reserved for merged result. - pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); - pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL, destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo); - int32_t code = appendDownstream(pOperator, downStreams, numStreams); + code = appendDownstream(pOperator, downStreams, numStreams); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index b133041fdc..4204692514 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -251,6 +251,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_ if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_AVAIL_DISK; printf("tHash Init failed since %s", terrstr(terrno)); + taosMemoryFree(pHashObj); return NULL; } diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 03248f5069..7ad5c1365c 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -140,6 +140,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource int32_t* sourceId, SArray* pPageIdList) { SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); if (pSource == NULL) { + taosArrayDestroy(pPageIdList); return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -155,6 +156,7 @@ static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSource int32_t numOfRows = (getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize; ASSERT(numOfRows > 0); + return blockDataEnsureCapacity(pSource->src.pBlock, numOfRows); } @@ -224,6 +226,22 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int int32_t code = 0; + // multi-pass internal merge sort is required + if (pHandle->pBuf == NULL) { + if (!osTempSpaceAvailable()) { + code = TSDB_CODE_NO_AVAIL_DISK; + qError("Sort compare init failed since %s", terrstr(code)); + return code; + } + + code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, + "sortComparInit", tsTempDir); + dBufSetPrintInfo(pHandle->pBuf); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } + if (pHandle->type == SORT_SINGLESOURCE_SORT) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; @@ -245,22 +263,6 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int releaseBufPage(pHandle->pBuf, pPage); } } else { - // multi-pass internal merge sort is required - if (pHandle->pBuf == NULL) { - if (!osTempSpaceAvailable()) { - terrno = TSDB_CODE_NO_AVAIL_DISK; - code = terrno; - qError("Sort compare init failed since %s", terrstr(terrno)); - return code; - } - code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, - "sortComparInit", tsTempDir); - dBufSetPrintInfo(pHandle->pBuf); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; pSource->src.pBlock = pHandle->fetchfp(pSource->param); @@ -507,12 +509,14 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { int32_t code = sortComparInit(&pHandle->cmpParam, pHandle->pOrderedSource, i * numOfInputSources, end, pHandle); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pResList); return code; } code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(pResList); return code; } @@ -520,12 +524,16 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { while (1) { SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { + taosArrayDestroy(pResList); + taosArrayDestroy(pPageIdList); break; } int32_t pageId = -1; void* pPage = getNewBufPage(pHandle->pBuf, &pageId); if (pPage == NULL) { + taosArrayDestroy(pResList); + taosArrayDestroy(pPageIdList); return terrno; } diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 339065633b..1d8744d726 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1178,9 +1178,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara GET_TYPED_DATA(timeUnit, int64_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData); GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); - int64_t factor = - (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); - + int64_t factor = TSDB_TICK_PER_SECOND(timePrec); int64_t unit = timeUnit * 1000 / factor; for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { @@ -1372,9 +1370,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); } - int64_t factor = - (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); - + int64_t factor = TSDB_TICK_PER_SECOND(timePrec); int32_t numOfRows = 0; for (int32_t i = 0; i < inputNum; ++i) { if (pInput[i].numOfRows > numOfRows) { diff --git a/source/libs/scalar/src/sclvector.c b/source/libs/scalar/src/sclvector.c index f089bad04e..6480c53dbf 100644 --- a/source/libs/scalar/src/sclvector.c +++ b/source/libs/scalar/src/sclvector.c @@ -1426,57 +1426,6 @@ void vectorAssign(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, pOut->numOfQualified = pRight->numOfQualified * pOut->numOfRows; } -void vectorConcat(SScalarParam *pLeft, SScalarParam *pRight, void *out, int32_t _ord) { -#if 0 - int32_t len = pLeft->bytes + pRight->bytes; - - int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : TMAX(pLeft->numOfRows, pRight->numOfRows) - 1; - int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; - - char *output = (char *)out; - if (pLeft->numOfRows == pRight->numOfRows) { - for (; i < pRight->numOfRows && i >= 0; i += step, output += len) { - char* left = POINTER_SHIFT(pLeft->data, pLeft->bytes * i); - char* right = POINTER_SHIFT(pRight->data, pRight->bytes * i); - - if (isNull(left, pLeftCol->info.type) || isNull(right, pRight->info.type)) { - setVardataNull(output, TSDB_DATA_TYPE_BINARY); - continue; - } - - // todo define a macro - memcpy(varDataVal(output), varDataVal(left), varDataLen(left)); - memcpy(varDataVal(output) + varDataLen(left), varDataVal(right), varDataLen(right)); - varDataSetLen(output, varDataLen(left) + varDataLen(right)); - } - } else if (pLeft->numOfRows == 1) { - for (; i >= 0 && i < pRight->numOfRows; i += step, output += len) { - char *right = POINTER_SHIFT(pRight->data, pRight->bytes * i); - if (isNull(pLeft->data, pLeftCol->info.type) || isNull(right, pRight->info.type)) { - setVardataNull(output, TSDB_DATA_TYPE_BINARY); - continue; - } - - memcpy(varDataVal(output), varDataVal(pLeft->data), varDataLen(pLeft->data)); - memcpy(varDataVal(output) + varDataLen(pLeft->data), varDataVal(right), varDataLen(right)); - varDataSetLen(output, varDataLen(pLeft->data) + varDataLen(right)); - } - } else if (pRight->numOfRows == 1) { - for (; i >= 0 && i < pLeft->numOfRows; i += step, output += len) { - char* left = POINTER_SHIFT(pLeft->data, pLeft->bytes * i); - if (isNull(left, pLeftCol->info.type) || isNull(pRight->data, pRight->info.type)) { - SET_DOUBLE_NULL(output); - continue; - } - - memcpy(varDataVal(output), varDataVal(left), varDataLen(pRight->data)); - memcpy(varDataVal(output) + varDataLen(left), varDataVal(pRight->data), varDataLen(pRight->data)); - varDataSetLen(output, varDataLen(left) + varDataLen(pRight->data)); - } - } -#endif -} - static void vectorBitAndHelper(SColumnInfoData *pLeftCol, SColumnInfoData *pRightCol, SColumnInfoData *pOutputCol, int32_t numOfRows, int32_t step, int32_t i) { _getBigintValue_fn_t getVectorBigintValueFnLeft = getVectorBigintValueFn(pLeftCol->info.type);