From 5c5efa34eb11b4e798919e46f90e9917250a7450 Mon Sep 17 00:00:00 2001 From: Bob Liu Date: Wed, 29 Nov 2023 01:53:13 +0800 Subject: [PATCH 1/5] fix crash when insert big csv --- include/libs/nodes/querynodes.h | 1 + source/libs/nodes/src/nodesUtilFuncs.c | 1 + source/libs/parser/inc/parInsertUtil.h | 2 +- source/libs/parser/src/parInsertSml.c | 2 +- source/libs/parser/src/parInsertSql.c | 52 +++++++- source/libs/parser/src/parInsertStmt.c | 2 +- source/libs/parser/src/parInsertUtil.c | 29 ++++- tests/parallel_test/cases.task | 1 + tests/system-test/1-insert/ts-4272.py | 158 +++++++++++++++++++++++++ 9 files changed, 238 insertions(+), 10 deletions(-) create mode 100644 tests/system-test/1-insert/ts-4272.py diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 19dc8c9e4d..5c5172b9cd 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -449,6 +449,7 @@ typedef struct SVnodeModifyOpStmt { SHashObj* pSubTableHashObj; // SHashObj SHashObj* pTableNameHashObj; // set of table names for refreshing meta, sync mode SHashObj* pDbFNameHashObj; // set of db names for refreshing meta, sync mode + SHashObj* pTableCxtHashObj; // temp SHashObj for single request SArray* pVgDataBlocks; // SArray SVCreateTbReq* pCreateTblReq; TdFilePtr fp; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index d167d81c82..e730ccf21b 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -887,6 +887,7 @@ void nodesDestroyNode(SNode* pNode) { taosHashCleanup(pStmt->pSubTableHashObj); taosHashCleanup(pStmt->pTableNameHashObj); taosHashCleanup(pStmt->pDbFNameHashObj); + taosHashCleanup(pStmt->pTableCxtHashObj); if (pStmt->freeHashFunc) { pStmt->freeHashFunc(pStmt->pTableBlockHashObj); } diff --git a/source/libs/parser/inc/parInsertUtil.h b/source/libs/parser/inc/parInsertUtil.h index ce8c2d8a3d..b20587dd43 100644 --- a/source/libs/parser/inc/parInsertUtil.h +++ b/source/libs/parser/inc/parInsertUtil.h @@ -50,7 +50,7 @@ void insCheckTableDataOrder(STableDataCxt *pTableCxt, TSKEY tsKey); int32_t insGetTableDataCxt(SHashObj *pHash, void *id, int32_t idLen, STableMeta *pTableMeta, SVCreateTbReq **pCreateTbReq, STableDataCxt **pTableCxt, bool colMode, bool ignoreColVals); int32_t initTableColSubmitData(STableDataCxt *pTableCxt); -int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks); +int32_t insMergeTableDataCxt(SHashObj *pTableHash, SArray **pVgDataBlocks, bool isRebuild); int32_t insBuildVgDataBlocks(SHashObj *pVgroupsHashObj, SArray *pVgDataBlocks, SArray **pDataBlocks); void insDestroyTableDataCxtHashMap(SHashObj *pTableCxtHash); void insDestroyVgroupDataCxt(SVgroupDataCxt *pVgCxt); diff --git a/source/libs/parser/src/parInsertSml.c b/source/libs/parser/src/parInsertSml.c index f2194402da..2dbba38212 100644 --- a/source/libs/parser/src/parInsertSml.c +++ b/source/libs/parser/src/parInsertSml.c @@ -425,7 +425,7 @@ SQuery* smlInitHandle() { int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) { SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)(handle)->pRoot; // merge according to vgId - int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks); + int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks, true); if (code != TSDB_CODE_SUCCESS) { uError("insMergeTableDataCxt failed"); return code; diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 2b8516d37b..684314faef 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -55,6 +55,7 @@ typedef struct SInsertParseContext { bool usingDuplicateTable; bool forceUpdate; bool needTableTagVal; + bool needRequest; // whether or not request server } SInsertParseContext; typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param); @@ -652,6 +653,10 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStm } static int32_t buildCreateTbReq(SVnodeModifyOpStmt* pStmt, STag* pTag, SArray* pTagName) { + if (pStmt->pCreateTblReq) { + tdDestroySVCreateTbReq(pStmt->pCreateTblReq); + taosMemoryFreeClear(pStmt->pCreateTblReq); + } pStmt->pCreateTblReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); if (NULL == pStmt->pCreateTblReq) { return TSDB_CODE_OUT_OF_MEMORY; @@ -1992,7 +1997,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt (*pNumOfRows)++; } - if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) > tsMaxInsertBatchRows) { + if (TSDB_CODE_SUCCESS == code && (*pNumOfRows) >= tsMaxInsertBatchRows) { pStmt->fileProcessing = true; break; } @@ -2003,7 +2008,7 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt parserDebug("0x%" PRIx64 " %d rows have been parsed", pCxt->pComCxt->requestId, *pNumOfRows); - if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && + if (TSDB_CODE_SUCCESS == code && 0 == (*pNumOfRows) && 0 == pStmt->totalRowsNum && (!TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_STMT_INSERT)) && !pStmt->fileProcessing) { code = buildSyntaxErrMsg(&pCxt->msg, "no any data points", NULL); } @@ -2022,7 +2027,22 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt } else { parserDebug("0x%" PRIx64 " insert from csv. File is too large, do it in batches.", pCxt->pComCxt->requestId); } + if (pStmt->insertType != TSDB_QUERY_TYPE_FILE_INSERT) { + return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", NULL); + } } + + // just record pTableCxt whose data come from file + if (numOfRows > 0) { + if (NULL == pStmt->pTableCxtHashObj) { + pStmt->pTableCxtHashObj = + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + void* pData = rowsDataCxt.pTableDataCxt; + taosHashPut(pStmt->pTableCxtHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), &pData, + POINTER_BYTES); + } + return code; } @@ -2061,6 +2081,9 @@ static int32_t parseDataClause(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS NEXT_TOKEN(pStmt->pSql, token); switch (token.type) { case TK_VALUES: + if (TSDB_QUERY_HAS_TYPE(pStmt->insertType, TSDB_QUERY_TYPE_FILE_INSERT)) { + return buildSyntaxErrMsg(&pCxt->msg, "keyword VALUES or FILE is exclusive", token.z); + } return parseValuesClause(pCxt, pStmt, rowsDataCxt, &token); case TK_FILE: return parseFileClause(pCxt, pStmt, rowsDataCxt, &token); @@ -2275,8 +2298,25 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpSt return setStmtInfo(pCxt, pStmt); } + // release old array alloced by merge + pStmt->freeArrayFunc(pStmt->pVgDataBlocks); + pStmt->pVgDataBlocks = NULL; + + bool fileOnly = (pStmt->insertType == TSDB_QUERY_TYPE_FILE_INSERT); + if (fileOnly) { + // none data, skip merge & buildvgdata + if (0 == taosHashGetSize(pStmt->pTableCxtHashObj)) { + pCxt->needRequest = false; + return TSDB_CODE_SUCCESS; + } + } + // merge according to vgId - int32_t code = insMergeTableDataCxt(pStmt->pTableBlockHashObj, &pStmt->pVgDataBlocks); + int32_t code = insMergeTableDataCxt(fileOnly ? pStmt->pTableCxtHashObj : pStmt->pTableBlockHashObj, + &pStmt->pVgDataBlocks, pStmt->fileProcessing); + // clear tmp hashobj only + taosHashClear(pStmt->pTableCxtHashObj); + if (TSDB_CODE_SUCCESS == code) { code = insBuildVgDataBlocks(pStmt->pVgroupsHashObj, pStmt->pVgDataBlocks, &pStmt->pDataBlocks); } @@ -2718,6 +2758,7 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal .msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen}, .missCache = false, .usingDuplicateTable = false, + .needRequest = true, .forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)}; int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery); @@ -2732,5 +2773,10 @@ int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatal code = setRefreshMeta(*pQuery); } insDestroyBoundColInfo(&context.tags); + + // if no data to insert, set emptyMode to avoid request server + if (!context.needRequest) { + (*pQuery)->execMode = QUERY_EXEC_MODE_EMPTY_RESULT; + } return code; } diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 5137deca2e..a88aec20b3 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -58,7 +58,7 @@ int32_t qBuildStmtOutput(SQuery* pQuery, SHashObj* pVgHash, SHashObj* pBlockHash // merge according to vgId if (taosHashGetSize(pBlockHash) > 0) { - code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks); + code = insMergeTableDataCxt(pBlockHash, &pVgDataBlocks, true); } if (TSDB_CODE_SUCCESS == code) { code = insBuildVgDataBlocks(pVgHash, pVgDataBlocks, &pStmt->pDataBlocks); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 21b093c76c..a924ed68b0 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -289,6 +289,14 @@ static int32_t rebuildTableData(SSubmitTbData* pSrc, SSubmitTbData** pDst) { pTmp->uid = pSrc->uid; pTmp->sver = pSrc->sver; pTmp->pCreateTbReq = NULL; + if (pTmp->flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { + if (pSrc->pCreateTbReq) { + cloneSVreateTbReq(pSrc->pCreateTbReq, &pTmp->pCreateTbReq); + } else { + pTmp->flags &= ~SUBMIT_REQ_AUTO_CREATE_TABLE; + } + } + if (pTmp->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { pTmp->aCol = taosArrayInit(128, sizeof(SColData)); if (NULL == pTmp->aCol) { @@ -416,15 +424,21 @@ void insDestroyTableDataCxtHashMap(SHashObj* pTableCxtHash) { taosHashCleanup(pTableCxtHash); } -static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt) { +static int32_t fillVgroupDataCxt(STableDataCxt* pTableCxt, SVgroupDataCxt* pVgCxt, bool isRebuild) { if (NULL == pVgCxt->pData->aSubmitTbData) { pVgCxt->pData->aSubmitTbData = taosArrayInit(128, sizeof(SSubmitTbData)); if (NULL == pVgCxt->pData->aSubmitTbData) { return TSDB_CODE_OUT_OF_MEMORY; } } + + // push data to submit, rebuild empty data for next submit taosArrayPush(pVgCxt->pData->aSubmitTbData, pTableCxt->pData); - rebuildTableData(pTableCxt->pData, &pTableCxt->pData); + if (isRebuild) { + rebuildTableData(pTableCxt->pData, &pTableCxt->pData); + } else { + taosMemoryFreeClear(pTableCxt->pData); + } qDebug("add tableDataCxt uid:%" PRId64 " to vgId:%d", pTableCxt->pMeta->uid, pVgCxt->vgId); @@ -467,7 +481,7 @@ int insColDataComp(const void* lp, const void* rp) { return 0; } -int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { +int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks, bool isRebuild) { SHashObj* pVgroupHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); SArray* pVgroupList = taosArrayInit(8, POINTER_BYTES); if (NULL == pVgroupHash || NULL == pVgroupList) { @@ -502,6 +516,13 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { tColDataSortMerge(pTableCxt->pData->aCol); } else { + // skip the table has no data to insert + // eg: import a csv without valid data + // if (0 == taosArrayGetSize(pTableCxt->pData->aRowP)) { + // qWarn("no row in tableDataCxt uid:%" PRId64 " ", pTableCxt->pMeta->uid); + // p = taosHashIterate(pTableHash, p); + // continue; + // } if (!pTableCxt->ordered) { code = tRowSort(pTableCxt->pData->aRowP); } @@ -520,7 +541,7 @@ int32_t insMergeTableDataCxt(SHashObj* pTableHash, SArray** pVgDataBlocks) { pVgCxt = *(SVgroupDataCxt**)pp; } if (TSDB_CODE_SUCCESS == code) { - code = fillVgroupDataCxt(pTableCxt, pVgCxt); + code = fillVgroupDataCxt(pTableCxt, pVgCxt, isRebuild); } } if (TSDB_CODE_SUCCESS == code) { diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 7a47df97a9..d039495351 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -300,6 +300,7 @@ e ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionUS.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/precisionNS.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4219.py +,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/ts-4272.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_ts4295.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_td27388.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py diff --git a/tests/system-test/1-insert/ts-4272.py b/tests/system-test/1-insert/ts-4272.py new file mode 100644 index 0000000000..4e837d646d --- /dev/null +++ b/tests/system-test/1-insert/ts-4272.py @@ -0,0 +1,158 @@ + +import csv +from datetime import datetime + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + self.testcasePath = os.path.split(__file__)[0] + self.testcaseFilename = os.path.split(__file__)[-1] + self.ts = 1700638570000 # 2023-11-22T07:36:10.000Z + self.tb1 = 'd001' + self.tb2 = 'd002' + self.tag1 = 'using meters(groupId) tags(1)' + self.tag2 = 'using meters(groupId) tags(2)' + self.file1 = f"{self.testcasePath}/b.csv" + self.file2 = f"{self.testcasePath}/c.csv" + + os.system("rm -rf %s/b.csv" %self.testcasePath) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), logSql) + + def check_count(self, rows, records): + tdSql.execute(f"use d1;") + tdSql.query(f"select tbname,count(*) from meters group by tbname order by tbname;") + tdSql.checkRows(rows) + for i in range(rows): + tdSql.checkData(i, 1, records[i]) + + def reset_tb(self): + # create database and tables + # os.system("taos -s 'drop database if exists d1;'") + # os.system("taos -s 'create database d1;use d1;CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);'") + # os.system(f"taos -s 'use d1;Create table d2001 using meters(groupId) tags(5);'") + # res = os.system(f"taos -s 'use d1;Create table d2002 using meters(groupId) tags(6);'") + # if (0 != res): + # tdLog.exit(f"create tb error") + + tdSql.execute("drop database if exists d1;") + tdSql.execute("create database d1;") + tdSql.execute("use d1;") + tdSql.execute("CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);") + tdSql.execute("Create table d2001 using meters(groupId) tags(5);") + tdSql.execute("Create table d2002 using meters(groupId) tags(6);") + + def test(self, sql): + sql = "use d1;" + sql + res = os.system(f'taos -s "{sql}"') + # if (0 != res): + # tdLog.exit(f"taos sql error") + + + def check(self): + # same table, auto create + create + sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';" + self.test(sql) + + # same table, create + insert + sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} file '{self.file2}';" + self.test(sql) + + # same table, insert + create + sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';" + self.test(sql) + + # same table, insert + insert + sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb1} file '{self.file2}';" + self.test(sql) + + # diff table auto create + create + sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';" + self.test(sql) + + # diff table, create + insert + sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} file '{self.file2}';" + self.test(sql) + + # diff table, insert + create + sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';" + self.test(sql) + + # diff table, insert + insert + sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb2} file '{self.file2}';" + self.test(sql) + + # bigNum = 1010000 + # self.check_count(5, [2100, 2100, bigNum, bigNum, bigNum]) + + result = os.popen("taos -s 'select count(*) from d1.%s'" %self.tb1) + res = result.read() + if (f"OK" in res): + tdLog.info(f"check count success") + + def make_csv(self, filepath, once, qtime, startts): + f = open(filepath, 'w') + with f: + writer = csv.writer(f) + for j in range(qtime): + ts = startts + j*once + rows = [] + for i in range(once): + rows.append([ts + i, 0.3 + (i%10)/100.0, 210 + i%10, 10.0 + (i%20)/20.0]) + writer.writerows(rows) + f.close() + print(datetime.now(), filepath, " ready!") + + def test_mix(self): + #forbid use both value and file in one insert + result = os.popen(f"insert into {self.tb1} file '{self.testcasePath}/csv/2k.csv' {self.tb2} values('2021-07-13 14:06:34.630', 10.2, 219, 0.32);") + res = result.read() + if (f"error" in res): + tdLog.info(f"forbid success") + + def test_bigcsv(self): + # prepare csv + print("start csv data prepare") + once = 10000 + qtime1 = 101 + qtime2 = 100 + rowNum1 = qtime1 * once + rowNum2 = qtime2 * once + self.make_csv(self.file1, once, qtime1, self.ts - 86400000) + self.make_csv(self.file2, once, qtime2, self.ts) + print("end csv data prepare") + + # auto create + insert + sql = f"INSERT INTO d2001 using meters(groupId) tags(5) FILE '{self.file1}';" + self.test(sql) + + # only insert + sql = f"INSERT INTO d2002 FILE '{self.file2}';" + self.test(sql) + + #tdSql.execute(f"use d1;") + tdSql.query(f"select tbname,count(*) from meters group by tbname order by tbname;") + tdSql.checkRows(2) + tdSql.checkData(0, 1, rowNum1) + tdSql.checkData(1, 1, rowNum2) + + def run(self): + tdSql.prepare() + self.reset_tb() + self.test_bigcsv() + self.test_mix() + self.check() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From 28b73b415cb38ba37f6d04fc5548d9fd56675022 Mon Sep 17 00:00:00 2001 From: Bob Liu Date: Wed, 29 Nov 2023 02:44:55 +0800 Subject: [PATCH 2/5] fit insert childtable by file --- source/libs/parser/src/parInsertSql.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 684314faef..d58eeed7ef 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -2033,7 +2033,7 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt } // just record pTableCxt whose data come from file - if (numOfRows > 0) { + if (!pStmt->stbSyntax && numOfRows > 0) { if (NULL == pStmt->pTableCxtHashObj) { pStmt->pTableCxtHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -2302,7 +2302,7 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpSt pStmt->freeArrayFunc(pStmt->pVgDataBlocks); pStmt->pVgDataBlocks = NULL; - bool fileOnly = (pStmt->insertType == TSDB_QUERY_TYPE_FILE_INSERT); + bool fileOnly = (!pStmt->stbSyntax && pStmt->insertType == TSDB_QUERY_TYPE_FILE_INSERT); if (fileOnly) { // none data, skip merge & buildvgdata if (0 == taosHashGetSize(pStmt->pTableCxtHashObj)) { From 9d3ce107f278f1130054f81d5f64add6428bd1d2 Mon Sep 17 00:00:00 2001 From: Bob Liu Date: Wed, 29 Nov 2023 15:32:40 +0800 Subject: [PATCH 3/5] support stable insert child batch by file --- source/libs/parser/src/parInsertSql.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index d58eeed7ef..24cdf40d31 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1824,6 +1824,10 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt if (TSDB_CODE_SUCCESS == code) { insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow)); } + + void* pData = pTableDataCxt; + taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData, + POINTER_BYTES); } if (code == TSDB_CODE_SUCCESS) { @@ -2016,6 +2020,12 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt } static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SRowsDataContext rowsDataCxt) { + // init only for file + if (NULL == pStmt->pTableCxtHashObj) { + pStmt->pTableCxtHashObj = + taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + int32_t numOfRows = 0; int32_t code = parseCsvFile(pCxt, pStmt, rowsDataCxt, &numOfRows); if (TSDB_CODE_SUCCESS == code) { @@ -2034,10 +2044,6 @@ static int32_t parseDataFromFileImpl(SInsertParseContext* pCxt, SVnodeModifyOpSt // just record pTableCxt whose data come from file if (!pStmt->stbSyntax && numOfRows > 0) { - if (NULL == pStmt->pTableCxtHashObj) { - pStmt->pTableCxtHashObj = - taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - } void* pData = rowsDataCxt.pTableDataCxt; taosHashPut(pStmt->pTableCxtHashObj, &pStmt->pTableMeta->uid, sizeof(pStmt->pTableMeta->uid), &pData, POINTER_BYTES); @@ -2302,7 +2308,7 @@ static int32_t parseInsertBodyBottom(SInsertParseContext* pCxt, SVnodeModifyOpSt pStmt->freeArrayFunc(pStmt->pVgDataBlocks); pStmt->pVgDataBlocks = NULL; - bool fileOnly = (!pStmt->stbSyntax && pStmt->insertType == TSDB_QUERY_TYPE_FILE_INSERT); + bool fileOnly = (pStmt->insertType == TSDB_QUERY_TYPE_FILE_INSERT); if (fileOnly) { // none data, skip merge & buildvgdata if (0 == taosHashGetSize(pStmt->pTableCxtHashObj)) { From 3069069e0f1e4dcf08aaf28e6e8686c5ac9fadb2 Mon Sep 17 00:00:00 2001 From: Bob Liu Date: Wed, 29 Nov 2023 22:47:31 +0800 Subject: [PATCH 4/5] adjust parseOneStbRow; add case for stable --- source/libs/parser/src/parInsertSql.c | 26 ++++--- tests/system-test/1-insert/ts-4272.py | 107 ++++++++++++++++++-------- 2 files changed, 92 insertions(+), 41 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 24cdf40d31..41d4a70675 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1584,7 +1584,7 @@ typedef struct SStbRowsDataContext { bool isJsonTag; } SStbRowsDataContext; -typedef union SRowsDataContext{ +typedef struct SRowsDataContext{ STableDataCxt* pTableDataCxt; SStbRowsDataContext* pStbRowsCxt; } SRowsDataContext; @@ -1803,8 +1803,9 @@ static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) { } static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, - SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) { + SRowsDataContext* pRowsDataCxt, bool* pGotRow, SToken* pToken) { bool bFirstTable = false; + SStbRowsDataContext* pStbRowsCxt = pRowsDataCxt->pStbRowsCxt; int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable); if (code != TSDB_CODE_SUCCESS || !*pGotRow) { return code; @@ -1814,20 +1815,16 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt); } - STableDataCxt* pTableDataCxt = NULL; code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), - pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pTableDataCxt, false, true); - initTableColSubmitData(pTableDataCxt); + pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pRowsDataCxt->pTableDataCxt, false, true); + initTableColSubmitData(pRowsDataCxt->pTableDataCxt); if (code == TSDB_CODE_SUCCESS) { + STableDataCxt* pTableDataCxt = pRowsDataCxt->pTableDataCxt; SRow** pRow = taosArrayReserve(pTableDataCxt->pData->aRowP, 1); code = tRowBuild(pStbRowsCxt->aColVals, pTableDataCxt->pSchema, pRow); if (TSDB_CODE_SUCCESS == code) { insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow)); } - - void* pData = pTableDataCxt; - taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData, - POINTER_BYTES); } if (code == TSDB_CODE_SUCCESS) { @@ -1924,7 +1921,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, if (!pStmt->stbSyntax) { code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken); } else { - code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken); + code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, &rowsDataCxt, &gotRow, pToken); } } @@ -1988,7 +1985,14 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt if (!pStmt->stbSyntax) { code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token); } else { - code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token); + rowsDataCxt.pTableDataCxt = NULL; + code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, &rowsDataCxt, &gotRow, &token); + if (code == TSDB_CODE_SUCCESS) { + SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt; + void* pData = rowsDataCxt.pTableDataCxt; + taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData, + POINTER_BYTES); + } } if (code && firstLine) { firstLine = false; diff --git a/tests/system-test/1-insert/ts-4272.py b/tests/system-test/1-insert/ts-4272.py index 4e837d646d..bb81305eb3 100644 --- a/tests/system-test/1-insert/ts-4272.py +++ b/tests/system-test/1-insert/ts-4272.py @@ -13,22 +13,26 @@ class TDTestCase: def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) self.testcasePath = os.path.split(__file__)[0] - self.testcaseFilename = os.path.split(__file__)[-1] + self.testcasefilename = os.path.split(__file__)[-1] self.ts = 1700638570000 # 2023-11-22T07:36:10.000Z + self.db = 'db1' self.tb1 = 'd001' self.tb2 = 'd002' - self.tag1 = 'using meters(groupId) tags(1)' - self.tag2 = 'using meters(groupId) tags(2)' + self.stable0 = "meters" + self.stable1 = "stb_1" + self.stable2 = "stb_null" + self.tag1 = f'using {self.stable0}(groupId) tags(1)' + self.tag2 = f'using {self.stable0}(groupId) tags(2)' self.file1 = f"{self.testcasePath}/b.csv" self.file2 = f"{self.testcasePath}/c.csv" - os.system("rm -rf %s/b.csv" %self.testcasePath) + #os.system("rm -rf %s/b.csv" %self.testcasePath) tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor(), logSql) def check_count(self, rows, records): - tdSql.execute(f"use d1;") - tdSql.query(f"select tbname,count(*) from meters group by tbname order by tbname;") + tdSql.execute(f"use {self.db};") + tdSql.query(f"select tbname,count(*) from {self.stable0} group by tbname order by tbname;") tdSql.checkRows(rows) for i in range(rows): tdSql.checkData(i, 1, records[i]) @@ -36,21 +40,23 @@ class TDTestCase: def reset_tb(self): # create database and tables # os.system("taos -s 'drop database if exists d1;'") - # os.system("taos -s 'create database d1;use d1;CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);'") - # os.system(f"taos -s 'use d1;Create table d2001 using meters(groupId) tags(5);'") - # res = os.system(f"taos -s 'use d1;Create table d2002 using meters(groupId) tags(6);'") + # os.system("taos -s 'create database d1;use d1;create stable meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);'") + # os.system(f"taos -s 'use d1;create table d2001 using meters(groupId) tags(5);'") + # res = os.system(f"taos -s 'use d1;create table d2002 using meters(groupId) tags(6);'") # if (0 != res): # tdLog.exit(f"create tb error") - tdSql.execute("drop database if exists d1;") - tdSql.execute("create database d1;") - tdSql.execute("use d1;") - tdSql.execute("CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);") - tdSql.execute("Create table d2001 using meters(groupId) tags(5);") - tdSql.execute("Create table d2002 using meters(groupId) tags(6);") + tdSql.execute(f"drop database if exists {self.db};") + tdSql.execute(f"create database {self.db};") + tdSql.execute(f"use {self.db};") + tdSql.execute(f"create stable {self.stable0} (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);") + tdSql.execute(f"create table {self.tb1} {self.tag1};") + tdSql.execute(f"create table {self.tb2} {self.tag2};") + tdSql.execute(f"create stable {self.stable1} (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);") + tdSql.execute(f"create stable {self.stable2} (ts timestamp , q_int int , q_bigint bigint , q_smallint smallint , q_tinyint tinyint , q_float float , q_double double , q_bool bool , q_binary binary(100) , q_nchar nchar(100) , q_ts timestamp , q_int_null int , q_bigint_null bigint , q_smallint_null smallint , q_tinyint_null tinyint, q_float_null float , q_double_null double , q_bool_null bool , q_binary_null binary(20) , q_nchar_null nchar(20) , q_ts_null timestamp) tags(loc nchar(100) , t_int int , t_bigint bigint , t_smallint smallint , t_tinyint tinyint, t_bool bool , t_binary binary(100) , t_nchar nchar(100) ,t_float float , t_double double , t_ts timestamp);") def test(self, sql): - sql = "use d1;" + sql + sql = f"use {self.db};" + sql res = os.system(f'taos -s "{sql}"') # if (0 != res): # tdLog.exit(f"taos sql error") @@ -58,41 +64,41 @@ class TDTestCase: def check(self): # same table, auto create + create - sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';" + sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';" self.test(sql) # same table, create + insert - sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} file '{self.file2}';" + sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb1} file '{self.file2}';" self.test(sql) # same table, insert + create - sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';" + sql = f"insert into {self.tb1} file '{self.file1}' {self.tb1} {self.tag1} file '{self.file2}';" self.test(sql) # same table, insert + insert - sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb1} file '{self.file2}';" + sql = f"insert into {self.tb1} file '{self.file1}' {self.tb1} file '{self.file2}';" self.test(sql) # diff table auto create + create - sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';" + sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';" self.test(sql) # diff table, create + insert - sql = f"INSERT INTO {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} file '{self.file2}';" + sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}' {self.tb2} file '{self.file2}';" self.test(sql) # diff table, insert + create - sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';" + sql = f"insert into {self.tb1} file '{self.file1}' {self.tb2} {self.tag2} file '{self.file2}';" self.test(sql) # diff table, insert + insert - sql = f"INSERT INTO {self.tb1} file '{self.file1}' {self.tb2} file '{self.file2}';" + sql = f"insert into {self.tb1} file '{self.file1}' {self.tb2} file '{self.file2}';" self.test(sql) # bigNum = 1010000 # self.check_count(5, [2100, 2100, bigNum, bigNum, bigNum]) - result = os.popen("taos -s 'select count(*) from d1.%s'" %self.tb1) + result = os.popen("taos -s 'select count(*) from %s.%s'" %(self.db, self.tb1)) res = result.read() if (f"OK" in res): tdLog.info(f"check count success") @@ -112,7 +118,7 @@ class TDTestCase: def test_mix(self): #forbid use both value and file in one insert - result = os.popen(f"insert into {self.tb1} file '{self.testcasePath}/csv/2k.csv' {self.tb2} values('2021-07-13 14:06:34.630', 10.2, 219, 0.32);") + result = os.popen(f"insert into {self.tb1} file '{self.file2}' {self.tb2} values('2021-07-13 14:06:34.630', 10.2, 219, 0.32);") res = result.read() if (f"error" in res): tdLog.info(f"forbid success") @@ -130,28 +136,69 @@ class TDTestCase: print("end csv data prepare") # auto create + insert - sql = f"INSERT INTO d2001 using meters(groupId) tags(5) FILE '{self.file1}';" + sql = f"insert into {self.tb1} {self.tag1} file '{self.file1}';" self.test(sql) # only insert - sql = f"INSERT INTO d2002 FILE '{self.file2}';" + sql = f"insert into {self.tb2} file '{self.file2}';" self.test(sql) + print("end insert to table") #tdSql.execute(f"use d1;") - tdSql.query(f"select tbname,count(*) from meters group by tbname order by tbname;") + tdSql.query(f"select tbname,count(*) from {self.stable0} group by tbname order by tbname;") tdSql.checkRows(2) tdSql.checkData(0, 1, rowNum1) tdSql.checkData(1, 1, rowNum2) + print("check insert file to table success") + + def make_stable_csv(self, filepath, once, qtime, startts, table_name): + f = open(filepath, 'w') + with f: + writer = csv.writer(f) + for j in range(qtime): + offset = j*once + ts = startts + offset + rows = [] + for i in range(once): + rows.append([table_name, ts + i, offset + i, 'NULL']) + writer.writerows(rows) + f.close() + print(datetime.now(), filepath, " ready!") + + def test_stable_csv(self): + # prepare csv + print("start stable_csv data prepare") + once = 10000 + qtime1 = 101 + qtime2 = 100 + # rowNum1 = qtime1 * once + # rowNum2 = qtime2 * once + child_1 = f"{self.stable1}_1" + child_2 = f"{self.stable2}_1" + self.make_stable_csv(self.file1, once, qtime1, self.ts - 86400000, child_1) + self.make_stable_csv(self.file2, once, qtime2, self.ts, child_2) + print("end stable_csv data prepare") + + # insert create child table of stable + sql = f"insert into {self.db}.{self.stable1}(tbname,ts,q_int,q_binary) file '{self.file1}' {self.db}.{self.stable2}(tbname,ts,q_int,q_binary) file '{self.file2}';" + self.test(sql) + print("end insert to stable") + + #tdSql.execute(f"insert into {self.db}.{child_1}(ts, q_int) values(now, 1);") + tdSql.query(f"select tbname,count(*) from {self.stable1} group by tbname order by tbname;") + tdSql.checkRows(0) + print("check stable success") def run(self): tdSql.prepare() self.reset_tb() + self.test_stable_csv() self.test_bigcsv() self.test_mix() self.check() + tdSql.close() def stop(self): - tdSql.close() tdLog.success(f"{__file__} successfully executed") tdCases.addLinux(__file__, TDTestCase()) From 35ae8a4c0d8f9df9dfe9b1581ee53f550a4edc10 Mon Sep 17 00:00:00 2001 From: Bob Liu Date: Thu, 30 Nov 2023 00:25:05 +0800 Subject: [PATCH 5/5] adjust parseOneStbRow --- source/libs/parser/src/parInsertSql.c | 30 +++++++++++++-------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 41d4a70675..31b016458a 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -1584,7 +1584,7 @@ typedef struct SStbRowsDataContext { bool isJsonTag; } SStbRowsDataContext; -typedef struct SRowsDataContext{ +typedef union SRowsDataContext{ STableDataCxt* pTableDataCxt; SStbRowsDataContext* pStbRowsCxt; } SRowsDataContext; @@ -1802,10 +1802,10 @@ static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) { taosMemoryFreeClear(pStbRowsCxt->pCreateCtbReq); } -static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, - SRowsDataContext* pRowsDataCxt, bool* pGotRow, SToken* pToken) { - bool bFirstTable = false; - SStbRowsDataContext* pStbRowsCxt = pRowsDataCxt->pStbRowsCxt; +static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, + SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken, + STableDataCxt** ppTableDataCxt) { + bool bFirstTable = false; int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable); if (code != TSDB_CODE_SUCCESS || !*pGotRow) { return code; @@ -1816,14 +1816,13 @@ static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pSt } code = insGetTableDataCxt(pStmt->pTableBlockHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), - pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, &pRowsDataCxt->pTableDataCxt, false, true); - initTableColSubmitData(pRowsDataCxt->pTableDataCxt); + pStbRowsCxt->pCtbMeta, &pStbRowsCxt->pCreateCtbReq, ppTableDataCxt, false, true); + initTableColSubmitData(*ppTableDataCxt); if (code == TSDB_CODE_SUCCESS) { - STableDataCxt* pTableDataCxt = pRowsDataCxt->pTableDataCxt; - SRow** pRow = taosArrayReserve(pTableDataCxt->pData->aRowP, 1); - code = tRowBuild(pStbRowsCxt->aColVals, pTableDataCxt->pSchema, pRow); + SRow** pRow = taosArrayReserve((*ppTableDataCxt)->pData->aRowP, 1); + code = tRowBuild(pStbRowsCxt->aColVals, (*ppTableDataCxt)->pSchema, pRow); if (TSDB_CODE_SUCCESS == code) { - insCheckTableDataOrder(pTableDataCxt, TD_ROW_KEY(*pRow)); + insCheckTableDataOrder(*ppTableDataCxt, TD_ROW_KEY(*pRow)); } } @@ -1921,7 +1920,8 @@ static int32_t parseValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, if (!pStmt->stbSyntax) { code = parseOneRow(pCxt, &pStmt->pSql, rowsDataCxt.pTableDataCxt, &gotRow, pToken); } else { - code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, &rowsDataCxt, &gotRow, pToken); + STableDataCxt* pTableDataCxt = NULL; + code = parseOneStbRow(pCxt, pStmt, &pStmt->pSql, rowsDataCxt.pStbRowsCxt, &gotRow, pToken, &pTableDataCxt); } } @@ -1985,11 +1985,11 @@ static int32_t parseCsvFile(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt if (!pStmt->stbSyntax) { code = parseOneRow(pCxt, (const char**)&pRow, rowsDataCxt.pTableDataCxt, &gotRow, &token); } else { - rowsDataCxt.pTableDataCxt = NULL; - code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, &rowsDataCxt, &gotRow, &token); + STableDataCxt* pTableDataCxt = NULL; + code = parseOneStbRow(pCxt, pStmt, (const char**)&pRow, rowsDataCxt.pStbRowsCxt, &gotRow, &token, &pTableDataCxt); if (code == TSDB_CODE_SUCCESS) { SStbRowsDataContext* pStbRowsCxt = rowsDataCxt.pStbRowsCxt; - void* pData = rowsDataCxt.pTableDataCxt; + void* pData = pTableDataCxt; taosHashPut(pStmt->pTableCxtHashObj, &pStbRowsCxt->pCtbMeta->uid, sizeof(pStbRowsCxt->pCtbMeta->uid), &pData, POINTER_BYTES); }