From 0c111cd2bf7a1a600f5037b45a622e8c6cb1d55d Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 29 Jul 2022 18:15:44 +0800 Subject: [PATCH 01/15] fix: update schema to newest version to parsing rows --- source/dnode/vnode/src/tsdb/tsdbRead.c | 122 +++++++++++++------------ 1 file changed, 63 insertions(+), 59 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 03985654f8..0eec715bd1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -80,15 +80,15 @@ typedef struct SFilesetIter { typedef struct SFileDataBlockInfo { // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it uint64_t uid; - int32_t tbBlockIdx; + int32_t tbBlockIdx; } SFileDataBlockInfo; typedef struct SDataBlockIter { - int32_t numOfBlocks; - int32_t index; - SArray* blockList; // SArray - int32_t order; - SBlock block; // current SBlock data + int32_t numOfBlocks; + int32_t index; + SArray* blockList; // SArray + int32_t order; + SBlock block; // current SBlock data SHashObj* pTableMap; } SDataBlockIter; @@ -124,8 +124,8 @@ struct STsdbReader { SSDataBlock* pResBlock; int32_t capacity; SReaderStatus status; - char* idStr; // query info handle, for debug purpose - int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows + char* idStr; // query info handle, for debug purpose + int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; SIOCostSummary cost; @@ -133,8 +133,8 @@ struct STsdbReader { SDataFReader* pFileReader; SVersionRange verRange; - int32_t step; - STsdbReader* innerReader[2]; + int32_t step; + STsdbReader* innerReader[2]; }; static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); @@ -211,8 +211,8 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK pTsdbReader->idStr); } - tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables, (sizeof(STableBlockScanInfo)*numOfTables)/1024.0, - pTsdbReader->idStr); + tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, %s", pTsdbReader, numOfTables, + (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->idStr); return pTableMap; } @@ -396,7 +396,8 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) return pResBlock; } -static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, const char* idstr) { +static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity, + const char* idstr) { int32_t code = 0; int8_t level = 0; STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader)); @@ -576,7 +577,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, int64_t et2 = taosGetTimestampUs(); tsdbDebug("load block index for %d tables completed, elapsed time:%.2f ms, set blockIdx:%.2f ms, size:%.2f Kb %s", - (int32_t)num, (et1 - st)/1000.0, (et2-et1)/1000.0, num * sizeof(SBlockIdx)/1024.0, pReader->idStr); + (int32_t)num, (et1 - st) / 1000.0, (et2 - et1) / 1000.0, num * sizeof(SBlockIdx) / 1024.0, pReader->idStr); pReader->cost.headFileLoadTime += (et1 - st) / 1000.0; @@ -591,7 +592,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ *numOfValidTables = 0; int64_t st = taosGetTimestampUs(); - size_t size = 0; + size_t size = 0; STableBlockScanInfo* px = NULL; while (1) { @@ -641,9 +642,9 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_ } } - double el = (taosGetTimestampUs() - st)/1000.0; + double el = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("load block of %d tables completed, blocks:%d in %d tables, size:%.2f Kb, elapsed time:%.2f ms %s", - numOfTables, *numOfBlocks, *numOfValidTables, size/1000.0, el, pReader->idStr); + numOfTables, *numOfBlocks, *numOfValidTables, size / 1000.0, el, pReader->idStr); pReader->cost.numOfBlocks += (*numOfBlocks); pReader->cost.headFileLoadTime += el; @@ -679,9 +680,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { return pFBlockInfo; } -static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { - return &pBlockIter->block; -} +static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { SReaderStatus* pStatus = &pReader->status; @@ -689,7 +688,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn SBlockData* pBlockData = &pStatus->fileBlockData; SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - SBlock* pBlock = getCurrentBlock(pBlockIter); + SBlock* pBlock = getCurrentBlock(pBlockIter); SSDataBlock* pResBlock = pReader->pResBlock; int32_t numOfCols = blockDataGetNumOfCols(pResBlock); @@ -772,17 +771,17 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI int64_t st = taosGetTimestampUs(); SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - SBlock* pBlock = getCurrentBlock(pBlockIter); + SBlock* pBlock = getCurrentBlock(pBlockIter); - SSDataBlock* pResBlock = pReader->pResBlock; - int32_t numOfCols = blockDataGetNumOfCols(pResBlock); + SSDataBlock* pResBlock = pReader->pResBlock; + int32_t numOfCols = blockDataGetNumOfCols(pResBlock); SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid}; - int32_t code = tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols, - pBlockData, NULL, NULL); + SBlockIdx blockIdx = {.suid = pReader->suid, .uid = pBlockScanInfo->uid}; + int32_t code = + tsdbReadColData(pReader->pFileReader, &blockIdx, pBlock, pSupInfo->colIds, numOfCols, pBlockData, NULL, NULL); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -857,7 +856,7 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v } static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); @@ -882,7 +881,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte int64_t st = taosGetTimestampUs(); SBlockOrderSupporter sup = {0}; - int32_t code = initBlockOrderSupporter(&sup, numOfTables); + int32_t code = initBlockOrderSupporter(&sup, numOfTables); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -937,8 +936,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } int64_t et = taosGetTimestampUs(); - tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", pReader, cnt, - (et - st)/1000.0, pReader->idStr); + tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", + pReader, cnt, (et - st) / 1000.0, pReader->idStr); pBlockIter->index = asc ? 0 : (numOfBlocks - 1); cleanupBlockOrderSupporter(&sup); @@ -976,7 +975,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte } int64_t et = taosGetTimestampUs(); - tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et-st)/1000.0, pReader->idStr); + tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, cnt, (et - st) / 1000.0, + pReader->idStr); cleanupBlockOrderSupporter(&sup); taosMemoryFree(pTree); @@ -1024,7 +1024,7 @@ static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STab int32_t step = asc ? 1 : -1; *nextIndex = pFBlockInfo->tbBlockIdx + step; - SBlock *pBlock = taosMemoryCalloc(1, sizeof(SBlock)); + SBlock* pBlock = taosMemoryCalloc(1, sizeof(SBlock)); int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock); @@ -1196,10 +1196,10 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* setComposedBlockFlag(pReader, true); double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; - tsdbDebug( - "%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64 - " - %" PRId64 " %s", - pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr); + tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange: %" PRId64 + " - %" PRId64 " %s", + pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, + pReader->idStr); pReader->cost.buildmemBlock += elapsedTime; return code; @@ -1499,9 +1499,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader, STableBlockScanInfo* setComposedBlockFlag(pReader, true); int64_t et = taosGetTimestampUs(); - tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%d, elapsed time:%.2f ms %s", pReader, - pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, - (et - st)/1000.0, pReader->idStr); + tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 + " rows:%d, elapsed time:%.2f ms %s", + pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, + pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -1679,7 +1680,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) { SReaderStatus* pStatus = &pReader->status; - size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); + size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBlockIdx)); while (1) { @@ -2212,7 +2213,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc CHECK_FILEBLOCK_STATE st; SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); - SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter); + SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter); checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st); if (st == CHECK_FILEBLOCK_QUIT) { break; @@ -2228,7 +2229,7 @@ void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { if (pReader->pSchema == NULL) { metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema); - } else if (pReader->pSchema->version != sversion) { + } else if (pReader->pSchema->version < sversion) { taosMemoryFreeClear(pReader->pSchema); metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema); } @@ -2466,7 +2467,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { // update the SQueryTableDataCond to create inner reader STimeWindow w = pCond->twindows; - int32_t order = pCond->order; + int32_t order = pCond->order; if (order == TSDB_ORDER_ASC) { pCond->twindows.ekey = pCond->twindows.skey; pCond->twindows.skey = INT64_MIN; @@ -2485,7 +2486,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl if (order == TSDB_ORDER_ASC) { pCond->twindows.skey = w.ekey; pCond->twindows.ekey = INT64_MAX; - } else { + } else { pCond->twindows.skey = INT64_MIN; pCond->twindows.ekey = w.ekey; } @@ -2533,10 +2534,11 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl } } } else { - STsdbReader* pPrevReader = pReader->innerReader[0]; + STsdbReader* pPrevReader = pReader->innerReader[0]; SDataBlockIter* pBlockIter = &pPrevReader->status.blockIter; - initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, pPrevReader->idStr); + initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order, + pPrevReader->idStr); resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap); // no data in files, let's try buffer in memory @@ -2591,12 +2593,14 @@ void tsdbReaderClose(STsdbReader* pReader) { SIOCostSummary* pCost = &pReader->cost; - tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%"PRId64" SMA-time:%.2f ms, " - "fileBlocks:%"PRId64", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo " - "size:%.2f Kb %s", + tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 + " SMA-time:%.2f ms, " + "fileBlocks:%" PRId64 + ", fileBlocks-time:%.2f ms, build in-memory-block-time:%.2f ms, STableBlockScanInfo " + "size:%.2f Kb %s", pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime, pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, - numOfTables * sizeof(STableBlockScanInfo) /1000.0, pReader->idStr); + numOfTables * sizeof(STableBlockScanInfo) / 1000.0, pReader->idStr); taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->pSchema); @@ -2675,9 +2679,9 @@ static void setBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) { if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { - if (pReader->step == EXTERNAL_ROWS_MAIN) { + if (pReader->step == EXTERNAL_ROWS_MAIN) { setBlockInfo(pReader, pDataBlockInfo); - } else if (pReader->step == EXTERNAL_ROWS_PREV) { + } else if (pReader->step == EXTERNAL_ROWS_PREV) { setBlockInfo(pReader->innerReader[0], pDataBlockInfo); } else { setBlockInfo(pReader->innerReader[1], pDataBlockInfo); @@ -2691,7 +2695,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS int32_t code = 0; *allHave = false; - if(pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { + if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { *pBlockStatis = NULL; return TSDB_CODE_SUCCESS; } @@ -2702,7 +2706,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS return TSDB_CODE_SUCCESS; } - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); + SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); int64_t stime = taosGetTimestampUs(); @@ -2807,7 +2811,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { } pReader->order = pCond->order; - pReader->type = TIMEWINDOW_RANGE_CONTAINED; + pReader->type = TIMEWINDOW_RANGE_CONTAINED; pReader->status.loadFromFile = true; pReader->status.pTableIter = NULL; pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); @@ -2826,7 +2830,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap); resetDataBlockScanInfo(pReader->status.pTableMap); - int32_t code = 0; + int32_t code = 0; SDataBlockIter* pBlockIter = &pReader->status.blockIter; // no data in files, let's try buffer in memory @@ -2835,8 +2839,8 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { } else { code = initForFirstBlockInFile(pReader, pBlockIter); if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", - pReader, numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); + tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, + numOfTables, pReader->window.skey, pReader->window.ekey, pReader->idStr); return code; } } From 4de8d28f639b502b038247d01d2fca8648c7acbc Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 29 Jul 2022 18:35:23 +0800 Subject: [PATCH 02/15] fix: fix ucs4 convertion issue --- include/os/osString.h | 2 + source/client/src/clientEnv.c | 2 + source/client/src/clientHb.c | 1 + source/client/src/clientMain.c | 2 + source/libs/scheduler/src/schJob.c | 2 +- source/os/src/osString.c | 68 ++++++++++++++++++++++++++++-- 6 files changed, 72 insertions(+), 5 deletions(-) diff --git a/include/os/osString.h b/include/os/osString.h index 3a4ff18694..74d5e52a1b 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -62,6 +62,8 @@ typedef int32_t TdUcs4; int32_t taosUcs4len(TdUcs4 *ucs4); int64_t taosStr2int64(const char *str); +int32_t taosConvInit(int32_t maxNum); +void taosConvDestroy(); int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs); bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len); int32_t tasoUcs4Compare(TdUcs4 *f1_ucs4, TdUcs4 *f2_ucs4, int32_t bytes); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 893aa39ce3..c012533056 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -361,6 +361,8 @@ void taos_init_imp(void) { initQueryModuleMsgHandle(); + taosConvInit(256); + rpcInit(); SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 3d6cf1c626..06bd3f3887 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -353,6 +353,7 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { desc.subDesc = NULL; desc.subPlanNum = 0; } + desc.subPlanNum = taosArrayGetSize(desc.subDesc); ASSERT(desc.subPlanNum == taosArrayGetSize(desc.subDesc)); } else { desc.subDesc = NULL; diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index b04b4ea2aa..a4eaf057d1 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -75,6 +75,8 @@ void taos_cleanup(void) { cleanupTaskQueue(); + taosConvDestroy(); + tscInfo("all local resources released"); taosCleanupCfg(); taosCloseLog(); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 46140ccdff..82146c3741 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -278,7 +278,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { } SHashObj *planToTask = taosHashInit( - SCHEDULE_DEFAULT_MAX_TASK_NUM, + pDag->numOfSubplans, taosGetDefaultHashFunction(POINTER_BYTES == sizeof(int64_t) ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (NULL == planToTask) { diff --git a/source/os/src/osString.c b/source/os/src/osString.c index 32f0fdf7b3..d6b0bafe8f 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -134,21 +134,81 @@ int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs) { #endif } +typedef struct { + iconv_t conv; + int8_t inUse; +} SConv; + +SConv *gConv = NULL; +int32_t convUsed = 0; +int32_t gConvMaxNum = 0; + +int32_t taosConvInit(int32_t maxNum) { + gConvMaxNum = maxNum * 2; + gConv = taosMemoryCalloc(gConvMaxNum, sizeof(SConv)); + for (int32_t i = 0; i < gConvMaxNum; ++i) { + gConv[i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + } + + return 0; +} + +void taosConvDestroy() { + for (int32_t i = 0; i < gConvMaxNum; ++i) { + iconv_close(gConv[i].conv); + } + taosMemoryFreeClear(gConv); +} + +void taosAcquireConv(int32_t *idx) { + while (true) { + int32_t used = atomic_add_fetch_32(&convUsed, 1); + if (used > gConvMaxNum) { + used = atomic_sub_fetch_32(&convUsed, 1); + sched_yield(); + continue; + } + + break; + } + + int32_t startId = taosGetSelfPthreadId() % gConvMaxNum; + while (true) { + if (gConv[startId].inUse) { + startId = (startId + 1) % gConvMaxNum; + continue; + } + + int8_t old = atomic_val_compare_exchange_8(&gConv[startId].inUse, 0, 1); + if (0 == old) { + break; + } + } + + *idx = startId; +} + +void taosReleaseConv(int32_t idx) { + atomic_store_8(&gConv[idx].inUse, 0); +} + bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len) { #ifdef DISALLOW_NCHAR_WITHOUT_ICONV printf("Nchar cannot be read and written without iconv, please install iconv library and recompile TDengine.\n"); return -1; #else memset(ucs4, 0, ucs4_max_len); - iconv_t cd = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + + int32_t idx = 0; + taosAcquireConv(&idx); size_t ucs4_input_len = mbsLength; size_t outLeft = ucs4_max_len; - if (iconv(cd, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) { - iconv_close(cd); + if (iconv(gConv[idx].conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) { + taosReleaseConv(idx); return false; } - iconv_close(cd); + taosReleaseConv(idx); if (len != NULL) { *len = (int32_t)(ucs4_max_len - outLeft); if (*len < 0) { From c2e4110ae1f43fad70fded02031408d7533c88b6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 29 Jul 2022 19:49:45 +0800 Subject: [PATCH 03/15] fix: fix mbs2ucs4 issue --- source/os/src/osString.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/source/os/src/osString.c b/source/os/src/osString.c index d6b0bafe8f..0642bd768b 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -161,6 +161,13 @@ void taosConvDestroy() { } void taosAcquireConv(int32_t *idx) { + if (0 == gConvMaxNum) { + gConv = taosMemoryCalloc(1, sizeof(SConv)); + gConv[0].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + *idx = 0; + return; + } + while (true) { int32_t used = atomic_add_fetch_32(&convUsed, 1); if (used > gConvMaxNum) { @@ -189,6 +196,12 @@ void taosAcquireConv(int32_t *idx) { } void taosReleaseConv(int32_t idx) { + if (0 == gConvMaxNum) { + iconv_close(gConv[0].conv); + taosMemoryFreeClear(gConv); + return; + } + atomic_store_8(&gConv[idx].inUse, 0); } From e3b73ba5849f9e7ecf4366a7ae0687044224f695 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 30 Jul 2022 08:34:16 +0800 Subject: [PATCH 04/15] fix: fix release conv handle issue --- source/os/src/osString.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/os/src/osString.c b/source/os/src/osString.c index 0642bd768b..329b039084 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -203,6 +203,7 @@ void taosReleaseConv(int32_t idx) { } atomic_store_8(&gConv[idx].inUse, 0); + atomic_sub_fetch_32(&convUsed, 1); } bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len) { From 9fbefbb16748355b58842980dc2bb83666d17f3c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 30 Jul 2022 09:00:21 +0800 Subject: [PATCH 05/15] fix: fix task queue thread issue --- source/common/src/tglobal.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 0cc4e31aed..ba7f9f93c1 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -49,7 +49,7 @@ int32_t tsNumOfShmThreads = 1; // queue & threads int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfCommitThreads = 2; -int32_t tsNumOfTaskQueueThreads = 1; +int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; @@ -317,9 +317,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; - tsNumOfTaskQueueThreads = tsNumOfCores / 4; - tsNumOfTaskQueueThreads = TRANGE(tsNumOfTaskQueueThreads, 1, 2); - if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 1, 1024, 0) != 0) return -1; + tsNumOfTaskQueueThreads = tsNumOfCores / 2; + tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); + if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, 0) != 0) return -1; return 0; } From 2f1c9b3c1673dc6db848775cfa3936e9025a44dc Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 30 Jul 2022 09:50:19 +0800 Subject: [PATCH 06/15] enh: add local forbidden cfg processing --- include/common/tglobal.h | 1 + source/common/src/tglobal.c | 14 ++++++++++++++ source/libs/command/src/command.c | 7 +++++++ 3 files changed, 22 insertions(+) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 57d1199e17..9111728e1a 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -146,6 +146,7 @@ struct SConfig *taosGetCfg(); void taosSetAllDebugFlag(int32_t flag); void taosSetDebugFlag(int32_t *pFlagPtr, const char *flagName, int32_t flagVal); int32_t taosSetCfg(SConfig *pCfg, char *name); +void taosLocalCfgForbiddenToChange(char* name, bool* forbidden); #ifdef __cplusplus } diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ba7f9f93c1..6f4a3060ed 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -594,6 +594,20 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { return 0; } +void taosLocalCfgForbiddenToChange(char* name, bool* forbidden) { + int32_t len = strlen(name); + char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; + strntolower(lowcaseName, name, TMIN(CFG_NAME_MAX_LEN, len)); + + if (strcasecmp("charset", name) == 0) { + *forbidden = true; + return; + } + + *forbidden = false; +} + + int32_t taosSetCfg(SConfig *pCfg, char *name) { int32_t len = strlen(name); char lowcaseName[CFG_NAME_MAX_LEN + 1] = {0}; diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 7c95e71823..a76b457422 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -517,6 +517,13 @@ static int32_t execAlterLocal(SAlterLocalStmt* pStmt) { goto _return; } + bool forbidden = false; + taosLocalCfgForbiddenToChange(pStmt->config, &forbidden); + if (forbidden) { + terrno = TSDB_CODE_OPS_NOT_SUPPORT; + return terrno; + } + if (cfgSetItem(tsCfg, pStmt->config, pStmt->value, CFG_STYPE_ALTER_CMD)) { return terrno; } From 80a32d623535afdf99111c8cf0debafcbe127998 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 30 Jul 2022 11:31:05 +0800 Subject: [PATCH 07/15] fix: fix mbs2ucs4 crash issue in taosd --- include/os/osString.h | 1 - source/client/src/clientEnv.c | 2 -- source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 1 + source/os/src/osString.c | 14 +++++++------- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/include/os/osString.h b/include/os/osString.h index 74d5e52a1b..2a22c6d8ff 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -62,7 +62,6 @@ typedef int32_t TdUcs4; int32_t taosUcs4len(TdUcs4 *ucs4); int64_t taosStr2int64(const char *str); -int32_t taosConvInit(int32_t maxNum); void taosConvDestroy(); int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs); bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index c012533056..893aa39ce3 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -361,8 +361,6 @@ void taos_init_imp(void) { initQueryModuleMsgHandle(); - taosConvInit(256); - rpcInit(); SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index c2e8a55271..582b16ce99 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -215,6 +215,7 @@ void dmCleanupDnode(SDnode *pDnode) { dmClearVars(pDnode); rpcCleanup(); indexCleanup(); + taosConvDestroy(); dDebug("dnode is closed, ptr:%p", pDnode); } diff --git a/source/os/src/osString.c b/source/os/src/osString.c index 329b039084..503b8e0c6b 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -143,14 +143,17 @@ SConv *gConv = NULL; int32_t convUsed = 0; int32_t gConvMaxNum = 0; -int32_t taosConvInit(int32_t maxNum) { - gConvMaxNum = maxNum * 2; +void taosConvInitImpl(void) { + gConvMaxNum = 512; gConv = taosMemoryCalloc(gConvMaxNum, sizeof(SConv)); for (int32_t i = 0; i < gConvMaxNum; ++i) { gConv[i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); } +} - return 0; +static TdThreadOnce convInit = PTHREAD_ONCE_INIT; +void taosConvInit() { + taosThreadOnce(&convInit, taosConvInitImpl); } void taosConvDestroy() { @@ -162,10 +165,7 @@ void taosConvDestroy() { void taosAcquireConv(int32_t *idx) { if (0 == gConvMaxNum) { - gConv = taosMemoryCalloc(1, sizeof(SConv)); - gConv[0].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); - *idx = 0; - return; + taosConvInit(); } while (true) { From 56d26973080fd6da11b594862138e81baaf9378e Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Sat, 30 Jul 2022 12:46:40 +0800 Subject: [PATCH 08/15] fix: new tRowMergerInit2 for ts row merging --- source/dnode/vnode/src/inc/tsdb.h | 3 + source/dnode/vnode/src/tsdb/tsdbRead.c | 50 ++++++++---- source/dnode/vnode/src/tsdb/tsdbUtil.c | 108 ++++++++++++++++++++++--- 3 files changed, 135 insertions(+), 26 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d8c84e952b..bbf100cf3a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -90,6 +90,9 @@ int32_t tsdbRowCmprFn(const void *p1, const void *p2); void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); SColVal *tRowIterNext(SRowIter *pIter); // SRowMerger +int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema); +int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); + int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema); void tRowMergerClear(SRowMerger *pMerger); int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0eec715bd1..9545ded927 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -143,7 +143,7 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); -static int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, +static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader); static int32_t doAppendOneRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow); static void setComposedBlockFlag(STsdbReader* pReader, bool composed); @@ -1230,7 +1230,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMerge(&merge, pRow); - doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMergerGetRow(&merge, &pTSRow); } @@ -1246,7 +1246,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* updateSchema(pRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(pIter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMerge(&merge, &fRow); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); @@ -1291,12 +1291,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (ik.ts == key) { tRowMerge(&merge, piRow); - doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader); } if (k.ts == key) { tRowMerge(&merge, pRow); - doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader); } tRowMergerGetRow(&merge, &pTSRow); @@ -1336,11 +1336,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* updateSchema(pRow, uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(&pBlockScanInfo->iter, key, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader); if (ik.ts == k.ts) { tRowMerge(&merge, piRow); - doMergeRowsInBuf(&pBlockScanInfo->iiter, key, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, uid, key, pBlockScanInfo->delSkyline, &merge, pReader); } if (k.ts == key) { @@ -2097,7 +2097,8 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea } } -int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) { +int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, + STsdbReader* pReader) { while (1) { pIter->hasVal = tsdbTbDataIterNext(pIter->iter); if (!pIter->hasVal) { @@ -2116,7 +2117,15 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, int64_t ts, SArray* pDelList, SRowMer break; } - tRowMerge(pMerger, pRow); + int32_t sversion = TSDBROW_SVERSION(pRow); + STSchema* pTSchema = NULL; + if (sversion != pReader->pSchema->version) { + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema); + } else { + pTSchema = pReader->pSchema; + } + + tRowMergerAdd(pMerger, pRow, pTSchema); } return TSDB_CODE_SUCCESS; @@ -2229,7 +2238,7 @@ void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { if (pReader->pSchema == NULL) { metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema); - } else if (pReader->pSchema->version < sversion) { + } else if (pReader->pSchema->version != sversion) { taosMemoryFreeClear(pReader->pSchema); metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pSchema); } @@ -2240,10 +2249,17 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe SRowMerger merge = {0}; TSDBKEY k = TSDBROW_KEY(pRow); - updateSchema(pRow, uid, pReader); + // updateSchema(pRow, uid, pReader); + int32_t sversion = TSDBROW_SVERSION(pRow); + STSchema* pTSchema = NULL; + if (sversion != pReader->pSchema->version) { + metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pTSchema); + } else { + pTSchema = pReader->pSchema; + } - tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(pIter, k.ts, pDelList, &merge, pReader); + tRowMergerInit2(&merge, pReader->pSchema, pRow, pTSchema); + doMergeRowsInBuf(pIter, uid, k.ts, pDelList, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); tRowMergerClear(&merge); } @@ -2259,18 +2275,18 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo updateSchema(piRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, piRow, pReader->pSchema); - doMergeRowsInBuf(&pBlockScanInfo->iiter, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMerge(&merge, pRow); - doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } else { updateSchema(pRow, pBlockScanInfo->uid, pReader); tRowMergerInit(&merge, pRow, pReader->pSchema); - doMergeRowsInBuf(&pBlockScanInfo->iter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); tRowMerge(&merge, piRow); - doMergeRowsInBuf(&pBlockScanInfo->iiter, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); + doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } tRowMergerGetRow(&merge, pTSRow); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 3e05b75dd0..211a330599 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "tdataformat.h" #include "tsdb.h" // SMapData ======================================================================= @@ -568,6 +569,95 @@ SColVal *tRowIterNext(SRowIter *pIter) { } // SRowMerger ====================================================== + +int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + TSDBKEY key = TSDBROW_KEY(pRow); + SColVal *pColVal = &(SColVal){0}; + STColumn *pTColumn; + int32_t iCol = 0; + + pMerger->pTSchema = pResTSchema; + pMerger->version = key.version; + + pMerger->pArray = taosArrayInit(pResTSchema->numOfCols, sizeof(SColVal)); + if (pMerger->pArray == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // ts + pTColumn = &pTSchema->columns[iCol++]; + + ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); + + *pColVal = COL_VAL_VALUE(pTColumn->colId, pTColumn->type, (SValue){.ts = key.ts}); + if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + // other + for (; iCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) { + pTColumn = &pResTSchema->columns[iCol]; + if (pTSchema->columns[iCol].colId != pTColumn->colId) { + taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); + continue; + } + + tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); + if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + for (; iCol < pResTSchema->numOfCols; ++iCol) { + pTColumn = &pResTSchema->columns[iCol]; + taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); + } + +_exit: + return code; +} + +int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + TSDBKEY key = TSDBROW_KEY(pRow); + SColVal *pColVal = &(SColVal){0}; + STColumn *pTColumn; + int32_t iCol; + + ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts); + + for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && iCol < pTSchema->numOfCols; ++iCol) { + pTColumn = &pMerger->pTSchema->columns[iCol]; + if (pTSchema->columns[iCol].colId != pTColumn->colId) { + continue; + } + + tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal); + + if (key.version > pMerger->version) { + if (!pColVal->isNone) { + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } else if (key.version < pMerger->version) { + SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); + if (tColVal->isNone && !pColVal->isNone) { + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } else { + ASSERT(0); + } + } + + pMerger->version = key.version; + +_exit: + return code; +} + int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; TSDBKEY key = TSDBROW_KEY(pRow); @@ -1401,7 +1491,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { break; case TSDB_DATA_TYPE_BOOL: break; - case TSDB_DATA_TYPE_TINYINT:{ + case TSDB_DATA_TYPE_TINYINT: { pColAgg->sum += colVal.value.i8; if (pColAgg->min > colVal.value.i8) { pColAgg->min = colVal.value.i8; @@ -1411,7 +1501,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_SMALLINT:{ + case TSDB_DATA_TYPE_SMALLINT: { pColAgg->sum += colVal.value.i16; if (pColAgg->min > colVal.value.i16) { pColAgg->min = colVal.value.i16; @@ -1441,7 +1531,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_FLOAT:{ + case TSDB_DATA_TYPE_FLOAT: { pColAgg->sum += colVal.value.f; if (pColAgg->min > colVal.value.f) { pColAgg->min = colVal.value.f; @@ -1451,7 +1541,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_DOUBLE:{ + case TSDB_DATA_TYPE_DOUBLE: { pColAgg->sum += colVal.value.d; if (pColAgg->min > colVal.value.d) { pColAgg->min = colVal.value.d; @@ -1463,7 +1553,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } case TSDB_DATA_TYPE_VARCHAR: break; - case TSDB_DATA_TYPE_TIMESTAMP:{ + case TSDB_DATA_TYPE_TIMESTAMP: { if (pColAgg->min > colVal.value.i64) { pColAgg->min = colVal.value.i64; } @@ -1474,7 +1564,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } case TSDB_DATA_TYPE_NCHAR: break; - case TSDB_DATA_TYPE_UTINYINT:{ + case TSDB_DATA_TYPE_UTINYINT: { pColAgg->sum += colVal.value.u8; if (pColAgg->min > colVal.value.u8) { pColAgg->min = colVal.value.u8; @@ -1484,7 +1574,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_USMALLINT:{ + case TSDB_DATA_TYPE_USMALLINT: { pColAgg->sum += colVal.value.u16; if (pColAgg->min > colVal.value.u16) { pColAgg->min = colVal.value.u16; @@ -1494,7 +1584,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_UINT:{ + case TSDB_DATA_TYPE_UINT: { pColAgg->sum += colVal.value.u32; if (pColAgg->min > colVal.value.u32) { pColAgg->min = colVal.value.u32; @@ -1504,7 +1594,7 @@ void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { } break; } - case TSDB_DATA_TYPE_UBIGINT:{ + case TSDB_DATA_TYPE_UBIGINT: { pColAgg->sum += colVal.value.u64; if (pColAgg->min > colVal.value.u64) { pColAgg->min = colVal.value.u64; From b08a28d559ba4b88db56c638d8697de22706a9b3 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 30 Jul 2022 13:14:44 +0800 Subject: [PATCH 09/15] fix: some problems of planner --- include/libs/nodes/querynodes.h | 2 - include/libs/scalar/filter.h | 4 + include/libs/scalar/scalar.h | 6 +- source/libs/nodes/src/nodesUtilFuncs.c | 184 ------------------ source/libs/parser/src/parAstCreater.c | 37 ++-- source/libs/parser/src/parTranslater.c | 43 ++++- source/libs/parser/test/parSelectTest.cpp | 2 + source/libs/planner/src/planOptimizer.c | 2 +- source/libs/planner/src/planSpliter.c | 2 + source/libs/planner/test/planOrderByTest.cpp | 2 + source/libs/scalar/src/filter.c | 192 +++++++++++++++++++ 11 files changed, 270 insertions(+), 206 deletions(-) diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 5dc1e7512f..cd2a7d3f9f 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -418,8 +418,6 @@ void nodesValueNodeToVariant(const SValueNode* pNode, SVariant* pVal); char* nodesGetFillModeString(EFillMode mode); int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc); -int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond, - SNode** pOtherCond); #ifdef __cplusplus } diff --git a/include/libs/scalar/filter.h b/include/libs/scalar/filter.h index f1cd99f04a..44064f0a9f 100644 --- a/include/libs/scalar/filter.h +++ b/include/libs/scalar/filter.h @@ -46,6 +46,10 @@ extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo); extern void filterFreeInfo(SFilterInfo *info); extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows); +/* condition split interface */ +int32_t filterPartitionCond(SNode **pCondition, SNode **pPrimaryKeyCond, SNode **pTagIndexCond, SNode **pTagCond, + SNode **pOtherCond); + #ifdef __cplusplus } #endif diff --git a/include/libs/scalar/scalar.h b/include/libs/scalar/scalar.h index 770b0442ea..6322c6a1e9 100644 --- a/include/libs/scalar/scalar.h +++ b/include/libs/scalar/scalar.h @@ -25,7 +25,7 @@ extern "C" { typedef struct SFilterInfo SFilterInfo; -int32_t scalarGetOperatorResultType(SOperatorNode* pOp); +int32_t scalarGetOperatorResultType(SOperatorNode *pOp); /* pNode will be freed in API; @@ -43,7 +43,7 @@ int32_t scalarGetOperatorParamNum(EOperatorType type); int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type); int32_t vectorGetConvertType(int32_t type1, int32_t type2); -int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut, int32_t* overflow); +int32_t vectorConvertImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow); /* Math functions */ int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); @@ -86,7 +86,7 @@ int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutpu int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); -bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool getTimePseudoFuncEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 3c6fbe409c..b7bcd13aa0 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1816,187 +1816,3 @@ int32_t nodesMergeConds(SNode** pDst, SNodeList** pSrc) { return TSDB_CODE_SUCCESS; } - -typedef struct SClassifyConditionCxt { - bool hasPrimaryKey; - bool hasTagIndexCol; - bool hasTagCol; - bool hasOtherCol; -} SClassifyConditionCxt; - -static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) { - SClassifyConditionCxt* pCxt = (SClassifyConditionCxt*)pContext; - if (QUERY_NODE_COLUMN == nodeType(pNode)) { - SColumnNode* pCol = (SColumnNode*)pNode; - if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && TSDB_SYSTEM_TABLE != pCol->tableType) { - pCxt->hasPrimaryKey = true; - } else if (pCol->hasIndex) { - pCxt->hasTagIndexCol = true; - pCxt->hasTagCol = true; - } else if (COLUMN_TYPE_TAG == pCol->colType || COLUMN_TYPE_TBNAME == pCol->colType) { - pCxt->hasTagCol = true; - } else { - pCxt->hasOtherCol = true; - } - } - return DEAL_RES_CONTINUE; -} - -typedef enum EConditionType { - COND_TYPE_PRIMARY_KEY = 1, - COND_TYPE_TAG_INDEX, - COND_TYPE_TAG, - COND_TYPE_NORMAL -} EConditionType; - -static EConditionType classifyCondition(SNode* pNode) { - SClassifyConditionCxt cxt = {.hasPrimaryKey = false, .hasTagIndexCol = false, .hasOtherCol = false}; - nodesWalkExpr(pNode, classifyConditionImpl, &cxt); - return cxt.hasOtherCol ? COND_TYPE_NORMAL - : (cxt.hasPrimaryKey && cxt.hasTagCol - ? COND_TYPE_NORMAL - : (cxt.hasPrimaryKey ? COND_TYPE_PRIMARY_KEY - : (cxt.hasTagIndexCol ? COND_TYPE_TAG_INDEX : COND_TYPE_TAG))); -} - -static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond, - SNode** pOtherCond) { - SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(*pCondition); - - int32_t code = TSDB_CODE_SUCCESS; - - SNodeList* pPrimaryKeyConds = NULL; - SNodeList* pTagIndexConds = NULL; - SNodeList* pTagConds = NULL; - SNodeList* pOtherConds = NULL; - SNode* pCond = NULL; - FOREACH(pCond, pLogicCond->pParameterList) { - switch (classifyCondition(pCond)) { - case COND_TYPE_PRIMARY_KEY: - if (NULL != pPrimaryKeyCond) { - code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond)); - } - break; - case COND_TYPE_TAG_INDEX: - if (NULL != pTagIndexCond) { - code = nodesListMakeAppend(&pTagIndexConds, nodesCloneNode(pCond)); - } - if (NULL != pTagCond) { - code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond)); - } - break; - case COND_TYPE_TAG: - if (NULL != pTagCond) { - code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond)); - } - break; - case COND_TYPE_NORMAL: - default: - if (NULL != pOtherCond) { - code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond)); - } - break; - } - if (TSDB_CODE_SUCCESS != code) { - break; - } - } - - SNode* pTempPrimaryKeyCond = NULL; - SNode* pTempTagIndexCond = NULL; - SNode* pTempTagCond = NULL; - SNode* pTempOtherCond = NULL; - if (TSDB_CODE_SUCCESS == code) { - code = nodesMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds); - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesMergeConds(&pTempTagIndexCond, &pTagIndexConds); - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesMergeConds(&pTempTagCond, &pTagConds); - } - if (TSDB_CODE_SUCCESS == code) { - code = nodesMergeConds(&pTempOtherCond, &pOtherConds); - } - - if (TSDB_CODE_SUCCESS == code) { - if (NULL != pPrimaryKeyCond) { - *pPrimaryKeyCond = pTempPrimaryKeyCond; - } - if (NULL != pTagIndexCond) { - *pTagIndexCond = pTempTagIndexCond; - } - if (NULL != pTagCond) { - *pTagCond = pTempTagCond; - } - if (NULL != pOtherCond) { - *pOtherCond = pTempOtherCond; - } - nodesDestroyNode(*pCondition); - *pCondition = NULL; - } else { - nodesDestroyList(pPrimaryKeyConds); - nodesDestroyList(pTagIndexConds); - nodesDestroyList(pTagConds); - nodesDestroyList(pOtherConds); - nodesDestroyNode(pTempPrimaryKeyCond); - nodesDestroyNode(pTempTagIndexCond); - nodesDestroyNode(pTempTagCond); - nodesDestroyNode(pTempOtherCond); - } - - return code; -} - -int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond, - SNode** pOtherCond) { - if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCondition) && - LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)*pCondition)->condType) { - return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond); - } - - bool needOutput = false; - switch (classifyCondition(*pCondition)) { - case COND_TYPE_PRIMARY_KEY: - if (NULL != pPrimaryKeyCond) { - *pPrimaryKeyCond = *pCondition; - needOutput = true; - } - break; - case COND_TYPE_TAG_INDEX: - if (NULL != pTagIndexCond) { - *pTagIndexCond = *pCondition; - needOutput = true; - } - if (NULL != pTagCond) { - SNode* pTempCond = *pCondition; - if (NULL != pTagIndexCond) { - pTempCond = nodesCloneNode(*pCondition); - if (NULL == pTempCond) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } - *pTagCond = pTempCond; - needOutput = true; - } - break; - case COND_TYPE_TAG: - if (NULL != pTagCond) { - *pTagCond = *pCondition; - needOutput = true; - } - break; - case COND_TYPE_NORMAL: - default: - if (NULL != pOtherCond) { - *pOtherCond = *pCondition; - needOutput = true; - } - break; - } - if (needOutput) { - *pCondition = NULL; - } - - return TSDB_CODE_SUCCESS; -} diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index ff2157fe67..d555e86306 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -233,18 +233,22 @@ SNode* createRawExprNodeExt(SAstCreateContext* pCxt, const SToken* pStart, const SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) { CHECK_PARSER_STATUS(pCxt); SRawExprNode* pRawExpr = (SRawExprNode*)pNode; - SNode* pExpr = pRawExpr->pNode; - if (nodesIsExprNode(pExpr)) { + SNode* pRealizedExpr = pRawExpr->pNode; + if (nodesIsExprNode(pRealizedExpr)) { + SExprNode* pExpr = (SExprNode*)pRealizedExpr; if (QUERY_NODE_COLUMN == nodeType(pExpr)) { - strcpy(((SExprNode*)pExpr)->aliasName, ((SColumnNode*)pExpr)->colName); + strcpy(pExpr->aliasName, ((SColumnNode*)pExpr)->colName); + strcpy(pExpr->userAlias, ((SColumnNode*)pExpr)->colName); } else { - int32_t len = TMIN(sizeof(((SExprNode*)pExpr)->aliasName) - 1, pRawExpr->n); - strncpy(((SExprNode*)pExpr)->aliasName, pRawExpr->p, len); - ((SExprNode*)pExpr)->aliasName[len] = '\0'; + int32_t len = TMIN(sizeof(pExpr->aliasName) - 1, pRawExpr->n); + strncpy(pExpr->aliasName, pRawExpr->p, len); + pExpr->aliasName[len] = '\0'; + strncpy(pExpr->userAlias, pRawExpr->p, len); + pExpr->userAlias[len] = '\0'; } } taosMemoryFreeClear(pNode); - return pExpr; + return pRealizedExpr; } SToken getTokenFromRawExprNode(SAstCreateContext* pCxt, SNode* pNode) { @@ -641,11 +645,12 @@ SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, SToken* pAlias) { CHECK_PARSER_STATUS(pCxt); trimEscape(pAlias); - int32_t len = TMIN(sizeof(((SExprNode*)pNode)->aliasName) - 1, pAlias->n); - strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, len); - ((SExprNode*)pNode)->aliasName[len] = '\0'; - strncpy(((SExprNode*)pNode)->userAlias, pAlias->z, len); - ((SExprNode*)pNode)->userAlias[len] = '\0'; + SExprNode* pExpr = (SExprNode*)pNode; + int32_t len = TMIN(sizeof(pExpr->aliasName) - 1, pAlias->n); + strncpy(pExpr->aliasName, pAlias->z, len); + pExpr->aliasName[len] = '\0'; + strncpy(pExpr->userAlias, pAlias->z, len); + pExpr->userAlias[len] = '\0'; return pNode; } @@ -766,13 +771,21 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr return (SNode*)select; } +static void setSubquery(SNode* pStmt) { + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { + ((SSelectStmt*)pStmt)->isSubquery = true; + } +} + SNode* createSetOperator(SAstCreateContext* pCxt, ESetOperatorType type, SNode* pLeft, SNode* pRight) { CHECK_PARSER_STATUS(pCxt); SSetOperator* setOp = (SSetOperator*)nodesMakeNode(QUERY_NODE_SET_OPERATOR); CHECK_OUT_OF_MEM(setOp); setOp->opType = type; setOp->pLeft = pLeft; + setSubquery(setOp->pLeft); setOp->pRight = pRight; + setSubquery(setOp->pRight); sprintf(setOp->stmtName, "%p", setOp); return (SNode*)setOp; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 9e8b28f362..8495aa15cd 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -691,7 +691,7 @@ static EDealRes translateColumnUseAlias(STranslateContext* pCxt, SColumnNode** p SNode* pNode; FOREACH(pNode, pProjectionList) { SExprNode* pExpr = (SExprNode*)pNode; - if (0 == strcmp((*pCol)->colName, pExpr->aliasName)) { + if (0 == strcmp((*pCol)->colName, pExpr->userAlias)) { SColumnRefNode* pColRef = (SColumnRefNode*)nodesMakeNode(QUERY_NODE_COLUMN_REF); if (NULL == pColRef) { pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; @@ -1535,6 +1535,7 @@ static EDealRes rewriteColToSelectValFunc(STranslateContext* pCxt, SNode** pNode } strcpy(pFunc->functionName, "_select_value"); strcpy(pFunc->node.aliasName, ((SExprNode*)*pNode)->aliasName); + strcpy(pFunc->node.userAlias, ((SExprNode*)*pNode)->userAlias); pCxt->errCode = nodesListMakeAppend(&pFunc->pParameterList, *pNode); if (TSDB_CODE_SUCCESS == pCxt->errCode) { pCxt->errCode = getFuncInfo(pCxt, pFunc); @@ -2171,12 +2172,45 @@ static int32_t translateFillValues(STranslateContext* pCxt, SSelectStmt* pSelect return TSDB_CODE_SUCCESS; } +static int32_t rewriteProjectAlias(SNodeList* pProjectionList) { + int32_t no = 1; + SNode* pProject = NULL; + FOREACH(pProject, pProjectionList) { sprintf(((SExprNode*)pProject)->aliasName, "#expr_%d", no++); } + return TSDB_CODE_SUCCESS; +} + +static int32_t checkProjectAlias(STranslateContext* pCxt, SNodeList* pProjectionList) { + SHashObj* pUserAliasSet = taosHashInit(LIST_LENGTH(pProjectionList), + taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + SNode* pProject = NULL; + FOREACH(pProject, pProjectionList) { + SExprNode* pExpr = (SExprNode*)pProject; + if (NULL != taosHashGet(pUserAliasSet, pExpr->userAlias, strlen(pExpr->userAlias))) { + taosHashCleanup(pUserAliasSet); + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_AMBIGUOUS_COLUMN, pExpr->userAlias); + } + taosHashPut(pUserAliasSet, pExpr->userAlias, strlen(pExpr->userAlias), &pExpr, POINTER_BYTES); + } + taosHashCleanup(pUserAliasSet); + return TSDB_CODE_SUCCESS; +} + +static int32_t translateProjectionList(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (pSelect->isSubquery) { + return checkProjectAlias(pCxt, pSelect->pProjectionList); + } + return rewriteProjectAlias(pSelect->pProjectionList); +} + static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect) { pCxt->currClause = SQL_CLAUSE_SELECT; int32_t code = translateExprList(pCxt, pSelect->pProjectionList); if (TSDB_CODE_SUCCESS == code) { code = translateStar(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = translateProjectionList(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList); } @@ -2232,7 +2266,7 @@ static int32_t getQueryTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWi } SNode* pPrimaryKeyCond = NULL; - nodesPartitionCond(&pCond, &pPrimaryKeyCond, NULL, NULL, NULL); + filterPartitionCond(&pCond, &pPrimaryKeyCond, NULL, NULL, NULL); int32_t code = TSDB_CODE_SUCCESS; if (NULL != pPrimaryKeyCond) { @@ -2699,6 +2733,7 @@ static SNode* createSetOperProject(const char* pTableAlias, SNode* pNode) { strcpy(pCol->tableAlias, pTableAlias); strcpy(pCol->colName, ((SExprNode*)pNode)->aliasName); strcpy(pCol->node.aliasName, pCol->colName); + strcpy(pCol->node.userAlias, ((SExprNode*)pNode)->userAlias); return (SNode*)pCol; } @@ -2810,7 +2845,7 @@ static int32_t partitionDeleteWhere(STranslateContext* pCxt, SDeleteStmt* pDelet SNode* pPrimaryKeyCond = NULL; SNode* pOtherCond = NULL; - int32_t code = nodesPartitionCond(&pDelete->pWhere, &pPrimaryKeyCond, NULL, &pDelete->pTagCond, &pOtherCond); + int32_t code = filterPartitionCond(&pDelete->pWhere, &pPrimaryKeyCond, NULL, &pDelete->pTagCond, &pOtherCond); if (TSDB_CODE_SUCCESS == code && NULL != pOtherCond) { code = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_DELETE_WHERE); } @@ -4983,7 +5018,7 @@ static int32_t translateSubquery(STranslateContext* pCxt, SNode* pNode) { SNode* pCurrStmt = pCxt->pCurrStmt; int32_t currLevel = pCxt->currLevel; pCxt->currLevel = ++(pCxt->levelNo); - int32_t code = translateQuery(pCxt, pNode); + int32_t code = translateQuery(pCxt, pNode); pCxt->currClause = currClause; pCxt->pCurrStmt = pCurrStmt; pCxt->currLevel = currLevel; diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index c91e09c825..ebf8012cf7 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -70,6 +70,8 @@ TEST_F(ParserSelectTest, condition) { run("SELECT c1 FROM t1 WHERE NOT ts in (true, false)"); run("SELECT * FROM t1 WHERE c1 > 10 and c1 is not null"); + + run("SELECT * FROM t1 WHERE TBNAME like 'fda%' or TS > '2021-05-05 18:19:01.000'"); } TEST_F(ParserSelectTest, pseudoColumn) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 577266a697..cb62c0390b 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -436,7 +436,7 @@ static int32_t pushDownCondOptDealScan(SOptimizeContext* pCxt, SScanLogicNode* p SNode* pPrimaryKeyCond = NULL; SNode* pOtherCond = NULL; - int32_t code = nodesPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond, &pScan->pTagCond, + int32_t code = filterPartitionCond(&pScan->node.pConditions, &pPrimaryKeyCond, &pScan->pTagIndexCond, &pScan->pTagCond, &pOtherCond); if (TSDB_CODE_SUCCESS == code && NULL != pScan->pTagCond) { code = pushDownCondOptRebuildTbanme(&pScan->pTagCond); diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index bc5e50218b..b077e6ef55 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -763,6 +763,8 @@ static SNode* stbSplCreateColumnNode(SExprNode* pExpr) { return NULL; } if (QUERY_NODE_COLUMN == nodeType(pExpr)) { + strcpy(pCol->dbName, ((SColumnNode*)pExpr)->dbName); + strcpy(pCol->tableName, ((SColumnNode*)pExpr)->tableName); strcpy(pCol->tableAlias, ((SColumnNode*)pExpr)->tableAlias); } strcpy(pCol->colName, pExpr->aliasName); diff --git a/source/libs/planner/test/planOrderByTest.cpp b/source/libs/planner/test/planOrderByTest.cpp index 13dfbad78c..6fef080bc1 100644 --- a/source/libs/planner/test/planOrderByTest.cpp +++ b/source/libs/planner/test/planOrderByTest.cpp @@ -39,6 +39,8 @@ TEST_F(PlanOrderByTest, expr) { useDb("root", "test"); run("SELECT * FROM t1 ORDER BY c1 + 10, c2"); + + run("SELECT c1 FROM st1 ORDER BY ts, _C0"); } TEST_F(PlanOrderByTest, nullsOrder) { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 29773b95c3..8e4e31d6cc 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -22,6 +22,7 @@ #include "tcompare.h" #include "tdatablock.h" #include "ttime.h" +#include "functionMgt.h" OptrStr gOptrStr[] = { {0, "invalid"}, @@ -3877,4 +3878,195 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData } +typedef struct SClassifyConditionCxt { + bool hasPrimaryKey; + bool hasTagIndexCol; + bool hasTagCol; + bool hasOtherCol; +} SClassifyConditionCxt; +static EDealRes classifyConditionImpl(SNode* pNode, void* pContext) { + SClassifyConditionCxt* pCxt = (SClassifyConditionCxt*)pContext; + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + SColumnNode* pCol = (SColumnNode*)pNode; + if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId && TSDB_SYSTEM_TABLE != pCol->tableType) { + pCxt->hasPrimaryKey = true; + } else if (pCol->hasIndex) { + pCxt->hasTagIndexCol = true; + pCxt->hasTagCol = true; + } else if (COLUMN_TYPE_TAG == pCol->colType || COLUMN_TYPE_TBNAME == pCol->colType) { + pCxt->hasTagCol = true; + } else { + pCxt->hasOtherCol = true; + } + } else if (QUERY_NODE_FUNCTION == nodeType(pNode)) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + if (fmIsPseudoColumnFunc(pFunc->funcId)) { + if (FUNCTION_TYPE_TBNAME==pFunc->funcType) { + pCxt->hasTagCol = true; + } else { + pCxt->hasOtherCol = true; + } + } + } + return DEAL_RES_CONTINUE; +} + +typedef enum EConditionType { + COND_TYPE_PRIMARY_KEY = 1, + COND_TYPE_TAG_INDEX, + COND_TYPE_TAG, + COND_TYPE_NORMAL +} EConditionType; + +static EConditionType classifyCondition(SNode* pNode) { + SClassifyConditionCxt cxt = {.hasPrimaryKey = false, .hasTagIndexCol = false, .hasOtherCol = false}; + nodesWalkExpr(pNode, classifyConditionImpl, &cxt); + return cxt.hasOtherCol ? COND_TYPE_NORMAL + : (cxt.hasPrimaryKey && cxt.hasTagCol + ? COND_TYPE_NORMAL + : (cxt.hasPrimaryKey ? COND_TYPE_PRIMARY_KEY + : (cxt.hasTagIndexCol ? COND_TYPE_TAG_INDEX : COND_TYPE_TAG))); +} + +static int32_t partitionLogicCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond, + SNode** pOtherCond) { + SLogicConditionNode* pLogicCond = (SLogicConditionNode*)(*pCondition); + + int32_t code = TSDB_CODE_SUCCESS; + + SNodeList* pPrimaryKeyConds = NULL; + SNodeList* pTagIndexConds = NULL; + SNodeList* pTagConds = NULL; + SNodeList* pOtherConds = NULL; + SNode* pCond = NULL; + FOREACH(pCond, pLogicCond->pParameterList) { + switch (classifyCondition(pCond)) { + case COND_TYPE_PRIMARY_KEY: + if (NULL != pPrimaryKeyCond) { + code = nodesListMakeAppend(&pPrimaryKeyConds, nodesCloneNode(pCond)); + } + break; + case COND_TYPE_TAG_INDEX: + if (NULL != pTagIndexCond) { + code = nodesListMakeAppend(&pTagIndexConds, nodesCloneNode(pCond)); + } + if (NULL != pTagCond) { + code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond)); + } + break; + case COND_TYPE_TAG: + if (NULL != pTagCond) { + code = nodesListMakeAppend(&pTagConds, nodesCloneNode(pCond)); + } + break; + case COND_TYPE_NORMAL: + default: + if (NULL != pOtherCond) { + code = nodesListMakeAppend(&pOtherConds, nodesCloneNode(pCond)); + } + break; + } + if (TSDB_CODE_SUCCESS != code) { + break; + } + } + + SNode* pTempPrimaryKeyCond = NULL; + SNode* pTempTagIndexCond = NULL; + SNode* pTempTagCond = NULL; + SNode* pTempOtherCond = NULL; + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempPrimaryKeyCond, &pPrimaryKeyConds); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempTagIndexCond, &pTagIndexConds); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempTagCond, &pTagConds); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesMergeConds(&pTempOtherCond, &pOtherConds); + } + + if (TSDB_CODE_SUCCESS == code) { + if (NULL != pPrimaryKeyCond) { + *pPrimaryKeyCond = pTempPrimaryKeyCond; + } + if (NULL != pTagIndexCond) { + *pTagIndexCond = pTempTagIndexCond; + } + if (NULL != pTagCond) { + *pTagCond = pTempTagCond; + } + if (NULL != pOtherCond) { + *pOtherCond = pTempOtherCond; + } + nodesDestroyNode(*pCondition); + *pCondition = NULL; + } else { + nodesDestroyList(pPrimaryKeyConds); + nodesDestroyList(pTagIndexConds); + nodesDestroyList(pTagConds); + nodesDestroyList(pOtherConds); + nodesDestroyNode(pTempPrimaryKeyCond); + nodesDestroyNode(pTempTagIndexCond); + nodesDestroyNode(pTempTagCond); + nodesDestroyNode(pTempOtherCond); + } + + return code; +} + +int32_t filterPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode** pTagIndexCond, SNode** pTagCond, + SNode** pOtherCond) { + if (QUERY_NODE_LOGIC_CONDITION == nodeType(*pCondition) && + LOGIC_COND_TYPE_AND == ((SLogicConditionNode*)*pCondition)->condType) { + return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond); + } + + bool needOutput = false; + switch (classifyCondition(*pCondition)) { + case COND_TYPE_PRIMARY_KEY: + if (NULL != pPrimaryKeyCond) { + *pPrimaryKeyCond = *pCondition; + needOutput = true; + } + break; + case COND_TYPE_TAG_INDEX: + if (NULL != pTagIndexCond) { + *pTagIndexCond = *pCondition; + needOutput = true; + } + if (NULL != pTagCond) { + SNode* pTempCond = *pCondition; + if (NULL != pTagIndexCond) { + pTempCond = nodesCloneNode(*pCondition); + if (NULL == pTempCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + *pTagCond = pTempCond; + needOutput = true; + } + break; + case COND_TYPE_TAG: + if (NULL != pTagCond) { + *pTagCond = *pCondition; + needOutput = true; + } + break; + case COND_TYPE_NORMAL: + default: + if (NULL != pOtherCond) { + *pOtherCond = *pCondition; + needOutput = true; + } + break; + } + if (needOutput) { + *pCondition = NULL; + } + + return TSDB_CODE_SUCCESS; +} From 77837f6e6a9be731a3efd4b16c9131aaf8f77735 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 30 Jul 2022 15:10:24 +0800 Subject: [PATCH 10/15] fix: fix crash issue --- include/os/osString.h | 1 + source/client/src/clientEnv.c | 2 ++ source/dnode/mgmt/exe/dmMain.c | 2 ++ source/os/src/osString.c | 36 +++++++++++++++++----------------- 4 files changed, 23 insertions(+), 18 deletions(-) diff --git a/include/os/osString.h b/include/os/osString.h index 2a22c6d8ff..8eb341faa7 100644 --- a/include/os/osString.h +++ b/include/os/osString.h @@ -62,6 +62,7 @@ typedef int32_t TdUcs4; int32_t taosUcs4len(TdUcs4 *ucs4); int64_t taosStr2int64(const char *str); +void taosConvInit(void); void taosConvDestroy(); int32_t taosUcs4ToMbs(TdUcs4 *ucs4, int32_t ucs4_max_len, char *mbs); bool taosMbsToUcs4(const char *mbs, size_t mbs_len, TdUcs4 *ucs4, int32_t ucs4_max_len, int32_t *len); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 893aa39ce3..3d539ea251 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -361,6 +361,8 @@ void taos_init_imp(void) { initQueryModuleMsgHandle(); + taosConvInit(); + rpcInit(); SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100}; diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index 34c3b40556..4030eaa6fe 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -218,6 +218,8 @@ int mainWindows(int argc,char** argv) { taosCleanupArgs(); return -1; } + + taosConvInit(); if (global.dumpConfig) { dmDumpCfg(); diff --git a/source/os/src/osString.c b/source/os/src/osString.c index 503b8e0c6b..26aafd743f 100644 --- a/source/os/src/osString.c +++ b/source/os/src/osString.c @@ -143,29 +143,29 @@ SConv *gConv = NULL; int32_t convUsed = 0; int32_t gConvMaxNum = 0; -void taosConvInitImpl(void) { +void taosConvInit(void) { gConvMaxNum = 512; gConv = taosMemoryCalloc(gConvMaxNum, sizeof(SConv)); for (int32_t i = 0; i < gConvMaxNum; ++i) { gConv[i].conv = iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); + if ((iconv_t)-1 == gConv[i].conv || (iconv_t)0 == gConv[i].conv) { + ASSERT(0); + } } } -static TdThreadOnce convInit = PTHREAD_ONCE_INIT; -void taosConvInit() { - taosThreadOnce(&convInit, taosConvInitImpl); -} - void taosConvDestroy() { for (int32_t i = 0; i < gConvMaxNum; ++i) { iconv_close(gConv[i].conv); } taosMemoryFreeClear(gConv); + gConvMaxNum = -1; } -void taosAcquireConv(int32_t *idx) { - if (0 == gConvMaxNum) { - taosConvInit(); +iconv_t taosAcquireConv(int32_t *idx) { + if (gConvMaxNum <= 0) { + *idx = -1; + return iconv_open(DEFAULT_UNICODE_ENCODEC, tsCharset); } while (true) { @@ -193,12 +193,12 @@ void taosAcquireConv(int32_t *idx) { } *idx = startId; + return gConv[startId].conv; } -void taosReleaseConv(int32_t idx) { - if (0 == gConvMaxNum) { - iconv_close(gConv[0].conv); - taosMemoryFreeClear(gConv); +void taosReleaseConv(int32_t idx, iconv_t conv) { + if (idx < 0) { + iconv_close(conv); return; } @@ -213,16 +213,16 @@ bool taosMbsToUcs4(const char *mbs, size_t mbsLength, TdUcs4 *ucs4, int32_t ucs4 #else memset(ucs4, 0, ucs4_max_len); - int32_t idx = 0; - taosAcquireConv(&idx); + int32_t idx = -1; + iconv_t conv = taosAcquireConv(&idx); size_t ucs4_input_len = mbsLength; size_t outLeft = ucs4_max_len; - if (iconv(gConv[idx].conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) { - taosReleaseConv(idx); + if (iconv(conv, (char **)&mbs, &ucs4_input_len, (char **)&ucs4, &outLeft) == -1) { + taosReleaseConv(idx, conv); return false; } - taosReleaseConv(idx); + taosReleaseConv(idx, conv); if (len != NULL) { *len = (int32_t)(ucs4_max_len - outLeft); if (*len < 0) { From ac0a32d9e4217ba8312710cc290ef08a736be2d5 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 30 Jul 2022 07:20:38 +0000 Subject: [PATCH 11/15] fix: multi-level storage coredump --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 24f066f703..24b300cb1e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -311,6 +311,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); + tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); + wSet.diskId = did; wSet.fid = pCommitter->commitFid; fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0}; From e57f12d5817e5fc8b598d171f246b95d448028fa Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Sat, 30 Jul 2022 15:23:28 +0800 Subject: [PATCH 12/15] fix: use colId to merge rows --- source/dnode/vnode/src/tsdb/tsdbRead.c | 8 ++++++++ source/dnode/vnode/src/tsdb/tsdbUtil.c | 26 +++++++++++++++++--------- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 9545ded927..47511cce98 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2126,6 +2126,10 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe } tRowMergerAdd(pMerger, pRow, pTSchema); + + if (sversion != pReader->pSchema->version) { + taosMemoryFree(pTSchema); + } } return TSDB_CODE_SUCCESS; @@ -2262,6 +2266,10 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe doMergeRowsInBuf(pIter, uid, k.ts, pDelList, &merge, pReader); tRowMergerGetRow(&merge, pTSRow); tRowMergerClear(&merge); + + if (sversion != pReader->pSchema->version) { + taosMemoryFree(pTSchema); + } } void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 211a330599..ae9f3ae7c3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -575,7 +575,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo TSDBKEY key = TSDBROW_KEY(pRow); SColVal *pColVal = &(SColVal){0}; STColumn *pTColumn; - int32_t iCol = 0; + int32_t iCol, jCol = 0; pMerger->pTSchema = pResTSchema; pMerger->version = key.version; @@ -587,7 +587,7 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo } // ts - pTColumn = &pTSchema->columns[iCol++]; + pTColumn = &pTSchema->columns[jCol++]; ASSERT(pTColumn->type == TSDB_DATA_TYPE_TIMESTAMP); @@ -598,14 +598,18 @@ int32_t tRowMergerInit2(SRowMerger *pMerger, STSchema *pResTSchema, TSDBROW *pRo } // other - for (; iCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) { + for (iCol = 1; jCol < pTSchema->numOfCols && iCol < pResTSchema->numOfCols; ++iCol) { pTColumn = &pResTSchema->columns[iCol]; - if (pTSchema->columns[iCol].colId != pTColumn->colId) { + if (pTSchema->columns[jCol].colId < pTColumn->colId) { + ++jCol; + --iCol; + continue; + } else if (pTSchema->columns[jCol].colId > pTColumn->colId) { taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); continue; } - tsdbRowGetColVal(pRow, pTSchema, iCol, pColVal); + tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -626,17 +630,21 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { TSDBKEY key = TSDBROW_KEY(pRow); SColVal *pColVal = &(SColVal){0}; STColumn *pTColumn; - int32_t iCol; + int32_t iCol, jCol = 1; ASSERT(((SColVal *)pMerger->pArray->pData)->value.ts == key.ts); - for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && iCol < pTSchema->numOfCols; ++iCol) { + for (iCol = 1; iCol < pMerger->pTSchema->numOfCols && jCol < pTSchema->numOfCols; ++iCol) { pTColumn = &pMerger->pTSchema->columns[iCol]; - if (pTSchema->columns[iCol].colId != pTColumn->colId) { + if (pTSchema->columns[jCol].colId < pTColumn->colId) { + ++jCol; + --iCol; + continue; + } else if (pTSchema->columns[jCol].colId > pTColumn->colId) { continue; } - tsdbRowGetColVal(pRow, pMerger->pTSchema, iCol, pColVal); + tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); if (key.version > pMerger->version) { if (!pColVal->isNone) { From 15a08b9999f3a20ee7162f125ad7927a224b7f9f Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 30 Jul 2022 15:29:57 +0800 Subject: [PATCH 13/15] fix: some problems of planner --- source/libs/nodes/src/nodesCodeFuncs.c | 7 +++++++ source/libs/nodes/src/nodesUtilFuncs.c | 2 +- source/libs/planner/test/planSubqueryTest.cpp | 6 ++++++ tests/script/tsim/parser/condition_query.sim | 6 +++--- 4 files changed, 17 insertions(+), 4 deletions(-) diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index b0c16f26ed..9d15b01acf 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2684,6 +2684,7 @@ static int32_t jsonToDataType(const SJson* pJson, void* pObj) { static const char* jkExprDataType = "DataType"; static const char* jkExprAliasName = "AliasName"; +static const char* jkExprUserAlias = "UserAlias"; static int32_t exprNodeToJson(const void* pObj, SJson* pJson) { const SExprNode* pNode = (const SExprNode*)pObj; @@ -2692,6 +2693,9 @@ static int32_t exprNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddStringToObject(pJson, jkExprAliasName, pNode->aliasName); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddStringToObject(pJson, jkExprUserAlias, pNode->userAlias); + } return code; } @@ -2703,6 +2707,9 @@ static int32_t jsonToExprNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetStringValue(pJson, jkExprAliasName, pNode->aliasName); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetStringValue(pJson, jkExprUserAlias, pNode->userAlias); + } return code; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index b7bcd13aa0..37d80917b3 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1502,7 +1502,7 @@ static EDealRes collectColumns(SNode* pNode, void* pContext) { SCollectColumnsCxt* pCxt = (SCollectColumnsCxt*)pContext; if (QUERY_NODE_COLUMN == nodeType(pNode)) { SColumnNode* pCol = (SColumnNode*)pNode; - if (isCollectType(pCxt->collectType, pCol->colType) && + if (isCollectType(pCxt->collectType, pCol->colType) && 0 != strcmp(pCol->colName, "*") && (NULL == pCxt->pTableAlias || 0 == strcmp(pCxt->pTableAlias, pCol->tableAlias))) { return doCollect(pCxt, pCol, pNode); } diff --git a/source/libs/planner/test/planSubqueryTest.cpp b/source/libs/planner/test/planSubqueryTest.cpp index 16dd91846c..aed77c0811 100644 --- a/source/libs/planner/test/planSubqueryTest.cpp +++ b/source/libs/planner/test/planSubqueryTest.cpp @@ -73,3 +73,9 @@ TEST_F(PlanSubqeuryTest, outerInterval) { run("SELECT COUNT(*) FROM (SELECT ts, TOP(c1, 10) FROM st1s1) INTERVAL(5s)"); } + +TEST_F(PlanSubqeuryTest, outerPartition) { + useDb("root", "test"); + + run("SELECT c1, COUNT(*) FROM (SELECT ts, c1 FROM st1) PARTITION BY c1"); +} diff --git a/tests/script/tsim/parser/condition_query.sim b/tests/script/tsim/parser/condition_query.sim index 87ea9f3cbe..a8a9be5cfd 100644 --- a/tests/script/tsim/parser/condition_query.sim +++ b/tests/script/tsim/parser/condition_query.sim @@ -2631,7 +2631,7 @@ sql_error select tb1.ts,tb1.c1,tb2_1.u1 from tb1, tb2_1 where tb1.ts=tb2_1.ts or print "ts&tbname test" -sql_error select count(*) from stb1 where ts > 0 or tbname like 'tb%'; +sql select count(*) from stb1 where ts > 0 or tbname like 'tb%'; print "ts&tag test" sql select count(*) from stb1 where ts > 0 or t1 > 0; @@ -2717,9 +2717,9 @@ print "tbname&tag&join test" print "column&ts&tbname&tag test" -sql_error select * from stb1 where (tbname like 'tb%' or ts > '2021-05-05 18:19:01.000') and (t1 > 5 or t1 < 4) and c1 > 0; +sql select * from stb1 where (tbname like 'tb%' or ts > '2021-05-05 18:19:01.000') and (t1 > 5 or t1 < 4) and c1 > 0; sql select * from stb1 where (ts > '2021-05-05 18:19:01.000') and (ts > '2021-05-05 18:19:02.000' or t1 > 3) and (t1 > 5 or t1 < 4) and c1 > 0; -sql_error select ts,c1,c7 from stb1 where ts > '2021-05-05 18:19:03.000' or ts > '2021-05-05 18:19:20.000' and col > 0 and t1 > 0; +sql select ts,c1,c7 from stb1 where ts > '2021-05-05 18:19:03.000' or ts > '2021-05-05 18:19:20.000' and c1 > 0 and t1 > 0; print "column&ts&tbname&join test" From dc6c0c1d1ad79dd386dd12db81c876062c4b450c Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Sat, 30 Jul 2022 15:46:43 +0800 Subject: [PATCH 14/15] fix: some problems of planner --- source/libs/parser/src/parTranslater.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8495aa15cd..02a7ca0fbb 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -523,6 +523,9 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p if ('\0' == pCol->node.aliasName[0]) { strcpy(pCol->node.aliasName, pColSchema->name); } + if ('\0' == pCol->node.userAlias[0]) { + strcpy(pCol->node.userAlias, pColSchema->name); + } pCol->tableId = pTable->pMeta->uid; pCol->tableType = pTable->pMeta->tableType; pCol->colId = pColSchema->colId; @@ -549,6 +552,9 @@ static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColum if ('\0' == pCol->node.aliasName[0]) { strcpy(pCol->node.aliasName, pCol->colName); } + if ('\0' == pCol->node.userAlias[0]) { + strcpy(pCol->node.userAlias, pCol->colName); + } pCol->node.resType = pExpr->resType; } @@ -2175,7 +2181,13 @@ static int32_t translateFillValues(STranslateContext* pCxt, SSelectStmt* pSelect static int32_t rewriteProjectAlias(SNodeList* pProjectionList) { int32_t no = 1; SNode* pProject = NULL; - FOREACH(pProject, pProjectionList) { sprintf(((SExprNode*)pProject)->aliasName, "#expr_%d", no++); } + FOREACH(pProject, pProjectionList) { + SExprNode* pExpr = (SExprNode*)pProject; + if ('\0' == pExpr->userAlias[0]) { + strcpy(pExpr->userAlias, pExpr->aliasName); + } + sprintf(pExpr->aliasName, "#expr_%d", no++); + } return TSDB_CODE_SUCCESS; } From 9d62c839d9182c3995bc8daaf6ef7e8098c92a05 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 30 Jul 2022 15:52:43 +0800 Subject: [PATCH 15/15] feat: update taostools for3.0 (#15587) * feat: update taos-tools for 3.0 [TD-14141] * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 * feat: update taos-tools for 3.0 * feat: update taos-tools 8e3b3ee * fix: remove submodules * feat: update taos-tools c529299 * feat: update taos-tools 9dc2fec for 3.0 * fix: optim upx --- .gitignore | 4 ++++ tools/CMakeLists.txt | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 80fd850cd4..c25f60075c 100644 --- a/.gitignore +++ b/.gitignore @@ -50,6 +50,10 @@ pysim/ tests/script/api/batchprepare taosadapter taosadapter-debug +tools/taos-tools/* +tools/taosws-rs/* +tools/taosadapter/* +tools/upx* # Doxygen Generated files html/ diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 58095940f3..1f066daabe 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -112,7 +112,7 @@ ELSE () COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}" COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/version.Version=${taos_version} -X github.com/taosdata/taosadapter/version.CommitID=${taosadapter_commit_sha1}" INSTALL_COMMAND - COMMAND wget -c https://github.com/upx/upx/releases/download/v3.96/upx-3.96-${PLATFORM_ARCH_STR}_linux.tar.xz -O ${CMAKE_CURRENT_SOURCE_DIR}/upx.tar.xz && tar -xvJf ${CMAKE_CURRENT_SOURCE_DIR}/upx.tar.xz -C ${CMAKE_CURRENT_SOURCE_DIR} --strip-components 1 > /dev/null && ${CMAKE_CURRENT_SOURCE_DIR}/upx taosadapter || : + COMMAND wget -c https://github.com/upx/upx/releases/download/v3.96/upx-3.96-${PLATFORM_ARCH_STR}_linux.tar.xz -O $ENV{HOME}/upx.tar.xz && tar -xvJf $ENV{HOME}/upx.tar.xz -C $ENV{HOME}/ --strip-components 1 > /dev/null && $ENV{HOME}/upx taosadapter || : COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/ COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/