From 31a04218787ba7bfeadbc02c469bcc8bcd1cb288 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 27 Oct 2022 15:41:29 +0800 Subject: [PATCH 01/24] enh: check completeness of WAL log entries after repairing --- include/util/taoserror.h | 1 + source/libs/wal/src/walMeta.c | 32 ++++++++++++++++++++++++++++++++ source/util/src/terror.c | 1 + 3 files changed, 34 insertions(+) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index bbddb539c6..563820f60b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -451,6 +451,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) #define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005) #define TSDB_CODE_WAL_CHKSUM_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x1006) +#define TSDB_CODE_WAL_LOG_INCOMPLETE TAOS_DEF_ERROR_CODE(0, 0x1007) // tfs #define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index bf3aef340c..671a59aef1 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -282,6 +282,33 @@ void walAlignVersions(SWal* pWal) { pWal->vers.appliedVer = TMIN(pWal->vers.commitVer, pWal->vers.appliedVer); } +bool walLogEntriesComplete(const SWal* pWal) { + int32_t sz = taosArrayGetSize(pWal->fileInfoSet); + bool complete = true; + int32_t fileIdx = -1; + int64_t index = pWal->vers.firstVer; + + while (++fileIdx < sz) { + SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + if (pFileInfo->firstVer != index) { + break; + } + index = pFileInfo->lastVer + ((fileIdx + 1 < sz) ? 1 : 0); + } + // empty is regarded as complete + if (sz != 0) { + complete = (index == pWal->vers.lastVer); + } + + if (!complete) { + wError("vgId:%d, WAL log entries incomplete in range [%" PRId64 ", %" PRId64 "], aligned with snaphotVer:%" PRId64, + pWal->cfg.vgId, pWal->vers.firstVer, pWal->vers.lastVer, pWal->vers.snapshotVer); + terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; + } + + return complete; +} + int walCheckAndRepairMeta(SWal* pWal) { // load log files, get first/snapshot/last version info const char* logPattern = "^[0-9]+.log$"; @@ -395,6 +422,11 @@ int walCheckAndRepairMeta(SWal* pWal) { if (updateMeta) { (void)walSaveMeta(pWal); } + + if (!walLogEntriesComplete(pWal)) { + return -1; + } + return 0; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 37f9734eab..bbdf43c85e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -452,6 +452,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid versi TAOS_DEFINE_ERROR(TSDB_CODE_WAL_OUT_OF_MEMORY, "WAL out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_LOG_NOT_EXIST, "WAL log not exist") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_CHKSUM_MISMATCH, "WAL checksum mismatch") +TAOS_DEFINE_ERROR(TSDB_CODE_WAL_LOG_INCOMPLETE, "WAL log incomplete") // tfs TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config") From 52c3777823376836714ba0570a956897c1e471a6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 30 Oct 2022 03:01:40 +0800 Subject: [PATCH 02/24] refactor: do some internal refactor. --- include/common/tcommon.h | 8 - source/dnode/vnode/src/inc/vnodeInt.h | 3 - source/libs/executor/inc/executil.h | 29 ++++ source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/cachescanoperator.c | 12 +- source/libs/executor/src/executil.c | 165 ++++++++++++++++--- source/libs/executor/src/executor.c | 8 +- source/libs/executor/src/executorimpl.c | 73 ++++++-- source/libs/executor/src/scanoperator.c | 44 +++-- 9 files changed, 259 insertions(+), 85 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2b45a5d206..3bfbb85958 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -164,14 +164,6 @@ typedef enum EStreamType { STREAM_FILL_OVER, } EStreamType; -typedef struct { - SArray* pGroupList; - SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid - bool needSortTableByGroupId; - uint64_t suid; -} STableListInfo; - #pragma pack(push, 1) typedef struct SColumnDataAgg { int16_t colId; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a5dc4431ab..94178727be 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -162,10 +162,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSub int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, - void* pMemRef); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); -int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list); // tq int tqInit(); diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 0cfef7dc24..9f77178966 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -97,6 +97,35 @@ typedef struct SColMatchInfo { struct SqlFunctionCtx; +// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly +// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups +//typedef struct STableListInfo { +// bool oneTableForEachGroup; +// int32_t numOfOuputGroups; // the data block will be generated one by one +// int32_t* groupOffset; // keep the offset value for each group in the tableList +// SArray* pTableList; +// SHashObj* map; // speedup acquire the tableQueryInfo by table uid +// uint64_t suid; +//} STableListInfo; +typedef struct { + bool oneTableForEachGroup; + int32_t numOfOuputGroups; // the data block will be generated one by one + int32_t* groupOffset; // keep the offset value for each group in the tableList + SArray* pGroupList; + SArray* pTableList; + SHashObj* map; // speedup acquire the tableQueryInfo by table uid + bool needSortTableByGroupId; + uint64_t suid; +} STableListInfo; + +void destroyTableList(STableListInfo* pTableList); +int32_t getNumOfOutputGroups(const STableListInfo* pTableList); +bool oneTableForEachGroup(const STableListInfo* pTableList); +int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid); +int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); +uint64_t getTotalTables(const STableListInfo* pTableList); + size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); void closeResultRow(SResultRow* pResultRow); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 8769e8ac2f..31bf410f0a 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1077,7 +1077,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); bool groupbyTbname(SNodeList* pGroupList); -int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey); +int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey, bool groupSort); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index f645e71c6e..6241eae3ac 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -167,17 +167,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } - if (pTableList->map != NULL) { - int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); - if (groupId != NULL) { - pInfo->pRes->info.groupId = *groupId; - } - } else { - ASSERT(taosArrayGetSize(pTableList->pTableList) == 1); - STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0); - pInfo->pRes->info.groupId = pKeyInfo->groupId; - } - + pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid); pInfo->indexOfBufferedRes += 1; return pInfo->pRes; } else { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 971b28eb09..7448f7a8fa 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -733,7 +733,6 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis int32_t len = (int32_t)(pStart - (char*)keyBuf); info->groupId = calcGroupId(keyBuf, len); - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); } // int64_t st2 = taosGetTimestampUs(); @@ -751,14 +750,70 @@ end: return code; } +static int32_t nameComparFn(const void* p1, const void* p2) { + const char* pName1 = *(const char**)p1; + const char* pName2 = *(const char**)p2; + + int32_t ret = strcmp(pName1, pName2); + if (ret == 0) { + return 0; + } else { + return (ret > 0) ? 1 : -1; + } +} + +static SArray* getTableNameList(const SNodeListNode* pList) { + int32_t len = LIST_LENGTH(pList->pNodeList); + SListCell* cell = pList->pNodeList->pHead; + + SArray* pTbList = taosArrayInit(len, POINTER_BYTES); + for (int i = 0; i < pList->pNodeList->length; i++) { + SValueNode* valueNode = (SValueNode*)cell->pNode; + if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { + terrno = TSDB_CODE_INVALID_PARA; + taosArrayDestroy(pTbList); + return NULL; + } + + char* name = varDataVal(valueNode->datum.p); + taosArrayPush(pTbList, &name); + cell = cell->pNext; + } + + size_t numOfTables = taosArrayGetSize(pTbList); + + // order the name + taosArraySort(pTbList, nameComparFn); + + // remove the duplicates + SArray* pNewList = taosArrayInit(taosArrayGetSize(pTbList), sizeof(void*)); + taosArrayPush(pNewList, taosArrayGet(pTbList, 0)); + + for (int32_t i = 1; i < numOfTables; ++i) { + char** name = taosArrayGetLast(pNewList); + char** nameInOldList = taosArrayGet(pTbList, i); + if (strcmp(*name, *nameInOldList) == 0) { + continue; + } + + taosArrayPush(pNewList, nameInOldList); + } + + taosArrayDestroy(pTbList); + return pNewList; +} + static int tableUidCompare(const void* a, const void* b) { - int64_t u1 = *(uint64_t*)a; - int64_t u2 = *(uint64_t*)b; + uint64_t u1 = *(uint64_t*)a; + uint64_t u2 = *(uint64_t*)b; + if (u1 == u2) { return 0; } + return u1 < u2 ? -1 : 1; } + static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* cond, SHashObj* tags) { int32_t ret = -1; if (nodeType(cond) == QUERY_NODE_OPERATOR) { @@ -778,7 +833,9 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list SNodeList* pList = (SNodeList*)pNode->pParameterList; int32_t len = LIST_LENGTH(pList); - if (len <= 0) return ret; + if (len <= 0) { + return ret; + } SListCell* cell = pList->pHead; for (int i = 0; i < len; i++) { @@ -789,6 +846,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list } cell = cell->pNext; } + taosArraySort(list, tableUidCompare); taosArrayRemoveDuplicate(list, tableUidCompare, NULL); @@ -796,6 +854,7 @@ static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list ret = metaGetTableTagsByUids(metaHandle, suid, list, tags); removeInvalidTable(list, tags); } + return ret; } @@ -831,23 +890,13 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* SNodeListNode* pList = (SNodeListNode*)pNode->pRight; int32_t len = LIST_LENGTH(pList->pNodeList); - if (len <= 0) return -1; - - SListCell* cell = pList->pNodeList->pHead; - - SArray* pTbList = taosArrayInit(len, sizeof(void*)); - for (int i = 0; i < pList->pNodeList->length; i++) { - SValueNode* valueNode = (SValueNode*)cell->pNode; - if (!IS_VAR_DATA_TYPE(valueNode->node.resType.type)) { - taosArrayDestroy(pTbList); - return -1; - } - char* name = varDataVal(valueNode->datum.p); - taosArrayPush(pTbList, &name); - cell = cell->pNext; + if (len <= 0) { + return -1; } - for (int i = 0; i < taosArrayGetSize(pTbList); i++) { + SArray* pTbList = getTableNameList(pList); + size_t num = taosArrayGetSize(pTbList); + for (int i = 0; i < num; i++) { char* name = taosArrayGetP(pTbList, i); uint64_t uid = 0; if (metaGetTableUidByName(metaHandle, name, &uid) == 0) { @@ -866,8 +915,10 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* taosArrayDestroy(pTbList); return 0; } + return -1; } + int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1604,3 +1655,79 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit pLimitInfo->remainOffset = limit.offset; pLimitInfo->remainGroupOffset = slimit.offset; } + +uint64_t getTotalTables(const STableListInfo* pTableList) { + ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map)); + return taosArrayGetSize(pTableList->pTableList); +} + +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { + int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); + ASSERT(pTableList->map != NULL && slot != NULL); + + STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot); + ASSERT(pKeyInfo->uid == tableUid); + + return pKeyInfo->groupId; +} + +int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { + if (pTableList->map == NULL) { + ASSERT(taosArrayGetSize(pTableList->pTableList) == 0); + pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + } + + STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; + taosArrayPush(pTableList->pTableList, &keyInfo); + + int32_t slot = (int32_t)taosArrayGetSize(pTableList->pTableList) - 1; + taosHashPut(pTableList->map, &uid, sizeof(uid), &slot, sizeof(slot)); + + qDebug("uid:%" PRIu64 ", groupId:%" PRIu64 " added into table list, slot:%d, total:%d", uid, gid, slot, slot + 1); + return TSDB_CODE_SUCCESS; +} + +int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, + int32_t* size) { + int32_t total = getNumOfOutputGroups(pTableList); + if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) { + return TSDB_CODE_INVALID_PARA; + } + + // here handle two special cases: + // 1. only one group exists, and 2. one table exists for each group. + if (total == 1) { + *size = getTotalTables(pTableList); + *pKeyInfo = taosArrayGet(pTableList->pTableList, 0); + return TSDB_CODE_SUCCESS; + } else if (total == getTotalTables(pTableList)) { + *size = 1; + *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex); + return TSDB_CODE_SUCCESS; + } + + int32_t offset = pTableList->groupOffset[ordinalGroupIndex]; + if (ordinalGroupIndex < total - 1) { + *size = pTableList->groupOffset[offset + 1] - pTableList->groupOffset[offset]; + } else { + *size = total - pTableList->groupOffset[offset] - 1; + } + + *pKeyInfo = taosArrayGet(pTableList->pTableList, offset); + return TSDB_CODE_SUCCESS; +} + +int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; } + +// todo remove it +bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; } + +void destroyTableList(STableListInfo* pTableqinfoList) { + pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList); + taosMemoryFreeClear(pTableqinfoList->groupOffset); + + taosHashCleanup(pTableqinfoList->map); + + pTableqinfoList->pTableList = NULL; + pTableqinfoList->map = NULL; +} diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9546c3895e..12f3874e70 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -293,10 +293,6 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); } - if (pListInfo->map == NULL) { - pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); - } - // traverse to the stream scanner node to add this table id SOperatorInfo* pInfo = pTaskInfo->pRoot; while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { @@ -358,8 +354,8 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo if (!exists) { #endif - taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo); - taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); + + addTableIntoTableList(&pTaskInfo->tableqinfoList, keyInfo.uid, keyInfo.groupId); } if (keyBuf != NULL) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 75db42fccc..0091b4a31d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3366,7 +3366,47 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } + +static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { + STableKeyInfo* pInfo1 = (STableKeyInfo*) p1; + STableKeyInfo* pInfo2 = (STableKeyInfo*) p2; + + if (pInfo1->groupId == pInfo2->groupId) { + return 0; + } else { + return pInfo1->groupId < pInfo2->groupId? -1:1; + } +} + static int32_t sortTableGroup(STableListInfo* pTableListInfo) { + int32_t code = TSDB_CODE_SUCCESS; + + taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); + int32_t size = taosArrayGetSize(pTableListInfo->pTableList); + + SArray* pList = taosArrayInit(4, sizeof(int32_t)); + + STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); + uint64_t gid = pInfo->groupId; + + int32_t start = 0; + taosArrayPush(pList, &start); + + for(int32_t i = 1; i < size; ++i) { + pInfo = taosArrayGet(pTableListInfo->pTableList, i); + if (pInfo->groupId != gid) { + taosArrayPush(pList, &i); + gid = pInfo->groupId; + } + } + + pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); + pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + taosArrayDestroy(pList); + return TSDB_CODE_SUCCESS; + +#if 0 taosArrayClear(pTableListInfo->pGroupList); SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY; @@ -3422,6 +3462,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo) { } taosArrayDestroy(sortSupport); return TDB_CODE_SUCCESS; +#endif } bool groupbyTbname(SNodeList* pGroupList) { @@ -3437,35 +3478,41 @@ bool groupbyTbname(SNodeList* pGroupList) { return bytbname; } -int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group) { - if (group == NULL) { - return TDB_CODE_SUCCESS; - } - +int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (pTableListInfo->map == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - bool assignUid = groupbyTbname(group); - + bool groupByTbname = groupbyTbname(group); size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - if (assignUid) { + if (groupByTbname || group == NULL) { for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - info->groupId = info->uid; - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); + info->groupId = groupByTbname? info->uid:0; + } + + pTableListInfo->oneTableForEachGroup = groupByTbname; + + if (groupSort && groupByTbname) { + taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); + pTableListInfo->numOfOuputGroups = numOfTables; } } else { int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; } + + if (groupSort) { + return sortTableGroup(pTableListInfo); + } } - if (pTableListInfo->needSortTableByGroupId) { - return sortTableGroup(pTableListInfo); + for(int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &i, sizeof(int32_t)); } return TDB_CODE_SUCCESS; @@ -3551,7 +3598,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; if (pHandle->vnode) { int32_t code = - createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, + createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/false, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); if (code) { pTaskInfo->code = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e2f3b1c6c4..a4a9f233ab 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -626,10 +626,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pBlock->info = binfo; ASSERT(binfo.uid != 0); - uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } + pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); @@ -683,10 +680,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - qDebug( - "%s start to repeat ascending order scan data SELECT last_row(*),hostname from cpu group by hostname;blocks " - "due to query func required", - GET_TASKID(pTaskInfo)); + qDebug( "%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -755,8 +749,12 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pInfo->currentGroupId == -1) { pInfo->currentGroupId++; - if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { - setTaskStatus(pTaskInfo, TASK_COMPLETED); + qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList), + getNumOfOutputGroups(&pTaskInfo->tableqinfoList)); + if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)/*taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)*/) { +// if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { +// setTaskStatus(pTaskInfo, TASK_COMPLETED); + doSetOperatorCompleted(pOperator); return NULL; } @@ -790,7 +788,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return result; } - setTaskStatus(pTaskInfo, TASK_COMPLETED); + doSetOperatorCompleted(pOperator); return NULL; } @@ -1122,12 +1120,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, } static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { - SHashObj* map = pInfo->pTableScanOp->pTaskInfo->tableqinfoList.map; - uint64_t* groupId = taosHashGet(map, &uid, sizeof(int64_t)); - if (groupId) { - return *groupId; - } - return 0; + return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid); } static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -1549,12 +1542,13 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupIdPre) { - pInfo->pRes->info.groupId = *groupIdPre; - } else { - pInfo->pRes->info.groupId = 0; - } + pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); +// uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); +// if (groupIdPre) { +// pInfo->pRes->info.groupId = *groupIdPre; +// } else { +// pInfo->pRes->info.groupId = 0; +// } // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -4202,6 +4196,8 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags return code; } + pTableListInfo->numOfOuputGroups = 1; + int64_t st1 = taosGetTimestampUs(); qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr); @@ -4211,7 +4207,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags } pTableListInfo->needSortTableByGroupId = groupSort; - code = generateGroupIdMap(pTableListInfo, pHandle, pGroupTags); + code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort); if (code != TSDB_CODE_SUCCESS) { return code; } From 41c99c02efec1f0ce648952dd2ba2e6c55cace7b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 30 Oct 2022 11:17:07 +0800 Subject: [PATCH 03/24] refactor: do some internal refactor. --- source/libs/executor/src/cachescanoperator.c | 20 ++-- source/libs/executor/src/executil.c | 10 +- source/libs/executor/src/executorimpl.c | 16 +-- source/libs/executor/src/scanoperator.c | 114 +++++++++++++------ 4 files changed, 99 insertions(+), 61 deletions(-) diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 6241eae3ac..92e8d90bb8 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -62,7 +62,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); // partition by tbname - if (taosArrayGetSize(pTableList->pGroupList) == taosArrayGetSize(pTableList->pTableList)) { + if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, @@ -175,12 +175,18 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { return NULL; } } else { - size_t totalGroups = taosArrayGetSize(pTableList->pGroupList); + size_t num = getNumOfOutputGroups(pTableList); + while (pInfo->currentGroupIndex < num) { - while (pInfo->currentGroupIndex < totalGroups) { - SArray* pGroupTableList = taosArrayGetP(pTableList->pGroupList, pInfo->currentGroupIndex); + STableKeyInfo* p = NULL; + int32_t s = 0; + getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &p, &s); + SArray* x = taosArrayInit(4, sizeof(STableKeyInfo)); + for(int32_t i = 0; i < s; ++i) { + taosArrayPush(x, &p[i]); + } - tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pGroupTableList, + tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, x, taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); @@ -195,9 +201,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pInfo->pRes->info.rows > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; - - STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); - pInfo->pRes->info.groupId = pKeyInfo->groupId; + pInfo->pRes->info.groupId = p->groupId; if (taosArrayGetSize(pInfo->pUidList) > 0) { ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 7448f7a8fa..c8fb0e23f8 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -997,14 +997,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, } taosArrayDestroy(res); - - pListInfo->pGroupList = taosArrayInit(4, POINTER_BYTES); - if (pListInfo->pGroupList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - // put into list as default group, remove it if grouping sorting is required later - taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList); return code; } @@ -1698,7 +1690,7 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI // 1. only one group exists, and 2. one table exists for each group. if (total == 1) { *size = getTotalTables(pTableList); - *pKeyInfo = taosArrayGet(pTableList->pTableList, 0); + *pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0); return TSDB_CODE_SUCCESS; } else if (total == getTotalTables(pTableList)) { *size = 1; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0091b4a31d..b0480bef35 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4026,15 +4026,15 @@ void doDestroyTableList(STableListInfo* pTableqinfoList) { taosArrayDestroy(pTableqinfoList->pTableList); taosHashCleanup(pTableqinfoList->map); if (pTableqinfoList->needSortTableByGroupId) { - for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { - SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); - if (tmp == pTableqinfoList->pTableList) { - continue; - } - taosArrayDestroy(tmp); - } +// for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { +// SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); +// if (tmp == pTableqinfoList->pTableList) { +// continue; +// } +// taosArrayDestroy(tmp); +// } } - taosArrayDestroy(pTableqinfoList->pGroupList); +// taosArrayDestroy(pTableqinfoList->pGroupList); pTableqinfoList->pTableList = NULL; pTableqinfoList->map = NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a4a9f233ab..b1adbd14c0 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -749,20 +749,28 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pInfo->currentGroupId == -1) { pInfo->currentGroupId++; - qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList), - getNumOfOutputGroups(&pTaskInfo->tableqinfoList)); +// qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList), +// getNumOfOutputGroups(&pTaskInfo->tableqinfoList)); if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)/*taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)*/) { // if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { -// setTaskStatus(pTaskInfo, TASK_COMPLETED); doSetOperatorCompleted(pOperator); return NULL; } - SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); + SArray* p = taosArrayInit(4, sizeof(STableKeyInfo)); tsdbReaderClose(pInfo->dataReader); - int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, + STableKeyInfo* x = NULL; + int32_t num = 0; + getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &x, &num); + for(int32_t i = 0; i < num; ++i) { + taosArrayPush(p, &x[i]); + } + + int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, p, (STsdbReader**)&pInfo->dataReader, GET_TASKID(pTaskInfo)); + taosArrayDestroy(p); + if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); return NULL; @@ -775,11 +783,16 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { } pInfo->currentGroupId++; - if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { - setTaskStatus(pTaskInfo, TASK_COMPLETED); + if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { + doSetOperatorCompleted(pOperator); return NULL; } + // reset value for the next group data output + pOperator->status = OP_OPENED; + pInfo->limitInfo.numOfOutputRows = 0; + pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset; + tsdbReaderReset(pInfo->dataReader, &pInfo->cond); pInfo->scanTimes = 0; @@ -1075,39 +1088,59 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { pTableScanInfo->cond.twindows = *pWin; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; -} - -static void freeArray(void* array) { taosArrayDestroy(array); } - -static void resetTableScanOperator(SOperatorInfo* pTableScanOp) { - STableScanInfo* pTableScanInfo = pTableScanOp->info; - pTableScanInfo->cond.startVersion = -1; - pTableScanInfo->cond.endVersion = -1; - SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; - SArray* allTbls = pTableScanOp->pTaskInfo->tableqinfoList.pTableList; - taosArrayClearP(gpTbls, freeArray); - taosArrayPush(gpTbls, &allTbls); - STimeWindow win = {.skey = INT64_MIN, .ekey = INT64_MAX}; - resetTableScanInfo(pTableScanOp->info, &win); + tsdbReaderClose(pTableScanInfo->dataReader); + pTableScanInfo->dataReader = NULL; } static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbUid, TSKEY startTs, TSKEY endTs, int64_t maxVersion) { - SArray* gpTbls = pTableScanOp->pTaskInfo->tableqinfoList.pGroupList; - taosArrayClear(gpTbls); STableKeyInfo tblInfo = {.uid = tbUid, .groupId = 0}; - SArray* tbls = taosArrayInit(1, sizeof(STableKeyInfo)); - taosArrayPush(tbls, &tblInfo); - taosArrayPush(gpTbls, &tbls); - STimeWindow win = {.skey = startTs, .ekey = endTs}; - STableScanInfo* pTableScanInfo = pTableScanOp->info; - pTableScanInfo->cond.startVersion = -1; - pTableScanInfo->cond.endVersion = maxVersion; - resetTableScanInfo(pTableScanOp->info, &win); - SSDataBlock* pRes = doTableScan(pTableScanOp); - resetTableScanOperator(pTableScanOp); - return pRes; + STableScanInfo* pTableScanInfo = pTableScanOp->info; + SQueryTableDataCond cond = pTableScanInfo->cond; + + cond.startVersion = -1; + cond.endVersion = maxVersion; + cond.twindows = (STimeWindow){.skey = startTs, .ekey = endTs}; + + SExecTaskInfo* pTaskInfo = pTableScanOp->pTaskInfo; + + SSDataBlock* pBlock = pTableScanInfo->pResBlock; + blockDataCleanup(pBlock); + + SArray* p = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(p, &tblInfo); + + STsdbReader* pReader = NULL; + int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, p, (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + bool hasBlock = tsdbNextDataBlock(pReader); + if (hasBlock) { + SDataBlockInfo binfo = {0}; + tsdbRetrieveDataBlockInfo(pReader, &binfo); + + SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL); + blockDataEnsureCapacity(pBlock, binfo.rows); + + pBlock->info.window = binfo.window; + pBlock->info.uid = binfo.uid; + pBlock->info.rows = binfo.rows; + + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + + pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, binfo.uid); + } + + tsdbReaderClose(pReader); + qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64 + ", suid:%" PRIu64, pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid); + + return pBlock->info.rows > 0 ? pBlock : NULL; } static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -2329,11 +2362,20 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pTSInfo->cond.endVersion = pHandle->version; } - SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); +// SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); + STableKeyInfo* pList = NULL; + int32_t num = 0; + getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num); + + SArray* p = taosArrayInit(4, sizeof(STableKeyInfo)); + for(int32_t i = 0; i < num; ++i) { + taosArrayPush(p, &pList[i]); + } + if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->dataReader = NULL; - if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, tableList, &pTSInfo->dataReader, NULL) < 0) { + if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, p, &pTSInfo->dataReader, NULL) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _error; } From adf085ab46a33706612ec461553f3292f032c596 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 14:17:57 +0800 Subject: [PATCH 04/24] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 20 ++++---- source/libs/executor/src/scanoperator.c | 65 +++++++++++-------------- 3 files changed, 40 insertions(+), 47 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index bed6e93e5a..3d4a3e1969 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -159,7 +159,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTabl void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid); -void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); +void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a0fbf545f3..895b7adc5b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3787,24 +3787,24 @@ bool tsdbTableNextDataBlock(STsdbReader* pReader, int64_t uid) { return true; } -static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { - ASSERT(pDataBlockInfo != NULL && pReader != NULL); - pDataBlockInfo->rows = pReader->pResBlock->info.rows; - pDataBlockInfo->uid = pReader->pResBlock->info.uid; - pDataBlockInfo->window = pReader->pResBlock->info.window; +static void setBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) { + ASSERT(pReader != NULL); + *rows = pReader->pResBlock->info.rows; + *uid = pReader->pResBlock->info.uid; + *pWindow = pReader->pResBlock->info.window; } -void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { +void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow) { if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->step == EXTERNAL_ROWS_MAIN) { - setBlockInfo(pReader, pDataBlockInfo); + setBlockInfo(pReader, rows, uid, pWindow); } else if (pReader->step == EXTERNAL_ROWS_PREV) { - setBlockInfo(pReader->innerReader[0], pDataBlockInfo); + setBlockInfo(pReader->innerReader[0], rows, uid, pWindow); } else { - setBlockInfo(pReader->innerReader[1], pDataBlockInfo); + setBlockInfo(pReader->innerReader[1], rows, uid, pWindow); } } else { - setBlockInfo(pReader, pDataBlockInfo); + setBlockInfo(pReader, rows, uid, pWindow); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b1adbd14c0..3102e63737 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -618,13 +618,14 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { blockDataCleanup(pBlock); - SDataBlockInfo binfo = pBlock->info; - tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &binfo); + SDataBlockInfo* pBInfo = &pBlock->info; - binfo.capacity = binfo.rows; - blockDataEnsureCapacity(pBlock, binfo.rows); - pBlock->info = binfo; - ASSERT(binfo.uid != 0); + int32_t rows = 0; + tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &rows, &pBInfo->uid, &pBInfo->window); + blockDataEnsureCapacity(pBlock, rows); + pBlock->info.rows = rows; + + ASSERT(pBInfo->uid != 0); pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); @@ -1120,20 +1121,19 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU bool hasBlock = tsdbNextDataBlock(pReader); if (hasBlock) { - SDataBlockInfo binfo = {0}; - tsdbRetrieveDataBlockInfo(pReader, &binfo); + SDataBlockInfo* pBInfo = &pBlock->info; + + int32_t rows = 0; + tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->uid, &pBInfo->window); SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL); - blockDataEnsureCapacity(pBlock, binfo.rows); - - pBlock->info.window = binfo.window; - pBlock->info.uid = binfo.uid; - pBlock->info.rows = binfo.rows; + blockDataEnsureCapacity(pBlock, pBInfo->rows); + pBInfo->rows = rows; relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); - pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, binfo.uid); + pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBInfo->uid); } tsdbReaderClose(pReader); @@ -2121,7 +2121,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } - tsdbRetrieveDataBlockInfo(pInfo->dataReader, &pBlock->info); + int32_t rows = 0; + tsdbRetrieveDataBlockInfo(pInfo->dataReader, &rows, &pBlock->info.uid, &pBlock->info.window); + pBlock->info.rows = rows; SArray* pCols = tsdbRetrieveDataBlock(pInfo->dataReader, NULL); pBlock->pDataBlock = pCols; @@ -4561,14 +4563,11 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { } blockDataCleanup(pBlock); - SDataBlockInfo binfo = pBlock->info; - tsdbRetrieveDataBlockInfo(reader, &binfo); - blockDataEnsureCapacity(pBlock, binfo.rows); - pBlock->info.type = binfo.type; - pBlock->info.uid = binfo.uid; - pBlock->info.window = binfo.window; - pBlock->info.rows = binfo.rows; + int32_t rows = 0; + tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window); + blockDataEnsureCapacity(pBlock, rows); + pBlock->info.rows = rows; if (tsdbIsAscendingOrder(pInfo->pReader)) { pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; @@ -4627,14 +4626,11 @@ static SSDataBlock* getTableDataBlock2(void* param) { } blockDataCleanup(pBlock); - SDataBlockInfo binfo = pBlock->info; - tsdbRetrieveDataBlockInfo(reader, &binfo); - blockDataEnsureCapacity(pBlock, binfo.rows); - pBlock->info.type = binfo.type; - pBlock->info.uid = binfo.uid; - pBlock->info.window = binfo.window; - pBlock->info.rows = binfo.rows; + int32_t rows = 0; + tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window); + blockDataEnsureCapacity(pBlock, rows); + pBlock->info.rows = rows; uint32_t status = 0; int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status); @@ -4684,14 +4680,11 @@ static SSDataBlock* getTableDataBlock(void* param) { } blockDataCleanup(pBlock); - SDataBlockInfo binfo = pBlock->info; - tsdbRetrieveDataBlockInfo(reader, &binfo); - blockDataEnsureCapacity(pBlock, binfo.rows); - pBlock->info.type = binfo.type; - pBlock->info.uid = binfo.uid; - pBlock->info.window = binfo.window; - pBlock->info.rows = binfo.rows; + int32_t rows = 0; + tsdbRetrieveDataBlockInfo(reader, &rows, &pBlock->info.uid, &pBlock->info.window); + blockDataEnsureCapacity(pBlock, rows); + pBlock->info.rows = rows; uint32_t status = 0; int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); From a1db402500cf88e40928df80fe495d8316c23c14 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 14:38:01 +0800 Subject: [PATCH 05/24] refactor: do some internal refactor. --- source/libs/executor/src/scanoperator.c | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3102e63737..2a1734b8c6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -474,6 +474,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return terrno; } + int32_t rows = pBlock->info.rows; + pBlock->info.rows = 0; + + blockDataEnsureCapacity(pBlock, rows); + pBlock->info.rows = rows; + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); @@ -619,14 +625,11 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { blockDataCleanup(pBlock); SDataBlockInfo* pBInfo = &pBlock->info; - - int32_t rows = 0; - tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &rows, &pBInfo->uid, &pBInfo->window); - blockDataEnsureCapacity(pBlock, rows); - pBlock->info.rows = rows; + tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window); +// blockDataEnsureCapacity(pBlock, rows); +// pBlock->info.rows = rows; ASSERT(pBInfo->uid != 0); - pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); uint32_t status = 0; From 7135f35b21903a3209552b8f97332740c9d70184 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 15:06:51 +0800 Subject: [PATCH 06/24] refactor: do some internal refactor. --- source/libs/executor/src/executil.c | 2 +- source/libs/executor/src/executorimpl.c | 2 +- source/libs/executor/src/scanoperator.c | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c8fb0e23f8..6d6e765574 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1666,7 +1666,7 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { if (pTableList->map == NULL) { ASSERT(taosArrayGetSize(pTableList->pTableList) == 0); - pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); } STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b0480bef35..7a3454e092 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3479,7 +3479,7 @@ bool groupbyTbname(SNodeList* pGroupList) { } int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { - pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); if (pTableListInfo->map == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2a1734b8c6..2d3521b419 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -626,8 +626,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { SDataBlockInfo* pBInfo = &pBlock->info; tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window); -// blockDataEnsureCapacity(pBlock, rows); -// pBlock->info.rows = rows; ASSERT(pBInfo->uid != 0); pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); From ba37a581929300835f012d57555b2e1d87686f7f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 15:39:16 +0800 Subject: [PATCH 07/24] enh(query): reduce the number of malloc. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 35 +++++++++++++------------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 23615fb8af..d6a91d049a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1212,25 +1212,24 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p (pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer); } -static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, - int32_t* nextIndex, int32_t order) { +static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, + int32_t* nextIndex, int32_t order, SDataBlk* pBlock) { bool asc = ASCENDING_TRAVERSE(order); if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { - return NULL; + return false; } if (!asc && pBlockInfo->tbBlockIdx == 0) { - return NULL; + return false; } int32_t step = asc ? 1 : -1; *nextIndex = pBlockInfo->tbBlockIdx + step; - SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk)); - int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); + int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk); - return pBlock; + return true; } static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) { @@ -1364,13 +1363,14 @@ typedef struct { static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock, STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader, STsdbReader* pReader) { - int32_t neighborIndex = 0; - SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order); + int32_t neighborIndex = 0; + SDataBlk block = {0}; + + bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &block); // overlap with neighbor - if (pNeighbor) { - pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, pNeighbor, pReader->order); - taosMemoryFree(pNeighbor); + if (hasNeighbor) { + pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &block, pReader->order); } // has duplicated ts of different version in this block @@ -3067,15 +3067,14 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn *state = CHECK_FILEBLOCK_QUIT; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - int32_t nextIndex = -1; - SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order); - if (pNeighborBlock == NULL) { // do nothing + int32_t nextIndex = -1; + SDataBlk block = {0}; + bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &block); + if (!hasNeighbor) { // do nothing return 0; } - bool overlap = overlapWithNeighborBlock(pBlock, pNeighborBlock, pReader->order); - taosMemoryFree(pNeighborBlock); - + bool overlap = overlapWithNeighborBlock(pBlock, &block, pReader->order); if (overlap) { // load next block SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; From 9703ab510ba66357bae66bd52cc9364bd943dc3c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 17:19:15 +0800 Subject: [PATCH 08/24] enh(query): avoid parse the mapdata to improve the performance by keep the offset of each block. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 41 ++++++++++++-------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index d6a91d049a..7c2c4be5b2 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -35,11 +35,16 @@ typedef struct { int32_t numOfLastFiles; } SBlockNumber; +typedef struct SBlockIndex { + int32_t ordinalIndex; + int64_t inFileoffset; +} SBlockIndex; + typedef struct STableBlockScanInfo { uint64_t uid; TSKEY lastKey; SMapData mapData; // block info (compressed) - SArray* pBlockList; // block data index list + SArray* pBlockList; // block data index list, SArray SIterInfo iter; // mem buffer skip list iterator SIterInfo iiter; // imem buffer skip list iterator SArray* delSkyline; // delete info for this table @@ -568,7 +573,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, STableBlockScanInfo* pScanInfo = p; if (pScanInfo->pBlockList == NULL) { - pScanInfo->pBlockList = taosArrayInit(4, sizeof(int32_t)); + pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex)); } taosArrayPush(pIndexList, pBlockIdx); @@ -630,7 +635,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN continue; } - void* p = taosArrayPush(pScanInfo->pBlockList, &j); + void* p = taosArrayPush(pScanInfo->pBlockList, + &(SBlockIndex){.ordinalIndex = j, .inFileoffset = block.aSubBlock->offset}); if (p == NULL) { tMapDataClear(&pScanInfo->mapData); return TSDB_CODE_OUT_OF_MEMORY; @@ -1059,8 +1065,8 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) return TSDB_CODE_INVALID_PARA; } - int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); - tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk); + SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); + tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk); } #if 0 @@ -1073,6 +1079,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) { bool asc = ASCENDING_TRAVERSE(pReader->order); + SBlockOrderSupporter sup = {0}; pBlockIter->numOfBlocks = numOfBlocks; taosArrayClear(pBlockIter->blockList); pBlockIter->pTableMap = pReader->status.pTableMap; @@ -1081,9 +1088,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); int64_t st = taosGetTimestampUs(); - - SBlockOrderSupporter sup = {0}; - int32_t code = initBlockOrderSupporter(&sup, numOfTables); + int32_t code = initBlockOrderSupporter(&sup, numOfTables); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1111,17 +1116,11 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; - SDataBlk block = {0}; + for (int32_t k = 0; k < num; ++k) { - SBlockOrderWrapper wrapper = {0}; - - int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k); - tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk); - - wrapper.uid = pTableScanInfo->uid; - wrapper.offset = block.aSubBlock[0].offset; - - sup.pDataBlockInfo[sup.numOfTables][k] = wrapper; + SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k); + sup.pDataBlockInfo[sup.numOfTables][k] = + (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileoffset}; cnt++; } @@ -1226,9 +1225,8 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl int32_t step = asc ? 1 : -1; *nextIndex = pBlockInfo->tbBlockIdx + step; - int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); - - tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk); + SBlockIndex* pIndex = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); + tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk); return true; } @@ -3495,7 +3493,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); -// resetDataBlockScanInfo(pReader->status.pTableMap, pReader->window.skey); // no data in files, let's try buffer in memory if (pReader->status.fileIter.numOfFiles == 0) { From fac516a196ba89c7081258903b7a9b11dc59f8ed Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 17:20:27 +0800 Subject: [PATCH 09/24] refactor: do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7c2c4be5b2..8d877dea7b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -635,8 +635,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN continue; } - void* p = taosArrayPush(pScanInfo->pBlockList, - &(SBlockIndex){.ordinalIndex = j, .inFileoffset = block.aSubBlock->offset}); + SBlockIndex bIndex = {.ordinalIndex = j, .inFileoffset = block.aSubBlock->offset}; + void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex); if (p == NULL) { tMapDataClear(&pScanInfo->mapData); return TSDB_CODE_OUT_OF_MEMORY; From 1fa5eafa8ace99eb956097670eccbdab056e8fde Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 17:49:50 +0800 Subject: [PATCH 10/24] ehn(query): keep the time window for each block to avoid repeatly parsing the compressed data block meta info. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 41 ++++++++++++++------------ 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 8d877dea7b..b454b25ea7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -36,8 +36,9 @@ typedef struct { } SBlockNumber; typedef struct SBlockIndex { - int32_t ordinalIndex; - int64_t inFileoffset; + int32_t ordinalIndex; + int64_t inFileOffset; + STimeWindow window; } SBlockIndex; typedef struct STableBlockScanInfo { @@ -635,7 +636,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN continue; } - SBlockIndex bIndex = {.ordinalIndex = j, .inFileoffset = block.aSubBlock->offset}; + SBlockIndex bIndex = {.ordinalIndex = j, .inFileOffset = block.aSubBlock->offset}; + bIndex.window = (STimeWindow) {.skey = block.minKey.ts, .ekey = block.maxKey.ts}; + void* p = taosArrayPush(pScanInfo->pBlockList, &bIndex); if (p == NULL) { tMapDataClear(&pScanInfo->mapData); @@ -1120,7 +1123,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte for (int32_t k = 0; k < num; ++k) { SBlockIndex* pIndex = taosArrayGet(pTableScanInfo->pBlockList, k); sup.pDataBlockInfo[sup.numOfTables][k] = - (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileoffset}; + (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pIndex->inFileOffset}; cnt++; } @@ -1212,7 +1215,7 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p } static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo, - int32_t* nextIndex, int32_t order, SDataBlk* pBlock) { + int32_t* nextIndex, int32_t order, SBlockIndex* pBlockIndex) { bool asc = ASCENDING_TRAVERSE(order); if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) { return false; @@ -1224,9 +1227,8 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl int32_t step = asc ? 1 : -1; *nextIndex = pBlockInfo->tbBlockIdx + step; - - SBlockIndex* pIndex = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); - tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk); + *pBlockIndex = *(SBlockIndex*) taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); +// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk); return true; } @@ -1269,12 +1271,12 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t return TSDB_CODE_SUCCESS; } -static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) { +static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) { // it is the last block in current file, no chance to overlap with neighbor blocks. if (ASCENDING_TRAVERSE(order)) { - return pBlock->maxKey.ts == pNeighbor->minKey.ts; + return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey; } else { - return pBlock->minKey.ts == pNeighbor->maxKey.ts; + return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey; } } @@ -1361,14 +1363,14 @@ typedef struct { static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo* pBlockInfo, SDataBlk* pBlock, STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader, STsdbReader* pReader) { - int32_t neighborIndex = 0; - SDataBlk block = {0}; + int32_t neighborIndex = 0; + SBlockIndex bIndex = {0}; - bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &block); + bool hasNeighbor = getNeighborBlockOfSameTable(pBlockInfo, pScanInfo, &neighborIndex, pReader->order, &bIndex); // overlap with neighbor if (hasNeighbor) { - pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &block, pReader->order); + pInfo->overlapWithNeighborBlock = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order); } // has duplicated ts of different version in this block @@ -3065,14 +3067,15 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn *state = CHECK_FILEBLOCK_QUIT; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; - int32_t nextIndex = -1; - SDataBlk block = {0}; - bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &block); + int32_t nextIndex = -1; + SBlockIndex bIndex = {0}; + + bool hasNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order, &bIndex); if (!hasNeighbor) { // do nothing return 0; } - bool overlap = overlapWithNeighborBlock(pBlock, &block, pReader->order); + bool overlap = overlapWithNeighborBlock(pBlock, &bIndex, pReader->order); if (overlap) { // load next block SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; From f29127a15ec24b558b2677d84a00e67d471583b7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 18:04:58 +0800 Subject: [PATCH 11/24] ehn(query): do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 4 +- source/libs/executor/src/executorimpl.c | 51 ------------------------- source/libs/executor/src/scanoperator.c | 3 +- 3 files changed, 5 insertions(+), 53 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index b454b25ea7..2b3eff0297 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3868,7 +3868,9 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS size_t numOfCols = blockDataGetNumOfCols(pReader->pResBlock); int32_t i = 0, j = 0; - while (j < numOfCols && i < taosArrayGetSize(pSup->pColAgg)) { + size_t size = taosArrayGetSize(pSup->pColAgg); + + while (j < numOfCols && i < size) { SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); if (pAgg->colId == pSup->colIds[j]) { if (IS_BSMA_ON(&(pReader->pSchema->columns[i]))) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 11dbfaff9f..5ceda8bd78 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -844,57 +844,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int return win; } -#if 0 -static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) { - STimeWindow w = {0}; - - TSKEY sk = TMIN(pQueryAttr->window.skey, pQueryAttr->window.ekey); - TSKEY ek = TMAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); - - if (true) { - // getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.skey, sk, ek, &w); - assert(w.ekey >= pBlockInfo->window.skey); - - if (w.ekey < pBlockInfo->window.ekey) { - return true; - } - - while (1) { - // getNextTimeWindow(pQueryAttr, &w); - if (w.skey > pBlockInfo->window.ekey) { - break; - } - - assert(w.ekey > pBlockInfo->window.ekey); - if (w.skey <= pBlockInfo->window.ekey && w.skey > pBlockInfo->window.skey) { - return true; - } - } - } else { - // getAlignQueryTimeWindow(pQueryAttr, pBlockInfo->window.ekey, sk, ek, &w); - assert(w.skey <= pBlockInfo->window.ekey); - - if (w.skey > pBlockInfo->window.skey) { - return true; - } - - while (1) { - // getNextTimeWindow(pQueryAttr, &w); - if (w.ekey < pBlockInfo->window.skey) { - break; - } - - assert(w.skey < pBlockInfo->window.skey); - if (w.ekey < pBlockInfo->window.ekey && w.ekey >= pBlockInfo->window.skey) { - return true; - } - } - } - - return false; -} -#endif - int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { *status = BLK_DATA_NOT_LOAD; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 855be4752d..d499091ccb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -331,7 +331,8 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, } } - for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->matchInfo.pList); ++i) { + size_t num = taosArrayGetSize(pTableScanInfo->matchInfo.pList); + for (int32_t i = 0; i < num; ++i) { SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); if (!pColMatchInfo->needOutput) { continue; From 1fd3ad502a1551163f83eb473514cead97d64f4b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 19:01:28 +0800 Subject: [PATCH 12/24] fix(query): ensure the capacity before loading tags value. --- source/libs/executor/src/scanoperator.c | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d499091ccb..750b9f258b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -382,6 +382,17 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo } } +static void ensureBlockCapacity(SSDataBlock* pBlock, int32_t capacity) { + // keep the value of rows temporarily + int32_t rows = pBlock->info.rows; + + pBlock->info.rows = 0; + blockDataEnsureCapacity(pBlock, capacity); + + // restore the rows number + pBlock->info.rows = rows; +} + static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, uint32_t* status) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -413,6 +424,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + // NOTE: here the tag value only load for one row + ensureBlockCapacity(pBlock, 1); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); pCost->skipBlocks += 1; return TSDB_CODE_SUCCESS; @@ -423,6 +436,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + ensureBlockCapacity(pBlock, 1); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); return TSDB_CODE_SUCCESS; } else { @@ -472,12 +486,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca return terrno; } - int32_t rows = pBlock->info.rows; - pBlock->info.rows = 0; - - blockDataEnsureCapacity(pBlock, rows); - pBlock->info.rows = rows; - + ensureBlockCapacity(pBlock, pBlock->info.rows); relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); From 8a5f94d35c90a5c730fba40456a80c67f128435b Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 31 Oct 2022 20:03:21 +0800 Subject: [PATCH 13/24] fix: set search key according to order --- source/dnode/vnode/src/tsdb/tsdbRead.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5f90c567be..3d6b16d5c7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -821,7 +821,8 @@ int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData, SData } else if (!asc && pReader->window.skey <= pBlock->minKey.ts) { endPos = 0; } else { - endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.ekey, pReader->order); + int64_t key = asc ? pReader->window.ekey : pReader->window.skey; + endPos = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, pReader->order); } return endPos; @@ -852,8 +853,9 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn // pDumpInfo->rowIndex = pBlock->nRow - 1; } else { int32_t pos = asc ? pBlock->nRow - 1 : 0; - int32_t order = (pReader->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; - pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, pReader->window.skey, order); + int32_t order = asc ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; + int64_t key = asc ? pReader->window.skey : pReader->window.ekey; + pDumpInfo->rowIndex = doBinarySearchKey(pBlockData->aTSKEY, pBlock->nRow, pos, key, order); } } From 1d0eae197dae26e0568f2307b2f0c70c6cf350ec Mon Sep 17 00:00:00 2001 From: "benguang.zhao" Date: Mon, 31 Oct 2022 21:06:08 +0800 Subject: [PATCH 14/24] fix: not allow to rollback WAL across snapshot point --- source/libs/wal/src/walWrite.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 527ffa0056..e016e72471 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -101,7 +101,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { taosThreadMutexLock(&pWal->mutex); int64_t code; char fnameStr[WAL_FILE_LEN]; - if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { + if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { terrno = TSDB_CODE_WAL_INVALID_VER; taosThreadMutexUnlock(&pWal->mutex); return -1; From 96520291216455d3e5630b5de82e7b0e2e5d8167 Mon Sep 17 00:00:00 2001 From: "benguang.zhao" Date: Mon, 31 Oct 2022 21:08:35 +0800 Subject: [PATCH 15/24] fix: update testcases of walRollback in walMetaTest.cpp --- source/libs/wal/test/walMetaTest.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/source/libs/wal/test/walMetaTest.cpp b/source/libs/wal/test/walMetaTest.cpp index d98826a384..891e7dcdae 100644 --- a/source/libs/wal/test/walMetaTest.cpp +++ b/source/libs/wal/test/walMetaTest.cpp @@ -274,17 +274,17 @@ TEST_F(WalCleanEnv, rollbackMultiFile) { code = walRollback(pWal, 9); ASSERT_EQ(code, 0); ASSERT_EQ(pWal->vers.lastVer, 8); + code = walRollback(pWal, 6); + ASSERT_EQ(code, 0); + ASSERT_EQ(pWal->vers.lastVer, 5); code = walRollback(pWal, 5); - ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->vers.lastVer, 4); - code = walRollback(pWal, 3); - ASSERT_EQ(code, 0); + ASSERT_EQ(code, -1); - ASSERT_EQ(pWal->vers.lastVer, 2); + ASSERT_EQ(pWal->vers.lastVer, 5); - code = walWrite(pWal, 3, 3, (void*)ranStr, ranStrLen); + code = walWrite(pWal, 6, 6, (void*)ranStr, ranStrLen); ASSERT_EQ(code, 0); - ASSERT_EQ(pWal->vers.lastVer, 3); + ASSERT_EQ(pWal->vers.lastVer, 6); code = walSaveMeta(pWal); ASSERT_EQ(code, 0); From c17a921a03f94da375ce54b5eb144e7dd4b1683b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Oct 2022 23:19:40 +0800 Subject: [PATCH 16/24] fix(query): set the correct buffer to accommodate the tag values. --- source/common/src/tdatablock.c | 1 + source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/cachescanoperator.c | 17 ++++----- source/libs/executor/src/scanoperator.c | 37 +++++++++++--------- source/libs/function/src/builtinsimpl.c | 2 +- 5 files changed, 33 insertions(+), 26 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0c38d43543..3afa91f2f9 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1186,6 +1186,7 @@ static int32_t doEnsureCapacity(SColumnInfoData* pColumn, const SDataBlockInfo* } void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) { + pColumn->hasNull = false; if (IS_VAR_DATA_TYPE(pColumn->info.type)) { pColumn->varmeta.length = 0; if (pColumn->varmeta.offset != NULL) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1191b6a485..16f094c85d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -921,7 +921,7 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SColMatchInfo* pColMatchInfo, SFilterInfo* pFilterInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, const char* idStr); + SSDataBlock* pBlock, int32_t rows, const char* idStr); void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 92e98d3eab..bcd59e83f0 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -152,25 +152,27 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->indexOfBufferedRes = 0; } + SSDataBlock* pRes = pInfo->pRes; + if (pInfo->indexOfBufferedRes < pInfo->pBufferredRes->info.rows) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { SColMatchItem* pMatchInfo = taosArrayGet(pInfo->matchInfo.pList, i); int32_t slotId = pMatchInfo->dstSlotId; SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); - SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, slotId); + SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes); bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes); colDataAppend(pDst, 0, p, isNull); } - pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); - pInfo->pRes->info.rows = 1; + pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); + pRes->info.rows = 1; if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; - int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes, pRes->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; @@ -178,10 +180,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } - pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid); - + pRes->info.groupId = getTableGroupId(pTableList, pRes->info.uid); pInfo->indexOfBufferedRes += 1; - return pInfo->pRes; + return pRes; } else { doSetOperatorCompleted(pOperator); return NULL; @@ -221,7 +222,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0); - code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, + code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 750b9f258b..b3226f292a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -344,11 +344,11 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, return true; } -static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { +static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, int32_t rows) { if (pTableScanInfo->pseudoSup.numOfExprs > 0) { SExprSupp* pSup = &pTableScanInfo->pseudoSup; - int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); @@ -426,7 +426,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca // NOTE: here the tag value only load for one row ensureBlockCapacity(pBlock, 1); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); pCost->skipBlocks += 1; return TSDB_CODE_SUCCESS; } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { @@ -437,7 +437,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); ensureBlockCapacity(pBlock, 1); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); return TSDB_CODE_SUCCESS; } else { qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo)); @@ -488,7 +488,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca ensureBlockCapacity(pBlock, pBlock->info.rows); relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, pBlock->info.rows); // restore the previous value pCost->totalRows -= pBlock->info.rows; @@ -528,27 +528,30 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction } int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, - SSDataBlock* pBlock, const char* idStr) { + SSDataBlock* pBlock, int32_t rows, const char* idStr) { // currently only the tbname pseudo column if (numOfPseudoExpr == 0) { return TSDB_CODE_SUCCESS; } + // backup the rows + int32_t backupRows = pBlock->info.rows; + pBlock->info.rows = rows; + SMetaReader mr = {0}; metaReaderInit(&mr, pHandle->meta, 0); int32_t code = metaGetTableEntryByUid(&mr, pBlock->info.uid); + metaReaderReleaseLock(&mr); + if (code != TSDB_CODE_SUCCESS) { qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); metaReaderClear(&mr); return terrno; } - metaReaderReleaseLock(&mr); - for (int32_t j = 0; j < numOfPseudoExpr; ++j) { SExprInfo* pExpr = &pPseudoExpr[j]; - - int32_t dstSlotId = pExpr->base.resSchema.slotId; + int32_t dstSlotId = pExpr->base.resSchema.slotId; SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotId); colInfoDataCleanup(pColInfoData, pBlock->info.rows); @@ -587,6 +590,9 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int } metaReaderClear(&mr); + + // restore the rows + pBlock->info.rows = backupRows; return TSDB_CODE_SUCCESS; } @@ -631,7 +637,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { blockDataCleanup(pBlock); - SDataBlockInfo* pBInfo = &pBlock->info; tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window); @@ -1167,7 +1172,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU pBlock->info.rows = rows; relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); - doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, rows); pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid); } @@ -1638,7 +1643,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - GET_TASKID(pTaskInfo)); + pInfo->pRes->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { blockDataFreeRes((SSDataBlock*)pBlock); T_LONG_JMP(pTaskInfo->env, code); @@ -1648,11 +1653,11 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock if (filter) { doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); } + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes); - return 0; } @@ -4368,7 +4373,7 @@ static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeS // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); + pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -4485,7 +4490,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc // currently only the tbname pseudo column if (pTableScanInfo->pseudoSup.numOfExprs > 0) { int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, - pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); + pTableScanInfo->pseudoSup.numOfExprs, pBlock, pBlock->info.rows, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5dcef7cd17..b9acd36088 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -497,7 +497,7 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -static FORCE_INLINE int32_t getNumOfElems(SqlFunctionCtx* pCtx) { +static int32_t getNumOfElems(SqlFunctionCtx* pCtx) { int32_t numOfElem = 0; /* From be467a10c71ca7fe09cc4aed514930d2d8428413 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 1 Nov 2022 09:36:33 +0800 Subject: [PATCH 17/24] fix: drop index error --- source/libs/parser/src/parTranslater.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index d76ee55ef4..6aaa8cc98f 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5160,7 +5160,7 @@ static int32_t translateCreateIndex(STranslateContext* pCxt, SCreateIndexStmt* p static int32_t translateDropIndex(STranslateContext* pCxt, SDropIndexStmt* pStmt) { SMDropSmaReq dropSmaReq = {0}; SName name; - tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->indexName, &name), dropSmaReq.name); + tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->indexDbName, pStmt->indexName, &name), dropSmaReq.name); dropSmaReq.igNotExists = pStmt->ignoreNotExists; return buildCmdMsg(pCxt, TDMT_MND_DROP_SMA, (FSerializeFunc)tSerializeSMDropSmaReq, &dropSmaReq); } From 6547dbd2a7f41d4ffbcb5543ce281b3249ff5151 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 1 Nov 2022 11:30:44 +0800 Subject: [PATCH 18/24] fix: interp _irowts error --- source/libs/planner/src/planLogicCreater.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 74941d95f1..3a67390ee9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -587,6 +587,8 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt return code; } +static bool isInterpFunc(int32_t funcId) { return fmIsInterpFunc(funcId) || fmIsInterpPseudoColumnFunc(funcId); } + static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (!pSelect->hasInterpFunc) { return TSDB_CODE_SUCCESS; @@ -602,7 +604,7 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p pInterpFunc->node.resultDataOrder = pInterpFunc->node.requireDataOrder; // interp functions and _group_key functions - int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, fmIsVectorFunc, &pInterpFunc->pFuncs); + int32_t code = nodesCollectFuncs(pSelect, SQL_CLAUSE_SELECT, isInterpFunc, &pInterpFunc->pFuncs); if (TSDB_CODE_SUCCESS == code) { code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); } From 973a942d0fbfda4206be8adc331f534d63f7eeca Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 1 Nov 2022 11:43:40 +0800 Subject: [PATCH 19/24] enh: support get meta only from cache --- include/libs/catalog/catalog.h | 3 +++ source/libs/catalog/src/catalog.c | 30 ++++++++++++++++++++--- source/libs/catalog/src/ctgUtil.c | 5 ++++ source/libs/catalog/test/catalogTests.cpp | 20 +++++++++++++++ source/libs/qcom/src/queryUtil.c | 1 + source/libs/scalar/src/filter.c | 20 ++++++++------- source/libs/scalar/src/scalar.c | 4 +++ 7 files changed, 71 insertions(+), 12 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index dfeb68ce43..b957be4267 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -309,6 +309,9 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, bool* pass); +int32_t catalogChkAuthFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, + bool* pass, bool* exists); + int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet* epSet); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index b6a22b5fa7..e66cdb14ce 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -320,7 +320,7 @@ _return: } int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, - bool* pass) { + bool* pass, bool* exists) { bool inCache = false; int32_t code = 0; @@ -329,6 +329,13 @@ int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, co CTG_ERR_RET(ctgChkAuthFromCache(pCtg, (char*)user, (char*)dbFName, type, &inCache, pass)); if (inCache) { + if (exists) { + *exists = true; + } + + return TSDB_CODE_SUCCESS; + } else if (exists) { + *exists = false; return TSDB_CODE_SUCCESS; } @@ -1032,7 +1039,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* switch (tbType) { case TSDB_CHILD_TABLE: { SName stb = name; - strcpy(stb.tname, stbName); + tstrncpy(stb.tname, stbName, sizeof(stb.tname)); ctgRemoveTbMeta(pCtg, &stb); break; } @@ -1373,13 +1380,30 @@ int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user } int32_t code = 0; - CTG_ERR_JRET(ctgChkAuth(pCtg, pConn, user, dbFName, type, pass)); + CTG_ERR_JRET(ctgChkAuth(pCtg, pConn, user, dbFName, type, pass, NULL)); _return: CTG_API_LEAVE(code); } +int32_t catalogChkAuthFromCache(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, + bool* pass, bool* exists) { + CTG_API_ENTER(); + + if (NULL == pCtg || NULL == pConn || NULL == user || NULL == dbFName || NULL == pass || NULL == exists) { + CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t code = 0; + CTG_ERR_JRET(ctgChkAuth(pCtg, pConn, user, dbFName, type, pass, exists)); + +_return: + + CTG_API_LEAVE(code); +} + + int32_t catalogGetServerVersion(SCatalog* pCtg, SRequestConnInfo* pConn, char** pVersion) { CTG_API_ENTER(); diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 4a64a8666e..3d21bbbcd9 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -924,6 +924,11 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* if (1 == vgNum) { void* pIter = taosHashIterate(dbInfo->vgHash, NULL); + if (NULL == pIter) { + ctgError("empty vgHash, db:%s, vgroup number:%d", dbFName, vgNum); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + for (int32_t i = 0; i < tbNum; ++i) { vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo)); if (NULL == vgInfo) { diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 0bdd9841ab..489d174e17 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -2771,10 +2771,30 @@ TEST(apiTest, catalogChkAuth_test) { ASSERT_EQ(code, 0); bool pass = false; + bool exists = false; + code = catalogChkAuthFromCache(pCtg, mockPointer, ctgTestUsername, ctgTestDbname, AUTH_TYPE_READ, &pass, &exists); + ASSERT_EQ(code, 0); + ASSERT_EQ(exists, false); + code = catalogChkAuth(pCtg, mockPointer, ctgTestUsername, ctgTestDbname, AUTH_TYPE_READ, &pass); ASSERT_EQ(code, 0); ASSERT_EQ(pass, true); + while (true) { + uint64_t n = 0; + ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n); + if (n != 1) { + taosMsleep(50); + } else { + break; + } + } + + code = catalogChkAuthFromCache(pCtg, mockPointer, ctgTestUsername, ctgTestDbname, AUTH_TYPE_READ, &pass, &exists); + ASSERT_EQ(code, 0); + ASSERT_EQ(pass, true); + ASSERT_EQ(exists, true); + catalogDestroy(); } diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 1a7965397e..cf064881c2 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -456,6 +456,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) { (*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == (*pDst)->vgHash) { + taosMemoryFreeClear(*pDst); return TSDB_CODE_TSC_OUT_OF_MEMORY; } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 426274a20f..4738e1bbf9 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1087,7 +1087,7 @@ int32_t filterAddUnitImpl(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, if (tmp == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - info->units = tmp; + info->units = (SFilterUnit*)tmp; memset(info->units + psize, 0, sizeof(*info->units) * FILTER_DEFAULT_UNIT_SIZE); } @@ -1633,12 +1633,12 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options) SValueNode *var = (SValueNode *)field->desc; SDataType *dType = &var->node.resType; - if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { - qDebug("VAL%d => [type:TS][val:[%" PRIi64 "] - [%" PRId64 "]]", i, *(int64_t *)field->data, - *(((int64_t *)field->data) + 1)); - } else { + //if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { + // qDebug("VAL%d => [type:TS][val:[%" PRIi64 "] - [%" PRId64 "]]", i, *(int64_t *)field->data, + // *(((int64_t *)field->data) + 1)); + //} else { qDebug("VAL%d => [type:%d][val:%" PRIx64 "]", i, dType->type, var->datum.i); // TODO - } + //} } else if (field->data) { qDebug("VAL%d => [type:NIL][val:NIL]", i); // TODO } @@ -4059,11 +4059,13 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, SColumnInfoData **p, SC SArray *pList = taosArrayInit(1, POINTER_BYTES); taosArrayPush(pList, &pSrc); - FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output)); - *p = output.columnData; - + int32_t code = scalarCalculate(info->sclCtx.node, pList, &output); taosArrayDestroy(pList); + FLT_ERR_RET(code); + + *p = output.columnData; + if (output.numOfQualified == output.numOfRows) { *pResultStatus = FILTER_RESULT_ALL_QUALIFIED; } else if (output.numOfQualified == 0) { diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 70d1bb2685..ea1ce175e3 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -896,6 +896,10 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen)); SCL_ERR_JRET(sclGetNodeRes(pWhenThen->pThen, ctx, &pThen)); + if (NULL == pWhen || NULL == pThen) { + sclError("invalid when/then in whenThen list"); + SCL_ERR_JRET(TSDB_CODE_INVALID_PARA); + } if (pCase) { vectorCompare(pCase, pWhen, &comp, TSDB_ORDER_ASC, OP_TYPE_EQUAL); From 1b179f65b2fccb50b126315979e45680eb5d9873 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 1 Nov 2022 13:34:10 +0800 Subject: [PATCH 20/24] fix: interp _irowts error --- include/libs/function/functionMgt.h | 1 + source/libs/function/src/functionMgt.c | 7 +++++++ source/libs/planner/src/planLogicCreater.c | 4 +++- 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 6968f1712c..a50af18b44 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -216,6 +216,7 @@ bool fmIsMultiRowsFunc(int32_t funcId); bool fmIsKeepOrderFunc(int32_t funcId); bool fmIsCumulativeFunc(int32_t funcId); bool fmIsInterpPseudoColumnFunc(int32_t funcId); +bool fmIsGroupKeyFunc(int32_t funcId); void getLastCacheDataType(SDataType* pType); diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index bae005f5c4..0de914271c 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -251,6 +251,13 @@ bool fmIsSelectValueFunc(int32_t funcId) { return FUNCTION_TYPE_SELECT_VALUE == funcMgtBuiltins[funcId].type; } +bool fmIsGroupKeyFunc(int32_t funcId) { + if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { + return false; + } + return FUNCTION_TYPE_GROUP_KEY == funcMgtBuiltins[funcId].type; +} + void fmFuncMgtDestroy() { void* m = gFunMgtService.pFuncNameHashTable; if (m != NULL && atomic_val_compare_exchange_ptr((void**)&gFunMgtService.pFuncNameHashTable, m, 0) == m) { diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 3a67390ee9..275983f869 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -587,7 +587,9 @@ static int32_t createIndefRowsFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt return code; } -static bool isInterpFunc(int32_t funcId) { return fmIsInterpFunc(funcId) || fmIsInterpPseudoColumnFunc(funcId); } +static bool isInterpFunc(int32_t funcId) { + return fmIsInterpFunc(funcId) || fmIsInterpPseudoColumnFunc(funcId) || fmIsGroupKeyFunc(funcId); +} static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (!pSelect->hasInterpFunc) { From 7a8c2efbc3330212caffd35a3159dafc3ee3bcb4 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 1 Nov 2022 13:44:04 +0800 Subject: [PATCH 21/24] fix: add jemalloc as build dependency of taosd, udf and shell (#17785) * fix: jemalloc compile error * fix: jemalloc compile error * fix: add jemalloc as dependency of taosd and udf Co-authored-by: afwerar <1296468573@qq.com> --- source/dnode/mgmt/CMakeLists.txt | 16 +++++++++- source/libs/function/CMakeLists.txt | 48 +++++++++++++++++++++++------ tools/shell/CMakeLists.txt | 10 +++++- 3 files changed, 63 insertions(+), 11 deletions(-) diff --git a/source/dnode/mgmt/CMakeLists.txt b/source/dnode/mgmt/CMakeLists.txt index 423028b167..762b8fd529 100644 --- a/source/dnode/mgmt/CMakeLists.txt +++ b/source/dnode/mgmt/CMakeLists.txt @@ -13,4 +13,18 @@ target_include_directories( taosd PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/node_mgmt/inc" ) -target_link_libraries(taosd dnode) + +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc) + SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc") +ELSE () + SET(LINK_JEMALLOC "") +ENDIF () + +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEPENDENCIES(taosd jemalloc) + target_link_libraries(taosd dnode ${LINK_JEMALLOC}) +ELSE () + target_link_libraries(taosd dnode) +ENDIF () + diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index 913dd24a49..fa241dc6ef 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -12,6 +12,17 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc) + SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc") +ELSE () + SET(LINK_JEMALLOC "") +ENDIF () + +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEPENDENCIES(function jemalloc) +ENDIF () + target_link_libraries( function PRIVATE os @@ -21,7 +32,7 @@ target_link_libraries( PRIVATE qcom PRIVATE scalar PRIVATE transport - PRIVATE stream + PRIVATE stream ${LINK_JEMALLOC} PUBLIC uv_a ) @@ -37,10 +48,15 @@ target_include_directories( "${TD_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) + +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEPENDENCIES(runUdf jemalloc) +ENDIF () + target_link_libraries( runUdf PUBLIC uv_a - PRIVATE os util common nodes function + PRIVATE os util common nodes function ${LINK_JEMALLOC} ) add_library(udf1 STATIC MODULE test/udf1.c) @@ -54,8 +70,13 @@ target_include_directories( "${TD_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) + +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEPENDENCIES(udf1 jemalloc) +ENDIF () + target_link_libraries( - udf1 PUBLIC os) + udf1 PUBLIC os ${LINK_JEMALLOC}) add_library(udf2 STATIC MODULE test/udf2.c) target_include_directories( @@ -68,8 +89,13 @@ target_include_directories( "${TD_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) + +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEPENDENCIES(udf2 jemalloc) +ENDIF () + target_link_libraries( - udf2 PUBLIC os + udf2 PUBLIC os ${LINK_JEMALLOC} ) #SET(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR}/build/bin) @@ -86,9 +112,13 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) -target_link_libraries( - udfd - PUBLIC uv_a - PRIVATE os util common nodes function -) +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEPENDENCIES(udfd jemalloc) +ENDIF () + +target_link_libraries( + udfd + PUBLIC uv_a + PRIVATE os util common nodes function ${LINK_JEMALLOC} + ) diff --git a/tools/shell/CMakeLists.txt b/tools/shell/CMakeLists.txt index 31dcde036d..1e7d0ed140 100644 --- a/tools/shell/CMakeLists.txt +++ b/tools/shell/CMakeLists.txt @@ -2,6 +2,14 @@ aux_source_directory(src SHELL_SRC) add_executable(shell ${SHELL_SRC}) +IF (TD_LINUX_64 AND JEMALLOC_ENABLED) + ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc) + SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc") + ADD_DEPENDENCIES(shell jemalloc) +ELSE () + SET(LINK_JEMALLOC "") +ENDIF () + IF (TD_LINUX AND TD_WEBSOCKET) ADD_DEFINITIONS(-DWEBSOCKET -I${CMAKE_BINARY_DIR}/build/include -ltaosws) SET(LINK_WEBSOCKET "-L${CMAKE_BINARY_DIR}/build/lib -ltaosws") @@ -21,7 +29,7 @@ ENDIF () if(TD_WINDOWS) target_link_libraries(shell PUBLIC taos_static ${LINK_WEBSOCKET}) else() - target_link_libraries(shell PUBLIC taos ${LINK_WEBSOCKET}) + target_link_libraries(shell PUBLIC taos ${LINK_WEBSOCKET} ${LINK_JEMALLOC}) endif () target_link_libraries( shell From 83495ed5d3a22e2fe6209cf3456f1e17370bb934 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 1 Nov 2022 13:49:02 +0800 Subject: [PATCH 22/24] fix(query): set the correct buffer to accommodate the tag values. --- source/common/src/tdatablock.c | 4 ++++ source/dnode/vnode/src/tsdb/tsdbRead.c | 8 ++------ source/libs/executor/src/executorimpl.c | 4 ++++ source/libs/executor/src/scanoperator.c | 10 +++++++--- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 3afa91f2f9..86b851877f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1132,6 +1132,10 @@ void blockDataCleanup(SSDataBlock* pDataBlock) { pDataBlock->info.window.ekey = 0; pDataBlock->info.window.skey = 0; + if (pDataBlock->info.capacity == 0) { + return; + } + size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock); for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 2b3eff0297..2967a96453 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3837,7 +3837,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); - int64_t stime = taosGetTimestampUs(); +// int64_t stime = taosGetTimestampUs(); SBlockLoadSuppInfo* pSup = &pReader->suppInfo; @@ -3887,14 +3887,10 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS } } - double elapsed = (taosGetTimestampUs() - stime) / 1000.0; - pReader->cost.smaLoadTime += elapsed; pReader->cost.smaDataLoad += 1; - *pBlockStatis = pSup->plist; - tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", elapsed time:%.2f ms, %s", 0, pFBlock->uid, - elapsed, pReader->idStr); + tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr); return code; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5b26005a19..286885d018 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -574,6 +574,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc // if the source equals to the destination, it is to create a new column as the result of scalar // function or some operators. bool createNewColModel = (pResult == pSrcBlock); + if (createNewColModel) { + blockDataEnsureCapacity(pResult, pResult->info.rows); + } int32_t numOfRows = 0; @@ -623,6 +626,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; ASSERT(pResult->info.capacity > 0); + colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); colDataDestroy(&idata); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b3226f292a..c731823392 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -424,8 +424,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - // NOTE: here the tag value only load for one row - ensureBlockCapacity(pBlock, 1); + if (pTableScanInfo->pseudoSup.numOfExprs > 0) { + ensureBlockCapacity(pBlock, pBlock->info.rows); + } doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); pCost->skipBlocks += 1; return TSDB_CODE_SUCCESS; @@ -436,7 +437,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); - ensureBlockCapacity(pBlock, 1); + if (pTableScanInfo->pseudoSup.numOfExprs > 0) { + ensureBlockCapacity(pBlock, pBlock->info.rows); + } + doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1); return TSDB_CODE_SUCCESS; } else { From f4e7f3da5e8bb8a7f8a509f7ab23864776f8fbb3 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 1 Nov 2022 14:05:54 +0800 Subject: [PATCH 23/24] test: comment out unstable case --- tests/script/jenkins/basic.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 6328124b0d..f1340a6968 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -321,7 +321,7 @@ # --- vnode ---- ./test.sh -f tsim/vnode/replica3_basic.sim -./test.sh -f tsim/vnode/replica3_repeat.sim +# TD-20089 ./test.sh -f tsim/vnode/replica3_repeat.sim ./test.sh -f tsim/vnode/replica3_vgroup.sim ./test.sh -f tsim/vnode/replica3_many.sim ./test.sh -f tsim/vnode/replica3_import.sim From 20539ae04b50bf344ecccc1b439b0506a9a7662c Mon Sep 17 00:00:00 2001 From: Hui Li <52318143+plum-lihui@users.noreply.github.com> Date: Tue, 1 Nov 2022 15:45:55 +0800 Subject: [PATCH 24/24] Update tmqDnodeRestart1.py --- tests/system-test/7-tmq/tmqDnodeRestart1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/7-tmq/tmqDnodeRestart1.py b/tests/system-test/7-tmq/tmqDnodeRestart1.py index 054afe4fef..714f10b362 100644 --- a/tests/system-test/7-tmq/tmqDnodeRestart1.py +++ b/tests/system-test/7-tmq/tmqDnodeRestart1.py @@ -21,7 +21,7 @@ class TDTestCase: self.ctbNum = 100 self.rowsPerTbl = 1000 - def init(self, conn, logSql): + def init(self, conn, logSql, replicaVar=1): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), False)