diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c0723e210a..06308f9e0c 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -337,16 +337,189 @@ int tscSendMsgToServer(SSqlObj *pSql) { return TSDB_CODE_SUCCESS; } -static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { - SRpcMsg* rpcMsg = pSchedMsg->ahandle; - SRpcEpSet* pEpSet = pSchedMsg->thandle; +//static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { +// SRpcMsg* rpcMsg = pSchedMsg->ahandle; +// SRpcEpSet* pEpSet = pSchedMsg->thandle; +// +// TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; +// SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); +// if (pSql == NULL) { +// rpcFreeCont(rpcMsg->pCont); +// free(rpcMsg); +// free(pEpSet); +// return; +// } +// +// assert(pSql->self == handle); +// +// STscObj *pObj = pSql->pTscObj; +// SSqlRes *pRes = &pSql->res; +// SSqlCmd *pCmd = &pSql->cmd; +// +// pSql->rpcRid = -1; +// +// if (pObj->signature != pObj) { +// tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature); +// +// taosRemoveRef(tscObjRef, handle); +// taosReleaseRef(tscObjRef, handle); +// rpcFreeCont(rpcMsg->pCont); +// free(rpcMsg); +// free(pEpSet); +// return; +// } +// +// SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); +// if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { +// tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", +// pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature); +// +// taosRemoveRef(tscObjRef, handle); +// taosReleaseRef(tscObjRef, handle); +// rpcFreeCont(rpcMsg->pCont); +// free(rpcMsg); +// free(pEpSet); +// return; +// } +// +// if (pEpSet) { +// if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) { +// if (pCmd->command < TSDB_SQL_MGMT) { +// tscUpdateVgroupInfo(pSql, pEpSet); +// } else { +// tscUpdateMgmtEpSet(pSql, pEpSet); +// } +// } +// } +// +// int32_t cmd = pCmd->command; +// +// // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema +// if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { +// pSql->cmd.insertParam.schemaAttached = 1; +// } +// +// // single table query error need to be handled here. +// if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && +// (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || +// rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { +// +// // 1. super table subquery +// // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer +// if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | +// TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && +// !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) || +// (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) { +// // do nothing in case of super table subquery +// } else { +// pSql->retry += 1; +// tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry); +// +// pSql->res.code = rpcMsg->code; // keep the previous error code +// if (pSql->retry > pSql->maxRetry) { +// tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry); +// } else { +// // wait for a little bit moment and then retry +// // todo do not sleep in rpc callback thread, add this process into queue to process +// if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { +// int32_t duration = getWaitingTimeInterval(pSql->retry); +// taosMsleep(duration); +// } +// +// pSql->retryReason = rpcMsg->code; +// rpcMsg->code = tscRenewTableMeta(pSql, 0); +// // if there is an error occurring, proceed to the following error handling procedure. +// if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { +// taosReleaseRef(tscObjRef, handle); +// rpcFreeCont(rpcMsg->pCont); +// free(rpcMsg); +// free(pEpSet); +// return; +// } +// } +// } +// } +// +// pRes->rspLen = 0; +// +// if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { +// tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code)); +// } else { +// pRes->code = rpcMsg->code; +// } +// +// if (pRes->code == TSDB_CODE_SUCCESS) { +// tscDebug("0x%"PRIx64" reset retry counter to be 0 due to success rsp, old:%d", pSql->self, pSql->retry); +// pSql->retry = 0; +// } +// +// if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) { +// assert(rpcMsg->msgType == pCmd->msgType + 1); +// pRes->code = rpcMsg->code; +// pRes->rspType = rpcMsg->msgType; +// pRes->rspLen = rpcMsg->contLen; +// +// if (pRes->rspLen > 0 && rpcMsg->pCont) { +// char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); +// if (tmp == NULL) { +// pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; +// } else { +// pRes->pRsp = tmp; +// memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen); +// } +// } else { +// tfree(pRes->pRsp); +// } +// +// /* +// * There is not response callback function for submit response. +// * The actual inserted number of points is the first number. +// */ +// if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) { +// SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp; +// pMsg->code = htonl(pMsg->code); +// pMsg->numOfRows = htonl(pMsg->numOfRows); +// pMsg->affectedRows = htonl(pMsg->affectedRows); +// pMsg->failedRows = htonl(pMsg->failedRows); +// pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks); +// +// pRes->numOfRows += pMsg->affectedRows; +// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command], +// tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); +// } else { +// tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); +// } +// } +// +// if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { +// rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); +// } +// +// bool shouldFree = tscShouldBeFreed(pSql); +// if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { +// if (rpcMsg->code != TSDB_CODE_SUCCESS) { +// pRes->code = rpcMsg->code; +// } +// rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code; +// (*pSql->fp)(pSql->param, pSql, rpcMsg->code); +// } +// +// if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it +// tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self); +// taosRemoveRef(tscObjRef, handle); +// } +// +// taosReleaseRef(tscObjRef, handle); +// rpcFreeCont(rpcMsg->pCont); +// free(rpcMsg); +// free(pEpSet); +//} +void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); if (pSql == NULL) { rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); return; } @@ -357,15 +530,12 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { SSqlCmd *pCmd = &pSql->cmd; pSql->rpcRid = -1; - if (pObj->signature != pObj) { tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature); taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); return; } @@ -377,8 +547,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { taosRemoveRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); return; } @@ -432,8 +600,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); return; } } @@ -511,29 +677,6 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) { taosReleaseRef(tscObjRef, handle); rpcFreeCont(rpcMsg->pCont); - free(rpcMsg); - free(pEpSet); -} - -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { - SSchedMsg schedMsg = {0}; - - schedMsg.fp = doProcessMsgFromServer; - - SRpcMsg* rpcMsgCopy = calloc(1, sizeof(SRpcMsg)); - memcpy(rpcMsgCopy, rpcMsg, sizeof(struct SRpcMsg)); - schedMsg.ahandle = (void*)rpcMsgCopy; - - SRpcEpSet* pEpSetCopy = NULL; - if (pEpSet != NULL) { - pEpSetCopy = calloc(1, sizeof(SRpcEpSet)); - memcpy(pEpSetCopy, pEpSet, sizeof(SRpcEpSet)); - } - - schedMsg.thandle = (void*)pEpSetCopy; - schedMsg.msg = NULL; - - taosScheduleTask(tscQhandle, &schedMsg); } int doBuildAndSendMsg(SSqlObj *pSql) {