Merge pull request #23843 from taosdata/fix3.0/TS-4272
fix crash when insert big csv
This commit is contained in:
commit
0d04dc296e
|
@ -449,6 +449,7 @@ typedef struct SVnodeModifyOpStmt {
|
|||
SHashObj* pSubTableHashObj; // SHashObj<table_name, STableMeta*>
|
||||
SHashObj* pTableNameHashObj; // set of table names for refreshing meta, sync mode
|
||||
SHashObj* pDbFNameHashObj; // set of db names for refreshing meta, sync mode
|
||||
SHashObj* pTableCxtHashObj; // temp SHashObj<tuid, STableDataCxt*> for single request
|
||||
SArray* pVgDataBlocks; // SArray<SVgroupDataCxt*>
|
||||
SVCreateTbReq* pCreateTblReq;
|
||||
TdFilePtr fp;
|
||||
|
|
|
@ -6906,6 +6906,7 @@ void destoryVnodeModifyOpStmt(SNode* pNode) {
|
|||
taosHashCleanup(pStmt->pSubTableHashObj);
|
||||
taosHashCleanup(pStmt->pTableNameHashObj);
|
||||
taosHashCleanup(pStmt->pDbFNameHashObj);
|
||||
taosHashCleanup(pStmt->pTableCxtHashObj);
|
||||
if (pStmt->freeHashFunc) {
|
||||
pStmt->freeHashFunc(pStmt->pTableBlockHashObj);
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey);
|
|||
int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta,
|
||||
SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals);
|
||||
int32_t initTableColSubmitData(STableDataCxt *pTableCxt);
|
||||
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks);
|
||||
int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks, bool isRebuild);
|
||||
int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks);
|
||||
void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash);
|
||||
void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt);
|
||||
|
|
|
@ -425,7 +425,7 @@ SQuery* smlInitHandle() {
|
|||
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) {
|
||||
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot;
|
||||
// merge according to vgId
|
||||
int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks);
|
||||
int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks, true);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("insMergeTableDataCxt failed");
|
||||
return code;
|
||||
|
|
|
@ -55,6 +55,7 @@ typedef struct SInsertParseContext {
|
|||
bool usingDuplicateTable;
|
||||
bool forceUpdate;
|
||||
bool needTableTagVal;
|
||||
bool needRequest; // whether or not request server
|
||||
} SInsertParseContext;
|
||||
|
||||
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
|
||||
|
@ -652,6 +653,10 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStm
|
|||
}
|
||||
|
||||
static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* pTagName) {
|
||||
if (pStmt->pCreateTblReq) {
|
||||
tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
|
||||
taosMemoryFreeClear(pStmt->pCreateTblReq);
|
||||
}
|
||||
pStmt->pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
|
||||
if (NULL == pStmt->pCreateTblReq) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -1797,9 +1802,10 @@ static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
|
|||
taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq);
|
||||
}
|
||||
|
||||
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
||||
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
|
||||
bool bFirstTable = false;
|
||||
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
|
||||
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken,
|
||||
STableDataCxt** ppTableDataCxt) {
|
||||
bool bFirstTable = false;
|
||||
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable);
|
||||
if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
|
||||
return code;
|
||||
|
@ -1809,15 +1815,14 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt
|
|||
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
|
||||
}
|
||||
|
||||
STableDataCxt* pTableDataCxt = NULL;
|
||||
code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid),
|
||||
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pTableDataCxt, false, true);
|
||||
initTableColSubmitData(pTableDataCxt);
|
||||
pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true);
|
||||
initTableColSubmitData(*ppTableDataCxt);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
SRow** pRow = taosArrayReserve(pTableDataCxt->pData->aRowP, 1);
|
||||
code = tRowBuild(pStbRowsCxt->aColVals, pTableDataCxt->pSchema, pRow);
|
||||
SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1);
|
||||
code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow));
|
||||
insCheckTableDataOrder(*ppTableDataCxt, TD_ROW_KEY(*pRow));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1915,7 +1920,8 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
|
|||
if (!pStmt->stbSyntax) {
|
||||
code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken);
|
||||
} else {
|
||||
code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken);
|
||||
STableDataCxt* pTableDataCxt = NULL;
|
||||
code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken, &pTableDataCxt);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1979,7 +1985,14 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
|||
if (!pStmt->stbSyntax) {
|
||||
code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token);
|
||||
} else {
|
||||
code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token);
|
||||
STableDataCxt* pTableDataCxt = NULL;
|
||||
code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token, &pTableDataCxt);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt;
|
||||
void* pData = pTableDataCxt;
|
||||
taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData,
|
||||
POINTER_BYTES);
|
||||
}
|
||||
}
|
||||
if (code && firstLine) {
|
||||
firstLine = false;
|
||||
|
@ -1992,7 +2005,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
|||
(*pNumOfRows)++;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) > tsMaxInsertBatchRows) {
|
||||
if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) >= tsMaxInsertBatchRows) {
|
||||
pStmt->fileProcessing = true;
|
||||
break;
|
||||
}
|
||||
|
@ -2003,7 +2016,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
|||
|
||||
parserDebug("0x%" PRIx64 " %d rows have been parsed", pCxt->pComCxt->requestId, *pNumOfRows);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) &&
|
||||
if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && 0 == pStmt->totalRowsNum &&
|
||||
(!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) {
|
||||
code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL);
|
||||
}
|
||||
|
@ -2011,6 +2024,12 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt
|
|||
}
|
||||
|
||||
static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) {
|
||||
// init only for file
|
||||
if (NULL == pStmt->pTableCxtHashObj) {
|
||||
pStmt->pTableCxtHashObj =
|
||||
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
int32_t numOfRows = 0;
|
||||
int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -2022,7 +2041,18 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt
|
|||
} else {
|
||||
parserDebug("0x%" PRIx64 " insert from csv. File is too large, do it in batches.", pCxt->pComCxt->requestId);
|
||||
}
|
||||
if (pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", NULL);
|
||||
}
|
||||
}
|
||||
|
||||
// just record pTableCxt whose data come from file
|
||||
if (!pStmt->stbSyntax && numOfRows > 0) {
|
||||
void* pData = rowsDataCxt.pTableDataCxt;
|
||||
taosHashPut(pStmt->pTableCxtHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), &pData,
|
||||
POINTER_BYTES);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2061,6 +2091,9 @@ static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
|
|||
NEXT_TOKEN(pStmt->pSql, token);
|
||||
switch (token.type) {
|
||||
case TK_VALUES:
|
||||
if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
|
||||
return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", token.z);
|
||||
}
|
||||
return parseValuesClause(pCxt, pStmt, rowsDataCxt, &token);
|
||||
case TK_FILE:
|
||||
return parseFileClause(pCxt, pStmt, rowsDataCxt, &token);
|
||||
|
@ -2275,8 +2308,25 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpSt
|
|||
return setStmtInfo(pCxt, pStmt);
|
||||
}
|
||||
|
||||
// release old array alloced by merge
|
||||
pStmt->freeArrayFunc(pStmt->pVgDataBlocks);
|
||||
pStmt->pVgDataBlocks = NULL;
|
||||
|
||||
bool fileOnly = (pStmt->insertType == TSDB_QUERY_TYPE_FILE_INSERT);
|
||||
if (fileOnly) {
|
||||
// none data, skip merge & buildvgdata
|
||||
if (0 == taosHashGetSize(pStmt->pTableCxtHashObj)) {
|
||||
pCxt->needRequest = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
// merge according to vgId
|
||||
int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks);
|
||||
int32_t code = insMergeTableDataCxt(fileOnly ? pStmt->pTableCxtHashObj : pStmt->pTableBlockHashObj,
|
||||
&pStmt->pVgDataBlocks, pStmt->fileProcessing);
|
||||
// clear tmp hashobj only
|
||||
taosHashClear(pStmt->pTableCxtHashObj);
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks);
|
||||
}
|
||||
|
@ -2718,6 +2768,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
|
|||
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
|
||||
.missCache = false,
|
||||
.usingDuplicateTable = false,
|
||||
.needRequest = true,
|
||||
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)};
|
||||
|
||||
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
|
||||
|
@ -2732,5 +2783,10 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal
|
|||
code = setRefreshMeta(*pQuery);
|
||||
}
|
||||
insDestroyBoundColInfo(&context.tags);
|
||||
|
||||
// if no data to insert, set emptyMode to avoid request server
|
||||
if (!context.needRequest) {
|
||||
(*pQuery)->execMode = QUERY_EXEC_MODE_EMPTY_RESULT;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -58,7 +58,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash
|
|||
|
||||
// merge according to vgId
|
||||
if (taosHashGetSize(pBlockHash) > 0) {
|
||||
code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks);
|
||||
code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks, true);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks);
|
||||
|
|
|
@ -289,6 +289,14 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) {
|
|||
pTmp->uid = pSrc->uid;
|
||||
pTmp->sver = pSrc->sver;
|
||||
pTmp->pCreateTbReq = NULL;
|
||||
if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
|
||||
if (pSrc->pCreateTbReq) {
|
||||
cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq);
|
||||
} else {
|
||||
pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE;
|
||||
}
|
||||
}
|
||||
|
||||
if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
|
||||
pTmp->aCol = taosArrayInit(128, sizeof(SColData));
|
||||
if (NULL == pTmp->aCol) {
|
||||
|
@ -416,15 +424,21 @@ void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) {
|
|||
taosHashCleanup(pTableCxtHash);
|
||||
}
|
||||
|
||||
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt) {
|
||||
static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild) {
|
||||
if (NULL == pVgCxt->pData->aSubmitTbData) {
|
||||
pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData));
|
||||
if (NULL == pVgCxt->pData->aSubmitTbData) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
// push data to submit, rebuild empty data for next submit
|
||||
taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData);
|
||||
rebuildTableData(pTableCxt->pData, &pTableCxt->pData);
|
||||
if (isRebuild) {
|
||||
rebuildTableData(pTableCxt->pData, &pTableCxt->pData);
|
||||
} else {
|
||||
taosMemoryFreeClear(pTableCxt->pData);
|
||||
}
|
||||
|
||||
qDebug("add tableDataCxt uid:%" PRId64 " to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId);
|
||||
|
||||
|
@ -467,7 +481,7 @@ int insColDataComp(const void* lp, const void* rp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
|
||||
int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) {
|
||||
SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
|
||||
SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES);
|
||||
if (NULL == pVgroupHash || NULL == pVgroupList) {
|
||||
|
@ -502,6 +516,13 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
|
|||
|
||||
tColDataSortMerge(pTableCxt->pData->aCol);
|
||||
} else {
|
||||
// skip the table has no data to insert
|
||||
// eg: import a csv without valid data
|
||||
// if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) {
|
||||
// qWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid);
|
||||
// p = taosHashIterate(pTableHash, p);
|
||||
// continue;
|
||||
// }
|
||||
if (!pTableCxt->ordered) {
|
||||
code = tRowSort(pTableCxt->pData->aRowP);
|
||||
}
|
||||
|
@ -520,7 +541,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) {
|
|||
pVgCxt = *(SVgroupDataCxt**)pp;
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = fillVgroupDataCxt(pTableCxt, pVgCxt);
|
||||
code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -300,6 +300,7 @@ e
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionUS.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionNS.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4219.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/ts-4272.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
|
||||
|
|
|
@ -0,0 +1,205 @@
|
|||
|
||||
import csv
|
||||
from datetime import datetime
|
||||
|
||||
import taos
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
self.testcasePath = os.path.split(__file__)[0]
|
||||
self.testcasefilename = os.path.split(__file__)[-1]
|
||||
self.ts = 1700638570000 # 2023-11-22T07:36:10.000Z
|
||||
self.db = 'db1'
|
||||
self.tb1 = 'd001'
|
||||
self.tb2 = 'd002'
|
||||
self.stable0 = "meters"
|
||||
self.stable1 = "stb_1"
|
||||
self.stable2 = "stb_null"
|
||||
self.tag1 = f'using {self.stable0}(groupId) tags(1)'
|
||||
self.tag2 = f'using {self.stable0}(groupId) tags(2)'
|
||||
self.file1 = f"{self.testcasePath}/b.csv"
|
||||
self.file2 = f"{self.testcasePath}/c.csv"
|
||||
|
||||
#os.system("rm -rf %s/b.csv" %self.testcasePath)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def check_count(self, rows, records):
|
||||
tdSql.execute(f"use {self.db};")
|
||||
tdSql.query(f"select tbname,count(*) from {self.stable0} group by tbname order by tbname;")
|
||||
tdSql.checkRows(rows)
|
||||
for i in range(rows):
|
||||
tdSql.checkData(i, 1, records[i])
|
||||
|
||||
def reset_tb(self):
|
||||
# create database and tables
|
||||
# os.system("taos -s 'drop database if exists d1;'")
|
||||
# os.system("taos -s 'create database d1;use d1;create stable meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);'")
|
||||
# os.system(f"taos -s 'use d1;create table d2001 using meters(groupId) tags(5);'")
|
||||
# res = os.system(f"taos -s 'use d1;create table d2002 using meters(groupId) tags(6);'")
|
||||
# if (0 != res):
|
||||
# tdLog.exit(f"create tb error")
|
||||
|
||||
tdSql.execute(f"drop database if exists {self.db};")
|
||||
tdSql.execute(f"create database {self.db};")
|
||||
tdSql.execute(f"use {self.db};")
|
||||
tdSql.execute(f"create stable {self.stable0} (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);")
|
||||
tdSql.execute(f"create table {self.tb1} {self.tag1};")
|
||||
tdSql.execute(f"create table {self.tb2} {self.tag2};")
|
||||
tdSql.execute(f"create stable {self.stable1} (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);")
|
||||
tdSql.execute(f"create stable {self.stable2} (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);")
|
||||
|
||||
def test(self, sql):
|
||||
sql = f"use {self.db};" + sql
|
||||
res = os.system(f'taos -s "{sql}"')
|
||||
# if (0 != res):
|
||||
# tdLog.exit(f"taos sql error")
|
||||
|
||||
|
||||
def check(self):
|
||||
# same table, auto create + create
|
||||
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';"
|
||||
self.test(sql)
|
||||
|
||||
# same table, create + insert
|
||||
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} file '{self.file2}';"
|
||||
self.test(sql)
|
||||
|
||||
# same table, insert + create
|
||||
sql = f"insert into {self.tb1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';"
|
||||
self.test(sql)
|
||||
|
||||
# same table, insert + insert
|
||||
sql = f"insert into {self.tb1} file '{self.file1}' {self.tb1} file '{self.file2}';"
|
||||
self.test(sql)
|
||||
|
||||
# diff table auto create + create
|
||||
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';"
|
||||
self.test(sql)
|
||||
|
||||
# diff table, create + insert
|
||||
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} file '{self.file2}';"
|
||||
self.test(sql)
|
||||
|
||||
# diff table, insert + create
|
||||
sql = f"insert into {self.tb1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';"
|
||||
self.test(sql)
|
||||
|
||||
# diff table, insert + insert
|
||||
sql = f"insert into {self.tb1} file '{self.file1}' {self.tb2} file '{self.file2}';"
|
||||
self.test(sql)
|
||||
|
||||
# bigNum = 1010000
|
||||
# self.check_count(5, [2100, 2100, bigNum, bigNum, bigNum])
|
||||
|
||||
result = os.popen("taos -s 'select count(*) from %s.%s'" %(self.db, self.tb1))
|
||||
res = result.read()
|
||||
if (f"OK" in res):
|
||||
tdLog.info(f"check count success")
|
||||
|
||||
def make_csv(self, filepath, once, qtime, startts):
|
||||
f = open(filepath, 'w')
|
||||
with f:
|
||||
writer = csv.writer(f)
|
||||
for j in range(qtime):
|
||||
ts = startts + j*once
|
||||
rows = []
|
||||
for i in range(once):
|
||||
rows.append([ts + i, 0.3 + (i%10)/100.0, 210 + i%10, 10.0 + (i%20)/20.0])
|
||||
writer.writerows(rows)
|
||||
f.close()
|
||||
print(datetime.now(), filepath, " ready!")
|
||||
|
||||
def test_mix(self):
|
||||
#forbid use both value and file in one insert
|
||||
result = os.popen(f"insert into {self.tb1} file '{self.file2}' {self.tb2} values('2021-07-13 14:06:34.630', 10.2, 219, 0.32);")
|
||||
res = result.read()
|
||||
if (f"error" in res):
|
||||
tdLog.info(f"forbid success")
|
||||
|
||||
def test_bigcsv(self):
|
||||
# prepare csv
|
||||
print("start csv data prepare")
|
||||
once = 10000
|
||||
qtime1 = 101
|
||||
qtime2 = 100
|
||||
rowNum1 = qtime1 * once
|
||||
rowNum2 = qtime2 * once
|
||||
self.make_csv(self.file1, once, qtime1, self.ts - 86400000)
|
||||
self.make_csv(self.file2, once, qtime2, self.ts)
|
||||
print("end csv data prepare")
|
||||
|
||||
# auto create + insert
|
||||
sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}';"
|
||||
self.test(sql)
|
||||
|
||||
# only insert
|
||||
sql = f"insert into {self.tb2} file '{self.file2}';"
|
||||
self.test(sql)
|
||||
print("end insert to table")
|
||||
|
||||
#tdSql.execute(f"use d1;")
|
||||
tdSql.query(f"select tbname,count(*) from {self.stable0} group by tbname order by tbname;")
|
||||
tdSql.checkRows(2)
|
||||
tdSql.checkData(0, 1, rowNum1)
|
||||
tdSql.checkData(1, 1, rowNum2)
|
||||
print("check insert file to table success")
|
||||
|
||||
def make_stable_csv(self, filepath, once, qtime, startts, table_name):
|
||||
f = open(filepath, 'w')
|
||||
with f:
|
||||
writer = csv.writer(f)
|
||||
for j in range(qtime):
|
||||
offset = j*once
|
||||
ts = startts + offset
|
||||
rows = []
|
||||
for i in range(once):
|
||||
rows.append([table_name, ts + i, offset + i, 'NULL'])
|
||||
writer.writerows(rows)
|
||||
f.close()
|
||||
print(datetime.now(), filepath, " ready!")
|
||||
|
||||
def test_stable_csv(self):
|
||||
# prepare csv
|
||||
print("start stable_csv data prepare")
|
||||
once = 10000
|
||||
qtime1 = 101
|
||||
qtime2 = 100
|
||||
# rowNum1 = qtime1 * once
|
||||
# rowNum2 = qtime2 * once
|
||||
child_1 = f"{self.stable1}_1"
|
||||
child_2 = f"{self.stable2}_1"
|
||||
self.make_stable_csv(self.file1, once, qtime1, self.ts - 86400000, child_1)
|
||||
self.make_stable_csv(self.file2, once, qtime2, self.ts, child_2)
|
||||
print("end stable_csv data prepare")
|
||||
|
||||
# insert create child table of stable
|
||||
sql = f"insert into {self.db}.{self.stable1}(tbname,ts,q_int,q_binary) file '{self.file1}' {self.db}.{self.stable2}(tbname,ts,q_int,q_binary) file '{self.file2}';"
|
||||
self.test(sql)
|
||||
print("end insert to stable")
|
||||
|
||||
#tdSql.execute(f"insert into {self.db}.{child_1}(ts, q_int) values(now, 1);")
|
||||
tdSql.query(f"select tbname,count(*) from {self.stable1} group by tbname order by tbname;")
|
||||
tdSql.checkRows(0)
|
||||
print("check stable success")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.reset_tb()
|
||||
self.test_stable_csv()
|
||||
self.test_bigcsv()
|
||||
self.test_mix()
|
||||
self.check()
|
||||
tdSql.close()
|
||||
|
||||
def stop(self):
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue