Merge pull request #7475 from taosdata/feature/TD-6214
[TD-6214]<feature> improve insert performance
This commit is contained in:
commit
83c4bc5fe7
|
@ -337,16 +337,189 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
|
||||
SRpcMsg* rpcMsg = pSchedMsg->ahandle;
|
||||
SRpcEpSet* pEpSet = pSchedMsg->thandle;
|
||||
//static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
|
||||
// SRpcMsg* rpcMsg = pSchedMsg->ahandle;
|
||||
// SRpcEpSet* pEpSet = pSchedMsg->thandle;
|
||||
//
|
||||
// TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
|
||||
// SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
|
||||
// if (pSql == NULL) {
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// assert(pSql->self == handle);
|
||||
//
|
||||
// STscObj *pObj = pSql->pTscObj;
|
||||
// SSqlRes *pRes = &pSql->res;
|
||||
// SSqlCmd *pCmd = &pSql->cmd;
|
||||
//
|
||||
// pSql->rpcRid = -1;
|
||||
//
|
||||
// if (pObj->signature != pObj) {
|
||||
// tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
|
||||
//
|
||||
// taosRemoveRef(tscObjRef, handle);
|
||||
// taosReleaseRef(tscObjRef, handle);
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
|
||||
// if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) {
|
||||
// tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
|
||||
// pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
|
||||
//
|
||||
// taosRemoveRef(tscObjRef, handle);
|
||||
// taosReleaseRef(tscObjRef, handle);
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
// return;
|
||||
// }
|
||||
//
|
||||
// if (pEpSet) {
|
||||
// if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
|
||||
// if (pCmd->command < TSDB_SQL_MGMT) {
|
||||
// tscUpdateVgroupInfo(pSql, pEpSet);
|
||||
// } else {
|
||||
// tscUpdateMgmtEpSet(pSql, pEpSet);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// int32_t cmd = pCmd->command;
|
||||
//
|
||||
// // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema
|
||||
// if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||
// pSql->cmd.insertParam.schemaAttached = 1;
|
||||
// }
|
||||
//
|
||||
// // single table query error need to be handled here.
|
||||
// if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
|
||||
// (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
|
||||
// rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
|
||||
//
|
||||
// // 1. super table subquery
|
||||
// // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
|
||||
// if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
|
||||
// TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
|
||||
// !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
|
||||
// (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
|
||||
// // do nothing in case of super table subquery
|
||||
// } else {
|
||||
// pSql->retry += 1;
|
||||
// tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry);
|
||||
//
|
||||
// pSql->res.code = rpcMsg->code; // keep the previous error code
|
||||
// if (pSql->retry > pSql->maxRetry) {
|
||||
// tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry);
|
||||
// } else {
|
||||
// // wait for a little bit moment and then retry
|
||||
// // todo do not sleep in rpc callback thread, add this process into queue to process
|
||||
// if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
||||
// int32_t duration = getWaitingTimeInterval(pSql->retry);
|
||||
// taosMsleep(duration);
|
||||
// }
|
||||
//
|
||||
// pSql->retryReason = rpcMsg->code;
|
||||
// rpcMsg->code = tscRenewTableMeta(pSql, 0);
|
||||
// // if there is an error occurring, proceed to the following error handling procedure.
|
||||
// if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
// taosReleaseRef(tscObjRef, handle);
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
// return;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// pRes->rspLen = 0;
|
||||
//
|
||||
// if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
|
||||
// tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code));
|
||||
// } else {
|
||||
// pRes->code = rpcMsg->code;
|
||||
// }
|
||||
//
|
||||
// if (pRes->code == TSDB_CODE_SUCCESS) {
|
||||
// tscDebug("0x%"PRIx64" reset retry counter to be 0 due to success rsp, old:%d", pSql->self, pSql->retry);
|
||||
// pSql->retry = 0;
|
||||
// }
|
||||
//
|
||||
// if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) {
|
||||
// assert(rpcMsg->msgType == pCmd->msgType + 1);
|
||||
// pRes->code = rpcMsg->code;
|
||||
// pRes->rspType = rpcMsg->msgType;
|
||||
// pRes->rspLen = rpcMsg->contLen;
|
||||
//
|
||||
// if (pRes->rspLen > 0 && rpcMsg->pCont) {
|
||||
// char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen);
|
||||
// if (tmp == NULL) {
|
||||
// pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
// } else {
|
||||
// pRes->pRsp = tmp;
|
||||
// memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen);
|
||||
// }
|
||||
// } else {
|
||||
// tfree(pRes->pRsp);
|
||||
// }
|
||||
//
|
||||
// /*
|
||||
// * There is not response callback function for submit response.
|
||||
// * The actual inserted number of points is the first number.
|
||||
// */
|
||||
// if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) {
|
||||
// SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp;
|
||||
// pMsg->code = htonl(pMsg->code);
|
||||
// pMsg->numOfRows = htonl(pMsg->numOfRows);
|
||||
// pMsg->affectedRows = htonl(pMsg->affectedRows);
|
||||
// pMsg->failedRows = htonl(pMsg->failedRows);
|
||||
// pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks);
|
||||
//
|
||||
// pRes->numOfRows += pMsg->affectedRows;
|
||||
// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command],
|
||||
// tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen);
|
||||
// } else {
|
||||
// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen);
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) {
|
||||
// rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
|
||||
// }
|
||||
//
|
||||
// bool shouldFree = tscShouldBeFreed(pSql);
|
||||
// if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
|
||||
// pRes->code = rpcMsg->code;
|
||||
// }
|
||||
// rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
|
||||
// (*pSql->fp)(pSql->param, pSql, rpcMsg->code);
|
||||
// }
|
||||
//
|
||||
// if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
|
||||
// tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self);
|
||||
// taosRemoveRef(tscObjRef, handle);
|
||||
// }
|
||||
//
|
||||
// taosReleaseRef(tscObjRef, handle);
|
||||
// rpcFreeCont(rpcMsg->pCont);
|
||||
// free(rpcMsg);
|
||||
// free(pEpSet);
|
||||
//}
|
||||
|
||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
|
||||
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
|
||||
if (pSql == NULL) {
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
free(rpcMsg);
|
||||
free(pEpSet);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -357,15 +530,12 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
|
|||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
pSql->rpcRid = -1;
|
||||
|
||||
if (pObj->signature != pObj) {
|
||||
tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature);
|
||||
|
||||
taosRemoveRef(tscObjRef, handle);
|
||||
taosReleaseRef(tscObjRef, handle);
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
free(rpcMsg);
|
||||
free(pEpSet);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -377,8 +547,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
|
|||
taosRemoveRef(tscObjRef, handle);
|
||||
taosReleaseRef(tscObjRef, handle);
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
free(rpcMsg);
|
||||
free(pEpSet);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -432,8 +600,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
|
|||
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
taosReleaseRef(tscObjRef, handle);
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
free(rpcMsg);
|
||||
free(pEpSet);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -511,29 +677,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
|
|||
|
||||
taosReleaseRef(tscObjRef, handle);
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
free(rpcMsg);
|
||||
free(pEpSet);
|
||||
}
|
||||
|
||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||
SSchedMsg schedMsg = {0};
|
||||
|
||||
schedMsg.fp = doProcessMsgFromServer;
|
||||
|
||||
SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg));
|
||||
memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg));
|
||||
schedMsg.ahandle = (void*)rpcMsgCopy;
|
||||
|
||||
SRpcEpSet* pEpSetCopy = NULL;
|
||||
if (pEpSet != NULL) {
|
||||
pEpSetCopy = calloc(1, sizeof(SRpcEpSet));
|
||||
memcpy(pEpSetCopy, pEpSet, sizeof(SRpcEpSet));
|
||||
}
|
||||
|
||||
schedMsg.thandle = (void*)pEpSetCopy;
|
||||
schedMsg.msg = NULL;
|
||||
|
||||
taosScheduleTask(tscQhandle, &schedMsg);
|
||||
}
|
||||
|
||||
int doBuildAndSendMsg(SSqlObj *pSql) {
|
||||
|
|
Loading…
Reference in New Issue