Merge pull request #30000 from taosdata/enh/3.0/TD33165
enh(insert): use cache to improve auto create table performance.
This commit is contained in:
commit
f9681cce1d
|
@ -490,6 +490,7 @@ typedef enum ENodeType {
|
|||
typedef struct {
|
||||
int32_t vgId;
|
||||
uint8_t option; // 0x0 REQ_OPT_TBNAME, 0x01 REQ_OPT_TBUID
|
||||
uint8_t autoCreateCtb; // 0x0 not auto create, 0x01 auto create
|
||||
const char* dbFName;
|
||||
const char* tbName;
|
||||
} SBuildTableInput;
|
||||
|
@ -2173,6 +2174,7 @@ typedef struct {
|
|||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
uint8_t option;
|
||||
uint8_t autoCreateCtb;
|
||||
} STableInfoReq;
|
||||
|
||||
int32_t tSerializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq);
|
||||
|
|
|
@ -79,6 +79,7 @@ typedef struct SDbInfo {
|
|||
typedef struct STablesReq {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
SArray* pTables;
|
||||
uint8_t autoCreate; // 0x0 not auto create, 0x01 auto create
|
||||
} STablesReq;
|
||||
|
||||
typedef struct SCatalogReq {
|
||||
|
|
|
@ -571,6 +571,7 @@ typedef struct SVnodeModifyOpStmt {
|
|||
SHashObj* pVgroupsHashObj; // SHashObj<vgId, SVgInfo>
|
||||
SHashObj* pTableBlockHashObj; // SHashObj<tuid, STableDataCxt*>
|
||||
SHashObj* pSubTableHashObj; // SHashObj<table_name, STableMeta*>
|
||||
SHashObj* pSuperTableHashObj; // SHashObj<table_name, STableMeta*>
|
||||
SHashObj* pTableNameHashObj; // set of table names for refreshing meta, sync mode
|
||||
SHashObj* pDbFNameHashObj; // set of db names for refreshing meta, sync mode
|
||||
SHashObj* pTableCxtHashObj; // temp SHashObj<tuid, STableDataCxt*> for single request
|
||||
|
|
|
@ -1927,8 +1927,10 @@ TEST(stmt2Case, async_order) {
|
|||
while (!stop_task) {
|
||||
auto elapsed_time = std::chrono::steady_clock::now() - start_time;
|
||||
if (std::chrono::duration_cast<std::chrono::seconds>(elapsed_time).count() > 100) {
|
||||
FAIL() << "Test[stmt2_async_test] timed out";
|
||||
if (t.joinable()) {
|
||||
t.detach();
|
||||
}
|
||||
FAIL() << "Test[stmt2_async_test] timed out";
|
||||
break;
|
||||
}
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1)); // 每 1s 检查一次
|
||||
|
|
|
@ -6293,6 +6293,7 @@ int32_t tSerializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq)
|
|||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->dbFName));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->tbName));
|
||||
TAOS_CHECK_EXIT(tEncodeU8(&encoder, pReq->option));
|
||||
TAOS_CHECK_EXIT(tEncodeU8(&encoder, pReq->autoCreateCtb));
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -6332,6 +6333,11 @@ int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq
|
|||
} else {
|
||||
pReq->option = 0;
|
||||
}
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeU8(&decoder, &pReq->autoCreateCtb));
|
||||
} else {
|
||||
pReq->autoCreateCtb = 0;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
_exit:
|
||||
|
@ -11947,7 +11953,11 @@ int32_t tEncodeSubmitReq(SEncoder *pCoder, const SSubmitReq2 *pReq) {
|
|||
}
|
||||
} else{
|
||||
for (uint64_t i = 0; i < taosArrayGetSize(pReq->aSubmitTbData); i++) {
|
||||
TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, taosArrayGet(pReq->aSubmitTbData, i)));
|
||||
SSubmitTbData *pSubmitTbData = taosArrayGet(pReq->aSubmitTbData, i);
|
||||
if ((pSubmitTbData->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) && pSubmitTbData->pCreateTbReq == NULL) {
|
||||
pSubmitTbData->flags = 0;
|
||||
}
|
||||
TAOS_CHECK_EXIT(tEncodeSSubmitTbData(pCoder, pSubmitTbData));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -90,12 +90,14 @@ int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
|||
void *pRsp = NULL;
|
||||
SSchemaWrapper schema = {0};
|
||||
SSchemaWrapper schemaTag = {0};
|
||||
uint8_t autoCreateCtb = 0;
|
||||
|
||||
// decode req
|
||||
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
|
||||
code = terrno;
|
||||
goto _exit4;
|
||||
}
|
||||
autoCreateCtb = infoReq.autoCreateCtb;
|
||||
|
||||
if (infoReq.option == REQ_OPT_TBUID) reqTbUid = true;
|
||||
metaRsp.dbId = pVnode->config.dbId;
|
||||
|
@ -223,6 +225,10 @@ _exit4:
|
|||
rpcMsg.code = code;
|
||||
rpcMsg.msgType = pMsg->msgType;
|
||||
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && autoCreateCtb == 1) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (code) {
|
||||
qError("get table %s meta with %" PRIu8 " failed cause of %s", infoReq.tbName, infoReq.option, tstrerror(code));
|
||||
}
|
||||
|
|
|
@ -481,6 +481,7 @@ struct SCtgTask {
|
|||
typedef struct SCtgTaskReq {
|
||||
SCtgTask* pTask;
|
||||
int32_t msgIdx;
|
||||
uint8_t autoCreateCtb;
|
||||
} SCtgTaskReq;
|
||||
|
||||
typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*);
|
||||
|
|
|
@ -3093,6 +3093,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
|
|||
SCtgTbMetasCtx* pCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
SName* pName = NULL;
|
||||
bool autoCreate = false;
|
||||
|
||||
int32_t dbNum = taosArrayGetSize(pCtx->pNames);
|
||||
int32_t fetchIdx = 0;
|
||||
|
@ -3103,6 +3104,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
|
|||
ctgError("fail to get the %dth STablesReq, num:%d", i, dbNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
autoCreate = pReq->autoCreate;
|
||||
|
||||
ctgDebug("start to check tb metas in db:%s, tbNum:%d", pReq->dbFName, (int32_t)taosArrayGetSize(pReq->pTables));
|
||||
CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
|
||||
|
@ -3143,6 +3145,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
|
|||
}
|
||||
|
||||
SCtgTaskReq tReq;
|
||||
tReq.autoCreateCtb = (autoCreate && i == pCtx->fetchNum - 1) ? 1 : 0;
|
||||
tReq.pTask = pTask;
|
||||
tReq.msgIdx = pFetch->fetchIdx;
|
||||
CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId));
|
||||
|
|
|
@ -1367,6 +1367,7 @@ int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
|
|||
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
|
||||
STableMetaOutput* out, SCtgTaskReq* tReq) {
|
||||
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
|
||||
uint8_t autoCreateCtb = tReq ? tReq->autoCreateCtb : 0;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
(void)tNameGetFullDbName(pTableName, dbFName);
|
||||
int32_t reqType = (pTask && pTask->type == CTG_TASK_GET_TB_NAME ? TDMT_VND_TABLE_NAME : TDMT_VND_TABLE_META);
|
||||
|
@ -1380,6 +1381,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
|
|||
|
||||
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId,
|
||||
.option = reqType == TDMT_VND_TABLE_NAME ? REQ_OPT_TBUID : REQ_OPT_TBNAME,
|
||||
.autoCreateCtb = autoCreateCtb,
|
||||
.dbFName = dbFName,
|
||||
.tbName = (char*)tNameGetTableName(pTableName)};
|
||||
char* msg = NULL;
|
||||
|
|
|
@ -1326,6 +1326,7 @@ void nodesDestroyNode(SNode* pNode) {
|
|||
taosArrayDestroy(pStmt->pTableTag);
|
||||
taosHashCleanup(pStmt->pVgroupsHashObj);
|
||||
taosHashCleanup(pStmt->pSubTableHashObj);
|
||||
taosHashCleanup(pStmt->pSuperTableHashObj);
|
||||
taosHashCleanup(pStmt->pTableNameHashObj);
|
||||
taosHashCleanup(pStmt->pDbFNameHashObj);
|
||||
taosHashCleanup(pStmt->pTableCxtHashObj);
|
||||
|
|
|
@ -35,6 +35,8 @@ typedef struct SInsertParseContext {
|
|||
} SInsertParseContext;
|
||||
|
||||
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
|
||||
static int32_t parseBoundTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt);
|
||||
static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate);
|
||||
|
||||
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
|
||||
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
|
||||
|
@ -102,7 +104,8 @@ static int32_t skipTableOptions(SInsertParseContext* pCxt, const char** pSql) {
|
|||
}
|
||||
|
||||
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
|
||||
static int32_t ignoreUsingClause(SInsertParseContext* pCxt, const char** pSql) {
|
||||
static int32_t ignoreUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||
const char** pSql = &pStmt->pSql;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SToken token;
|
||||
NEXT_TOKEN(*pSql, token);
|
||||
|
@ -138,6 +141,26 @@ static int32_t ignoreUsingClause(SInsertParseContext* pCxt, const char** pSql) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t ignoreUsingClauseAndCheckTagValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||
const char** pSql = &pStmt->pSql;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
code = parseBoundTagsClause(pCxt, pStmt);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
// pSql -> TAGS (tag1_value, ...)
|
||||
code = parseTagsClause(pCxt, pStmt, true);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = skipTableOptions(pCxt, pSql);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pDuplicate) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
*pDuplicate = false;
|
||||
|
@ -150,10 +173,12 @@ static int32_t parseDuplicateUsingClause(SInsertParseContext* pCxt, SVnodeModify
|
|||
STableMeta** pMeta = taosHashGet(pStmt->pSubTableHashObj, tbFName, strlen(tbFName));
|
||||
if (NULL != pMeta) {
|
||||
*pDuplicate = true;
|
||||
code = ignoreUsingClause(pCxt, &pStmt->pSql);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
return cloneTableMeta(*pMeta, &pStmt->pTableMeta);
|
||||
pCxt->missCache = false;
|
||||
code = cloneTableMeta(*pMeta, &pStmt->pTableMeta);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
return ignoreUsingClause(pCxt, pStmt);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -937,7 +962,7 @@ static int32_t checkSubtablePrivilege(SArray* pTagVals, SArray* pTagName, SNode*
|
|||
}
|
||||
|
||||
// pSql -> tag1_value, ...)
|
||||
static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||
static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSchema* pSchema = getTableTagSchema(pStmt->pTableMeta);
|
||||
SArray* pTagVals = NULL;
|
||||
|
@ -991,7 +1016,7 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt
|
|||
code = tTagNew(pTagVals, 1, false, &pTag);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && !isParseBindParam) {
|
||||
if (TSDB_CODE_SUCCESS == code && !isParseBindParam && !autoCreate) {
|
||||
code = buildCreateTbReq(pStmt, pTag, pTagName);
|
||||
pTag = NULL;
|
||||
}
|
||||
|
@ -1011,7 +1036,7 @@ _exit:
|
|||
|
||||
// input pStmt->pSql: TAGS (tag1_value, ...) [table_options] ...
|
||||
// output pStmt->pSql: [table_options] ...
|
||||
static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||
static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool autoCreate) {
|
||||
SToken token;
|
||||
NEXT_TOKEN(pStmt->pSql, token);
|
||||
if (TK_TAGS != token.type) {
|
||||
|
@ -1023,7 +1048,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
|||
return buildSyntaxErrMsg(&pCxt->msg, "( is expected", token.z);
|
||||
}
|
||||
|
||||
int32_t code = parseTagsClauseImpl(pCxt, pStmt);
|
||||
int32_t code = parseTagsClauseImpl(pCxt, pStmt, autoCreate);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
NEXT_VALID_TOKEN(pStmt->pSql, token);
|
||||
if (TK_NK_COMMA == token.type) {
|
||||
|
@ -1108,7 +1133,7 @@ static int32_t parseUsingClauseBottom(SInsertParseContext* pCxt, SVnodeModifyOpS
|
|||
|
||||
int32_t code = parseBoundTagsClause(pCxt, pStmt);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = parseTagsClause(pCxt, pStmt);
|
||||
code = parseTagsClause(pCxt, pStmt, false);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = parseTableOptions(pCxt, pStmt);
|
||||
|
@ -1288,17 +1313,54 @@ static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpS
|
|||
return insCreateSName(&pStmt->usingTableName, pTbName, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
|
||||
}
|
||||
|
||||
static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||
static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* ctbCacheHit) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
STableMeta* pStableMeta = NULL;
|
||||
STableMeta* pCtableMeta = NULL;
|
||||
if (pCxt->forceUpdate) {
|
||||
pCxt->missCache = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->usingTableName, &pCxt->missCache, &pStmt->pTagCond);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
bool bUsingTable = true;
|
||||
code = getTableMeta(pCxt, &pStmt->usingTableName, &pStmt->pTableMeta, &pCxt->missCache, bUsingTable);
|
||||
if (!pCxt->missCache) {
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
code = tNameExtractFullName(&pStmt->usingTableName, tbFName);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
STableMeta** ppStableMeta = taosHashGet(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName));
|
||||
if (NULL != ppStableMeta) {
|
||||
pStableMeta = *ppStableMeta;
|
||||
}
|
||||
if (NULL == pStableMeta) {
|
||||
bool bUsingTable = true;
|
||||
code = getTableMeta(pCxt, &pStmt->usingTableName, &pStableMeta, &pCxt->missCache, bUsingTable);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = taosHashPut(pStmt->pSuperTableHashObj, tbFName, strlen(tbFName), &pStableMeta, POINTER_BYTES);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pCxt->isStmtBind) {
|
||||
goto _no_ctb_cache;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
bool bUsingTable = false;
|
||||
code = getTableMeta(pCxt, &pStmt->targetTableName, &pCtableMeta, &pCxt->missCache, bUsingTable);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = (pStableMeta->suid == pCtableMeta->suid) ? TSDB_CODE_SUCCESS : TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
|
||||
*ctbCacheHit = true;
|
||||
}
|
||||
_no_ctb_cache:
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (*ctbCacheHit) {
|
||||
code = cloneTableMeta(pCtableMeta, &pStmt->pTableMeta);
|
||||
} else {
|
||||
code = cloneTableMeta(pStableMeta, &pStmt->pTableMeta);
|
||||
}
|
||||
}
|
||||
taosMemoryFree(pCtableMeta);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTargetTableVgroup(pCxt->pComCxt, pStmt, true, &pCxt->missCache);
|
||||
}
|
||||
|
@ -1314,9 +1376,14 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt
|
|||
static int32_t parseUsingTableNameImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||
SToken token;
|
||||
NEXT_TOKEN(pStmt->pSql, token);
|
||||
bool ctbCacheHit = false;
|
||||
int32_t code = preParseUsingTableName(pCxt, pStmt, &token);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = getUsingTableSchema(pCxt, pStmt);
|
||||
code = getUsingTableSchema(pCxt, pStmt, &ctbCacheHit);
|
||||
if (TSDB_CODE_SUCCESS == code && ctbCacheHit) {
|
||||
pStmt->usingTableProcessing = false;
|
||||
return ignoreUsingClauseAndCheckTagValues(pCxt, pStmt);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = storeChildTableMeta(pCxt, pStmt);
|
||||
|
@ -1337,7 +1404,6 @@ static int32_t parseUsingTableName(SInsertParseContext* pCxt, SVnodeModifyOpStmt
|
|||
if (TK_USING != token.type) {
|
||||
return getTargetTableSchema(pCxt, pStmt);
|
||||
}
|
||||
|
||||
pStmt->usingTableProcessing = true;
|
||||
// pStmt->pSql -> stb_name [(tag1_name, ...)
|
||||
pStmt->pSql += index;
|
||||
|
@ -2791,6 +2857,7 @@ static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, S
|
|||
}
|
||||
}
|
||||
pStmt->pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||
pStmt->pSuperTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||
pStmt->pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||
pStmt->pDbFNameHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
|
||||
if ((!reentry && (NULL == pStmt->pVgroupsHashObj || NULL == pStmt->pTableBlockHashObj)) ||
|
||||
|
@ -2800,6 +2867,7 @@ static int32_t createVnodeModifOpStmt(SInsertParseContext* pCxt, bool reentry, S
|
|||
}
|
||||
|
||||
taosHashSetFreeFp(pStmt->pSubTableHashObj, destroySubTableHashElem);
|
||||
taosHashSetFreeFp(pStmt->pSuperTableHashObj, destroySubTableHashElem);
|
||||
|
||||
*pOutput = (SNode*)pStmt;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -2842,7 +2910,7 @@ static int32_t checkAuthFromMetaData(const SArray* pUsers, SNode** pTagCond) {
|
|||
}
|
||||
|
||||
static int32_t getTableMetaFromMetaData(const SArray* pTables, STableMeta** pMeta) {
|
||||
if (1 != taosArrayGetSize(pTables)) {
|
||||
if (1 != taosArrayGetSize(pTables) && 2 != taosArrayGetSize(pTables)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -3119,6 +3187,29 @@ static int32_t parseInsertSqlImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt*
|
|||
return parseInsertSqlFromTable(pCxt, pStmt);
|
||||
}
|
||||
|
||||
static int32_t buildUsingInsertTableReq(SName* pSName, SName* pCName, SArray** pTables) {
|
||||
if (NULL == *pTables) {
|
||||
*pTables = taosArrayInit(2, sizeof(SName));
|
||||
if (NULL == *pTables) {
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
if (NULL == taosArrayPush(*pTables, pSName)) {
|
||||
goto _err;
|
||||
}
|
||||
if (NULL == taosArrayPush(*pTables, pCName)) {
|
||||
goto _err;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_err:
|
||||
if (NULL != *pTables) {
|
||||
taosArrayDestroy(*pTables);
|
||||
*pTables = NULL;
|
||||
}
|
||||
return terrno;
|
||||
}
|
||||
|
||||
static int32_t buildInsertTableReq(SName* pName, SArray** pTables) {
|
||||
*pTables = taosArrayInit(1, sizeof(SName));
|
||||
if (NULL == *pTables) {
|
||||
|
@ -3133,6 +3224,26 @@ static int32_t buildInsertTableReq(SName* pName, SArray** pTables) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t buildInsertUsingDbReq(SName* pSName, SName* pCName, SArray** pDbs) {
|
||||
if (NULL == *pDbs) {
|
||||
*pDbs = taosArrayInit(1, sizeof(STablesReq));
|
||||
if (NULL == *pDbs) {
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
STablesReq req = {0};
|
||||
req.autoCreate = 1;
|
||||
(void)tNameGetFullDbName(pSName, req.dbFName);
|
||||
(void)tNameGetFullDbName(pCName, req.dbFName);
|
||||
|
||||
int32_t code = buildUsingInsertTableReq(pSName, pCName, &req.pTables);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL == taosArrayPush(*pDbs, &req)) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildInsertDbReq(SName* pName, SArray** pDbs) {
|
||||
if (NULL == *pDbs) {
|
||||
*pDbs = taosArrayInit(1, sizeof(STablesReq));
|
||||
|
@ -3182,7 +3293,7 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifyOpSt
|
|||
if (0 == pStmt->usingTableName.type) {
|
||||
code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableMeta);
|
||||
} else {
|
||||
code = buildInsertDbReq(&pStmt->usingTableName, &pCatalogReq->pTableMeta);
|
||||
code = buildInsertUsingDbReq(&pStmt->usingTableName, &pStmt->targetTableName, &pCatalogReq->pTableMeta);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -0,0 +1,114 @@
|
|||
import time
|
||||
import taos
|
||||
|
||||
conn = taos.connect()
|
||||
|
||||
total_batches = 100
|
||||
tables_per_batch = 100
|
||||
|
||||
def prepare_database():
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("DROP DATABASE IF EXISTS test")
|
||||
cursor.execute("CREATE DATABASE IF NOT EXISTS test")
|
||||
cursor.execute("USE test")
|
||||
cursor.execute("CREATE STABLE IF NOT EXISTS stb (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT)")
|
||||
cursor.close()
|
||||
|
||||
def test_auto_create_tables():
|
||||
"""测试场景1:自动建表插入"""
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("USE test")
|
||||
print("开始测试自动建表插入...")
|
||||
|
||||
start_time = time.time()
|
||||
for _ in range(100):
|
||||
for batch in range(total_batches):
|
||||
# 生成当前批次的子表ID范围
|
||||
start_id = batch * tables_per_batch
|
||||
end_id = start_id + tables_per_batch
|
||||
|
||||
# 构建批量插入SQL
|
||||
sql_parts = []
|
||||
for i in range(start_id, end_id):
|
||||
sql_part = f"t_{i} USING stb TAGS ({i}) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')"
|
||||
sql_parts.append(sql_part)
|
||||
|
||||
# 执行批量插入
|
||||
full_sql = "INSERT INTO " + " ".join(sql_parts)
|
||||
cursor.execute(full_sql)
|
||||
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
print(f"自动建表插入耗时: {elapsed:.2f} 秒")
|
||||
|
||||
cursor.close()
|
||||
return elapsed
|
||||
|
||||
def precreate_tables():
|
||||
"""预处理:创建所有子表结构"""
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("USE test")
|
||||
|
||||
print("\n开始预创建子表...")
|
||||
start_time = time.time()
|
||||
|
||||
for batch in range(total_batches):
|
||||
start_id = batch * tables_per_batch
|
||||
end_id = start_id + tables_per_batch
|
||||
|
||||
for i in range(start_id, end_id):
|
||||
sql_part = f"CREATE TABLE t_{i} USING stb TAGS ({i})"
|
||||
cursor.execute(sql_part)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
print(f"子表预创建耗时: {elapsed:.2f} 秒")
|
||||
|
||||
cursor.close()
|
||||
|
||||
def test_direct_insert():
|
||||
"""测试场景2:直接插入已存在的子表"""
|
||||
cursor = conn.cursor()
|
||||
cursor.execute("USE test")
|
||||
|
||||
print("\n开始测试直接插入...")
|
||||
start_time = time.time()
|
||||
for _ in range(100):
|
||||
for batch in range(total_batches):
|
||||
start_id = batch * tables_per_batch
|
||||
end_id = start_id + tables_per_batch
|
||||
|
||||
# 构建批量插入SQL
|
||||
sql_parts = []
|
||||
for i in range(start_id, end_id):
|
||||
sql_part = f"t_{i} VALUES ('2024-01-01 00:00:01', 1, 2.0, 'test')"
|
||||
sql_parts.append(sql_part)
|
||||
|
||||
# 执行批量插入
|
||||
full_sql = "INSERT INTO " + " ".join(sql_parts)
|
||||
cursor.execute(full_sql)
|
||||
|
||||
elapsed = time.time() - start_time
|
||||
print(f"直接插入耗时: {elapsed:.2f} 秒")
|
||||
|
||||
cursor.close()
|
||||
return elapsed
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 初始化数据库环境
|
||||
prepare_database()
|
||||
# 预创建所有子表
|
||||
precreate_tables()
|
||||
# 测试场景1:自动建表插入
|
||||
auto_create_time = test_auto_create_tables()
|
||||
# # 清理环境并重新初始化
|
||||
# prepare_database()
|
||||
# # 预创建所有子表
|
||||
# precreate_tables()
|
||||
# # 测试场景2:直接插入
|
||||
# direct_insert_time = test_direct_insert()
|
||||
|
||||
# 打印最终结果
|
||||
print("\n测试结果对比:")
|
||||
print(f"自动建表插入耗时: {auto_create_time:.2f} 秒")
|
||||
# print(f"直接插入耗时: {direct_insert_time:.2f} 秒")
|
||||
# print(f"性能差异: {auto_create_time/direct_insert_time:.1f} 倍")
|
|
@ -0,0 +1,195 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
import time
|
||||
import random
|
||||
|
||||
import taos
|
||||
import frame
|
||||
import frame.etool
|
||||
|
||||
|
||||
from frame.log import *
|
||||
from frame.cases import *
|
||||
from frame.sql import *
|
||||
from frame.caseBase import *
|
||||
from frame import *
|
||||
|
||||
|
||||
class TDTestCase(TBase):
|
||||
|
||||
def prepare_database(self):
|
||||
tdLog.info(f"prepare database")
|
||||
tdSql.execute("DROP DATABASE IF EXISTS test")
|
||||
tdSql.execute("CREATE DATABASE IF NOT EXISTS test")
|
||||
tdSql.execute("USE test")
|
||||
tdSql.execute("CREATE STABLE IF NOT EXISTS stb (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT)")
|
||||
|
||||
|
||||
def insert_table_auto_create(self):
|
||||
tdLog.info(f"insert table auto create")
|
||||
tdSql.execute("USE test")
|
||||
tdLog.info("start to test auto create insert...")
|
||||
tdSql.execute("INSERT INTO t_0 USING stb TAGS (0) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')")
|
||||
tdSql.execute("INSERT INTO t_0 USING stb TAGS (0) VALUES ('2024-01-01 00:00:01', 1, 2.0, 'test')")
|
||||
tdSql.query("select * from t_0")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
def insert_table_pre_create(self):
|
||||
tdLog.info(f"insert table pre create")
|
||||
tdSql.execute("USE test")
|
||||
tdLog.info("start to pre create table...")
|
||||
tdSql.execute("CREATE TABLE t_1 USING stb TAGS (1)")
|
||||
tdLog.info("start to test pre create insert...")
|
||||
tdSql.execute("INSERT INTO t_1 USING stb TAGS (1) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')")
|
||||
tdSql.execute("INSERT INTO t_1 VALUES ('2024-01-01 00:00:01', 1, 2.0, 'test')")
|
||||
tdSql.query("select * from t_1")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
def insert_table_auto_insert_with_cache(self):
|
||||
tdLog.info(f"insert table auto insert with cache")
|
||||
tdSql.execute("USE test")
|
||||
tdLog.info("start to test auto insert with cache...")
|
||||
tdSql.execute("CREATE TABLE t_2 USING stb TAGS (2)")
|
||||
tdLog.info("start to insert to init cache...")
|
||||
tdSql.execute("INSERT INTO t_2 VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')")
|
||||
tdSql.execute("INSERT INTO t_2 USING stb TAGS (2) VALUES ('2024-01-01 00:00:01', 1, 2.0, 'test')")
|
||||
tdSql.query("select * from t_2")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
def insert_table_auto_insert_with_multi_rows(self):
|
||||
tdLog.info(f"insert table auto insert with multi rows")
|
||||
tdSql.execute("USE test")
|
||||
tdLog.info("start to test auto insert with multi rows...")
|
||||
tdSql.execute("CREATE TABLE t_3 USING stb TAGS (3)")
|
||||
tdLog.info("start to insert multi rows...")
|
||||
tdSql.execute("INSERT INTO t_3 VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test'), ('2024-01-01 00:00:01', 1, 2.0, 'test')")
|
||||
tdSql.query("select * from t_3")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
tdLog.info("start to insert multi rows with direct insert and auto create...")
|
||||
tdSql.execute("INSERT INTO t_4 USING stb TAGS (4) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test'), t_3 VALUES ('2024-01-01 00:00:02', 1, 2.0, 'test')")
|
||||
tdSql.query("select * from t_4")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.query("select * from t_3")
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdLog.info("start to insert multi rows with auto create and direct insert...")
|
||||
tdSql.execute("INSERT INTO t_3 VALUES ('2024-01-01 00:00:03', 1, 2.0, 'test'),t_4 USING stb TAGS (4) VALUES ('2024-01-01 00:00:01', 1, 2.0, 'test'),")
|
||||
tdSql.query("select * from t_4")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.query("select * from t_3")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdLog.info("start to insert multi rows with auto create into same table...")
|
||||
tdSql.execute("INSERT INTO t_10 USING stb TAGS (10) VALUES ('2024-01-01 00:00:04', 1, 2.0, 'test'),t_10 USING stb TAGS (10) VALUES ('2024-01-01 00:00:05', 1, 2.0, 'test'),")
|
||||
tdSql.query("select * from t_10")
|
||||
tdSql.checkRows(2)
|
||||
|
||||
def check_some_err_case(self):
|
||||
tdLog.info(f"check some err case")
|
||||
tdSql.execute("USE test")
|
||||
|
||||
tdLog.info("start to test err stb name...")
|
||||
tdSql.error("INSERT INTO t_5 USING errrrxx TAGS (5) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')", expectErrInfo="Table does not exist")
|
||||
|
||||
tdLog.info("start to test err syntax name...")
|
||||
tdSql.error("INSERT INTO t_5 USING stb TAG (5) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')", expectErrInfo="syntax error")
|
||||
|
||||
tdLog.info("start to test err syntax values...")
|
||||
tdSql.error("INSERT INTO t_5 USING stb TAG (5) VALUS ('2024-01-01 00:00:00', 1, 2.0, 'test')", expectErrInfo="syntax error")
|
||||
|
||||
tdLog.info("start to test err tag counts...")
|
||||
tdSql.error("INSERT INTO t_5 USING stb TAG (5,1) VALUS ('2024-01-01 00:00:00', 1, 2.0, 'test')", expectErrInfo="syntax error")
|
||||
|
||||
tdLog.info("start to test err tag counts...")
|
||||
tdSql.error("INSERT INTO t_5 USING stb TAG ('dasds') VALUS ('2024-01-01 00:00:00', 1, 2.0, 'test')", expectErrInfo="syntax error")
|
||||
|
||||
tdLog.info("start to test err values counts...")
|
||||
tdSql.error("INSERT INTO t_5 USING stb TAGS (5) VALUES ('2024-01-01 00:00:00', 1, 1 ,2.0, 'test')", expectErrInfo="Illegal number of columns")
|
||||
|
||||
tdLog.info("start to test err values...")
|
||||
tdSql.error("INSERT INTO t_5 USING stb TAGS (5) VALUES ('2024-01-01 00:00:00', 'dasdsa', 1 ,2.0, 'test')", expectErrInfo="syntax error")
|
||||
|
||||
def check_same_table_same_ts(self):
|
||||
tdLog.info(f"check same table same ts")
|
||||
tdSql.execute("USE test")
|
||||
tdSql.execute("INSERT INTO t_6 USING stb TAGS (6) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test') t_6 USING stb TAGS (6) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')")
|
||||
tdSql.query("select * from t_6")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
def check_tag_parse_error_with_cache(self):
|
||||
tdLog.info(f"check tag parse error with cache")
|
||||
tdSql.execute("USE test")
|
||||
tdSql.execute("INSERT INTO t_7 USING stb TAGS (7) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')")
|
||||
tdSql.error("INSERT INTO t_7 USING stb TAGS ('ddd') VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')", expectErrInfo="syntax error")
|
||||
tdSql.query("select * from t_7")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
def check_duplicate_table_with_err_tag(self):
|
||||
tdLog.info(f"check tag parse error with cache")
|
||||
tdSql.execute("USE test")
|
||||
tdSql.execute("INSERT INTO t_8 USING stb TAGS (8) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test') t_8 USING stb TAGS (ddd) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')")
|
||||
tdSql.query("select * from t_8")
|
||||
tdSql.checkRows(1)
|
||||
|
||||
def check_table_with_another_stb_name(self):
|
||||
tdLog.info(f"check table with another stb name")
|
||||
tdSql.execute("USE test")
|
||||
tdSql.execute("CREATE STABLE IF NOT EXISTS stb2 (ts TIMESTAMP, a INT, b FLOAT, c BINARY(10)) TAGS (e_id INT)")
|
||||
tdSql.execute("INSERT INTO t_20 USING stb2 TAGS (20) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')")
|
||||
tdSql.query("select * from t_20")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.error("INSERT INTO t_20 USING stb TAGS (20) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')", expectErrInfo="Table already exists in other stables")
|
||||
tdSql.error("INSERT INTO t_20 USING stb TAGS (20) VALUES ('2024-01-01 00:00:00', 1, 2.0, 'test')", expectErrInfo="Table already exists in other stables")
|
||||
|
||||
|
||||
# run
|
||||
def run(self):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
|
||||
# prepare database
|
||||
self.prepare_database()
|
||||
|
||||
# insert table auto create
|
||||
self.insert_table_auto_create()
|
||||
|
||||
# insert table pre create
|
||||
self.insert_table_pre_create()
|
||||
|
||||
# insert table auto insert with cache
|
||||
self.insert_table_auto_insert_with_cache()
|
||||
|
||||
# insert table auto insert with multi rows
|
||||
self.insert_table_auto_insert_with_multi_rows()
|
||||
|
||||
# check some err case
|
||||
self.check_some_err_case()
|
||||
|
||||
# check same table same ts
|
||||
self.check_same_table_same_ts()
|
||||
|
||||
# check tag parse error with cache
|
||||
self.check_tag_parse_error_with_cache()
|
||||
|
||||
# check duplicate table with err tag
|
||||
self.check_duplicate_table_with_err_tag()
|
||||
|
||||
# check table with another stb name
|
||||
self.check_table_with_another_stb_name()
|
||||
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -51,6 +51,7 @@
|
|||
,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_ts5400.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_having.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f insert/insert_basic.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f insert/auto_create_insert.py
|
||||
,,y,army,./pytest.sh python3 ./test.py -f cluster/splitVgroupByLearner.py -N 3
|
||||
,,y,army,./pytest.sh python3 ./test.py -f authorith/authBasic.py -N 3
|
||||
,,n,army,python3 ./test.py -f cmdline/fullopt.py
|
||||
|
|
Loading…
Reference in New Issue