fix: fix submit response message handling

This commit is contained in:
dapan1121 2022-12-01 19:26:26 +08:00
parent 70c133a117
commit 2e75de8368
9 changed files with 84 additions and 88 deletions

View File

@ -3234,7 +3234,6 @@ void tDestroySSubmitTbData(SSubmitTbData* pTbData, int32_t flag);
void tDestroySSubmitReq2(SSubmitReq2* pReq, int32_t flag); void tDestroySSubmitReq2(SSubmitReq2* pReq, int32_t flag);
typedef struct { typedef struct {
int32_t code;
int32_t affectedRows; int32_t affectedRows;
SArray* aCreateTbRsp; // SArray<SVCreateTbRsp> SArray* aCreateTbRsp; // SArray<SVCreateTbRsp>
} SSubmitRsp2; } SSubmitRsp2;

View File

@ -104,6 +104,8 @@ void destroyBoundColumnInfo(void* pBoundInfo);
int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf, int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char* msgBuf,
int32_t msgBufLen); int32_t msgBufLen);
void qDestroyBoundColInfo(void* pInfo);
void* smlInitHandle(SQuery* pQuery); void* smlInitHandle(SQuery* pQuery);
void smlDestroyHandle(void* pHandle); void smlDestroyHandle(void* pHandle);
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,

View File

