fix: some problems of parser

This commit is contained in:
Xiaoyu Wang 2022-05-12 18:59:02 +08:00
parent 50ad5a096f
commit d6f03eb7da
2 changed files with 41 additions and 16 deletions

View File

@ -53,6 +53,7 @@ typedef struct SInsertParseContext {
SHashObj* pTableBlockHashObj; // global SHashObj* pTableBlockHashObj; // global
SHashObj* pSubTableHashObj; // global SHashObj* pSubTableHashObj; // global
SArray* pVgDataBlocks; // global SArray* pVgDataBlocks; // global
SHashObj* pTableNameHashObj; // global
int32_t totalNum; int32_t totalNum;
SVnodeModifOpStmt* pOutput; SVnodeModifOpStmt* pOutput;
SStmtCallback* pStmtCb; SStmtCallback* pStmtCb;
@ -1111,6 +1112,8 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg); createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
tNameExtractFullName(&name, tbFName); tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
// USING cluase // USING cluase
if (TK_USING == sToken.type) { if (TK_USING == sToken.type) {
CHECK_CODE(parseUsingClause(pCxt, &name, tbFName)); CHECK_CODE(parseUsingClause(pCxt, &name, tbFName));
@ -1193,7 +1196,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
.pSql = (char*)pContext->pSql, .pSql = (char*)pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen}, .msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pTableMeta = NULL, .pTableMeta = NULL,
.pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false), .pSubTableHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
.pTableNameHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK),
.totalNum = 0, .totalNum = 0,
.pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT), .pOutput = (SVnodeModifOpStmt*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT),
.pStmtCb = pContext->pStmtCb}; .pStmtCb = pContext->pStmtCb};
@ -1202,12 +1206,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
(*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj, (*pContext->pStmtCb->getExecInfoFn)(pContext->pStmtCb->pStmt, &context.pVgroupsHashObj,
&context.pTableBlockHashObj); &context.pTableBlockHashObj);
} else { } else {
context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); context.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
context.pTableBlockHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); context.pTableBlockHashObj =
taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
} }
if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj || if (NULL == context.pVgroupsHashObj || NULL == context.pTableBlockHashObj || NULL == context.pSubTableHashObj ||
NULL == context.pOutput) { NULL == context.pTableNameHashObj || NULL == context.pOutput) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
@ -1220,6 +1225,10 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (NULL == *pQuery) { if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
(*pQuery)->pTableList = taosArrayInit(taosHashGetSize(context.pTableNameHashObj), sizeof(SName));
if (NULL == (*pQuery)->pTableList) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE; (*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false; (*pQuery)->haveResultSet = false;
(*pQuery)->msgType = TDMT_VND_SUBMIT; (*pQuery)->msgType = TDMT_VND_SUBMIT;
@ -1232,6 +1241,13 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = parseInsertBody(&context); code = parseInsertBody(&context);
} }
if (TSDB_CODE_SUCCESS == code) {
SName* pTable = taosHashIterate(context.pTableNameHashObj, NULL);
while (NULL != pTable) {
taosArrayPush((*pQuery)->pTableList, pTable);
pTable = taosHashIterate(context.pTableNameHashObj, pTable);
}
}
destroyInsertParseContext(&context); destroyInsertParseContext(&context);
return code; return code;
} }

View File

@ -1092,11 +1092,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
for (int32_t i = 0; i < batchRsp.nRsps; ++i) { for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVCreateTbRsp *rsp = batchRsp.pRsps + i; SVCreateTbRsp *rsp = batchRsp.pRsps + i;
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { if (TSDB_CODE_SUCCESS != rsp->code) {
tDecoderClear(&coder);
SCH_ERR_JRET(rsp->code);
} else if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code; code = rsp->code;
tDecoderClear(&coder);
SCH_ERR_JRET(code);
} }
} }
} }
@ -1117,11 +1116,10 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) { if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
for (int32_t i = 0; i < batchRsp.nRsps; ++i) { for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
SVDropTbRsp *rsp = batchRsp.pRsps + i; SVDropTbRsp *rsp = batchRsp.pRsps + i;
if (NEED_CLIENT_HANDLE_ERROR(rsp->code)) { if (TSDB_CODE_SUCCESS != rsp->code) {
tDecoderClear(&coder);
SCH_ERR_JRET(rsp->code);
} else if (TSDB_CODE_SUCCESS != rsp->code) {
code = rsp->code; code = rsp->code;
tDecoderClear(&coder);
SCH_ERR_JRET(code);
} }
} }
} }
@ -1137,7 +1135,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
if (msg) { if (msg) {
SDecoder coder = {0}; SDecoder coder = {0};
SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp)); SSubmitRsp *rsp = taosMemoryMalloc(sizeof(*rsp));
tDecoderInit(&coder, msg, msgSize); tDecoderInit(&coder, msg, msgSize);
code = tDecodeSSubmitRsp(&coder, rsp); code = tDecodeSSubmitRsp(&coder, rsp);
@ -1147,6 +1145,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
if (rsp->nBlocks > 0) {
for (int32_t i = 0; i < rsp->nBlocks; ++i) {
SSubmitBlkRsp *blk = rsp->pBlocks + i;
if (TSDB_CODE_SUCCESS != blk->code) {
code = blk->code;
tFreeSSubmitRsp(rsp);
SCH_ERR_JRET(code);
}
}
}
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);