diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index a1a6cd4519..8f94313382 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -270,7 +270,7 @@ if(${JEMALLOC_ENABLED}) PREFIX "jemalloc" SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc BUILD_IN_SOURCE 1 - CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/ --disable-initial-exec-tls + CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/ --disable-initial-exec-tls BUILD_COMMAND ${MAKE} ) INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include) diff --git a/docs/examples/c/insert_example.c b/docs/examples/c/insert_example.c index ce8fdc5b93..a921f794fd 100644 --- a/docs/examples/c/insert_example.c +++ b/docs/examples/c/insert_example.c @@ -36,10 +36,10 @@ int main() { executeSQL(taos, "CREATE DATABASE power"); executeSQL(taos, "USE power"); executeSQL(taos, "CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"); - executeSQL(taos, "INSERT INTO d1001 USING meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)" - "d1002 USING meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)" - "d1003 USING meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)" - "d1004 USING meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)"); + executeSQL(taos, "INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)" + "d1002 USING meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)" + "d1003 USING meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)" + "d1004 USING meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)"); taos_close(taos); taos_cleanup(); } @@ -48,4 +48,4 @@ int main() { // affected rows 0 // affected rows 0 // affected rows 0 -// affected rows 8 \ No newline at end of file +// affected rows 8 diff --git a/include/util/tdef.h b/include/util/tdef.h index e298509ad6..6e1fa87854 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -360,7 +360,7 @@ typedef enum ELogicConditionType { #define TSDB_DEFAULT_DB_SCHEMALESS TSDB_DB_SCHEMALESS_OFF #define TSDB_MIN_STT_TRIGGER 1 #define TSDB_MAX_STT_TRIGGER 16 -#define TSDB_DEFAULT_SST_TRIGGER 8 +#define TSDB_DEFAULT_SST_TRIGGER 1 #define TSDB_MIN_HASH_PREFIX 0 #define TSDB_MAX_HASH_PREFIX 128 #define TSDB_DEFAULT_HASH_PREFIX 0 diff --git a/source/common/src/trow.c b/source/common/src/trow.c index b007075efe..e4818aaa87 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -73,7 +73,13 @@ void tdSCellValPrint(SCellVal *pVal, int8_t colType) { } else if (tdValTypeIsNone(pVal->valType)) { printf("NONE "); return; + } + if(!pVal->val) { + ASSERT(0); + printf("BadVal "); + return; } + switch (colType) { case TSDB_DATA_TYPE_BOOL: printf("%s ", (*(int8_t *)pVal->val) == 0 ? "false" : "true"); @@ -678,6 +684,10 @@ int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colTyp } // TS KEY is stored in STSRow.ts and not included in STSRow.data field. if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + if (!val) { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } TD_ROW_KEY(pRow) = *(TSKEY *)val; // The primary TS key is Norm all the time, thus its valType is not stored in bitmap. return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3d4a3e1969..ba3b7284ce 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -152,26 +152,25 @@ typedef struct STsdbReader STsdbReader; #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 -int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); -int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, - const char *idstr); +int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num); +int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, + STsdbReader **ppReader, const char *idstr); void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); -bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid); +bool tsdbTableNextDataBlock(STsdbReader *pReader, uint64_t uid); void tsdbRetrieveDataBlockInfo(const STsdbReader* pReader, int32_t* rows, uint64_t* uid, STimeWindow* pWindow); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); -bool tsdbIsAscendingOrder(STsdbReader *pReader); void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); -int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, uint64_t suid, - void **pReader); +int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, + uint64_t suid, void **pReader); int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); void *tsdbCacherowsReaderClose(void *pReader); int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index e5b8a1f327..c4e7ff92b7 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -716,7 +716,9 @@ typedef struct SCacheRowsReader { int32_t numOfCols; int32_t type; int32_t tableIndex; // currently returned result tables - SArray *pTableList; // table id list + + STableKeyInfo *pTableList; // table id list + int32_t numOfTables; SSttBlockLoadInfo *pLoadInfo; STsdbReadSnap *pReadSnap; SDataFReader *pDataFReader; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 19a0fbd629..03532eb6d4 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -701,25 +701,28 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { SSDataBlock *output = taosArrayGetP(pResList, i); + smaDebug("result block, uid:%"PRIu64", groupid:%"PRIu64", rows:%d", output->info.uid, output->info.groupId, + output->info.rows); + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq *pReq = NULL; // TODO: the schema update should be handled later(TD-17965) if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), - suid, pItem->level, terrstr()); + smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%"PRIu64", level %" PRIi8 " failed since %s", SMA_VID(pSma), + suid, output->info.groupId, pItem->level, terrstr()); goto _err; } if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, pItem->level, terrstr()); + smaError("vgId:%d, process submit req for rsma suid:%"PRIu64", uid:%" PRIu64 " level %" PRIi8 " failed since %s", + SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr()); goto _err; } - smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, - SMA_VID(pSma), suid, pItem->level, output->info.version, htonl(pReq->header.contLen)); + smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%"PRIu64", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, + SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, htonl(pReq->header.contLen)); taosMemoryFreeClear(pReq); } diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index b8f49f38e4..dbc02363ea 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -97,10 +97,9 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea } } -int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, uint64_t suid, - void** pReader) { +int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols, + uint64_t suid, void** pReader) { *pReader = NULL; - SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader)); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -111,14 +110,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList p->numOfCols = numOfCols; p->suid = suid; - if (taosArrayGetSize(pTableIdList) == 0) { + if (numOfTables == 0) { *pReader = p; return TSDB_CODE_SUCCESS; } - STableKeyInfo* pKeyInfo = taosArrayGet(pTableIdList, 0); + STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableIdList)[0]; p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1, 1); p->pTableList = pTableIdList; + p->numOfTables = numOfTables; p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); if (p->transferBuf == NULL) { @@ -205,7 +205,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 SLRUCache* lruCache = pr->pVnode->pTsdb->lruCache; LRUHandle* h = NULL; SArray* pRow = NULL; - size_t numOfTables = taosArrayGetSize(pr->pTableList); bool hasRes = false; SArray* pLastCols = NULL; @@ -243,8 +242,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 // retrieve the only one last row of all tables in the uid list. if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) { - for (int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* pKeyInfo = taosArrayGet(pr->pTableList, i); + for (int32_t i = 0; i < pr->numOfTables; ++i) { + STableKeyInfo* pKeyInfo = &pr->pTableList[i]; code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); if (code != TSDB_CODE_SUCCESS) { @@ -308,8 +307,8 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 } } else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) { - for (int32_t i = pr->tableIndex; i < numOfTables; ++i) { - STableKeyInfo* pKeyInfo = (STableKeyInfo*)taosArrayGet(pr->pTableList, i); + for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) { + STableKeyInfo* pKeyInfo = &pr->pTableList[i]; code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h); if (code != TSDB_CODE_SUCCESS) { return code; @@ -334,7 +333,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 code = TSDB_CODE_INVALID_PARA; } -_end: + _end: tsdbDataFReaderClose(&pr->pDataFReaderLast); tsdbDataFReaderClose(&pr->pDataFReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 895b7adc5b..23615fb8af 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -270,24 +270,27 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) { } } +static void clearBlockScanInfo(STableBlockScanInfo* p) { + p->iterInit = false; + p->iiter.hasVal = false; + + if (p->iter.iter != NULL) { + p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); + } + + if (p->iiter.iter != NULL) { + p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter); + } + + p->delSkyline = taosArrayDestroy(p->delSkyline); + p->pBlockList = taosArrayDestroy(p->pBlockList); + tMapDataClear(&p->mapData); +} + static void destroyBlockScanInfo(SHashObj* pTableMap) { STableBlockScanInfo* p = NULL; - while ((p = taosHashIterate(pTableMap, p)) != NULL) { - p->iterInit = false; - p->iiter.hasVal = false; - - if (p->iter.iter != NULL) { - p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); - } - - if (p->iiter.iter != NULL) { - p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter); - } - - p->delSkyline = taosArrayDestroy(p->delSkyline); - p->pBlockList = taosArrayDestroy(p->pBlockList); - tMapDataClear(&p->mapData); + clearBlockScanInfo(p); } taosHashCleanup(pTableMap); @@ -951,7 +954,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn int32_t unDumpedRows = asc ? pBlock->nRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", + ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, remain, unDumpedRows, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); @@ -978,7 +981,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, code:%s %s", + ", rows:%d, code:%s %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, tstrerror(code), pReader->idStr); return code; @@ -987,7 +990,7 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", + ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); @@ -1438,7 +1441,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64 - " - %" PRId64 " %s", + " - %" PRId64 " %s", pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader->idStr); @@ -1864,7 +1867,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (minKey == key) { init = true; TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema); + code = tRowMergerInit(&merge, &fRow, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1878,7 +1881,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* tRowMerge(&merge, &fRow1); } else { init = true; - int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); + code = tRowMergerInit(&merge, &fRow1, pReader->pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2304,7 +2307,7 @@ _end: if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%d, elapsed time:%.2f ms %s", + " rows:%d, elapsed time:%.2f ms %s", pReader, pResBlock->info.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -2811,8 +2814,8 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret int8_t precision = pVnode->config.tsdbCfg.precision; int64_t now = taosGetTimestamp(precision); int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI) ? 1L - : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L - : 1000000L); + : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L + : 1000000L); for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) { SRetention* pRetention = retentions + level; @@ -3452,13 +3455,23 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e return TSDB_CODE_SUCCESS; } -// todo refactor, use arraylist instead -int32_t tsdbSetTableId(STsdbReader* pReader, int64_t uid) { +// TODO refactor: with createDataBlockScanInfo +int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) { ASSERT(pReader != NULL); + + STableBlockScanInfo* p = NULL; + while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) { + clearBlockScanInfo(p); + } + taosHashClear(pReader->status.pTableMap); - STableBlockScanInfo info = {.lastKey = 0, .uid = uid}; - taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); + STableKeyInfo* pList = (STableKeyInfo*) pTableList; + for(int32_t i = 0; i < num; ++i) { + STableBlockScanInfo info = {.lastKey = 0, .uid = pList[i].uid}; + taosHashPut(pReader->status.pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); + } + return TDB_CODE_SUCCESS; } @@ -3495,8 +3508,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } // ====================================== EXPOSED APIs ====================================== -int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTableList, STsdbReader** ppReader, - const char* idstr) { +int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, + STsdbReader** ppReader, const char* idstr) { STimeWindow window = pCond->twindows; if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) { pCond->twindows.skey += 1; @@ -3555,8 +3568,8 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl if (pReader->pSchema == NULL) { tsdbError("failed to get table schema, suid:%" PRIu64 ", ver:-1, %s", pReader->suid, pReader->idStr); } - } else if (taosArrayGetSize(pTableList) > 0) { - STableKeyInfo* pKey = taosArrayGet(pTableList, 0); + } else if (numOfTables > 0) { + STableKeyInfo* pKey = pTableList; pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, pKey->uid, -1, 1); if (pReader->pSchema == NULL) { tsdbError("failed to get table schema, uid:%" PRIu64 ", ver:-1, %s", pKey->uid, pReader->idStr); @@ -3565,8 +3578,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl STsdbReader* p = pReader->innerReader[0] != NULL ? pReader->innerReader[0] : pReader; - int32_t numOfTables = taosArrayGetSize(pTableList); - pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList->pData, numOfTables); + pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables); if (pReader->status.pTableMap == NULL) { tsdbReaderClose(pReader); *ppReader = NULL; @@ -3613,7 +3625,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr); return code; -_err: + _err: tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr); return code; } @@ -3778,7 +3790,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } -bool tsdbTableNextDataBlock(STsdbReader* pReader, int64_t uid) { +bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); if (pBlockScanInfo == NULL) { // no data block for the table of given uid return false; @@ -3967,7 +3979,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { } tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64 - " in query %s", + " in query %s", pReader, pReader->suid, numOfTables, pCond->twindows.skey, pReader->window.skey, pReader->window.ekey, pReader->idStr); @@ -4158,7 +4170,7 @@ int32_t tsdbTakeReadSnap(STsdb* pTsdb, STsdbReadSnap** ppSnap, const char* idStr } tsdbTrace("vgId:%d, take read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); -_exit: + _exit: return code; } @@ -4177,4 +4189,3 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) { } tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); } -bool tsdbIsAscendingOrder(STsdbReader* pReader) { return ASCENDING_TRAVERSE(pReader->order); } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 9f77178966..76a3359b5b 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -23,12 +23,13 @@ #include "tpagedbuf.h" #include "tsimplehash.h" #include "vnode.h" +#include "executor.h" #define T_LONG_JMP(_obj, _c) \ do { \ ASSERT((_c) != -1); \ longjmp((_obj), (_c)); \ - } while (0); + } while (0) #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ do { \ @@ -95,36 +96,23 @@ typedef struct SColMatchInfo { int32_t matchType; // determinate the source according to col id or slot id } SColMatchInfo; +typedef struct STableListInfo STableListInfo; struct SqlFunctionCtx; -// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly -// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups -//typedef struct STableListInfo { -// bool oneTableForEachGroup; -// int32_t numOfOuputGroups; // the data block will be generated one by one -// int32_t* groupOffset; // keep the offset value for each group in the tableList -// SArray* pTableList; -// SHashObj* map; // speedup acquire the tableQueryInfo by table uid -// uint64_t suid; -//} STableListInfo; -typedef struct { - bool oneTableForEachGroup; - int32_t numOfOuputGroups; // the data block will be generated one by one - int32_t* groupOffset; // keep the offset value for each group in the tableList - SArray* pGroupList; - SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid - bool needSortTableByGroupId; - uint64_t suid; -} STableListInfo; +int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* id); -void destroyTableList(STableListInfo* pTableList); -int32_t getNumOfOutputGroups(const STableListInfo* pTableList); -bool oneTableForEachGroup(const STableListInfo* pTableList); -int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid); -int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); -uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); -uint64_t getTotalTables(const STableListInfo* pTableList); +STableListInfo* tableListCreate(); +void* tableListDestroy(STableListInfo* pTableListInfo); +void tableListClear(STableListInfo* pTableListInfo); +int32_t tableListGetOutputGroups(const STableListInfo* pTableList); +bool oneTableForEachGroup(const STableListInfo* pTableList); +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); +int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); +int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); +uint64_t tableListGetSize(const STableListInfo* pTableList); +uint64_t tableListGetSuid(const STableListInfo* pTableList); +STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); @@ -138,6 +126,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo if (forUpdate) { setBufPageDirty(bufPage, true); } + SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); return pRow; } @@ -153,10 +142,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); -int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, - STableListInfo* pListInfo); int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); -int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableListInfo* pTableListInfo); size_t getTableTagsBufLen(const SNodeList* pGroups); SArray* createSortInfo(SNodeList* pNodeList); @@ -179,9 +165,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi void cleanupQueryTableDataCond(SQueryTableDataCond* pCond); int32_t convertFillType(int32_t mode); - int32_t resultrowComparAsc(const void* p1, const void* p2); - int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified); #endif // TDENGINE_QUERYUTIL_H diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 31bf410f0a..1191b6a485 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -184,7 +184,7 @@ typedef struct SExecTaskInfo { int64_t version; // used for stream to record wal version SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; - STableListInfo tableqinfoList; // this is a table list + STableListInfo* pTableInfoList; // this is a table list const char* sql; // query sql string jmp_buf env; // jump to this position when error happens. EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] @@ -555,7 +555,7 @@ typedef struct SSysTableScanInfo { typedef struct SBlockDistInfo { SSDataBlock* pResBlock; - void* pHandle; + STsdbReader* pHandle; SReadHandle readHandle; uint64_t uid; // table uid } SBlockDistInfo; @@ -970,8 +970,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, - SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo); @@ -1065,10 +1065,6 @@ uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, S int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, - const char* idstr); - SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo, @@ -1077,7 +1073,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); bool groupbyTbname(SNodeList* pGroupList); -int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* groupKey, bool groupSort); void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 92e8d90bb8..92e98d3eab 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -48,6 +48,10 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe int32_t numOfCols = 0; code = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + removeRedundantTsCol(pScanNode, &pInfo->matchInfo); code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds); @@ -55,18 +59,23 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe goto _error; } - STableListInfo* pTableList = &pTaskInfo->tableqinfoList; + STableListInfo* pTableList = pTaskInfo->pTableInfoList; initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pInfo->pUidList = taosArrayInit(4, sizeof(int64_t)); - // partition by tbname - if (oneTableForEachGroup(pTableList) || (getTotalTables(pTableList) == 1)) { + // partition by tbname, todo opt perf + if (oneTableForEachGroup(pTableList) || (tableListGetSize(pTableList) == 1)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | (pScanNode->ignoreNull ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW); - code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pTableList->pTableList, - taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); + + STableKeyInfo* pList = tableListGetInfo(pTableList, 0); + + size_t num = tableListGetSize(pTableList); + uint64_t suid = tableListGetSuid(pTableList); + code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -98,7 +107,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pOperator->cost.openCost = 0; return pOperator; -_error: + _error: pTaskInfo->code = code; destroyLastrowScanOperator(pInfo); taosMemoryFree(pOperator); @@ -112,8 +121,10 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SLastrowScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - STableListInfo* pTableList = &pTaskInfo->tableqinfoList; - int32_t size = taosArrayGetSize(pTableList->pTableList); + STableListInfo* pTableList = pTaskInfo->pTableInfoList; + + uint64_t suid = tableListGetSuid(pTableList); + int32_t size = tableListGetSize(pTableList); if (size == 0) { doSetOperatorCompleted(pOperator); return NULL; @@ -168,6 +179,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } pInfo->pRes->info.groupId = getTableGroupId(pTableList, pInfo->pRes->info.uid); + pInfo->indexOfBufferedRes += 1; return pInfo->pRes; } else { @@ -175,24 +187,24 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { return NULL; } } else { - size_t num = getNumOfOutputGroups(pTableList); - while (pInfo->currentGroupIndex < num) { + size_t totalGroups = tableListGetOutputGroups(pTableList); - STableKeyInfo* p = NULL; - int32_t s = 0; - getTablesOfGroup(pTableList, pInfo->currentGroupIndex, &p, &s); - SArray* x = taosArrayInit(4, sizeof(STableKeyInfo)); - for(int32_t i = 0; i < s; ++i) { - taosArrayPush(x, &p[i]); + while (pInfo->currentGroupIndex < totalGroups) { + STableKeyInfo* pList = NULL; + int32_t num = 0; + + int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); } - tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, x, - taosArrayGetSize(pInfo->matchInfo.pList), pTableList->suid, &pInfo->pLastrowReader); + tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, + taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader); taosArrayClear(pInfo->pUidList); - int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); + code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } pInfo->currentGroupIndex += 1; @@ -201,7 +213,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { if (pInfo->pRes->info.rows > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) { SExprSupp* pSup = &pInfo->pseudoExprSup; - pInfo->pRes->info.groupId = p->groupId; + + STableKeyInfo* pKeyInfo = &((STableKeyInfo*)pTableList)[0]; + pInfo->pRes->info.groupId = pKeyInfo->groupId; if (taosArrayGetSize(pInfo->pUidList) > 0) { ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 6d6e765574..67405aad92 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -26,9 +26,31 @@ #include "executorimpl.h" #include "tcompression.h" -static int32_t removeInvalidTable(SArray* list, SHashObj* tags); +// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly +// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups +struct STableListInfo { + bool oneTableForEachGroup; + int32_t numOfOuputGroups; // the data block will be generated one by one + int32_t* groupOffset; // keep the offset value for each group in the tableList + SArray* pTableList; + SHashObj* map; // speedup acquire the tableQueryInfo by table uid + uint64_t suid; +}; + +typedef struct tagFilterAssist { + SHashObj* colHash; + int32_t index; + SArray* cInfoList; +} tagFilterAssist; + +static int32_t removeInvalidTable(SArray* uids, SHashObj* tags); static int32_t optimizeTbnameInCond(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond, SHashObj* tags); static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond); +static int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, + SNode* pTagIndexCond, STableListInfo* pListInfo); + +static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } +static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } void initResultRowInfo(SResultRowInfo* pResultRowInfo) { pResultRowInfo->size = 0; @@ -301,12 +323,6 @@ int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, return TSDB_CODE_SUCCESS; } -typedef struct tagFilterAssist { - SHashObj* colHash; - int32_t index; - SArray* cInfoList; -} tagFilterAssist; - static EDealRes getColumn(SNode** pNode, void* pContext) { SColumnNode* pSColumnNode = NULL; if (QUERY_NODE_COLUMN == nodeType((*pNode))) { @@ -482,6 +498,7 @@ static SColumnInfoData* getColInfoResult(void* metaHandle, int64_t suid, SArray* } } } + pResBlock->info.rows = rows; // int64_t st1 = taosGetTimestampUs(); @@ -544,6 +561,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis code = TSDB_CODE_OUT_OF_MEMORY; goto end; } + ctx.index = 0; ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); if (ctx.cInfoList == NULL) { @@ -606,6 +624,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } else { void* tag = taosHashGet(tags, uid, sizeof(int64_t)); ASSERT(tag); + STagVal tagVal = {0}; tagVal.cid = pColInfo->info.colId; const char* p = metaGetTableTagVal(tag, pColInfo->info.type, &tagVal); @@ -636,6 +655,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } } } + pResBlock->info.rows = rows; // int64_t st1 = taosGetTimestampUs(); @@ -661,10 +681,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } break; } + default: code = TSDB_CODE_OPS_NOT_SUPPORT; goto end; } + if (nodeType(pNode) == QUERY_NODE_COLUMN) { SColumnNode* pSColumnNode = (SColumnNode*)pNode; SColumnInfoData* pColInfo = (SColumnInfoData*)taosArrayGet(pResBlock->pDataBlock, pSColumnNode->slotId); @@ -674,10 +696,12 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis } else { code = scalarCalculate(pNode, pBlockList, &output); } + if (code != TSDB_CODE_SUCCESS) { releaseColInfoData(output.columnData); goto end; } + taosArrayPush(groupData, &output.columnData); } @@ -696,6 +720,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis code = TSDB_CODE_OUT_OF_MEMORY; goto end; } + for (int i = 0; i < rows; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); @@ -872,18 +897,22 @@ static int32_t removeInvalidTable(SArray* uids, SHashObj* tags) { taosArrayPush(validUid, uid); } } + taosArraySwap(uids, validUid); taosArrayDestroy(validUid); return 0; } + static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* list, SNode* pTagCond) { if (nodeType(pTagCond) != QUERY_NODE_OPERATOR) { return -1; } + SOperatorNode* pNode = (SOperatorNode*)pTagCond; if (pNode->opType != OP_TYPE_IN) { return -1; } + if ((pNode->pLeft != NULL && nodeType(pNode->pLeft) == QUERY_NODE_COLUMN && ((SColumnNode*)pNode->pLeft)->colType == COLUMN_TYPE_TBNAME) && (pNode->pRight != NULL && nodeType(pNode->pRight) == QUERY_NODE_NODE_LIST)) { @@ -895,9 +924,11 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* } SArray* pTbList = getTableNameList(pList); - size_t num = taosArrayGetSize(pTbList); - for (int i = 0; i < num; i++) { - char* name = taosArrayGetP(pTbList, i); + int32_t numOfTables = taosArrayGetSize(pTbList); + + for (int i = 0; i < numOfTables; i++) { + char* name = taosArrayGetP(pTbList, i); + uint64_t uid = 0; if (metaGetTableUidByName(metaHandle, name, &uid) == 0) { ETableType tbType = TSDB_TABLE_MAX; @@ -912,6 +943,7 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, int64_t suid, SArray* terrno = 0; } } + taosArrayDestroy(pTbList); return 0; } @@ -923,11 +955,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) { int32_t code = TSDB_CODE_SUCCESS; - pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); - if (pListInfo->pTableList == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - uint64_t tableUid = pScanNode->uid; pListInfo->suid = pScanNode->suid; SArray* res = taosArrayInit(8, sizeof(uint64_t)); @@ -987,13 +1014,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, size_t numOfTables = taosArrayGetSize(res); for (int i = 0; i < numOfTables; i++) { STableKeyInfo info = {.uid = *(uint64_t*)taosArrayGet(res, i), .groupId = 0}; - void* p = taosArrayPush(pListInfo->pTableList, &info); + + void* p = taosArrayPush(pListInfo->pTableList, &info); if (p == NULL) { taosArrayDestroy(res); return TSDB_CODE_OUT_OF_MEMORY; } - qDebug("tagfilter get uid:%" PRId64 "", info.uid); + qDebug("tagfilter get uid:%" PRIu64 "", info.uid); } taosArrayDestroy(res); @@ -1111,7 +1139,7 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList) { int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols, int32_t type, SColMatchInfo* pMatchInfo) { - size_t numOfCols = LIST_LENGTH(pNodeList); + size_t numOfCols = LIST_LENGTH(pNodeList); int32_t code = 0; pMatchInfo->matchType = type; @@ -1454,10 +1482,10 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { SColumnInfoData* p = taosArrayGet(pCols, i); SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, j); -/* if (!outputEveryColumn && pmInfo->reserved) { - j++; - continue; - }*/ + /* if (!outputEveryColumn && pmInfo->reserved) { + j++; + continue; + }*/ if (p->info.colId == pmInfo->colId) { SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, pmInfo->dstSlotId); @@ -1635,9 +1663,6 @@ bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo) { pLimitInfo->slimit.offset != -1); } -static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; } -static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; } - void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo) { SLimit limit = {.limit = getLimit(pLimit), .offset = getOffset(pLimit)}; SLimit slimit = {.limit = getLimit(pSLimit), .offset = getOffset(pSLimit)}; @@ -1648,11 +1673,23 @@ void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimit pLimitInfo->remainGroupOffset = slimit.offset; } -uint64_t getTotalTables(const STableListInfo* pTableList) { +uint64_t tableListGetSize(const STableListInfo* pTableList) { ASSERT(taosArrayGetSize(pTableList->pTableList) == taosHashGetSize(pTableList->map)); return taosArrayGetSize(pTableList->pTableList); } +uint64_t tableListGetSuid(const STableListInfo* pTableList) { + return pTableList->suid; +} + +STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) { + if (taosArrayGetSize(pTableList->pTableList) == 0) { + return NULL; + } + + return taosArrayGet(pTableList->pTableList, index); +} + uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); ASSERT(pTableList->map != NULL && slot != NULL); @@ -1663,10 +1700,11 @@ uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { return pKeyInfo->groupId; } -int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { +// TODO handle the group offset info, fix it, the rule of group output will be broken by this function +int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid) { if (pTableList->map == NULL) { ASSERT(taosArrayGetSize(pTableList->pTableList) == 0); - pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); } STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; @@ -1679,9 +1717,9 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t return TSDB_CODE_SUCCESS; } -int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, +int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalGroupIndex, STableKeyInfo** pKeyInfo, int32_t* size) { - int32_t total = getNumOfOutputGroups(pTableList); + int32_t total = tableListGetOutputGroups(pTableList); if (ordinalGroupIndex < 0 || ordinalGroupIndex >= total) { return TSDB_CODE_INVALID_PARA; } @@ -1689,10 +1727,10 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI // here handle two special cases: // 1. only one group exists, and 2. one table exists for each group. if (total == 1) { - *size = getTotalTables(pTableList); + *size = tableListGetSize(pTableList); *pKeyInfo = (*size == 0)? NULL:taosArrayGet(pTableList->pTableList, 0); return TSDB_CODE_SUCCESS; - } else if (total == getTotalTables(pTableList)) { + } else if (total == tableListGetSize(pTableList)) { *size = 1; *pKeyInfo = taosArrayGet(pTableList->pTableList, ordinalGroupIndex); return TSDB_CODE_SUCCESS; @@ -1709,17 +1747,178 @@ int32_t getTablesOfGroup(const STableListInfo* pTableList, int32_t ordinalGroupI return TSDB_CODE_SUCCESS; } -int32_t getNumOfOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; } +int32_t tableListGetOutputGroups(const STableListInfo* pTableList) { return pTableList->numOfOuputGroups; } -// todo remove it bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList->oneTableForEachGroup; } -void destroyTableList(STableListInfo* pTableqinfoList) { - pTableqinfoList->pTableList = taosArrayDestroy(pTableqinfoList->pTableList); - taosMemoryFreeClear(pTableqinfoList->groupOffset); +STableListInfo* tableListCreate() { + STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo)); + if (pListInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } - taosHashCleanup(pTableqinfoList->map); + pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); + if (pListInfo->pTableList == NULL) { + goto _error; + } - pTableqinfoList->pTableList = NULL; - pTableqinfoList->map = NULL; + pListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (pListInfo->map == NULL) { + goto _error; + } + + pListInfo->numOfOuputGroups = 1; + return pListInfo; + +_error: + tableListDestroy(pListInfo); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; +} + +void* tableListDestroy(STableListInfo* pTableListInfo) { + if (pTableListInfo == NULL) { + return NULL; + } + + pTableListInfo->pTableList = taosArrayDestroy(pTableListInfo->pTableList); + taosMemoryFreeClear(pTableListInfo->groupOffset); + + taosHashCleanup(pTableListInfo->map); + + pTableListInfo->pTableList = NULL; + pTableListInfo->map = NULL; + taosMemoryFree(pTableListInfo); + return NULL; +} + +void tableListClear(STableListInfo* pTableListInfo) { + if (pTableListInfo == NULL) { + return; + } + + taosArrayClear(pTableListInfo->pTableList); + taosHashClear(pTableListInfo->map); + taosMemoryFree(pTableListInfo->groupOffset); + pTableListInfo->numOfOuputGroups = 1; + pTableListInfo->oneTableForEachGroup = false; +} + +static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { + STableKeyInfo* pInfo1 = (STableKeyInfo*) p1; + STableKeyInfo* pInfo2 = (STableKeyInfo*) p2; + + if (pInfo1->groupId == pInfo2->groupId) { + return 0; + } else { + return pInfo1->groupId < pInfo2->groupId? -1:1; + } +} + +static int32_t sortTableGroup(STableListInfo* pTableListInfo) { + int32_t code = TSDB_CODE_SUCCESS; + + taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); + int32_t size = taosArrayGetSize(pTableListInfo->pTableList); + + SArray* pList = taosArrayInit(4, sizeof(int32_t)); + + STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); + uint64_t gid = pInfo->groupId; + + int32_t start = 0; + taosArrayPush(pList, &start); + + for(int32_t i = 1; i < size; ++i) { + pInfo = taosArrayGet(pTableListInfo->pTableList, i); + if (pInfo->groupId != gid) { + taosArrayPush(pList, &i); + gid = pInfo->groupId; + } + } + + pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); + pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); + taosArrayDestroy(pList); + return TDB_CODE_SUCCESS; +} + +int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { + int32_t code = TSDB_CODE_SUCCESS; + ASSERT(pTableListInfo->map != NULL); + + bool groupByTbname = groupbyTbname(group); + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); + if (group == NULL || groupByTbname) { + for (int32_t i = 0; i < numOfTables; i++) { + STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); + info->groupId = groupByTbname? info->uid:0; + } + + pTableListInfo->oneTableForEachGroup = groupByTbname; + + if (groupSort && groupByTbname) { + taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); + pTableListInfo->numOfOuputGroups = numOfTables; + } else { + pTableListInfo->numOfOuputGroups = 1; + } + } else { + code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + if (groupSort) { + code = sortTableGroup(pTableListInfo); + } + } + + // add all table entry in the hash map + size_t size = taosArrayGetSize(pTableListInfo->pTableList); + for(int32_t i = 0; i < size; ++i) { + STableKeyInfo* p = taosArrayGet(pTableListInfo->pTableList, i); + taosHashPut(pTableListInfo->map, &p->uid, sizeof(uint64_t), &i, sizeof(int32_t)); + } + + return code; +} + +int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, + STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, + const char* idStr) { + int64_t st = taosGetTimestampUs(); + + if (pHandle == NULL) { + qError("invalid handle, in creating operator tree, %s", idStr); + return TSDB_CODE_INVALID_PARA; + } + + int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to getTableList, code: %s", tstrerror(code)); + return code; + } + + ASSERT(pTableListInfo->numOfOuputGroups == 1); + + int64_t st1 = taosGetTimestampUs(); + qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr); + + if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { + qDebug("no table qualified for query, %s" PRIx64, idStr); + return TSDB_CODE_SUCCESS; + } + + code = buildGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int64_t st2 = taosGetTimestampUs(); + qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr); + + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 12f3874e70..bbd62a9912 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -287,7 +287,6 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - STableListInfo* pListInfo = &pTaskInfo->tableqinfoList; if (isAdd) { qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); @@ -303,8 +302,10 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo SStreamScanInfo* pScanInfo = pInfo->info; if (isAdd) { // add new table id SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); + int32_t numOfQualifiedTables = taosArrayGetSize(qa); + + qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables); - qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); code = tqReaderAddTbUidList(pScanInfo->tqReader, qa); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(qa); @@ -324,7 +325,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } - for (int32_t i = 0; i < taosArrayGetSize(qa); ++i) { + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + + for (int32_t i = 0; i < numOfQualifiedTables; ++i) { uint64_t* uid = taosArrayGet(qa, i); STableKeyInfo keyInfo = {.uid = *uid, .groupId = 0}; @@ -355,7 +358,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo if (!exists) { #endif - addTableIntoTableList(&pTaskInfo->tableqinfoList, keyInfo.uid, keyInfo.groupId); + tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId); } if (keyBuf != NULL) { @@ -435,7 +438,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId); -_error: + _error: // if failed to add ref for all tables in this query, abort current query return code; } @@ -919,8 +922,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int64_t ts = pOffset->ts; if (uid == 0) { - if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0); + if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) { + STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0); uid = pTableInfo->uid; ts = INT64_MIN; } else { @@ -931,7 +934,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); #ifndef NDEBUG qDebug("switch to next table %" PRId64 " (cursor %d), %" PRId64 " rows returned", uid, @@ -940,8 +943,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT #endif bool found = false; - for (int32_t i = 0; i < tableSz; i++) { - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i); + for (int32_t i = 0; i < numOfTables; i++) { + STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); if (pTableInfo->uid == uid) { found = true; pTableScanInfo->currentTable = i; @@ -953,14 +956,17 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT ASSERT(found); if (pTableScanInfo->dataReader == NULL) { - if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, - pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 || - pTableScanInfo->dataReader == NULL) { + STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); + int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); + + if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond, pList, num, + &pTableScanInfo->dataReader, NULL) < 0 || pTableScanInfo->dataReader == NULL) { ASSERT(0); } } - tsdbSetTableId(pTableScanInfo->dataReader, uid); + STableKeyInfo tki = {.uid = uid}; + tsdbSetTableList(pTableScanInfo->dataReader, &tki, 1); int64_t oldSkey = pTableScanInfo->cond.twindows.skey; pTableScanInfo->cond.twindows.skey = ts + 1; tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -968,7 +974,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pTableScanInfo->scanTimes = 0; qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, - ts, pTableScanInfo->currentTable, tableSz); + ts, pTableScanInfo->currentTable, numOfTables); /*}*/ } else { ASSERT(0); @@ -984,16 +990,28 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT SMetaTableInfo mtInfo = getUidfromSnapShot(sContext); tsdbReaderClose(pInfo->dataReader); pInfo->dataReader = NULL; + cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); - taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList); - if (mtInfo.uid == 0) return 0; // no data + tableListClear(pTaskInfo->pTableInfoList); + + if (mtInfo.uid == 0) { + return 0; // no data + } initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; - pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo)); - taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0}); - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, - &pInfo->dataReader, NULL); + + if (pTaskInfo->pTableInfoList == NULL) { + pTaskInfo->pTableInfoList = tableListCreate(); + } + + tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0); + + STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); + int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); + ASSERT(size == 1); + + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, &pInfo->dataReader, NULL); cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7a3454e092..11dbfaff9f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -494,7 +494,7 @@ static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int // todo: refactor this if (fmIsImplicitTsFunc(pCtx[i].functionId) && (j == pOneExpr->base.numOfParams - 1)) { pInput->pPTS = pInput->pData[j]; // in case of merge function, this is not always the ts column data. - // ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP); + // ASSERT(pInput->pPTS->info.type == TSDB_DATA_TYPE_TIMESTAMP); } ASSERT(pInput->pData[j] != NULL); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { @@ -815,7 +815,7 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo) { // abort current query execution. if (pTaskInfo->owner != 0 && ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec()) - /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) { + /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) { assert(pTaskInfo->cost.start != 0); // qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64 // ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); @@ -1739,8 +1739,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t return TSDB_CODE_SUCCESS; } -static void doDestroyTableList(STableListInfo* pTableqinfoList); - typedef struct SFetchRspHandleWrapper { uint32_t exchangeId; int32_t sourceIndex; @@ -1965,7 +1963,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; @@ -1992,10 +1990,10 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d" - " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - ", total:%.2f Kb," - " completed:%d try next %d/%" PRIzu, + " execId:%d" + " index:%d completed, blocks:%d, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + ", total:%.2f Kb," + " completed:%d try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, completed + 1, i + 1, totalSources); @@ -2003,7 +2001,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", + " execId:%d blocks:%d, numOfRows:%d, totalRows:%" PRIu64 ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } @@ -2030,7 +2028,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn sched_yield(); } -_error: + _error: pTaskInfo->code = code; } @@ -2091,7 +2089,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 " try next", + ", totalRows:%" PRIu64 " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); @@ -2108,7 +2106,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); @@ -2117,7 +2115,7 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 - ", totalBytes:%" PRIu64, + ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize); } @@ -2305,7 +2303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, NULL); return pOperator; -_error: + _error: if (pInfo != NULL) { doDestroyExchangeOperatorInfo(pInfo); } @@ -3042,7 +3040,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN return pOperator; -_error: + _error: if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } @@ -3212,8 +3210,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType - ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval - : &((SIntervalAggOperatorInfo*)downstream->info)->interval; + ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval + : &((SIntervalAggOperatorInfo*)downstream->info)->interval; int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); @@ -3258,7 +3256,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* code = appendDownstream(pOperator, &downstream, 1); return pOperator; -_error: + _error: if (pInfo != NULL) { destroyFillOperatorInfo(pInfo); } @@ -3275,6 +3273,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT pTaskInfo->cost.created = taosGetTimestampMs(); pTaskInfo->id.queryId = queryId; pTaskInfo->execModel = model; + pTaskInfo->pTableInfoList = tableListCreate(); char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); @@ -3366,105 +3365,6 @@ static void cleanupTableSchemaInfo(SSchemaInfo* pSchemaInfo) { static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) { tDeleteSSchemaWrapper(pStreamInfo->schema); } - -static int32_t orderbyGroupIdComparFn(const void* p1, const void* p2) { - STableKeyInfo* pInfo1 = (STableKeyInfo*) p1; - STableKeyInfo* pInfo2 = (STableKeyInfo*) p2; - - if (pInfo1->groupId == pInfo2->groupId) { - return 0; - } else { - return pInfo1->groupId < pInfo2->groupId? -1:1; - } -} - -static int32_t sortTableGroup(STableListInfo* pTableListInfo) { - int32_t code = TSDB_CODE_SUCCESS; - - taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); - int32_t size = taosArrayGetSize(pTableListInfo->pTableList); - - SArray* pList = taosArrayInit(4, sizeof(int32_t)); - - STableKeyInfo* pInfo = taosArrayGet(pTableListInfo->pTableList, 0); - uint64_t gid = pInfo->groupId; - - int32_t start = 0; - taosArrayPush(pList, &start); - - for(int32_t i = 1; i < size; ++i) { - pInfo = taosArrayGet(pTableListInfo->pTableList, i); - if (pInfo->groupId != gid) { - taosArrayPush(pList, &i); - gid = pInfo->groupId; - } - } - - pTableListInfo->numOfOuputGroups = taosArrayGetSize(pList); - pTableListInfo->groupOffset = taosMemoryMalloc(sizeof(int32_t) * pTableListInfo->numOfOuputGroups); - memcpy(pTableListInfo->groupOffset, taosArrayGet(pList, 0), sizeof(int32_t) * pTableListInfo->numOfOuputGroups); - taosArrayDestroy(pList); - return TSDB_CODE_SUCCESS; - -#if 0 - taosArrayClear(pTableListInfo->pGroupList); - SArray* sortSupport = taosArrayInit(16, sizeof(uint64_t)); - if (sortSupport == NULL) return TSDB_CODE_OUT_OF_MEMORY; - for (int32_t i = 0; i < taosArrayGetSize(pTableListInfo->pTableList); i++) { - STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - uint64_t* groupId = taosHashGet(pTableListInfo->map, &info->uid, sizeof(uint64_t)); - - int32_t index = taosArraySearchIdx(sortSupport, groupId, compareUint64Val, TD_EQ); - if (index == -1) { - void* p = taosArraySearch(sortSupport, groupId, compareUint64Val, TD_GT); - SArray* tGroup = taosArrayInit(8, sizeof(STableKeyInfo)); - if (tGroup == NULL) { - taosArrayDestroy(sortSupport); - return TSDB_CODE_OUT_OF_MEMORY; - } - if (taosArrayPush(tGroup, info) == NULL) { - qError("taos push info array error"); - taosArrayDestroy(sortSupport); - return TSDB_CODE_QRY_APP_ERROR; - } - if (p == NULL) { - if (taosArrayPush(sortSupport, groupId) == NULL) { - qError("taos push support array error"); - taosArrayDestroy(sortSupport); - return TSDB_CODE_QRY_APP_ERROR; - } - if (taosArrayPush(pTableListInfo->pGroupList, &tGroup) == NULL) { - qError("taos push group array error"); - taosArrayDestroy(sortSupport); - return TSDB_CODE_QRY_APP_ERROR; - } - } else { - int32_t pos = TARRAY_ELEM_IDX(sortSupport, p); - if (taosArrayInsert(sortSupport, pos, groupId) == NULL) { - qError("taos insert support array error"); - taosArrayDestroy(sortSupport); - return TSDB_CODE_QRY_APP_ERROR; - } - if (taosArrayInsert(pTableListInfo->pGroupList, pos, &tGroup) == NULL) { - qError("taos insert group array error"); - taosArrayDestroy(sortSupport); - return TSDB_CODE_QRY_APP_ERROR; - } - } - } else { - SArray* tGroup = (SArray*)taosArrayGetP(pTableListInfo->pGroupList, index); - if (taosArrayPush(tGroup, info) == NULL) { - qError("taos push uid array error"); - taosArrayDestroy(sortSupport); - return TSDB_CODE_QRY_APP_ERROR; - } - } - } - taosArrayDestroy(sortSupport); - return TDB_CODE_SUCCESS; -#endif -} - bool groupbyTbname(SNodeList* pGroupList) { bool bytbname = false; if (LIST_LENGTH(pGroupList) == 1) { @@ -3478,86 +3378,29 @@ bool groupbyTbname(SNodeList* pGroupList) { return bytbname; } -int32_t setGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle* pHandle, SNodeList* group, bool groupSort) { - pTableListInfo->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); - if (pTableListInfo->map == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - bool groupByTbname = groupbyTbname(group); - size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - - if (groupByTbname || group == NULL) { - for (int32_t i = 0; i < numOfTables; i++) { - STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - info->groupId = groupByTbname? info->uid:0; - } - - pTableListInfo->oneTableForEachGroup = groupByTbname; - - if (groupSort && groupByTbname) { - taosArraySort(pTableListInfo->pTableList, orderbyGroupIdComparFn); - pTableListInfo->numOfOuputGroups = numOfTables; - } - } else { - int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (groupSort) { - return sortTableGroup(pTableListInfo); - } - } - - for(int32_t i = 0; i < numOfTables; ++i) { - STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); - taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &i, sizeof(int32_t)); - } - - return TDB_CODE_SUCCESS; -} - -static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { - memset(pCond, 0, sizeof(SQueryTableDataCond)); - - pCond->order = TSDB_ORDER_ASC; - pCond->numOfCols = 1; - pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); - if (pCond->colList == NULL) { - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return terrno; - } - - pCond->colList->colId = 1; - pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP; - pCond->colList->bytes = sizeof(TSKEY); - - pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; - pCond->suid = uid; - pCond->type = TIMEWINDOW_RANGE_CONTAINED; - pCond->startVersion = -1; - pCond->endVersion = -1; - - return TSDB_CODE_SUCCESS; -} - -SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, - const char* pUser) { +SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, + SNode* pTagIndexCond, const char* pUser) { int32_t type = nodeType(pPhyNode); + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + const char* idstr = GET_TASKID(pTaskInfo); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { SOperatorInfo* pOperator = NULL; if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; + // NOTE: this is an patch to fix the physical plan + // TODO remove it later + if (pTableScanNode->scan.node.pLimit != NULL) { + pTableScanNode->groupSort = true; + } + int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, - pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pTableListInfo, pTagCond, pTagIndexCond, idstr); if (code) { pTaskInfo->code = code; - qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo)); + qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), idstr); return NULL; } @@ -3573,8 +3416,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; int32_t code = - createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle, - pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/true, pHandle, + pTableListInfo, pTagCond, pTagIndexCond, idstr); if (code) { pTaskInfo->code = code; qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); @@ -3598,8 +3441,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; if (pHandle->vnode) { int32_t code = - createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, /*pTableScanNode->groupSort*/false, - pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, + pHandle, pTableListInfo, pTagCond, pTagIndexCond, idstr); if (code) { pTaskInfo->code = code; qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); @@ -3607,10 +3450,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } #ifndef NDEBUG - int32_t sz = taosArrayGetSize(pTableListInfo->pTableList); + int32_t sz = tableListGetSize(pTableListInfo); + qDebug("create stream task, total:%d", sz); + for (int32_t i = 0; i < sz; i++) { - STableKeyInfo* pKeyInfo = taosArrayGet(pTableListInfo->pTableList, i); - qDebug("creating stream task: add table %" PRId64, pKeyInfo->uid); + STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); + qDebug("add table uid:%" PRIu64", gid:%"PRIu64, pKeyInfo->uid, pKeyInfo->groupId); } #endif } @@ -3622,7 +3467,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; - int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo); + + int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, + pTagIndexCond, idstr); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; qError("failed to getTableList, code: %s", tstrerror(code)); @@ -3632,35 +3479,30 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; - pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); if (pBlockNode->tableType == TSDB_SUPER_TABLE) { - int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pTableListInfo->pTableList); + SArray* pList = taosArrayInit(4, sizeof(STableKeyInfo)); + int32_t code = vnodeGetAllTableList(pHandle->vnode, pBlockNode->uid, pList); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = terrno; return NULL; } - } else { // Create one table group. - STableKeyInfo info = {.uid = pBlockNode->uid, .groupId = 0}; - taosArrayPush(pTableListInfo->pTableList, &info); + + for(int32_t i = 0; i < tableListGetSize(pTableListInfo); ++i) { + STableKeyInfo* p = taosArrayGet(pList, i); + tableListAddTableInfo(pTableListInfo, p->uid, 0); + } + taosArrayDestroy(pList); + } else { // Create group with only one table + tableListAddTableInfo(pTableListInfo, pBlockNode->uid, 0); } - SQueryTableDataCond cond = {0}; - int32_t code = initTableblockDistQueryCond(pBlockNode->suid, &cond); - if (code != TSDB_CODE_SUCCESS) { - return NULL; - } - - STsdbReader* pReader = NULL; - tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, ""); - cleanupQueryTableDataCond(&cond); - - pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); + pOperator = createDataBlockInfoScanOperator(pHandle, pBlockNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, - pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); + pTagCond, pTagIndexCond, idstr); if (code != TSDB_CODE_SUCCESS) { pTaskInfo->code = code; return NULL; @@ -3686,17 +3528,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOperator; } - size_t size = LIST_LENGTH(pPhyNode->pChildren); + size_t size = LIST_LENGTH(pPhyNode->pChildren); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); + if (ops == NULL) { + return NULL; + } + for (int32_t i = 0; i < size; ++i) { SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); - ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser); + ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTagCond, pTagIndexCond, pUser); if (ops[i] == NULL) { taosMemoryFree(ops); return NULL; } - - ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId; } SOperatorInfo* pOptr = NULL; @@ -3895,7 +3739,7 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32 *length = *(int32_t*)(*result); } -_downstream: + _downstream: for (int32_t i = 0; i < ops->numOfDownstream; ++i) { code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal); if (code != TDB_CODE_SUCCESS) { @@ -3961,15 +3805,18 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT if (NULL == pDeleterParam) { return TSDB_CODE_OUT_OF_MEMORY; } - int32_t tbNum = taosArrayGetSize(pTask->tableqinfoList.pTableList); - pDeleterParam->suid = pTask->tableqinfoList.suid; + int32_t tbNum = tableListGetSize(pTask->pTableInfoList); + pDeleterParam->suid = tableListGetSuid(pTask->pTableInfoList); + + // TODO extract uid list pDeleterParam->pUidList = taosArrayInit(tbNum, sizeof(uint64_t)); if (NULL == pDeleterParam->pUidList) { taosMemoryFree(pDeleterParam); return TSDB_CODE_OUT_OF_MEMORY; } + for (int32_t i = 0; i < tbNum; ++i) { - STableKeyInfo* pTable = taosArrayGet(pTask->tableqinfoList.pTableList, i); + STableKeyInfo* pTable = tableListGetInfo(pTask->pTableInfoList, i); taosArrayPush(pDeleterParam->pUidList, &pTable->uid); } @@ -4005,8 +3852,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead sql = NULL; (*pTaskInfo)->pSubplan = pPlan; - (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, - pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); + (*pTaskInfo)->pRoot = + createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user); if (NULL == (*pTaskInfo)->pRoot) { code = (*pTaskInfo)->code; @@ -4015,35 +3862,17 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead return code; -_complete: + _complete: taosMemoryFree(sql); doDestroyTask(*pTaskInfo); terrno = code; return code; } -void doDestroyTableList(STableListInfo* pTableqinfoList) { - taosArrayDestroy(pTableqinfoList->pTableList); - taosHashCleanup(pTableqinfoList->map); - if (pTableqinfoList->needSortTableByGroupId) { -// for (int32_t i = 0; i < taosArrayGetSize(pTableqinfoList->pGroupList); i++) { -// SArray* tmp = taosArrayGetP(pTableqinfoList->pGroupList, i); -// if (tmp == pTableqinfoList->pTableList) { -// continue; -// } -// taosArrayDestroy(tmp); -// } - } -// taosArrayDestroy(pTableqinfoList->pGroupList); - - pTableqinfoList->pTableList = NULL; - pTableqinfoList->map = NULL; -} - void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); - doDestroyTableList(&pTaskInfo->tableqinfoList); + pTaskInfo->pTableInfoList = tableListDestroy(pTaskInfo->pTableInfoList); destroyOperatorInfo(pTaskInfo->pRoot); cleanupTableSchemaInfo(&pTaskInfo->schemaInfo); cleanupStreamInfo(&pTaskInfo->streamInfo); @@ -4172,8 +4001,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat int32_t size = 0; void* pVal = NULL; SWinKey key = { - .ts = *(TSKEY*)pPos->key, - .groupId = pPos->groupId, + .ts = *(TSKEY*)pPos->key, + .groupId = pPos->groupId, }; int32_t code = streamStateGet(pState, &key, &pVal, &size); ASSERT(code == 0); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2d3521b419..855be4752d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -363,8 +363,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; pBlock->info.rows = 0; - qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, - GET_TASKID(pTaskInfo)); + qDebug("current block ignore due to offset, current:%"PRId64", %s", pLimitInfo->remainOffset, GET_TASKID(pTaskInfo)); } else { blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); pLimitInfo->remainOffset = 0; @@ -377,9 +376,7 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo int32_t keep = pBlock->info.rows - overflowRows; blockDataKeepFirstNRows(pBlock, keep); - qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); - - // setTaskStatus(pTaskInfo, TASK_COMPLETED); + qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); pOperator->status = OP_EXEC_DONE; } } @@ -624,11 +621,12 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { blockDataCleanup(pBlock); + SDataBlockInfo* pBInfo = &pBlock->info; tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBInfo->rows, &pBInfo->uid, &pBInfo->window); ASSERT(pBInfo->uid != 0); - pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); + pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); @@ -682,7 +680,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - qDebug( "%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); + qDebug("start to repeat ascending order scan data blocks due to query func required, %s", GET_TASKID(pTaskInfo)); // do prepare for the next round table scan operation tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); @@ -709,8 +707,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - qDebug("%s start to repeat descending order scan data blocks due to query func required", - GET_TASKID(pTaskInfo)); + qDebug("%s start to repeat descending order scan data blocks", GET_TASKID(pTaskInfo)); tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond); } } @@ -723,9 +720,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - // if scan table by table + // scan table one by one sequentially if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { - int32_t numOfTables = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); + int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); while (1) { SSDataBlock* result = doTableScanGroup(pOperator); @@ -739,72 +736,65 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { return NULL; } - STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); - tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); + STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); + tsdbSetTableList(pInfo->dataReader, pTableInfo, 1); qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, pInfo->currentTable, pTaskInfo->id.str); tsdbReaderReset(pInfo->dataReader, &pInfo->cond); pInfo->scanTimes = 0; } - } + } else { // scan table group by group sequentially + if (pInfo->currentGroupId == -1) { + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { + doSetOperatorCompleted(pOperator); + return NULL; + } - if (pInfo->currentGroupId == -1) { - pInfo->currentGroupId++; -// qDebug("number:------------------------%d, %d", (int)taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList), -// getNumOfOutputGroups(&pTaskInfo->tableqinfoList)); - if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)/*taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)*/) { -// if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) { + int32_t num = 0; + STableKeyInfo* pList = NULL; + tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); + ASSERT(pInfo->dataReader == NULL); + + int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, pList, num, (STsdbReader**)&pInfo->dataReader, + GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + } + + SSDataBlock* result = doTableScanGroup(pOperator); + if (result != NULL) { + ASSERT(result->info.uid != 0); + return result; + } + + if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pTaskInfo->pTableInfoList)) { doSetOperatorCompleted(pOperator); return NULL; } - SArray* p = taosArrayInit(4, sizeof(STableKeyInfo)); - tsdbReaderClose(pInfo->dataReader); + // reset value for the next group data output + pOperator->status = OP_OPENED; + pInfo->limitInfo.numOfOutputRows = 0; + pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset; - STableKeyInfo* x = NULL; int32_t num = 0; - getTablesOfGroup(&pTaskInfo->tableqinfoList, pInfo->currentGroupId, &x, &num); - for(int32_t i = 0; i < num; ++i) { - taosArrayPush(p, &x[i]); + STableKeyInfo* pList = NULL; + tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num); + + tsdbSetTableList(pInfo->dataReader, pList, num); + tsdbReaderReset(pInfo->dataReader, &pInfo->cond); + pInfo->scanTimes = 0; + + result = doTableScanGroup(pOperator); + if (result != NULL) { + return result; } - int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, p, (STsdbReader**)&pInfo->dataReader, - GET_TASKID(pTaskInfo)); - taosArrayDestroy(p); - - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - return NULL; - } - } - - SSDataBlock* result = doTableScanGroup(pOperator); - if (result) { - return result; - } - - pInfo->currentGroupId++; - if (pInfo->currentGroupId >= getNumOfOutputGroups(&pTaskInfo->tableqinfoList)) { doSetOperatorCompleted(pOperator); return NULL; } - - // reset value for the next group data output - pOperator->status = OP_OPENED; - pInfo->limitInfo.numOfOutputRows = 0; - pInfo->limitInfo.remainOffset = pInfo->limitInfo.limit.offset; - - tsdbReaderReset(pInfo->dataReader, &pInfo->cond); - pInfo->scanTimes = 0; - - result = doTableScanGroup(pOperator); - if (result) { - return result; - } - - doSetOperatorCompleted(pOperator); - return NULL; } static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { @@ -832,8 +822,8 @@ static void destroyTableScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, + SExecTaskInfo* pTaskInfo) { STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -850,7 +840,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, } initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); - code = initQueryTableDataCond(&pInfo->cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -898,7 +887,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->cost.openCost = 0; return pOperator; -_error: + _error: if (pInfo != NULL) { destroyTableScanOperatorInfo(pInfo); } @@ -1014,8 +1003,31 @@ static void destroyBlockDistScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, - SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { +static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pCond) { + memset(pCond, 0, sizeof(SQueryTableDataCond)); + + pCond->order = TSDB_ORDER_ASC; + pCond->numOfCols = 1; + pCond->colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + if (pCond->colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return terrno; + } + + pCond->colList->colId = 1; + pCond->colList->type = TSDB_DATA_TYPE_TIMESTAMP; + pCond->colList->bytes = sizeof(TSKEY); + + pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pCond->suid = uid; + pCond->type = TIMEWINDOW_RANGE_CONTAINED; + pCond->startVersion = -1; + pCond->endVersion = -1; + + return TSDB_CODE_SUCCESS; +} + +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1023,9 +1035,24 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re goto _error; } - pInfo->pHandle = dataReader; + { + SQueryTableDataCond cond = {0}; + + int32_t code = initTableblockDistQueryCond(pBlockScanNode->suid, &cond); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + size_t num = tableListGetSize(pTableListInfo); + void* pList = tableListGetInfo(pTableListInfo, 0); + + tsdbReaderOpen(readHandle->vnode, &cond, pList, num, &pInfo->pHandle, pTaskInfo->id.str); + cleanupQueryTableDataCond(&cond); + } + pInfo->readHandle = *readHandle; - pInfo->uid = uid; + pInfo->uid = pBlockScanNode->suid; pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); int32_t numOfCols = 0; @@ -1046,7 +1073,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, destroyBlockDistScanOperatorInfo, NULL); return pOperator; -_error: + _error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; @@ -1110,11 +1137,9 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU SSDataBlock* pBlock = pTableScanInfo->pResBlock; blockDataCleanup(pBlock); - SArray* p = taosArrayInit(1, sizeof(STableKeyInfo)); - taosArrayPush(p, &tblInfo); - STsdbReader* pReader = NULL; - int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, p, (STsdbReader**)&pReader, GET_TASKID(pTaskInfo)); + int32_t code = tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &cond, &tblInfo, 1, (STsdbReader**)&pReader, + GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -1128,13 +1153,13 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU tsdbRetrieveDataBlockInfo(pReader, &rows, &pBInfo->uid, &pBInfo->window); SArray* pCols = tsdbRetrieveDataBlock(pReader, NULL); - blockDataEnsureCapacity(pBlock, pBInfo->rows); - pBInfo->rows = rows; + blockDataEnsureCapacity(pBlock, rows); + pBlock->info.rows = rows; relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo); - pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBInfo->uid); + pBlock->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBInfo->uid); } tsdbReaderClose(pReader); @@ -1154,7 +1179,7 @@ static uint64_t getGroupIdByCol(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, } static uint64_t getGroupIdByUid(SStreamScanInfo* pInfo, uint64_t uid) { - return getTableGroupId(&pInfo->pTableScanOp->pTaskInfo->tableqinfoList, uid); + return getTableGroupId(pInfo->pTableScanOp->pTaskInfo->pTableInfoList, uid); } static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts, int64_t maxVersion) { @@ -1327,9 +1352,6 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SSessionKey startWin = {0}; getCurSessionWindow(pInfo->windowSup.pStreamAggSup, startData[i], endData[i], groupId, &startWin); if (IS_INVALID_SESSION_WIN_KEY(startWin)) { - // char* tmp = streamStateSessionDump(pInfo->windowSup.pStreamAggSup->pState); - // qInfo("%s", tmp); - // taosMemoryFree(tmp); // window has been closed. continue; } @@ -1576,13 +1598,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.version = pBlock->info.version; - pInfo->pRes->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); -// uint64_t* groupIdPre = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); -// if (groupIdPre) { -// pInfo->pRes->info.groupId = *groupIdPre; -// } else { -// pInfo->pRes->info.groupId = 0; -// } + pInfo->pRes->info.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.uid); // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { @@ -1727,7 +1743,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } } #if 0 - } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { + } else if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { qDebug("stream scan tsdb return %d rows", pResult->info.rows); @@ -1875,7 +1891,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor -FETCH_NEXT_BLOCK: + FETCH_NEXT_BLOCK: if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { if (pInfo->validBlockIndex >= total) { doClearBufferedBlocks(pInfo); @@ -2002,7 +2018,7 @@ FETCH_NEXT_BLOCK: int32_t totBlockNum = taosArrayGetSize(pInfo->pBlockLists); - NEXT_SUBMIT_BLK: + NEXT_SUBMIT_BLK: while (1) { if (pInfo->tqReader->pMsg == NULL) { if (pInfo->validBlockIndex >= totBlockNum) { @@ -2094,12 +2110,13 @@ FETCH_NEXT_BLOCK: } } -static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { +static SArray* extractTableIdList(const STableListInfo* pTableListInfo) { SArray* tableIdList = taosArrayInit(4, sizeof(uint64_t)); // Transfer the Array of STableKeyInfo into uid list. - for (int32_t i = 0; i < taosArrayGetSize(pTableGroupInfo->pTableList); ++i) { - STableKeyInfo* pkeyInfo = taosArrayGet(pTableGroupInfo->pTableList, i); + size_t size = tableListGetSize(pTableListInfo); + for (int32_t i = 0; i < size; ++i) { + STableKeyInfo* pkeyInfo = tableListGetInfo(pTableListInfo, i); taosArrayPush(tableIdList, &pkeyInfo->uid); } @@ -2259,7 +2276,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL); return pOperator; -_end: + _end: taosMemoryFree(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = code; @@ -2313,6 +2330,9 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys int32_t numOfCols = 0; int32_t code = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } int32_t numOfOutput = taosArrayGetSize(pInfo->matchInfo.pList); SArray* pColIds = taosArrayInit(numOfOutput, sizeof(int16_t)); @@ -2365,21 +2385,16 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pTSInfo->cond.endVersion = pHandle->version; } -// SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); STableKeyInfo* pList = NULL; int32_t num = 0; - getTablesOfGroup(&pTaskInfo->tableqinfoList, 0, &pList, &num); - - SArray* p = taosArrayInit(4, sizeof(STableKeyInfo)); - for(int32_t i = 0; i < num; ++i) { - taosArrayPush(p, &pList[i]); - } + tableListGetGroupList(pTaskInfo->pTableInfoList, 0, &pList, &num); if (pHandle->initTableReader) { pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pTSInfo->dataReader = NULL; - if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, p, &pTSInfo->dataReader, NULL) < 0) { + if (tsdbReaderOpen(pHandle->vnode, &pTSInfo->cond, pList, num, &pTSInfo->dataReader, NULL) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; + destroyTableScanOperatorInfo(pTableScanOp); goto _error; } } @@ -2405,8 +2420,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys // set the extract column id to streamHandle tqReaderSetColIdList(pInfo->tqReader, pColIds); - SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); - int32_t code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); + SArray* tableIdList = extractTableIdList(pTaskInfo->pTableInfoList); + code = tqReaderSetTbUidList(pInfo->tqReader, tableIdList); if (code != 0) { taosArrayDestroy(tableIdList); goto _error; @@ -2450,7 +2465,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys return pOperator; -_error: + _error: if (pColIds != NULL) { taosArrayDestroy(pColIds); } @@ -3355,6 +3370,7 @@ static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt) { optSysIntersection(mRslt, rslt); return 0; } + static int32_t optSysSpecialColumn(SNode* cond) { SOperatorNode* pOper = (SOperatorNode*)cond; SColumnNode* pCol = (SColumnNode*)pOper->pLeft; @@ -3365,6 +3381,7 @@ static int32_t optSysSpecialColumn(SNode* cond) { } return 0; } + static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { int ret = -1; if (nodeType(cond) == QUERY_NODE_OPERATOR) { @@ -3398,6 +3415,7 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { if (cell == NULL) break; SArray* aRslt = taosArrayInit(16, sizeof(int64_t)); + ret = optSysTabFilteImpl(arg, cell->pNode, aRslt); if (ret == 0) { // has index @@ -3478,7 +3496,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) { SMetaReader mr = {0}; metaReaderInit(&mr, pInfo->readHandle.meta, 0); - int32_t ret = metaGetTableEntryByUid(&mr, *uid); + ret = metaGetTableEntryByUid(&mr, *uid); if (ret < 0) { metaReaderClear(&mr); continue; @@ -4082,7 +4100,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan return pOperator; -_error: + _error: taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -4101,7 +4119,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->pRes; blockDataCleanup(pRes); - int32_t size = taosArrayGetSize(pInfo->pTableList->pTableList); + int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); if (size == 0) { setTaskStatus(pTaskInfo, TASK_COMPLETED); return NULL; @@ -4113,7 +4131,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { metaReaderInit(&mr, pInfo->readHandle.meta, 0); while (pInfo->curPos < size && count < pOperator->resultInfo.capacity) { - STableKeyInfo* item = taosArrayGet(pInfo->pTableList->pTableList, pInfo->curPos); + STableKeyInfo* item = tableListGetInfo(pInfo->pTableList, pInfo->curPos); int32_t code = metaGetTableEntryByUid(&mr, item->uid); tDecoderClear(&mr.coder); if (code != TSDB_CODE_SUCCESS) { @@ -4192,6 +4210,9 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); int32_t code = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID, &pInfo->matchInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } code = initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); if (code != TSDB_CODE_SUCCESS) { @@ -4218,62 +4239,20 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi return pOperator; -_error: + _error: taosMemoryFree(pInfo); taosMemoryFree(pOperator); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } -int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, - STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, - const char* idStr) { - int64_t st = taosGetTimestampUs(); - - if (pHandle == NULL) { - qError("invalid handle, in creating operator tree, %s", idStr); - return TSDB_CODE_INVALID_PARA; - } - - int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo); - if (code != TSDB_CODE_SUCCESS) { - qError("failed to getTableList, code: %s", tstrerror(code)); - return code; - } - - pTableListInfo->numOfOuputGroups = 1; - - int64_t st1 = taosGetTimestampUs(); - qDebug("generate queried table list completed, elapsed time:%.2f ms %s", (st1 - st) / 1000.0, idStr); - - if (taosArrayGetSize(pTableListInfo->pTableList) == 0) { - qDebug("no table qualified for query, %s" PRIx64, idStr); - return TSDB_CODE_SUCCESS; - } - - pTableListInfo->needSortTableByGroupId = groupSort; - code = setGroupIdMapForAllTables(pTableListInfo, pHandle, pGroupTags, groupSort); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - int64_t st2 = taosGetTimestampUs(); - qDebug("generate group id map completed, elapsed time:%.2f ms %s", (st2 - st1) / 1000.0, idStr); - - return TSDB_CODE_SUCCESS; -} - int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, const char* idstr) { for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { - SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo)); - taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i)); - - STsdbReader* pReader = NULL; - tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr); + STableKeyInfo* pList = tableListGetInfo(pTableListInfo, i); + STsdbReader* pReader = NULL; + tsdbReaderOpen(pHandle->vnode, pQueryCond, pList, 1, &pReader, idstr); taosArrayPush(arrayReader, &pReader); - - taosArrayDestroy(subTableList); } return TSDB_CODE_SUCCESS; @@ -4283,15 +4262,14 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx, STsdbReader** ppReader, const char* idstr) { STsdbReader* pReader = NULL; - SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo)); - for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { - taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i)); - } - int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr); + void* pStart = tableListGetInfo(pTableListInfo, tableStartIdx); + int32_t num = tableEndIdx - tableStartIdx + 1; + + int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr); if (code != 0) { - taosArrayDestroy(subTableList); return code; } + *ppReader = pReader; return TSDB_CODE_SUCCESS; } @@ -4446,8 +4424,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc bool allColumnsHaveAgg = true; SColumnDataAgg** pColAgg = NULL; - STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); - tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg); +// STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); if (allColumnsHaveAgg == true) { int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); @@ -4487,7 +4464,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; - STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + STsdbReader* reader = pTableScanInfo->pReader; SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); if (pCols == NULL) { return terrno; @@ -4526,7 +4503,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; - int64_t uid; + uint64_t uid; SSDataBlock* inputBlock; } STableMergeScanSortSourceParam; @@ -4545,11 +4522,9 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { int64_t st = taosGetTimestampUs(); - SArray* subTable = taosArrayInit(1, sizeof(STableKeyInfo)); - taosArrayPush(subTable, taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex)); + void* p = tableListGetInfo(pInfo->tableListInfo, readIdx + pInfo->tableStartIndex); SReadHandle* pHandle = &pInfo->readHandle; - tsdbReaderOpen(pHandle->vnode, pQueryCond, subTable, &pInfo->pReader, GET_TASKID(pTaskInfo)); - taosArrayDestroy(subTable); + tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); STsdbReader* reader = pInfo->pReader; while (tsdbNextDataBlock(reader)) { @@ -4570,7 +4545,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { blockDataEnsureCapacity(pBlock, rows); pBlock->info.rows = rows; - if (tsdbIsAscendingOrder(pInfo->pReader)) { + if (pQueryCond->order == TSDB_ORDER_ASC) { pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; } else { pQueryCond->twindows.ekey = pBlock->info.window.skey - 1; @@ -4587,10 +4562,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { continue; } - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } + pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -4644,11 +4616,7 @@ static SSDataBlock* getTableDataBlock2(void* param) { continue; } - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } - + pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -4689,6 +4657,7 @@ static SSDataBlock* getTableDataBlock(void* param) { uint32_t status = 0; int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); + // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pOperator->pTaskInfo->env, code); } @@ -4698,11 +4667,7 @@ static SSDataBlock* getTableDataBlock(void* param) { continue; } - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } - + pBlock->info.groupId = getTableGroupId(pOperator->pTaskInfo->pTableInfoList, pBlock->info.uid); pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; @@ -4739,15 +4704,16 @@ int32_t dumpSQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* } return 0; } + int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; { - size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); + size_t numOfTables = tableListGetSize(pInfo->tableListInfo); int32_t i = pInfo->tableStartIndex + 1; - for (; i < tableListSize; ++i) { - STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i); + for (; i < numOfTables; ++i) { + STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->tableListInfo, i); if (tableKeyInfo->groupId != pInfo->groupId) { break; } @@ -4758,10 +4724,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableStartIdx = pInfo->tableStartIndex; int32_t tableEndIdx = pInfo->tableEndIndex; - STableListInfo* tableListInfo = pInfo->tableListInfo; - - // pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES); pInfo->pReader = NULL; + // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); @@ -4776,8 +4740,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond)); for (int32_t i = 0; i < numOfTable; ++i) { - STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i + tableStartIdx); - STableMergeScanSortSourceParam param = {0}; param.readerIdx = i; param.pOperator = pOperator; @@ -4876,7 +4838,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, code); } - size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); + size_t tableListSize = tableListGetSize(pInfo->tableListInfo); if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; @@ -4885,7 +4847,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { return NULL; } pInfo->tableStartIndex = 0; - pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; + pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex))->groupId; startGroupTableMergeScan(pOperator); } @@ -4904,8 +4866,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { break; } pInfo->tableStartIndex = pInfo->tableEndIndex + 1; - pInfo->groupId = - ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId; + pInfo->groupId = tableListGetInfo(pInfo->tableListInfo, pInfo->tableStartIndex)->groupId; startGroupTableMergeScan(pOperator); } } @@ -4920,9 +4881,10 @@ void destroyTableMergeScanOperatorInfo(void* param) { int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds); for (int32_t i = 0; i < numOfTable; i++) { - STableMergeScanSortSourceParam* param = taosArrayGet(pTableScanInfo->sortSourceParams, i); - blockDataDestroy(param->inputBlock); + STableMergeScanSortSourceParam* p = taosArrayGet(pTableScanInfo->sortSourceParams, i); + blockDataDestroy(p->inputBlock); } + taosArrayDestroy(pTableScanInfo->sortSourceParams); tsdbReaderClose(pTableScanInfo->pReader); @@ -4987,10 +4949,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN goto _error; } - if (pTableScanNode->pGroupTags) { - taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid); - } - SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc; int32_t numOfCols = 0; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8e1b15f315..49a7112eba 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5384,6 +5384,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); minTs = TMIN(minTs, pBlock->info.window.skey); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 4fdf6d9e57..e002ff9c32 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -617,8 +617,6 @@ int tfileWriterPut(TFileWriter* tw, void* data, bool order) { indexError("failed to write data: %s, offset: %d len: %d", v->colVal, v->offset, (int)taosArrayGetSize(v->tableId)); } else { - indexInfo("success to write data: %s, offset: %d len: %d", v->colVal, v->offset, - (int)taosArrayGetSize(v->tableId)); } } fstBuilderDestroy(tw->fb); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c1b8527856..02e5c643a4 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3376,8 +3376,12 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); } else { code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); - ASSERT(code == 0); - ASSERT(pEntry != NULL); + // ASSERT(code == 0); + // ASSERT(pEntry != NULL); + if (code != 0 || pEntry == NULL) { + syncNodeErrorLog(ths, "get log entry error"); + continue; + } } SRpcMsg rpcMsg; @@ -3738,4 +3742,4 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p snprintf(logBuf, sizeof(logBuf), "recv sync-heartbeat-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 "}, %s", host, port, pMsg->term, pMsg->privateTerm, s); syncNodeEventLog(pSyncNode, logBuf); -} \ No newline at end of file +} diff --git a/source/libs/tfs/test/tfsTest.cpp b/source/libs/tfs/test/tfsTest.cpp index f4b0adf1f7..df37630fd7 100644 --- a/source/libs/tfs/test/tfsTest.cpp +++ b/source/libs/tfs/test/tfsTest.cpp @@ -42,7 +42,7 @@ TEST_F(TfsTest, 01_Open_Close) { STfs *pTfs = tfsOpen(&dCfg, 1); ASSERT_EQ(pTfs, nullptr); - taosMkDir(root); + taosMulMkDir(root); pTfs = tfsOpen(&dCfg, 1); ASSERT_NE(pTfs, nullptr); diff --git a/tests/system-test/1-insert/create_retentions.py b/tests/system-test/1-insert/create_retentions.py index 54abcec391..47d53d5b34 100644 --- a/tests/system-test/1-insert/create_retentions.py +++ b/tests/system-test/1-insert/create_retentions.py @@ -46,7 +46,7 @@ class TDTestCase: def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor(), False) + tdSql.init(conn.cursor(), True) @property def create_databases_sql_err(self): diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index e01796617d..79be05e9fc 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -86,9 +86,34 @@ ELSE () ) MESSAGE("CURRENT SOURCE DIR ${CMAKE_CURRENT_SOURCE_DIR}") - include(ExternalProject) + IF (TD_WINDOWS) + INCLUDE(ExternalProject) + ExternalProject_Add(taosadapter + PREFIX "taosadapter" + SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosadapter + BUILD_ALWAYS off + DEPENDS taos + BUILD_IN_SOURCE 1 + CONFIGURE_COMMAND cmake -E echo "taosadapter no need cmake to config" + PATCH_COMMAND + COMMAND git clean -f -d + BUILD_COMMAND + COMMAND set CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client + COMMAND set CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib + COMMAND go build -a -o taosadapter.exe -ldflags "-s -w -X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}" + COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}" - ExternalProject_Add(taosadapter + INSTALL_COMMAND + COMMAND cmake -E time upx taosadapter ||: + COMMAND cmake -E copy taosadapter.exe ${CMAKE_BINARY_DIR}/build/bin + COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E copy taosadapter-debug.exe ${CMAKE_BINARY_DIR}/build/bin + ) + ELSE (TD_WINDOWS) + INCLUDE(ExternalProject) + ExternalProject_Add(taosadapter PREFIX "taosadapter" SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/taosadapter BUILD_ALWAYS off @@ -108,4 +133,5 @@ ELSE () COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/ COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin ) + ENDIF (TD_WINDOWS) ENDIF ()