[td-225] fix bug in error process.
This commit is contained in:
parent
9e0ac1ebef
commit
47665e5694
|
@ -420,15 +420,15 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
|
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
|
pRes->code = code;
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pRes->code = code;
|
tscError("%p ge tableMeta failed, code:%s", pSql, tstrerror(code));
|
||||||
tscQueueAsyncRes(pSql);
|
goto _error;
|
||||||
return;
|
} else {
|
||||||
|
tscDebug("%p get tableMeta successfully", pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("%p get tableMeta successfully", pSql);
|
|
||||||
|
|
||||||
if (pSql->pStream == NULL) {
|
if (pSql->pStream == NULL) {
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||||
|
|
||||||
|
@ -453,11 +453,9 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
|
assert(pParObj->signature == pParObj && trs->subqueryIndex == pTableMetaInfo->vgroupIndex &&
|
||||||
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
|
pTableMetaInfo->vgroupIndex >= 0 && pTableMetaInfo->vgroupList != NULL);
|
||||||
|
|
||||||
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
// tscProcessSql can add error into async res
|
||||||
return;
|
tscProcessSql(pSql);
|
||||||
}
|
return;
|
||||||
|
|
||||||
goto _error;
|
|
||||||
} else { // continue to process normal async query
|
} else { // continue to process normal async query
|
||||||
if (pCmd->parseFinished) {
|
if (pCmd->parseFinished) {
|
||||||
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
|
tscDebug("%p update table meta in local cache, continue to process sql and send corresponding query", pSql);
|
||||||
|
@ -481,26 +479,21 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
|
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
return;
|
return;
|
||||||
|
} else if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
/*
|
||||||
/*
|
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
|
||||||
* Discard previous built submit blocks, and then parse the sql string again and build up all submit blocks,
|
* and send the required submit block according to index value in supporter to server.
|
||||||
* and send the required submit block according to index value in supporter to server.
|
*/
|
||||||
*/
|
pSql->fp = pSql->fetchFp; // restore the fp
|
||||||
pSql->fp = pSql->fetchFp; // restore the fp
|
tscHandleInsertRetry(pSql);
|
||||||
if ((code = tscHandleInsertRetry(pSql)) == TSDB_CODE_SUCCESS) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {// in case of other query type, continue
|
} else {// in case of other query type, continue
|
||||||
if ((code = tscProcessSql(pSql)) == TSDB_CODE_SUCCESS) {
|
tscProcessSql(pSql);
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
goto _error;
|
return;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("%p continue parse sql after get table meta", pSql);
|
tscDebug("%p continue parse sql after get table meta", pSql);
|
||||||
|
|
||||||
|
@ -538,7 +531,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||||
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
|
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -412,7 +412,7 @@ int tscProcessSql(SSqlObj *pSql) {
|
||||||
return pSql->res.code;
|
return pSql->res.code;
|
||||||
}
|
}
|
||||||
} else if (pCmd->command < TSDB_SQL_LOCAL) {
|
} else if (pCmd->command < TSDB_SQL_LOCAL) {
|
||||||
pSql->ipList = tscMgmtIpSet; //?
|
pSql->ipList = tscMgmtIpSet;
|
||||||
} else { // local handler
|
} else { // local handler
|
||||||
return (*tscProcessMsgRsp[pCmd->command])(pSql);
|
return (*tscProcessMsgRsp[pCmd->command])(pSql);
|
||||||
}
|
}
|
||||||
|
@ -1372,7 +1372,6 @@ static int tscLocalResultCommonBuilder(SSqlObj *pSql, int32_t numOfRes) {
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
|
||||||
|
|
||||||
pRes->code = TSDB_CODE_SUCCESS;
|
pRes->code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (pRes->rspType == 0) {
|
if (pRes->rspType == 0) {
|
||||||
pRes->numOfRows = numOfRes;
|
pRes->numOfRows = numOfRes;
|
||||||
pRes->row = 0;
|
pRes->row = 0;
|
||||||
|
|
|
@ -487,19 +487,8 @@ static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
|
||||||
(pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) {
|
(pCmd->command == TSDB_SQL_SELECT && pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) {
|
||||||
|
|
||||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||||
tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s", pSql, sqlCmd[pCmd->command]);
|
tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]);
|
||||||
tscProcessSql(pSql);
|
tscProcessSql(pSql);
|
||||||
|
|
||||||
// in case of sync model query, waits for response and then goes on
|
|
||||||
// if (pSql->fp == waitForQueryRsp || pSql->fp == waitForRetrieveRsp) {
|
|
||||||
// sem_wait(&pSql->rspSem);
|
|
||||||
|
|
||||||
// tscFreeSqlObj(pSql);
|
|
||||||
// tscDebug("%p sqlObj is freed by app", pSql);
|
|
||||||
// } else {
|
|
||||||
tscDebug("%p sqlObj will be freed while rsp received", pSql);
|
|
||||||
// }
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1895,9 +1895,11 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) {
|
||||||
assert(pSupporter->index < pSupporter->pState->numOfTotal);
|
assert(pSupporter->index < pSupporter->pState->numOfTotal);
|
||||||
|
|
||||||
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
|
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
|
||||||
pRes->code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
|
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
|
||||||
if (pRes->code != TSDB_CODE_SUCCESS) {
|
|
||||||
return pRes->code;
|
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
|
||||||
|
tscQueueAsyncRes(pSql);
|
||||||
|
return code; // here the pSql may have been released already.
|
||||||
}
|
}
|
||||||
|
|
||||||
return tscProcessSql(pSql);
|
return tscProcessSql(pSql);
|
||||||
|
|
Loading…
Reference in New Issue