From 9bccbbf865bd6a0712d116259454d45e6fd41f6f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 11 Jun 2022 19:40:56 +0800 Subject: [PATCH 1/4] fix:error in multi thread in schemaless --- source/client/src/clientSml.c | 34 +++++++++++++++++++++++++- source/libs/parser/src/parInsert.c | 2 ++ source/libs/parser/src/parTranslater.c | 3 +++ 3 files changed, 38 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 68bfa99ead..b570d59119 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -473,6 +473,21 @@ static int32_t smlProcessSchemaAction(SSmlHandle* info, SSchema* schemaField, SH return TSDB_CODE_SUCCESS; } +static int32_t smlCheckMeta(SSchema* schema, int32_t length, SArray* cols){ + SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + for(uint16_t i = 0; i < length; i++){ + taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES); + } + + for(int32_t i = 0; i < taosArrayGetSize(cols); i++){ + SSmlKv* kv = (SSmlKv*)taosArrayGetP(cols, i); + if(taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL){ + return -1; + } + } + return 0; +} + static int32_t smlModifyDBSchemas(SSmlHandle* info) { int32_t code = 0; SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); @@ -483,6 +498,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { while (tableMetaSml) { SSmlSTableMeta* sTableData = *tableMetaSml; STableMeta *pTableMeta = NULL; + bool needCheckMeta = false; // for multi thread size_t superTableLen = 0; void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); @@ -533,6 +549,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { if (code != TSDB_CODE_SUCCESS) { goto end; } + needCheckMeta = true; } else { uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); goto end; @@ -544,6 +561,20 @@ static int32_t smlModifyDBSchemas(SSmlHandle* info) { uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, (char*)superTable); goto end; } + + if(needCheckMeta){ + code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, sTableData->tags); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%"PRIx64" check tag failed. super table name %s", info->id, (char*)superTable); + goto end; + } + code = smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%"PRIx64" check cols failed. super table name %s", info->id, (char*)superTable); + goto end; + } + } + sTableData->tableMeta = pTableMeta; tableMetaSml = (SSmlSTableMeta**)taosHashIterate(info->superTables, tableMetaSml); @@ -2368,6 +2399,7 @@ static void smlInsertCallback(void* param, void* res, int32_t code) { SRequestObj *pRequest = (SRequestObj *)res; SSmlHandle* info = (SSmlHandle *)param; + uDebug("SML:0x%"PRIx64" result. code:%d, msg:%s", info->id, pRequest->code, pRequest->msgBuf); // lock if(code != TSDB_CODE_SUCCESS){ taosThreadSpinLock(&info->params->lock); @@ -2497,7 +2529,7 @@ end: taosThreadSpinDestroy(¶ms.lock); tsem_destroy(¶ms.sem); ((STscObj *)taos)->schemalessType = 0; - uDebug("result:%s", request->msgBuf); + uDebug("resultend:%s", request->msgBuf); return (TAOS_RES*)request; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 54c8a18218..3db453360d 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -2119,9 +2119,11 @@ static int32_t smlBoundColumnData(SArray* cols, SParsedDataColInfo* pColList, SS isOrdered = false; } if (index < 0) { + uError("smlBoundColumnData. index:%d", index); return TSDB_CODE_SML_INVALID_DATA; } if (pColList->cols[index].valStat == VAL_STAT_HAS) { + uError("smlBoundColumnData. already set. index:%d", index); return TSDB_CODE_SML_INVALID_DATA; } lastColIdx = index; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 96a5c09bbf..2b2e38861b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4945,6 +4945,9 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) { SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot; int32_t code = checkSchemalessDb(pCxt, pStmt->dbName); + if (TSDB_CODE_SUCCESS != code) { + return code; + } STableMeta* pTableMeta = NULL; code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta); if (TSDB_CODE_SUCCESS != code) { From d0fa1f568d48d490c8977dc9dae23dc67c265b4b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 11 Jun 2022 19:59:18 +0800 Subject: [PATCH 2/4] fix:error in multi thread in schemaless --- source/client/src/clientSml.c | 54 ++++------------------------------- 1 file changed, 5 insertions(+), 49 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index b570d59119..6205fc29cc 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -306,19 +306,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { const char* errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error: %s", info->id, errStr); + taosMsleep(100); } taos_free_result(res); -// if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) { - if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_ADD_TAG: { @@ -330,19 +321,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { const char* errStr = taos_errstr(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); -// if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) { - if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { @@ -353,19 +335,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); -// if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) { - if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_CHANGE_TAG_SIZE: { @@ -376,19 +349,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); -// if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) { - if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } case SCHEMA_ACTION_CREATE_STABLE: { @@ -428,18 +392,10 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); + taosMsleep(100); } taos_free_result(res); - if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); - code = taos_errno(res2); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); - } - taos_free_result(res2); - taosMsleep(500); - } break; } From 324e8607ccdab536fa29f426ee4fa14d7444dc7c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 11 Jun 2022 20:26:11 +0800 Subject: [PATCH 3/4] fix:disable schemaless database parameters --- source/client/src/clientEnv.c | 2 +- source/client/src/clientSml.c | 3 ++- source/common/src/systable.c | 2 +- source/dnode/mnode/impl/src/mndDb.c | 4 ++-- source/libs/parser/src/parAstCreater.c | 3 ++- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 171f06c257..bb60624145 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -167,7 +167,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c taosThreadMutexInit(&pObj->mutex, NULL); pObj->id = taosAddRef(clientConnRefPool, pObj); - pObj->schemalessType = 0; + pObj->schemalessType = 1; tscDebug("connObj created, 0x%" PRIx64, pObj->id); return pObj; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 6205fc29cc..bbf9eba363 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -2484,7 +2484,8 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr end: taosThreadSpinDestroy(¶ms.lock); tsem_destroy(¶ms.sem); - ((STscObj *)taos)->schemalessType = 0; +// ((STscObj *)taos)->schemalessType = 0; + ((STscObj *)taos)->schemalessType = 1; uDebug("resultend:%s", request->msgBuf); return (TAOS_RES*)request; } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 2b59354d60..2546cb3d7d 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -91,7 +91,7 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, +// {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL}, {.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, // {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 61d6e7be6c..9839f0fbf2 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1554,8 +1554,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)statusB, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); +// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); +// colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); char *p = buildRetension(pDb->cfg.pRetensions); diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 613a2d867d..1d8c4ff1ca 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -796,7 +796,8 @@ SNode* setDatabaseOption(SAstCreateContext* pCxt, SNode* pOptions, EDatabaseOpti ((SDatabaseOptions*)pOptions)->pRetentions = pVal; break; case DB_OPTION_SCHEMALESS: - ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); +// ((SDatabaseOptions*)pOptions)->schemaless = taosStr2Int8(((SToken*)pVal)->z, NULL, 10); + ((SDatabaseOptions*)pOptions)->schemaless = 1; break; default: break; From 6e45140db417a1cbbdf163194363b76767b7c541 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 11 Jun 2022 21:40:57 +0800 Subject: [PATCH 4/4] fix:disable schemaless database parameters --- source/client/test/CMakeLists.txt | 8 ++++---- source/client/test/smlTest.cpp | 3 ++- source/libs/parser/src/parInsert.c | 11 ++++++----- source/libs/parser/src/parTranslater.c | 19 ++++++++++--------- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index bb9f0ed23e..03d2b48134 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -41,7 +41,7 @@ TARGET_INCLUDE_DIRECTORIES( PRIVATE "${TD_SOURCE_DIR}/source/client/inc" ) -#add_test( -# NAME smlTest -# COMMAND smlTest -#) +add_test( + NAME smlTest + COMMAND smlTest +) diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 49e26a818f..4ad73a6424 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -499,6 +499,7 @@ TEST(testCase, smlGetTimestampLen_Test) { ASSERT_EQ(len, 3); } +/* TEST(testCase, smlProcess_influx_Test) { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(taos, nullptr); @@ -1259,4 +1260,4 @@ TEST(testCase, sml_16368_Test) { pRes = taos_schemaless_insert(taos, (char**)sql, sizeof(sql)/sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS); ASSERT_EQ(taos_errno(pRes), 0); taos_free_result(pRes); -} +}*/ diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 3db453360d..c721491489 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1297,11 +1297,12 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) { } static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) { - SDbCfgInfo pInfo = {0}; - char fullName[TSDB_TABLE_FNAME_LEN]; - snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName); - CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo)); - return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; +// SDbCfgInfo pInfo = {0}; +// char fullName[TSDB_TABLE_FNAME_LEN]; +// snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName); +// CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo)); +// return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } // tb_name diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 2b2e38861b..97c2947535 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2671,15 +2671,16 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt } static int32_t checkSchemalessDb(STranslateContext* pCxt, const char* pDbName) { - if (0 != pCxt->pParseCxt->schemalessType) { - return TSDB_CODE_SUCCESS; - } - SDbCfgInfo info = {0}; - int32_t code = getDBCfg(pCxt, pDbName, &info); - if (TSDB_CODE_SUCCESS == code) { - code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; - } - return code; +// if (0 != pCxt->pParseCxt->schemalessType) { +// return TSDB_CODE_SUCCESS; +// } +// SDbCfgInfo info = {0}; +// int32_t code = getDBCfg(pCxt, pDbName, &info); +// if (TSDB_CODE_SUCCESS == code) { +// code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS; +// } +// return code; + return TSDB_CODE_SUCCESS; } static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {