From fc84b5c95572c2b4ad28c5b879d033c9179b5ba1 Mon Sep 17 00:00:00 2001 From: Shengliang Date: Wed, 11 May 2022 11:37:13 +0800 Subject: [PATCH 01/16] fix: alter stb --- source/dnode/mnode/impl/src/mndStb.c | 13 +++++++++---- tests/script/tsim/stable/alter1.sim | 1 + 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index ca0ae111a3..12e89277f4 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -318,6 +318,7 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { pOld->updateTime = pNew->updateTime; pOld->version = pNew->version; pOld->nextColId = pNew->nextColId; + pOld->ttl = pNew->ttl; pOld->numOfColumns = pNew->numOfColumns; pOld->numOfTags = pNew->numOfTags; memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema)); @@ -832,7 +833,7 @@ static int32_t mndProcessVCreateStbRsp(SNodeMsg *pRsp) { } static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) { - if (pAlter->commentLen != 0) return 0; + if (pAlter->commentLen != 0 || pAlter->ttl != 0) return 0; if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) { terrno = TSDB_CODE_MND_INVALID_STB_OPTION; @@ -883,7 +884,8 @@ static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) { return 0; } -static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen) { +static int32_t mndUpdateStbCommentAndTTL(const SStbObj *pOld, SStbObj *pNew, char *pComment, int32_t commentLen, + int32_t ttl) { if (commentLen > 0) { pNew->commentLen = commentLen; pNew->comment = taosMemoryCalloc(1, commentLen); @@ -893,6 +895,9 @@ static int32_t mndUpdateStbComment(const SStbObj *pOld, SStbObj *pNew, char *pCo } memcpy(pNew->comment, pComment, commentLen); } + if (ttl >= 0) { + pNew->ttl = ttl; + } if (mndAllocStbSchemas(pOld, pNew) != 0) { return -1; @@ -1232,7 +1237,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SNodeMsg *pReq, const SMAlterStbReq * code = mndAlterStbColumnBytes(pOld, &stbObj, pField0); break; case TSDB_ALTER_TABLE_UPDATE_OPTIONS: - code = mndUpdateStbComment(pOld, &stbObj, pAlter->comment, pAlter->commentLen); + code = mndUpdateStbCommentAndTTL(pOld, &stbObj, pAlter->comment, pAlter->commentLen, pAlter->ttl); break; default: terrno = TSDB_CODE_OPS_NOT_SUPPORT; @@ -1723,7 +1728,7 @@ static int32_t mndRetrieveStb(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlo pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pStb->updateTime, false); // number of tables - char *p = taosMemoryMalloc(pStb->commentLen + VARSTR_HEADER_SIZE); // check malloc failures + char *p = taosMemoryCalloc(1, pStb->commentLen + 1 + VARSTR_HEADER_SIZE); // check malloc failures if (p != NULL) { if (pStb->commentLen != 0) { STR_TO_VARSTR(p, pStb->comment); diff --git a/tests/script/tsim/stable/alter1.sim b/tests/script/tsim/stable/alter1.sim index 5cee10756c..1205f50f6e 100644 --- a/tests/script/tsim/stable/alter1.sim +++ b/tests/script/tsim/stable/alter1.sim @@ -159,6 +159,7 @@ sql alter table db.stb rename tag t1 tx print ========== alter common sql alter table db.stb comment 'abcde' ; +sql alter table db.stb ttl 10 ; sql show db.stables; if $data[0][6] != abcde then From a24318d4a62c890a86bbcfebd52399c786348ad1 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 11 May 2022 21:56:16 +0800 Subject: [PATCH 02/16] Merge branch '3.0' of github.com:taosdata/TDengine into test/chr/TD-14699 --- tests/system-test/1-insert/insertWithMoreVgroup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/system-test/1-insert/insertWithMoreVgroup.py b/tests/system-test/1-insert/insertWithMoreVgroup.py index d8050c53c5..c9acc93c59 100644 --- a/tests/system-test/1-insert/insertWithMoreVgroup.py +++ b/tests/system-test/1-insert/insertWithMoreVgroup.py @@ -346,8 +346,10 @@ class TDTestCase: return def test_case3(self): + self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000) + self.taosBenchCreate("test209","no","db1", "stb1", 1, 8, 1*10000) + # self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000) - self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*1000) # self.taosBenchCreate("db1", "stb1", 4, 5, 100*10000) # self.taosBenchCreate("db1", "stb1", 1, 5, 100*10000) From 4765cb9e9a1254366d4bb9afe06703d45475039b Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 12 May 2022 09:42:53 +0800 Subject: [PATCH 03/16] test:modify testcase that using taosBenchmark to test multi-process table building and data inserting --- tests/system-test/1-insert/insertWithMoreVgroup.py | 2 +- tests/system-test/1-insert/manyVgroups.json | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/system-test/1-insert/insertWithMoreVgroup.py b/tests/system-test/1-insert/insertWithMoreVgroup.py index c9acc93c59..d3da4f2c59 100644 --- a/tests/system-test/1-insert/insertWithMoreVgroup.py +++ b/tests/system-test/1-insert/insertWithMoreVgroup.py @@ -347,7 +347,7 @@ class TDTestCase: def test_case3(self): self.taosBenchCreate("127.0.0.1","no","db1", "stb1", 1, 8, 1*10000) - self.taosBenchCreate("test209","no","db1", "stb1", 1, 8, 1*10000) + # self.taosBenchCreate("test209","no","db2", "stb2", 1, 8, 1*10000) # self.taosBenchCreate("chenhaoran02","no","db1", "stb1", 1, 8, 1*10000) diff --git a/tests/system-test/1-insert/manyVgroups.json b/tests/system-test/1-insert/manyVgroups.json index 5487dff708..e6719aedc9 100644 --- a/tests/system-test/1-insert/manyVgroups.json +++ b/tests/system-test/1-insert/manyVgroups.json @@ -29,8 +29,8 @@ "batch_create_tbl_num": 50000, "data_source": "rand", "insert_mode": "taosc", - "insert_rows": 0, - "interlace_rows": 0, + "insert_rows": 10, + "interlace_rows": 100000, "insert_interval": 0, "max_sql_len": 10000000, "disorder_ratio": 0, From d37d4e5b5eda68ae568c919c57a38dba7c4a2180 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 12 May 2022 09:57:43 +0800 Subject: [PATCH 04/16] error processing of executor when calling aggregate function --- source/libs/executor/inc/executorimpl.h | 4 +- source/libs/executor/src/executorimpl.c | 39 +++++++++++++------ source/libs/executor/src/groupoperator.c | 8 ++-- source/libs/executor/src/timewindowoperator.c | 26 ++++++------- 4 files changed, 47 insertions(+), 30 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 93e81aa70e..4881f23134 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -616,10 +616,10 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); -void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf); +void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf); void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 33f0c440ec..9c285856d3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -155,7 +155,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, void operatorDummyCloseFn(void* param, int32_t numOfCols) {} -static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, +static int32_t doCopyToSDataBlock(SExecTaskInfo *taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx); @@ -579,7 +579,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); } -void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, +void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { pCtx[k].startTs = pWin->skey; @@ -618,9 +618,14 @@ void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pEntryInfo->numOfRes = 1; continue; } - + int32_t code = TSDB_CODE_SUCCESS; if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { - pCtx[k].fpSet.process(&pCtx[k]); + code = pCtx[k].fpSet.process(&pCtx[k]); + if (code != TSDB_CODE_SUCCESS) { + qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); + taskInfo->code = code; + longjmp(taskInfo->env, code); + } } // restore it @@ -806,7 +811,13 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction // this can be set during create the struct // todo add a dummy funtion to avoid process check if (pCtx[k].fpSet.process != NULL) { - pCtx[k].fpSet.process(&pCtx[k]); + int32_t code = pCtx[k].fpSet.process(&pCtx[k]); + if (code != TSDB_CODE_SUCCESS) { + qError("%s call aggregate function error happens, code : %s", + GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); + pOperator->pTaskInfo->code = code; + longjmp(pOperator->pTaskInfo->env, code); + } } } } @@ -2176,7 +2187,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p * @param pQInfo * @param result */ -int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, +int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfResult = pBlock->info.rows; // there are already exists result rows @@ -2215,8 +2226,14 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased int32_t slotId = pExprInfo[j].base.resSchema.slotId; pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); - if (pCtx[j].fpSet.process) { - pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (pCtx[j].fpSet.finalize) { + int32_t code = TSDB_CODE_SUCCESS; + code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (code != TSDB_CODE_SUCCESS) { + qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code)); + taskInfo->code = code; + longjmp(taskInfo->env, code); + } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor } else { @@ -2243,7 +2260,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased return 0; } -void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, +void doBuildResultDatablock(SExecTaskInfo *taskInfo, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); @@ -2257,7 +2274,7 @@ void doBuildResultDatablock(SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo } int32_t orderType = TSDB_ORDER_ASC; - doCopyToSDataBlock(pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx); + doCopyToSDataBlock(taskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx); // add condition (pBlock->info.rows >= 1) just to runtime happy blockDataUpdateTsWindow(pBlock); @@ -3749,7 +3766,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { } blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, pInfo, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index e3a507bf7c..5d22c13ec6 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -234,7 +234,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); @@ -252,7 +252,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); + doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->numOfExprs, TSDB_ORDER_ASC); doAssignGroupKeys(pCtx, pOperator->numOfExprs, pBlock->info.rows, rowIndex); } } @@ -268,7 +268,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -317,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false); while(1) { - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pRes); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 738f4821bd..0f3b1bda20 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -703,7 +703,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe pInfo->order, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); STimeWindow nextWin = win; @@ -740,7 +740,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe pInfo->order, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -855,7 +855,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window @@ -874,7 +874,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -888,7 +888,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -921,7 +921,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -948,7 +948,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { } blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -998,7 +998,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -1035,7 +1035,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator->pTaskInfo, &pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); // TODO: remove for stream /*ASSERT(pInfo->binfo.pRes->info.rows > 0);*/ @@ -1233,7 +1233,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // pInfo->numOfRows data belong to the current session window updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); // here we start a new session window @@ -1252,7 +1252,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -1265,7 +1265,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { SOptrBasicInfo* pBInfo = &pInfo->binfo; if (pOperator->status == OP_RES_TO_RETURN) { - doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; @@ -1298,7 +1298,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, true); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); - doBuildResultDatablock(pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); + doBuildResultDatablock(pOperator->pTaskInfo, pBInfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf); if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } From 85ee4f7df60f90d7154817d253368e22ee88b533 Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 12 May 2022 10:43:54 +0800 Subject: [PATCH 05/16] fix finalize function call return value >=0 not error, <0 error --- source/libs/executor/src/executorimpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9c285856d3..a5bc1fdf58 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2229,7 +2229,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn if (pCtx[j].fpSet.finalize) { int32_t code = TSDB_CODE_SUCCESS; code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); - if (code != TSDB_CODE_SUCCESS) { + if (TAOS_FAILED(code)) { qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code)); taskInfo->code = code; longjmp(taskInfo->env, code); From 7ab2e76c3535b12f7f7e75b83f3fe8e4c2ed45dd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 12 May 2022 11:31:56 +0800 Subject: [PATCH 06/16] feat(tmq): support get tb name --- example/src/tmq.c | 9 +++++++-- include/client/taos.h | 5 +---- include/common/tmsg.h | 10 ++++++++++ source/client/src/tmq.c | 13 +++++++++++++ source/dnode/mnode/impl/src/mndTopic.c | 4 ++-- source/dnode/vnode/src/inc/meta.h | 10 +++++----- source/dnode/vnode/src/tq/tq.c | 26 ++++++++++++++++++++++++++ 7 files changed, 64 insertions(+), 13 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 976d658fa6..2ee91c254c 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) { int32_t numOfFields = taos_field_count(msg); taos_print_row(buf, row, fields, numOfFields); printf("%s\n", buf); + + const char* tbName = tmq_get_table_name(msg); + if (tbName) { + printf("from tb: %s\n", tbName); + } } } @@ -101,8 +106,8 @@ int32_t create_topic() { } taos_free_result(pRes); - pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); - /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ + pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/client/taos.h b/include/client/taos.h index 26d4d18234..486d5f5fef 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); -// TODO -#if 0 -DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res); -#endif +DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res); #if 0 DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b50367af03..d34892a278 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2480,6 +2480,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i); tlen += taosEncodeSSchemaWrapper(buf, pSW); } + if (pRsp->withTbName) { + char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i); + tlen += taosEncodeString(buf, tbName); + } } } return tlen; @@ -2492,6 +2496,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeFixedI32(buf, &pRsp->blockNum); pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*)); + pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); if (pRsp->blockNum != 0) { buf = taosDecodeFixedI8(buf, &pRsp->withTbName); @@ -2510,6 +2515,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p buf = taosDecodeSSchemaWrapper(buf, pSW); taosArrayPush(pRsp->blockSchema, &pSW); } + if (pRsp->withTbName) { + char* name = NULL; + buf = taosDecodeString(buf, &name); + taosArrayPush(pRsp->blockTbName, &name); + } } } return (void*)buf; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 0ce689f19c..b42f072e54 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -1346,3 +1346,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { return -1; } } + +const char* tmq_get_table_name(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*)res; + if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || + pRspObj->resIter >= pRspObj->rsp.blockNum) { + return NULL; + } + const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + return name; + } + return NULL; +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 41d4d5f406..00379ecda1 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.subType = TOPIC_SUB_TYPE__TABLE; - topicObj.withTbName = 0; - topicObj.withSchema = 0; + topicObj.withTbName = pCreate->withTbName; + topicObj.withSchema = pCreate->withSchema; SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 96feee3d7d..6dd9ac0fed 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -111,10 +111,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur); // SMetaDB int metaOpenDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta); -int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); -int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); -int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); -int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); +// int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); +int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); +int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); +int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); #endif #endif @@ -123,4 +123,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); } #endif -#endif /*_TD_VNODE_META_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_META_H_*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4b9551b250..29fabc0f9f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -427,9 +427,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; rsp.withSchema = pExec->withSchema; + rsp.withTbName = pExec->withTbName; + rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockSchema = taosArrayInit(0, sizeof(void*)); + rsp.blockTbName = taosArrayInit(0, sizeof(void*)); while (1) { consumerEpoch = atomic_load_32(&pExec->epoch); @@ -535,6 +538,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayPush(rsp.blockSchema, &pSW); } + if (pExec->withTbName) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; + if (metaGetTableEntryByUid(&mr, uid) < 0) { + ASSERT(0); + } + char* tbName = strdup(mr.me.name); + taosArrayPush(rsp.blockTbName, &tbName); + metaReaderClear(&mr); + } + rsp.blockNum++; } // db subscribe @@ -563,6 +578,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ASSERT(actualLen <= dataStrLen); taosArrayPush(rsp.blockDataLen, &actualLen); taosArrayPush(rsp.blockData, &buf); + if (pExec->withTbName) { + SMetaReader mr = {0}; + metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) { + ASSERT(0); + } + char* tbName = strdup(mr.me.name); + taosArrayPush(rsp.blockTbName, &tbName); + metaReaderClear(&mr); + } SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); taosArrayPush(rsp.blockSchema, &pSW); @@ -614,6 +639,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockDataLen); taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); + taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree); return 0; } From 72f9a1c4a4cff5ce41bf43316a2f47bb206cc494 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 12 May 2022 03:34:38 +0000 Subject: [PATCH 07/16] fix --- source/dnode/vnode/src/meta/metaQuery.c | 10 +++++++++- source/libs/tdb/src/db/tdbPCache.c | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 8d2a4ebcf3..7d0a87d79d 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -158,7 +158,9 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo skmDbKey.sver = sver; pKey = &skmDbKey; kLen = sizeof(skmDbKey); + metaRLock(pMeta); ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen); + metaULock(pMeta); if (ret < 0) { return NULL; } @@ -181,6 +183,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo } struct SMCtbCursor { + SMeta *pMeta; TDBC *pCur; tb_uid_t suid; void *pKey; @@ -200,9 +203,13 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { return NULL; } + pCtbCur->pMeta = pMeta; pCtbCur->suid = uid; + metaRLock(pMeta); + ret = tdbDbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL); if (ret < 0) { + metaULock(pMeta); taosMemoryFree(pCtbCur); return NULL; } @@ -220,6 +227,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) { if (pCtbCur) { + if (pCtbCur->pMeta) metaULock(pCtbCur->pMeta); if (pCtbCur->pCur) { tdbDbcClose(pCtbCur->pCur); @@ -269,7 +277,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { pSW = metaGetTableSchema(pMeta, quid, sver, 0); if (!pSW) return NULL; - + tdInitTSchemaBuilder(&sb, 0); for (int i = 0; i < pSW->nCols; i++) { pSchema = pSW->pSchema + i; diff --git a/source/libs/tdb/src/db/tdbPCache.c b/source/libs/tdb/src/db/tdbPCache.c index aa05687426..8574e071f2 100644 --- a/source/libs/tdb/src/db/tdbPCache.c +++ b/source/libs/tdb/src/db/tdbPCache.c @@ -242,7 +242,7 @@ static void tdbPCacheRemovePageFromHash(SPCache *pCache, SPage *pPage) { int h; h = tdbPCachePageHash(&(pPage->pgid)); - for (ppPage = &(pCache->pgHash[h % pCache->nHash]); *ppPage != pPage; ppPage = &((*ppPage)->pHashNext)) + for (ppPage = &(pCache->pgHash[h % pCache->nHash]); (*ppPage) && *ppPage != pPage; ppPage = &((*ppPage)->pHashNext)) ; ASSERT(*ppPage == pPage); *ppPage = pPage->pHashNext; From 51094f3434549cdebf0fbfec3a1c2e9c0a7a814f Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 12 May 2022 11:53:47 +0800 Subject: [PATCH 08/16] fix: table would be null when destory commit handle --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 031d200a66..07a68d780a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -485,8 +485,10 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { for (int i = 1; i < pCommith->niters; i++) { tSkipListDestroyIter(pCommith->iters[i].pIter); - tdFreeSchema(pCommith->iters[i].pTable->pSchema); - taosMemoryFree(pCommith->iters[i].pTable); + if (pCommith->iters[i].pTable) { + tdFreeSchema(pCommith->iters[i].pTable->pSchema); + taosMemoryFreeClear(pCommith->iters[i].pTable); + } } taosMemoryFree(pCommith->iters); From 67ae3c2efa8c254b8877a1b12e57a824072c0b9c Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 12 May 2022 11:55:58 +0800 Subject: [PATCH 09/16] fix: table would be null when destroy commit handle --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 031d200a66..07a68d780a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -485,8 +485,10 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { for (int i = 1; i < pCommith->niters; i++) { tSkipListDestroyIter(pCommith->iters[i].pIter); - tdFreeSchema(pCommith->iters[i].pTable->pSchema); - taosMemoryFree(pCommith->iters[i].pTable); + if (pCommith->iters[i].pTable) { + tdFreeSchema(pCommith->iters[i].pTable->pSchema); + taosMemoryFreeClear(pCommith->iters[i].pTable); + } } taosMemoryFree(pCommith->iters); From 8b9a2c20f31224c7b9602e7910ebdafd18056a1b Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 12 May 2022 12:05:58 +0800 Subject: [PATCH 10/16] enh(sync): add error log, linux api error, %X --- source/dnode/vnode/src/vnd/vnodeSync.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 1260f9a3e7..f0f5338c4d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -72,6 +72,7 @@ int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t ret = 0; SMsgCb *pMsgCb = rpcHandle; if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) { + pMsg->noResp = 1; tmsgSendReq(rpcHandle, pEpSet, pMsg); } else { vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE); From 77f4c3b5ab6ca568fc716bf3cb4fb9af048feebc Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Thu, 12 May 2022 13:30:05 +0800 Subject: [PATCH 11/16] fix: set commit table when move blk idx between different last file --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 42 ++++++++++++++++++++---- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 031d200a66..3521e9b1f5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -70,6 +70,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static void tsdbResetCommitFile(SCommitH *pCommith); static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCommitToTable(SCommitH *pCommith, int tid); +static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx); static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx); static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); static int tsdbComparKeyBlock(const void *arg1, const void *arg2); @@ -889,9 +890,11 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { } static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { - SReadH *pReadh = &pCommith->readh; - int nBlocks = pIdx->numOfBlocks; - int bidx = 0; + SReadH *pReadh = &pCommith->readh; + STsdb *pTsdb = TSDB_READ_REPO(pReadh); + STSchema *pTSchema = NULL; + int nBlocks = pIdx->numOfBlocks; + int bidx = 0; tsdbResetCommitTable(pCommith); @@ -901,30 +904,49 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { return -1; } + STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL}; + pCommith->pTable = &table; + while (bidx < nBlocks) { + if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) { + // Set commit table + pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, 0); // TODO: schema version + if (!pTSchema) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + table.pSchema = pTSchema; + if (tsdbSetCommitTable(pCommith, &table) < 0) { + taosMemoryFreeClear(pTSchema); + return -1; + } + } + if (tsdbMoveBlock(pCommith, bidx) < 0) { tsdbError("vgId:%d failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); + taosMemoryFreeClear(pTSchema); return -1; } + ++bidx; } - STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL}; - TSDB_COMMIT_TABLE(pCommith) = &table; - if (tsdbWriteBlockInfo(pCommith) < 0) { tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); + taosMemoryFreeClear(pTSchema); return -1; } + taosMemoryFreeClear(pTSchema); return 0; } static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + pCommith->pTable = pTable; if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) { @@ -1321,6 +1343,14 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { return 0; } +static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) { + SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + if (pBlock->last) { + return pCommith->isLFileSame; + } + return pCommith->isDFileSame; +} + static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; SDFile *pDFile; From c32a3400553cdfe65937be86e67c9173cbf404de Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 12 May 2022 06:13:59 +0000 Subject: [PATCH 12/16] fix memory leak --- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 10 ++++++++++ source/libs/tdb/src/db/tdbBtree.c | 1 + 2 files changed, 11 insertions(+) diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d40a73eb67..1e8fbb48c7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -62,6 +62,16 @@ int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) { void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) { if (pMemTable) { taosHashCleanup(pMemTable->pHashIdx); + SSkipListIterator *pIter = tSkipListCreateIter(pMemTable->pSlIdx); + SSkipListNode *pNode = NULL; + STbData *pTbData = NULL; + for (;;) { + if (!tSkipListIterNext(pIter)) break; + pNode = tSkipListIterGet(pIter); + pTbData = (STbData *)pNode->pData; + tsdbFreeTbData(pTbData); + } + tSkipListDestroyIter(pIter); tSkipListDestroy(pMemTable->pSlIdx); taosMemoryFree(pMemTable); } diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index cf7dd50103..fffda68731 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -114,6 +114,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, tdb_cmpr_fn_t kcmpr, SB int tdbBtreeClose(SBTree *pBt) { if (pBt) { + tdbFree(pBt->pBuf); tdbOsFree(pBt); } return 0; From 1dbe0650e03d8316c185fea2fb2ef3cd531c9b2e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 12 May 2022 14:20:50 +0800 Subject: [PATCH 13/16] enh(wal): set errno code --- source/libs/wal/src/walRead.c | 13 ++++++++++--- source/libs/wal/src/walWrite.c | 1 + 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 0cfe75bf33..7dfe1b8989 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -130,6 +130,7 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) { } } + // code set inner if (walReadSeekFilePos(pRead, pRet->firstVer, ver) < 0) { return -1; } @@ -249,16 +250,22 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { // TODO: check wal life if (pRead->curVersion != ver) { if (walReadSeekVer(pRead, ver) < 0) { + terrno = TSDB_CODE_WAL_INVALID_VER; + wError("unexpected wal log version: % " PRId64 ", since seek error", ver); return -1; } } - if (!taosValidFile(pRead->pReadLogTFile)) { - return -1; - } + /*if (!taosValidFile(pRead->pReadLogTFile)) {*/ + /*return -1;*/ + /*}*/ code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead)); if (code != sizeof(SWalHead)) { + if (code < 0) + terrno = TAOS_SYSTEM_ERROR(errno); + else + terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index da1c36dcc4..2e43997584 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -225,6 +225,7 @@ int walRoll(SWal *pWal) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } + // terrno set inner code = walRollFileInfo(pWal); if (code != 0) { return -1; From 71e43677f74b7c6c217e979934b8c324985f661b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 May 2022 14:38:56 +0800 Subject: [PATCH 14/16] feat(query): add csum function --- source/libs/function/inc/builtinsimpl.h | 3 ++ source/libs/function/src/builtins.c | 41 ++++++++++++++++++ source/libs/function/src/builtinsimpl.c | 57 ++++++++++++++++++++++++- 3 files changed, 99 insertions(+), 2 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index e3b7127efe..e1e30d37ea 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -94,6 +94,9 @@ bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stateCountFunction(SqlFunctionCtx* pCtx); int32_t stateDurationFunction(SqlFunctionCtx* pCtx); +bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t csumFunction(SqlFunctionCtx* pCtx); + bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); #ifdef __cplusplus diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 07ad0f7d1f..c9136433e0 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -308,6 +308,37 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32 return TSDB_CODE_SUCCESS; } +static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return TSDB_CODE_SUCCESS; + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The input parameter of CSUM function can only be column"); + } + + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + uint8_t resType; + if (!IS_NUMERIC_TYPE(colType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } else { + if (IS_SIGNED_NUMERIC_TYPE(colType)) { + resType = TSDB_DATA_TYPE_BIGINT; + } else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) { + resType = TSDB_DATA_TYPE_UBIGINT; + } else if (IS_FLOAT_TYPE(colType)) { + resType = TSDB_DATA_TYPE_DOUBLE; + } else { + ASSERT(0); + } + } + + pFunc->node.resType = (SDataType) { .bytes = tDataTypes[resType].bytes, .type = resType}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // todo return TSDB_CODE_SUCCESS; @@ -742,6 +773,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = stateDurationFunction, .finalizeFunc = NULL }, + { + .name = "csum", + .type = FUNCTION_TYPE_CSUM, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateCsum, + .getEnvFunc = getCsumFuncEnv, + .initFunc = functionSetup, + .processFunc = csumFunction, + .finalizeFunc = NULL + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index efc2992075..173dc9fd57 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2815,7 +2815,6 @@ int32_t stateCountFunction(SqlFunctionCtx* pCtx) { SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - SColumnInfoData* pTsOutput = pCtx->pTsOutput; int32_t numOfElems = 0; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; @@ -2853,7 +2852,6 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) { TSKEY* tsList = (int64_t*)pInput->pPTS->pData; SColumnInfoData* pInputCol = pInput->pData[0]; - SColumnInfoData* pTsOutput = pCtx->pTsOutput; int32_t numOfElems = 0; SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; @@ -2893,3 +2891,58 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx) { return numOfElems; } + +bool getCsumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SSumRes); + return true; +} + +int32_t csumFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SSumRes* pSumRes = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int32_t numOfElems = 0; + int32_t type = pInputCol->info.type; + int32_t startOffset = pCtx->offset; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t pos = startOffset + numOfElems; + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + //colDataAppendNULL(pOutput, i); + continue; + } + + char* data = colDataGetData(pInputCol, i); + if (IS_SIGNED_NUMERIC_TYPE(type)) { + int64_t v; + GET_TYPED_DATA(v, int64_t, type, data); + pSumRes->isum += v; + colDataAppend(pOutput, pos, (char *)&pSumRes->isum, false); + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + uint64_t v; + GET_TYPED_DATA(v, uint64_t, type, data); + pSumRes->usum += v; + colDataAppend(pOutput, pos, (char *)&pSumRes->usum, false); + } else if (IS_FLOAT_TYPE(type)) { + double v; + GET_TYPED_DATA(v, double, type, data); + pSumRes->dsum += v; + colDataAppend(pOutput, pos, (char *)&pSumRes->dsum, false); + } + + //TODO: remove this after pTsOutput is handled + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } + + numOfElems++; + } + + return numOfElems; +} From d7bd682237e710f1a6d319150d113d82dc6d53dd Mon Sep 17 00:00:00 2001 From: "slzhou@taodata.com" Date: Thu, 12 May 2022 14:42:57 +0800 Subject: [PATCH 15/16] column has member hasNull --- include/libs/function/tudf.h | 2 ++ include/util/tcoding.h | 15 +++++++++++++++ source/common/src/tdatablock.c | 2 ++ source/libs/function/src/tudf.c | 2 ++ 4 files changed, 21 insertions(+) diff --git a/include/libs/function/tudf.h b/include/libs/function/tudf.h index bdccd29acf..8e156edcd2 100644 --- a/include/libs/function/tudf.h +++ b/include/libs/function/tudf.h @@ -89,6 +89,7 @@ typedef struct SUdfColumnData { typedef struct SUdfColumn { SUdfColumnMeta colMeta; + bool hasNull; SUdfColumnData colData; } SUdfColumn; @@ -232,6 +233,7 @@ static FORCE_INLINE void udfColDataSetNull(SUdfColumn* pColumn, int32_t row) { } else { udfColDataSetNull_f(pColumn, row); } + pColumn->hasNull = true; } static FORCE_INLINE int32_t udfColDataSet(SUdfColumn* pColumn, uint32_t currentRow, const char* pData, bool isNull) { diff --git a/include/util/tcoding.h b/include/util/tcoding.h index 3f00c79f46..74e64d5292 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -59,6 +59,21 @@ static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) { static FORCE_INLINE void *taosSkipFixedLen(const void *buf, size_t len) { return POINTER_SHIFT(buf, len); } +// --- Bool + +static FORCE_INLINE int32_t taosEncodeFixedBool(void **buf, bool value) { + if (buf != NULL) { + ((int8_t *)(*buf))[0] = value ? 1 : 0; + *buf = POINTER_SHIFT(*buf, sizeof(int8_t)); + } + return (int32_t)sizeof(int8_t); +} + +static FORCE_INLINE void *taosDecodeFixedBool(const void *buf, bool *value) { + *value = ((int8_t *)buf)[0] == 0 ? false : true; + return POINTER_SHIFT(buf, sizeof(int8_t)); +} + // ---- Fixed U16 static FORCE_INLINE int32_t taosEncodeFixedU16(void **buf, uint16_t value) { if (buf != NULL) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b58e4bd1dd..9053101938 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1311,6 +1311,7 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { tlen += taosEncodeFixedI16(buf, pColData->info.colId); tlen += taosEncodeFixedI16(buf, pColData->info.type); tlen += taosEncodeFixedI32(buf, pColData->info.bytes); + tlen += taosEncodeFixedBool(buf, pColData->hasNull); if (IS_VAR_DATA_TYPE(pColData->info.type)) { tlen += taosEncodeBinary(buf, pColData->varmeta.offset, sizeof(int32_t) * rows); @@ -1340,6 +1341,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { buf = taosDecodeFixedI16(buf, &data.info.colId); buf = taosDecodeFixedI16(buf, &data.info.type); buf = taosDecodeFixedI32(buf, &data.info.bytes); + buf = taosDecodeFixedBool(buf, &data.hasNull); if (IS_VAR_DATA_TYPE(data.info.type)) { buf = taosDecodeBinary(buf, (void**)&data.varmeta.offset, pBlock->info.rows * sizeof(int32_t)); diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index a577ea200f..9d90dca63e 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -695,6 +695,7 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colMeta.scale = col->info.scale; udfCol->colMeta.precision = col->info.precision; udfCol->colData.numOfRows = udfBlock->numOfRows; + udfCol->hasNull = col->hasNull; if (IS_VAR_DATA_TYPE(udfCol->colMeta.type)) { udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows; udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen); @@ -731,6 +732,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { col->info.bytes = meta->bytes; col->info.scale = meta->scale; col->info.type = meta->type; + col->hasNull = udfCol->hasNull; SUdfColumnData *data = &udfCol->colData; if (!IS_VAR_DATA_TYPE(meta->type)) { From 62b2ccedf13d245b1b1e5e44a0f7fce53d4fb6b8 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 12 May 2022 07:09:27 +0000 Subject: [PATCH 16/16] return errno when table not exits --- include/common/tmsg.h | 3 ++- source/common/src/tmsg.c | 7 ++++--- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 11 +++++++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 9 ++++----- 4 files changed, 21 insertions(+), 9 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d655b82a08..08d5765246 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -252,6 +252,7 @@ STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); int32_t tPrintFixedSchemaSubmitReq(const SSubmitReq* pReq, STSchema* pSchema); typedef struct { + int32_t code; int8_t hashMeta; int64_t uid; char* tblFName; @@ -271,7 +272,7 @@ typedef struct { int32_t tEncodeSSubmitRsp(SEncoder* pEncoder, const SSubmitRsp* pRsp); int32_t tDecodeSSubmitRsp(SDecoder* pDecoder, SSubmitRsp* pRsp); -void tFreeSSubmitRsp(SSubmitRsp *pRsp); +void tFreeSSubmitRsp(SSubmitRsp* pRsp); #define COL_SMA_ON ((int8_t)0x1) #define COL_IDX_ON ((int8_t)0x2) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ebd81c7da3..021ee8455e 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4032,6 +4032,7 @@ int32_t tDecodeSVSubmitReq(SDecoder *pCoder, SVSubmitReq *pReq) { static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBlock) { if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI32(pEncoder, pBlock->code) < 0) return -1; if (tEncodeI8(pEncoder, pBlock->hashMeta) < 0) return -1; if (pBlock->hashMeta) { if (tEncodeI64(pEncoder, pBlock->uid) < 0) return -1; @@ -4047,10 +4048,11 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) { if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI32(pDecoder, &pBlock->code) < 0) return -1; if (tDecodeI8(pDecoder, &pBlock->hashMeta) < 0) return -1; if (pBlock->hashMeta) { if (tDecodeI64(pDecoder, &pBlock->uid) < 0) return -1; - pBlock->tblFName= taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); + pBlock->tblFName = taosMemoryCalloc(TSDB_TABLE_FNAME_LEN, 1); if (NULL == pBlock->tblFName) return -1; if (tDecodeCStrTo(pDecoder, pBlock->tblFName) < 0) return -1; } @@ -4089,7 +4091,7 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) { if (tDecodeSSubmitBlkRsp(pDecoder, pRsp->pBlocks + iBlock) < 0) return -1; } - tEndDecode(pDecoder); + tEndDecode(pDecoder); tDecoderClear(pDecoder); return 0; } @@ -4108,4 +4110,3 @@ void tFreeSSubmitRsp(SSubmitRsp *pRsp) { taosMemoryFree(pRsp); } - diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 1e8fbb48c7..037b099345 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -310,6 +310,17 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo TSKEY keyMax; SSubmitBlk *pBlkCopy; + // check if table exists + SMetaReader mr = {0}; + SMetaEntry me = {0}; + metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mr, pMsgIter->uid) < 0) { + metaReaderClear(&mr); + terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST; + return -1; + } + metaReaderClear(&mr); + // create container is nedd tptr = taosHashGet(pMemTable->pHashIdx, &(pMsgIter->uid), sizeof(pMsgIter->uid)); if (tptr == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fc2b6fe676..158e14f2ad 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -502,7 +502,7 @@ _exit: return 0; } -static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { +static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { SSubmitBlkIter blkIter = {0}; STSchema *pSchema = NULL; tb_uid_t suid = 0; @@ -544,7 +544,7 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char while (true) { if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (pBlock == NULL) break; - + vnodeDebugPrintSingleSubmitMsg(pMeta, pBlock, &msgIter, tags); } @@ -595,7 +595,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) { if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { - pRsp->code = terrno; + submitBlkRsp.code = terrno; tDecoderClear(&decoder); goto _exit; } @@ -617,8 +617,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in } if (tsdbInsertTableData(pVnode->pTsdb, &msgIter, pBlock, &submitBlkRsp) < 0) { - pRsp->code = terrno; - goto _exit; + submitBlkRsp.code = terrno; } submitRsp.numOfRows += submitBlkRsp.numOfRows;