diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d32414680b..2d85ee296c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3234,7 +3234,6 @@ void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag); void tDestroySSubmitReq2(SSubmitReq2* pReq, int32_t flag); typedef struct { - int32_t code; int32_t affectedRows; SArray* aCreateTbRsp; // SArray } SSubmitRsp2; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 912763314b..4c13f76e24 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -104,6 +104,8 @@ void destroyBoundColumnInfo(void* pBoundInfo); int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, int32_t msgBufLen); +void qDestroyBoundColInfo(void* pInfo); + void* smlInitHandle(SQuery* pQuery); void smlDestroyHandle(void* pHandle); int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 846accf51b..ca9f559f6b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -734,46 +734,21 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList } int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) { - int32_t code = 0; SArray* pArray = NULL; - SSubmitRsp* pRsp = (SSubmitRsp*)res; - if (pRsp->nBlocks <= 0) { + SSubmitRsp2* pRsp = (SSubmitRsp2*)res; + if (NULL == pRsp->aCreateTbRsp) { return TSDB_CODE_SUCCESS; } - pArray = taosArrayInit(pRsp->nBlocks, sizeof(STbSVersion)); - if (NULL == pArray) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_OUT_OF_MEMORY; + int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp); + for (int32_t i = 0; i < tbNum; ++i) { + SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i); + if (pTbRsp->pMeta) { + handleCreateTbExecRes(pTbRsp->pMeta, pCatalog); + } } - for (int32_t i = 0; i < pRsp->nBlocks; ++i) { - SSubmitBlkRsp* blk = pRsp->pBlocks + i; - if (blk->pMeta) { - handleCreateTbExecRes(blk->pMeta, pCatalog); - tFreeSTableMetaRsp(blk->pMeta); - taosMemoryFreeClear(blk->pMeta); - } - - if (NULL == blk->tblFName || 0 == blk->tblFName[0]) { - continue; - } - - STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver}; - taosArrayPush(pArray, &tbSver); - } - - SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = *epset}; - - code = catalogChkTbMetaVersion(pCatalog, &conn, pArray); - -_return: - - taosArrayDestroy(pArray); - return code; + return TSDB_CODE_SUCCESS; } int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) { diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 3af9bb2160..b9887f28d4 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -273,7 +273,7 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) { pStmt->bInfo.tbName[0] = 0; pStmt->bInfo.tbFName[0] = 0; if (!pStmt->bInfo.tagsCached) { - destroyBoundColumnInfo(pStmt->bInfo.boundTags); + qDestroyBoundColInfo(pStmt->bInfo.boundTags); taosMemoryFreeClear(pStmt->bInfo.boundTags); } memset(pStmt->bInfo.stbFName, 0, TSDB_TABLE_FNAME_LEN); @@ -293,12 +293,14 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { char* key = taosHashGetKey(pIter, &keyLen); STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); +/* if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) { STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false)); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); continue; } +*/ qDestroyStmtDataBlock(pBlocks); taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); @@ -308,13 +310,11 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) { pStmt->exec.autoCreateTbl = false; - if (keepTable) { - return TSDB_CODE_SUCCESS; + if (!keepTable) { + taosHashCleanup(pStmt->exec.pBlockHash); + pStmt->exec.pBlockHash = NULL; } - taosHashCleanup(pStmt->exec.pBlockHash); - pStmt->exec.pBlockHash = NULL; - STMT_ERR_RET(stmtCleanBindInfo(pStmt)); return TSDB_CODE_SUCCESS; @@ -334,7 +334,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) { SStmtTableCache* pCache = (SStmtTableCache*)pIter; qDestroyStmtDataBlock(pCache->pDataCtx); - destroyBoundColumnInfo(pCache->boundTags); + qDestroyBoundColInfo(pCache->boundTags); taosMemoryFreeClear(pCache->boundTags); pIter = taosHashIterate(pStmt->sql.pTableCache, pIter); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 73a034c23f..6eb77cba1d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6876,7 +6876,6 @@ void tDestroySSubmitReq2(SSubmitReq2 *pReq, int32_t flag) { int32_t tEncodeSSubmitRsp2(SEncoder *pCoder, const SSubmitRsp2 *pRsp) { if (tStartEncode(pCoder) < 0) return -1; - if (tEncodeI32v(pCoder, pRsp->code) < 0) return -1; if (tEncodeI32v(pCoder, pRsp->affectedRows) < 0) return -1; if (tEncodeU64v(pCoder, taosArrayGetSize(pRsp->aCreateTbRsp)) < 0) return -1; @@ -6899,11 +6898,6 @@ int32_t tDecodeSSubmitRsp2(SDecoder *pCoder, SSubmitRsp2 *pRsp) { goto _exit; } - if (tDecodeI32v(pCoder, &pRsp->code) < 0) { - code = TSDB_CODE_INVALID_MSG; - goto _exit; - } - if (tDecodeI32v(pCoder, &pRsp->affectedRows) < 0) { code = TSDB_CODE_INVALID_MSG; goto _exit; @@ -6943,7 +6937,22 @@ _exit: } void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) { - if (TSDB_MSG_FLG_ENCODE) { + if (NULL == pRsp) { + return; + } + + if (flag & TSDB_MSG_FLG_ENCODE) { + if (pRsp->aCreateTbRsp) { + int32_t nCreateTbRsp = TARRAY_SIZE(pRsp->aCreateTbRsp); + SVCreateTbRsp *aCreateTbRsp = TARRAY_DATA(pRsp->aCreateTbRsp); + for (int32_t i = 0; i < nCreateTbRsp; ++i) { + if (aCreateTbRsp[i].pMeta) { + taosMemoryFree(aCreateTbRsp[i].pMeta); + } + } + taosArrayDestroy(pRsp->aCreateTbRsp); + } + } else if (flag & TSDB_MSG_FLG_DECODE) { if (pRsp->aCreateTbRsp) { int32_t nCreateTbRsp = TARRAY_SIZE(pRsp->aCreateTbRsp); SVCreateTbRsp *aCreateTbRsp = TARRAY_DATA(pRsp->aCreateTbRsp); @@ -6954,8 +6963,5 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) { } taosArrayDestroy(pRsp->aCreateTbRsp); } - } else if (TSDB_MSG_FLG_DECODE) { - // TODO - taosArrayDestroy(pRsp->aCreateTbRsp); } } diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 0727b3bcf4..7d27c9fae7 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -53,7 +53,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; int32_t code = TSDB_CODE_SUCCESS; - SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + SBoundColInfo* tags = (SBoundColInfo*)boundTags; if (NULL == tags) { return TSDB_CODE_QRY_APP_ERROR; } @@ -79,7 +79,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch continue; } - SSchema* pTagSchema = &pSchema[tags->boundColumns[c]]; + SSchema* pTagSchema = &pSchema[tags->pColIndex[c]]; int32_t colLen = pTagSchema->bytes; if (IS_VAR_DATA_TYPE(pTagSchema->type)) { colLen = bind[c].length[0]; @@ -231,7 +231,7 @@ int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSc int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) { STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; - SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; + SBoundColInfo* tags = (SBoundColInfo*)boundTags; if (NULL == tags) { return TSDB_CODE_QRY_APP_ERROR; } @@ -248,7 +248,7 @@ int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TA return TSDB_CODE_SUCCESS; } - CHECK_CODE(buildBoundFields(tags->numOfBound, tags->boundColumns, pSchema, fieldNum, fields, 0)); + CHECK_CODE(buildBoundFields(tags->numOfBound, tags->pColIndex, pSchema, fieldNum, fields, 0)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 487593470f..66e5550eb7 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -194,6 +194,17 @@ void destroyBoundColumnInfo(void* pBoundInfo) { taosMemoryFreeClear(pColList->colIdxInfo); } +void qDestroyBoundColInfo(void* pInfo) { + if (NULL == pInfo) { + return; + } + + SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo; + + taosMemoryFreeClear(pBoundInfo->pColIndex); +} + + static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta, STableDataBlocks** dataBlocks) { STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks)); @@ -999,7 +1010,7 @@ void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) { return; } -static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); } +void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); } static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput, bool colMode) { STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt)); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 6eadf80e3d..55c47f9e55 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -243,7 +243,7 @@ void destroyQueryExecRes(SExecResult* pRes) { break; } case TDMT_VND_SUBMIT: { - tFreeSSubmitRsp((SSubmitRsp*)pRes->res); + tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE); break; } case TDMT_SCH_QUERY: diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 1a6d7df349..4915bba085 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -259,46 +259,49 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa if (msg) { SDecoder coder = {0}; - SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp)); + SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp)); tDecoderInit(&coder, msg, msgSize); - code = tDecodeSSubmitRsp(&coder, rsp); + code = tDecodeSSubmitRsp2(&coder, rsp); if (code) { - SCH_TASK_ELOG("decode submitRsp failed, code:%d", code); - tFreeSSubmitRsp(rsp); + SCH_TASK_ELOG("tDecodeSSubmitRsp2 failed, code:%d", code); + tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE); SCH_ERR_JRET(code); } - if (rsp->nBlocks > 0) { - for (int32_t i = 0; i < rsp->nBlocks; ++i) { - SSubmitBlkRsp *blk = rsp->pBlocks + i; - if (TSDB_CODE_SUCCESS != blk->code) { - code = blk->code; - tFreeSSubmitRsp(rsp); - SCH_ERR_JRET(code); - } - } - } - atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); - SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks); - SCH_LOCK(SCH_WRITE, &pJob->resLock); - if (pJob->execRes.res) { - SSubmitRsp *sum = pJob->execRes.res; - sum->affectedRows += rsp->affectedRows; - sum->nBlocks += rsp->nBlocks; - if (rsp->nBlocks > 0 && rsp->pBlocks) { - sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); - memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); + int32_t createTbRspNum = taosArrayGetSize(rsp->aCreateTbRsp); + SCH_TASK_DLOG("submit succeed, affectedRows:%d, createTbRspNum:%d", rsp->affectedRows, createTbRspNum); + + if (rsp->aCreateTbRsp && taosArrayGetSize(rsp->aCreateTbRsp) > 0) { + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (pJob->execRes.res) { + SSubmitRsp2 *sum = pJob->execRes.res; + sum->affectedRows += rsp->affectedRows; + if (sum->aCreateTbRsp) { + taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp); + taosArrayDestroy(rsp->aCreateTbRsp); + taosMemoryFree(rsp); + } else { + TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp); + taosMemoryFree(rsp); + } + } else { + pJob->execRes.res = rsp; + pJob->execRes.msgType = TDMT_VND_SUBMIT; } - taosMemoryFree(rsp->pBlocks); - taosMemoryFree(rsp); + pJob->execRes.numOfBytes += pTask->msgLen; + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); } else { - pJob->execRes.res = rsp; - pJob->execRes.msgType = TDMT_VND_SUBMIT; + SCH_LOCK(SCH_WRITE, &pJob->resLock); + pJob->execRes.numOfBytes += pTask->msgLen; + if (NULL == pJob->execRes.res) { + TSWAP(pJob->execRes.res, rsp); + pJob->execRes.msgType = TDMT_VND_SUBMIT; + } + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE); } - pJob->execRes.numOfBytes += pTask->msgLen; - SCH_UNLOCK(SCH_WRITE, &pJob->resLock); } taosMemoryFreeClear(msg);