From 6b236018d5373dbf43845040866babc33909589c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 27 Nov 2024 14:40:58 +0800 Subject: [PATCH] more code and case change --- include/libs/executor/storageapi.h | 11 +- source/libs/executor/src/sysscanoperator.c | 197 +++++++++++++++++- .../develop-test/2-query/table_count_scan.py | 2 +- tests/script/tsim/query/sys_tbname.sim | 2 +- tests/script/tsim/query/tableCount.sim | 6 +- 5 files changed, 200 insertions(+), 18 deletions(-) diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index feb7bcc25e..1959f79584 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -191,6 +191,12 @@ typedef struct TsdReader { void (*tsdSetFilesetDelimited)(void* pReader); void (*tsdSetSetNotifyCb)(void* pReader, TsdReaderNotifyCbFn notifyFn, void* param); + + // for fileset query + void *(*openFilesetReadCursor)(void *pVnode); + void *(*nextFilesetReadCursor)(void *cursor); + void (*closeFilesetReadCursor)(void *pReader); + } TsdReader; typedef struct SStoreCacheReader { @@ -400,7 +406,8 @@ typedef struct SStateStore { int32_t (*streamStateCountWinAddIfNotExist)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** ppVal, int32_t* pVLen, int32_t* pWinCode); - int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen); + int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, + int32_t* pVLen); int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); @@ -429,7 +436,7 @@ typedef struct SStateStore { int32_t (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char* id, int64_t ckId, int8_t type, struct SStreamFileState** ppFileState); - + int32_t (*streamStateGroupPut)(SStreamState* pState, int64_t groupId, void* value, int32_t vLen); SStreamStateCur* (*streamStateGroupGetCur)(SStreamState* pState); void (*streamStateGroupCurNext)(SStreamStateCur* pCur); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 5917910d56..cf54e67214 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1066,8 +1066,8 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4); QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno); int32_t tagStrBufflen = 32; - char tagTypeStr[VARSTR_HEADER_SIZE + 32]; - int tagTypeLen = tsnprintf(varDataVal(tagTypeStr), tagStrBufflen, "%s", tDataTypes[tagType].name); + char tagTypeStr[VARSTR_HEADER_SIZE + 32]; + int tagTypeLen = tsnprintf(varDataVal(tagTypeStr), tagStrBufflen, "%s", tDataTypes[tagType].name); tagStrBufflen -= tagTypeLen; if (tagStrBufflen <= 0) { code = TSDB_CODE_INVALID_PARA; @@ -1079,8 +1079,8 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, varDataVal(tagTypeStr) + tagTypeLen, tagStrBufflen, "(%d)", (int32_t)(((*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); } else if (IS_VAR_DATA_TYPE(tagType)) { - tagTypeLen += tsnprintf(varDataVal(tagTypeStr) + tagTypeLen, tagStrBufflen, "(%d)", - (int32_t)((*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); + tagTypeLen += tsnprintf(varDataVal(tagTypeStr) + tagTypeLen, tagStrBufflen, "(%d)", + (int32_t)((*smrSuperTable).me.stbEntry.schemaTag.pSchema[i].bytes - VARSTR_HEADER_SIZE)); } varDataSetLen(tagTypeStr, tagTypeLen); code = colDataSetVal(pColInfoData, numOfRows, (char*)tagTypeStr, false); @@ -1205,8 +1205,8 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, pColInfoData = taosArrayGet(dataBlock->pDataBlock, 4); QUERY_CHECK_NULL(pColInfoData, code, lino, _end, terrno); int32_t colStrBufflen = 32; - char colTypeStr[VARSTR_HEADER_SIZE + 32]; - int colTypeLen = tsnprintf(varDataVal(colTypeStr), colStrBufflen, "%s", tDataTypes[colType].name); + char colTypeStr[VARSTR_HEADER_SIZE + 32]; + int colTypeLen = tsnprintf(varDataVal(colTypeStr), colStrBufflen, "%s", tDataTypes[colType].name); colStrBufflen -= colTypeLen; if (colStrBufflen <= 0) { code = TSDB_CODE_INVALID_PARA; @@ -1214,10 +1214,10 @@ static int32_t sysTableUserColsFillOneTableCols(const SSysTableScanInfo* pInfo, } if (colType == TSDB_DATA_TYPE_VARCHAR) { colTypeLen += tsnprintf(varDataVal(colTypeStr) + colTypeLen, colStrBufflen, "(%d)", - (int32_t)(schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE)); + (int32_t)(schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE)); } else if (colType == TSDB_DATA_TYPE_NCHAR) { colTypeLen += tsnprintf(varDataVal(colTypeStr) + colTypeLen, colStrBufflen, "(%d)", - (int32_t)((schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); + (int32_t)((schemaRow->pSchema[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); } varDataSetLen(colTypeStr, colTypeLen); code = colDataSetVal(pColInfoData, numOfRows, (char*)colTypeStr, false); @@ -1989,9 +1989,183 @@ static SSDataBlock* sysTableScanUserSTables(SOperatorInfo* pOperator) { return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; } -static SSDataBlock* sysTableScanUserFileSets(SOperatorInfo* pOperator) { +static int32_t doSetQueryFileSetRow() { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + // TODO - ASSERT(0); + +_exit: + return code; +} + +static SSDataBlock* sysTableBuildUserFileSets(SOperatorInfo* pOperator) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SSysTableScanInfo* pInfo = pOperator->info; + SSDataBlock* p = NULL; + + // open cursor if not opened + // TODO: call corresponding api to open the cursor + if (pInfo->pCur == NULL) { + // pInfo->pCur = pAPI->tsdReader.openFileSetCursor(pInfo->readHandle.vnode); + // QUERY_CHECK_NULL(pInfo->pCur, code, lino, _end, terrno); + } + + blockDataCleanup(pInfo->pRes); + int32_t numOfRows = 0; + + const char* db = NULL; + int32_t vgId = 0; + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, &db, &vgId, NULL, NULL); + + SName sn = {0}; + char dbname[TSDB_DB_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + code = tNameFromString(&sn, db, T_NAME_ACCT | T_NAME_DB); + QUERY_CHECK_CODE(code, lino, _end); + + code = tNameGetDbName(&sn, varDataVal(dbname)); + QUERY_CHECK_CODE(code, lino, _end); + + varDataSetLen(dbname, strlen(varDataVal(dbname))); + + p = buildInfoSchemaTableMetaBlock(TSDB_INS_TABLE_FILESETS); + QUERY_CHECK_NULL(p, code, lino, _end, terrno); + + code = blockDataEnsureCapacity(p, pOperator->resultInfo.capacity); + QUERY_CHECK_CODE(code, lino, _end); + + char n[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + int32_t ret = 0; + + // loop to query each entry + for (;;) { + void* entry = pAPI->tsdReader.nextFilesetReadCursor(pInfo->pCur); + if (entry == NULL) { + break; + } + + code = doSetQueryFileSetRow(); + QUERY_CHECK_CODE(code, lino, _end); + + if (++numOfRows >= pOperator->resultInfo.capacity) { + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + code = relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); + QUERY_CHECK_CODE(code, lino, _end); + + code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + QUERY_CHECK_CODE(code, lino, _end); + + blockDataCleanup(p); + numOfRows = 0; + + if (pInfo->pRes->info.rows > 0) { + pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur); + break; + } + } + } + +#if 0 + if (numOfRows > 0) { + pAPI->metaFn.pauseTableMetaCursor(pInfo->pCur); + p->info.rows = numOfRows; + pInfo->pRes->info.rows = numOfRows; + + code = relocateColumnData(pInfo->pRes, pInfo->matchInfo.pList, p->pDataBlock, false); + QUERY_CHECK_CODE(code, lino, _end); + + code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); + QUERY_CHECK_CODE(code, lino, _end); + + blockDataCleanup(p); + numOfRows = 0; + } + + blockDataDestroy(p); + p = NULL; + + // todo temporarily free the cursor here, the true reason why the free is not valid needs to be found + if (ret != 0) { + pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); + pInfo->pCur = NULL; + setOperatorCompleted(pOperator); + } + + pInfo->loadInfo.totalRows += pInfo->pRes->info.rows; +#endif + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + blockDataDestroy(p); + pTaskInfo->code = code; + pAPI->metaFn.closeTableMetaCursor(pInfo->pCur); + pInfo->pCur = NULL; + T_LONG_JMP(pTaskInfo->env, code); + } + return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes; +} + +static SSDataBlock* sysTableScanUserFileSets(SOperatorInfo* pOperator) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SSysTableScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SNode* pCondition = pInfo->pCondition; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + if (pInfo->readHandle.mnd != NULL) { + // do nothing on mnode + qTrace("This operator do nothing on mnode, task id:%s", GET_TASKID(pTaskInfo)); + return NULL; + } else { +#if 0 + if (pInfo->showRewrite == false) { + if (pCondition != NULL && pInfo->pIdx == NULL) { + SSTabFltArg arg = { + .pMeta = pInfo->readHandle.vnode, .pVnode = pInfo->readHandle.vnode, .pAPI = &pTaskInfo->storageAPI}; + + SSysTableIndex* idx = taosMemoryMalloc(sizeof(SSysTableIndex)); + QUERY_CHECK_NULL(idx, code, lino, _end, terrno); + idx->init = 0; + idx->uids = taosArrayInit(128, sizeof(int64_t)); + QUERY_CHECK_NULL(idx->uids, code, lino, _end, terrno); + idx->lastIdx = 0; + + pInfo->pIdx = idx; // set idx arg + + int flt = optSysTabFilte(&arg, pCondition, idx->uids); + if (flt == 0) { + pInfo->pIdx->init = 1; + SSDataBlock* blk = sysTableBuildUserTablesByUids(pOperator); + return blk; + } else if ((flt == -1) || (flt == -2)) { + qDebug("%s failed to get sys table info by idx, scan sys table one by one", GET_TASKID(pTaskInfo)); + } + } else if (pCondition != NULL && (pInfo->pIdx != NULL && pInfo->pIdx->init == 1)) { + SSDataBlock* blk = sysTableBuildUserTablesByUids(pOperator); + return blk; + } + } +#endif + + return sysTableBuildUserFileSets(pOperator); + } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pTaskInfo->code = code; + T_LONG_JMP(pTaskInfo->env, code); + } return NULL; } @@ -2303,7 +2477,8 @@ int32_t createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNo const char* name = tNameGetTableName(&pInfo->name); if (strncasecmp(name, TSDB_INS_TABLE_TABLES, TSDB_TABLE_FNAME_LEN) == 0 || - strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { + strncasecmp(name, TSDB_INS_TABLE_TAGS, TSDB_TABLE_FNAME_LEN) == 0 || + strncasecmp(name, TSDB_INS_TABLE_FILESETS, TSDB_TABLE_FNAME_LEN) == 0) { pInfo->readHandle = *(SReadHandle*)readHandle; } else { if (tsem_init(&pInfo->ready, 0, 0) != TSDB_CODE_SUCCESS) { diff --git a/tests/develop-test/2-query/table_count_scan.py b/tests/develop-test/2-query/table_count_scan.py index b2b48c1f0b..4e64f455fd 100644 --- a/tests/develop-test/2-query/table_count_scan.py +++ b/tests/develop-test/2-query/table_count_scan.py @@ -65,7 +65,7 @@ class TDTestCase: tdSql.query('select count(*),db_name, stable_name from information_schema.ins_tables group by db_name, stable_name;') tdSql.checkRows(3) - tdSql.checkData(0, 0, 34) + tdSql.checkData(0, 0, 35) tdSql.checkData(0, 1, 'information_schema') tdSql.checkData(0, 2, None) tdSql.checkData(1, 0, 3) diff --git a/tests/script/tsim/query/sys_tbname.sim b/tests/script/tsim/query/sys_tbname.sim index 9736893428..a9b27f1389 100644 --- a/tests/script/tsim/query/sys_tbname.sim +++ b/tests/script/tsim/query/sys_tbname.sim @@ -58,7 +58,7 @@ endi sql select tbname from information_schema.ins_tables; print $rows $data00 -if $rows != 43 then +if $rows != 44 then return -1 endi if $data00 != @ins_tables@ then diff --git a/tests/script/tsim/query/tableCount.sim b/tests/script/tsim/query/tableCount.sim index 87f72eb3b6..a22bdc8b24 100644 --- a/tests/script/tsim/query/tableCount.sim +++ b/tests/script/tsim/query/tableCount.sim @@ -53,7 +53,7 @@ sql select stable_name,count(table_name) from information_schema.ins_tables grou if $rows != 3 then return -1 endi -if $data01 != 40 then +if $data01 != 41 then return -1 endi if $data11 != 10 then @@ -72,7 +72,7 @@ endi if $data11 != 5 then return -1 endi -if $data21 != 34 then +if $data21 != 35 then return -1 endi if $data31 != 5 then @@ -97,7 +97,7 @@ endi if $data42 != 3 then return -1 endi -if $data52 != 34 then +if $data52 != 35 then return -1 endi if $data62 != 5 then