@ -734,46 +734,21 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
} }
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) { int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
int32_t code = 0;
SArray* pArray = NULL; SArray* pArray = NULL;
SSubmitRsp* pRsp = (SSubmitRsp*)res; SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
if (pRsp->nBlocks <= 0) { if (NULL == pRsp->aCreateTbRsp) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pArray = taosArrayInit(pRsp->nBlocks, sizeof(STbSVersion)); int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
if (NULL == pArray) { for (int32_t i = 0; i < tbNum; ++i) {
terrno = TSDB_CODE_OUT_OF_MEMORY; SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
return TSDB_CODE_OUT_OF_MEMORY; if (pTbRsp->pMeta) {
handleCreateTbExecRes(pTbRsp->pMeta, pCatalog);
}
} }
for (int32_t i = 0; i < pRsp->nBlocks; ++i) { return TSDB_CODE_SUCCESS;
SSubmitBlkRsp* blk = pRsp->pBlocks + i;
if (blk->pMeta) {
handleCreateTbExecRes(blk->pMeta, pCatalog);
tFreeSTableMetaRsp(blk->pMeta);
taosMemoryFreeClear(blk->pMeta);
}
if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
continue;
}
STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
taosArrayPush(pArray, &tbSver);
}
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self,
.mgmtEps = *epset};
code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
_return:
taosArrayDestroy(pArray);
return code;
} }
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) { int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {

View File

@ -273,7 +273,7 @@ int32_t stmtCleanBindInfo(STscStmt* pStmt) {
pStmt->bInfo.tbName[0] = 0; pStmt->bInfo.tbName[0] = 0;
pStmt->bInfo.tbFName[0] = 0; pStmt->bInfo.tbFName[0] = 0;
if (!pStmt->bInfo.tagsCached) { if (!pStmt->bInfo.tagsCached) {
destroyBoundColumnInfo(pStmt->bInfo.boundTags); qDestroyBoundColInfo(pStmt->bInfo.boundTags);
taosMemoryFreeClear(pStmt->bInfo.boundTags); taosMemoryFreeClear(pStmt->bInfo.boundTags);
} }
memset(pStmt->bInfo.stbFName, 0, TSDB_TABLE_FNAME_LEN); memset(pStmt->bInfo.stbFName, 0, TSDB_TABLE_FNAME_LEN);
@ -293,12 +293,14 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
char* key = taosHashGetKey(pIter, &keyLen); char* key = taosHashGetKey(pIter, &keyLen);
STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks); STableMeta* pMeta = qGetTableMetaInDataBlock(pBlocks);
/*
if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) { if (keepTable && (strlen(pStmt->bInfo.tbFName) == keyLen) && strncmp(pStmt->bInfo.tbFName, key, keyLen) == 0) {
STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false)); STMT_ERR_RET(qResetStmtDataBlock(pBlocks, false));
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);
continue; continue;
} }
*/
qDestroyStmtDataBlock(pBlocks); qDestroyStmtDataBlock(pBlocks);
taosHashRemove(pStmt->exec.pBlockHash, key, keyLen); taosHashRemove(pStmt->exec.pBlockHash, key, keyLen);
@ -308,13 +310,11 @@ int32_t stmtCleanExecInfo(STscStmt* pStmt, bool keepTable, bool deepClean) {
pStmt->exec.autoCreateTbl = false; pStmt->exec.autoCreateTbl = false;
if (keepTable) { if (!keepTable) {
return TSDB_CODE_SUCCESS; taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL;
} }
taosHashCleanup(pStmt->exec.pBlockHash);
pStmt->exec.pBlockHash = NULL;
STMT_ERR_RET(stmtCleanBindInfo(pStmt)); STMT_ERR_RET(stmtCleanBindInfo(pStmt));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -334,7 +334,7 @@ int32_t stmtCleanSQLInfo(STscStmt* pStmt) {
SStmtTableCache* pCache = (SStmtTableCache*)pIter; SStmtTableCache* pCache = (SStmtTableCache*)pIter;
qDestroyStmtDataBlock(pCache->pDataCtx); qDestroyStmtDataBlock(pCache->pDataCtx);
destroyBoundColumnInfo(pCache->boundTags); qDestroyBoundColInfo(pCache->boundTags);
taosMemoryFreeClear(pCache->boundTags); taosMemoryFreeClear(pCache->boundTags);
pIter = taosHashIterate(pStmt->sql.pTableCache, pIter); pIter = taosHashIterate(pStmt->sql.pTableCache, pIter);

View File

@ -6876,7 +6876,6 @@ void tDestroySSubmitReq2(SSubmitReq2 *pReq, int32_t flag) {
int32_t tEncodeSSubmitRsp2(SEncoder *pCoder, const SSubmitRsp2 *pRsp) { int32_t tEncodeSSubmitRsp2(SEncoder *pCoder, const SSubmitRsp2 *pRsp) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI32v(pCoder, pRsp->code) < 0) return -1;
if (tEncodeI32v(pCoder, pRsp->affectedRows) < 0) return -1; if (tEncodeI32v(pCoder, pRsp->affectedRows) < 0) return -1;
if (tEncodeU64v(pCoder, taosArrayGetSize(pRsp->aCreateTbRsp)) < 0) return -1; if (tEncodeU64v(pCoder, taosArrayGetSize(pRsp->aCreateTbRsp)) < 0) return -1;
@ -6899,11 +6898,6 @@ int32_t tDecodeSSubmitRsp2(SDecoder *pCoder, SSubmitRsp2 *pRsp) {
goto _exit; goto _exit;
} }
if (tDecodeI32v(pCoder, &pRsp->code) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
if (tDecodeI32v(pCoder, &pRsp->affectedRows) < 0) { if (tDecodeI32v(pCoder, &pRsp->affectedRows) < 0) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
@ -6943,7 +6937,22 @@ _exit:
} }
void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) { void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) {
if (TSDB_MSG_FLG_ENCODE) { if (NULL == pRsp) {
return;
}
if (flag & TSDB_MSG_FLG_ENCODE) {
if (pRsp->aCreateTbRsp) {
int32_t nCreateTbRsp = TARRAY_SIZE(pRsp->aCreateTbRsp);
SVCreateTbRsp *aCreateTbRsp = TARRAY_DATA(pRsp->aCreateTbRsp);
for (int32_t i = 0; i < nCreateTbRsp; ++i) {
if (aCreateTbRsp[i].pMeta) {
taosMemoryFree(aCreateTbRsp[i].pMeta);
}
}
taosArrayDestroy(pRsp->aCreateTbRsp);
}
} else if (flag & TSDB_MSG_FLG_DECODE) {
if (pRsp->aCreateTbRsp) { if (pRsp->aCreateTbRsp) {
int32_t nCreateTbRsp = TARRAY_SIZE(pRsp->aCreateTbRsp); int32_t nCreateTbRsp = TARRAY_SIZE(pRsp->aCreateTbRsp);
SVCreateTbRsp *aCreateTbRsp = TARRAY_DATA(pRsp->aCreateTbRsp); SVCreateTbRsp *aCreateTbRsp = TARRAY_DATA(pRsp->aCreateTbRsp);
@ -6954,8 +6963,5 @@ void tDestroySSubmitRsp2(SSubmitRsp2 *pRsp, int32_t flag) {
} }
taosArrayDestroy(pRsp->aCreateTbRsp); taosArrayDestroy(pRsp->aCreateTbRsp);
} }
} else if (TSDB_MSG_FLG_DECODE) {
// TODO
taosArrayDestroy(pRsp->aCreateTbRsp);
} }
} }

View File

@ -53,7 +53,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; SBoundColInfo* tags = (SBoundColInfo*)boundTags;
if (NULL == tags) { if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
@ -79,7 +79,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
continue; continue;
} }
SSchema* pTagSchema = &pSchema[tags->boundColumns[c]]; SSchema* pTagSchema = &pSchema[tags->pColIndex[c]];
int32_t colLen = pTagSchema->bytes; int32_t colLen = pTagSchema->bytes;
if (IS_VAR_DATA_TYPE(pTagSchema->type)) { if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
colLen = bind[c].length[0]; colLen = bind[c].length[0];
@ -231,7 +231,7 @@ int32_t buildBoundFields(int32_t numOfBound, int16_t* boundColumns, SSchema* pSc
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) { int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields) {
STableDataCxt* pDataBlock = (STableDataCxt*)pBlock; STableDataCxt* pDataBlock = (STableDataCxt*)pBlock;
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; SBoundColInfo* tags = (SBoundColInfo*)boundTags;
if (NULL == tags) { if (NULL == tags) {
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
@ -248,7 +248,7 @@ int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TA
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
CHECK_CODE(buildBoundFields(tags->numOfBound, tags->boundColumns, pSchema, fieldNum, fields, 0)); CHECK_CODE(buildBoundFields(tags->numOfBound, tags->pColIndex, pSchema, fieldNum, fields, 0));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -194,6 +194,17 @@ void destroyBoundColumnInfo(void* pBoundInfo) {
taosMemoryFreeClear(pColList->colIdxInfo); taosMemoryFreeClear(pColList->colIdxInfo);
} }
void qDestroyBoundColInfo(void* pInfo) {
if (NULL == pInfo) {
return;
}
SBoundColInfo* pBoundInfo = (SBoundColInfo*)pInfo;
taosMemoryFreeClear(pBoundInfo->pColIndex);
}
static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta, static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, STableMeta* pTableMeta,
STableDataBlocks** dataBlocks) { STableDataBlocks** dataBlocks) {
STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks)); STableDataBlocks* dataBuf = (STableDataBlocks*)taosMemoryCalloc(1, sizeof(STableDataBlocks));
@ -999,7 +1010,7 @@ void insCheckTableDataOrder(STableDataCxt* pTableCxt, TSKEY tsKey) {
return; return;
} }
static void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); } void destroyBoundColInfo(SBoundColInfo* pInfo) { taosMemoryFreeClear(pInfo->pColIndex); }
static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput, bool colMode) { static int32_t createTableDataCxt(STableMeta* pTableMeta, SVCreateTbReq** pCreateTbReq, STableDataCxt** pOutput, bool colMode) {
STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt)); STableDataCxt* pTableCxt = taosMemoryCalloc(1, sizeof(STableDataCxt));

View File

@ -243,7 +243,7 @@ void destroyQueryExecRes(SExecResult* pRes) {
break; break;
} }
case TDMT_VND_SUBMIT: { case TDMT_VND_SUBMIT: {
tFreeSSubmitRsp((SSubmitRsp*)pRes->res); tDestroySSubmitRsp2((SSubmitRsp2*)pRes->res, TSDB_MSG_FLG_DECODE);
break; break;
} }
case TDMT_SCH_QUERY: case TDMT_SCH_QUERY:

View File

@ -259,46 +259,49 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
if (msg) { if (msg) {
SDecoder coder = {0}; SDecoder coder = {0};
SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp)); SSubmitRsp2 *rsp = taosMemoryMalloc(sizeof(*rsp));
tDecoderInit(&coder, msg, msgSize); tDecoderInit(&coder, msg, msgSize);
code = tDecodeSSubmitRsp(&coder, rsp); code = tDecodeSSubmitRsp2(&coder, rsp);
if (code) { if (code) {
SCH_TASK_ELOG("decode submitRsp failed, code:%d", code); SCH_TASK_ELOG("tDecodeSSubmitRsp2 failed, code:%d", code);
tFreeSSubmitRsp(rsp); tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
if (rsp->nBlocks > 0) {
for (int32_t i = 0; i < rsp->nBlocks; ++i) {
SSubmitBlkRsp *blk = rsp->pBlocks + i;
if (TSDB_CODE_SUCCESS != blk->code) {
code = blk->code;
tFreeSSubmitRsp(rsp);
SCH_ERR_JRET(code);
}
}
}
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("submit succeed, affectedRows:%d, blocks:%d", rsp->affectedRows, rsp->nBlocks);
SCH_LOCK(SCH_WRITE, &pJob->resLock); int32_t createTbRspNum = taosArrayGetSize(rsp->aCreateTbRsp);
if (pJob->execRes.res) { SCH_TASK_DLOG("submit succeed, affectedRows:%d, createTbRspNum:%d", rsp->affectedRows, createTbRspNum);
SSubmitRsp *sum = pJob->execRes.res;
sum->affectedRows += rsp->affectedRows; if (rsp->aCreateTbRsp && taosArrayGetSize(rsp->aCreateTbRsp) > 0) {
sum->nBlocks += rsp->nBlocks; SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (rsp->nBlocks > 0 && rsp->pBlocks) { if (pJob->execRes.res) {
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); SSubmitRsp2 *sum = pJob->execRes.res;
memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); sum->affectedRows += rsp->affectedRows;
if (sum->aCreateTbRsp) {
taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp);
taosArrayDestroy(rsp->aCreateTbRsp);
taosMemoryFree(rsp);
} else {
TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp);
taosMemoryFree(rsp);
}
} else {
pJob->execRes.res = rsp;
pJob->execRes.msgType = TDMT_VND_SUBMIT;
} }
taosMemoryFree(rsp->pBlocks); pJob->execRes.numOfBytes += pTask->msgLen;
taosMemoryFree(rsp); SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
} else { } else {
pJob->execRes.res = rsp; SCH_LOCK(SCH_WRITE, &pJob->resLock);
pJob->execRes.msgType = TDMT_VND_SUBMIT; pJob->execRes.numOfBytes += pTask->msgLen;
if (NULL == pJob->execRes.res) {
TSWAP(pJob->execRes.res, rsp);
pJob->execRes.msgType = TDMT_VND_SUBMIT;
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
tDestroySSubmitRsp2(rsp, TSDB_MSG_FLG_DECODE);
} }
pJob->execRes.numOfBytes += pTask->msgLen;
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
} }
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);