diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index f8fd69b89a..247468274e 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -13,11 +13,11 @@ * along with this program. If not, see . */ -#include -#include +#include "os.h" +#include "catalog.h" +#include "tname.h" #include "clientInt.h" #include "clientLog.h" -#include "os.h" #include "tmsgtype.h" #include "trpc.h" @@ -29,16 +29,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId); static int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo); -static int32_t minMsgSize() { return tsRpcHeadSize + 100; } -static int32_t getWaitingTimeInterval(int32_t count) { - int32_t initial = 100; // 100 ms by default - if (count <= 1) { - return 0; - } - - return initial * ((2u)<<(count - 2)); -} - static int32_t vgIdCompare(const void *lhs, const void *rhs) { int32_t left = *(int32_t *)lhs; int32_t right = *(int32_t *)rhs; @@ -298,36 +288,6 @@ void tscProcessActivityTimer(void *handle, void *tmrId) { taosReleaseRef(tscRefId, rid); } -int tscSendMsgToServer(SSqlObj *pSql) { - STscObj* pObj = pSql->pTscObj; - SSqlCmd* pCmd = &pSql->cmd; - - char *pMsg = rpcMallocCont(pCmd->payloadLen); - if (NULL == pMsg) { - tscError("0x%"PRIx64" msg:%s malloc failed", pSql->self, taosMsg[pSql->cmd.msgType]); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - // set the mgmt ip list - if (pSql->cmd.command >= TSDB_SQL_MGMT) { - tscDumpMgmtEpSet(pSql); - } - - memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen); - - SRpcMsg rpcMsg = { - .msgType = pSql->cmd.msgType, - .pCont = pMsg, - .contLen = pSql->cmd.payloadLen, - .ahandle = (void*)pSql->self, - .handle = NULL, - .code = 0 - }; - - rpcSendRequest(pObj->pRpcObj->pDnodeConn, &pSql->epSet, &rpcMsg, &pSql->rpcRid); - return TSDB_CODE_SUCCESS; -} - // handle three situation // 1. epset retry, only return last failure ep // 2. no epset retry, like 'taos -h invalidFqdn', return invalidFqdn @@ -354,176 +314,6 @@ void tscSetFqdnErrorMsg(SSqlObj* pSql, SRpcEpSet* pEpSet) { } } -void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { - TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; - SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); - if (pSql == NULL) { - rpcFreeCont(rpcMsg->pCont); - return; - } - - assert(pSql->self == handle); - - STscObj *pObj = pSql->pTscObj; - SSqlRes *pRes = &pSql->res; - SSqlCmd *pCmd = &pSql->cmd; - - pSql->rpcRid = -1; - if (pObj->signature != pObj) { - tscDebug("0x%"PRIx64" DB connection is closed, cmd:%d pObj:%p signature:%p", pSql->self, pCmd->command, pObj, pObj->signature); - - taosRemoveRef(tscObjRef, handle); - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); - return; - } - - SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); - if (pQueryInfo != NULL && pQueryInfo->type == TSDB_QUERY_TYPE_FREE_RESOURCE) { - tscDebug("0x%"PRIx64" sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p", - pSql->self, pCmd->command, pQueryInfo->type, pObj, pObj->signature); - - taosRemoveRef(tscObjRef, handle); - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); - return; - } - - if (pEpSet) { - if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) { - if (pCmd->command < TSDB_SQL_MGMT) { - tscUpdateVgroupInfo(pSql, pEpSet); - } else { - tscUpdateMgmtEpSet(pSql, pEpSet); - } - } - } - - int32_t cmd = pCmd->command; - - // set the flag to denote that sql string needs to be re-parsed and build submit block with table schema - if (cmd == TSDB_SQL_INSERT && rpcMsg->code == TSDB_CODE_TDB_TABLE_RECONFIGURE) { - pSql->cmd.insertParam.schemaAttached = 1; - } - - // single table query error need to be handled here. - if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAG_VAL) && - (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || - rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { - - // 1. super table subquery - // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer - if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | - TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && - !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) || - (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) { - // do nothing in case of super table subquery - } else { - pSql->retry += 1; - tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry); - - pSql->res.code = rpcMsg->code; // keep the previous error code - if (pSql->retry > pSql->maxRetry) { - tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry); - } else { - // wait for a little bit moment and then retry - // todo do not sleep in rpc callback thread, add this process into queue to process - if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { - int32_t duration = getWaitingTimeInterval(pSql->retry); - taosMsleep(duration); - } - - pSql->retryReason = rpcMsg->code; - rpcMsg->code = tscRenewTableMeta(pSql, 0); - // if there is an error occurring, proceed to the following error handling procedure. - if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); - return; - } - } - } - } - - pRes->rspLen = 0; - - if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) { - tscDebug("0x%"PRIx64" query is cancelled, code:%s", pSql->self, tstrerror(pRes->code)); - } else { - pRes->code = rpcMsg->code; - } - - if (pRes->code == TSDB_CODE_SUCCESS) { - tscDebug("0x%"PRIx64" reset retry counter to be 0 due to success rsp, old:%d", pSql->self, pSql->retry); - pSql->retry = 0; - } - - if (pRes->code != TSDB_CODE_TSC_QUERY_CANCELLED) { - assert(rpcMsg->msgType == pCmd->msgType + 1); - pRes->code = rpcMsg->code; - pRes->rspType = rpcMsg->msgType; - pRes->rspLen = rpcMsg->contLen; - - if (pRes->rspLen > 0 && rpcMsg->pCont) { - char *tmp = (char *)realloc(pRes->pRsp, pRes->rspLen); - if (tmp == NULL) { - pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY; - } else { - pRes->pRsp = tmp; - memcpy(pRes->pRsp, rpcMsg->pCont, pRes->rspLen); - } - } else { - tfree(pRes->pRsp); - } - - /* - * There is not response callback function for submit response. - * The actual inserted number of points is the first number. - */ - if (rpcMsg->msgType == TSDB_MSG_TYPE_SUBMIT_RSP && pRes->pRsp != NULL) { - SShellSubmitRspMsg *pMsg = (SShellSubmitRspMsg*)pRes->pRsp; - pMsg->code = htonl(pMsg->code); - pMsg->numOfRows = htonl(pMsg->numOfRows); - pMsg->affectedRows = htonl(pMsg->affectedRows); - pMsg->failedRows = htonl(pMsg->failedRows); - pMsg->numOfFailedBlocks = htonl(pMsg->numOfFailedBlocks); - - pRes->numOfRows += pMsg->affectedRows; - tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s inserted rows:%d rspLen:%d", pSql->self, sqlCmd[pCmd->command], - tstrerror(pRes->code), pMsg->affectedRows, pRes->rspLen); - } else { - tscDebug("0x%"PRIx64" SQL cmd:%s, code:%s rspLen:%d", pSql->self, sqlCmd[pCmd->command], tstrerror(pRes->code), pRes->rspLen); - } - } - - if (pRes->code == TSDB_CODE_SUCCESS && tscProcessMsgRsp[pCmd->command]) { - rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); - } - - bool shouldFree = tscShouldBeFreed(pSql); - if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - if (rpcMsg->code != TSDB_CODE_SUCCESS) { - pRes->code = rpcMsg->code; - } - - rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code; - if (rpcMsg->code == TSDB_CODE_RPC_FQDN_ERROR) { - tscAllocPayload(pCmd, TSDB_FQDN_LEN + 64); - tscSetFqdnErrorMsg(pSql, pEpSet); - } - - (*pSql->fp)(pSql->param, pSql, rpcMsg->code); - } - - if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it - tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self); - taosRemoveRef(tscObjRef, handle); - } - - taosReleaseRef(tscObjRef, handle); - rpcFreeCont(rpcMsg->pCont); -} - int doBuildAndSendMsg(SSqlObj *pSql) { SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -2987,51 +2777,6 @@ int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo) { return code; } -static void freeElem(void* p) { - tfree(*(char**)p); -} - -/** - * retrieve table meta from mnode, and then update the local table meta hashmap. - * @param pSql sql object - * @param tableIndex table index - * @return status code - */ -int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { - SSqlCmd* pCmd = &pSql->cmd; - - SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); - STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, tableIndex); - - char name[TSDB_TABLE_FNAME_LEN] = {0}; - int32_t code = tNameExtractFullName(&pTableMetaInfo->name, name); - if (code != TSDB_CODE_SUCCESS) { - tscError("0x%"PRIx64" failed to generate the table full name", pSql->self); - return TSDB_CODE_TSC_INVALID_OPERATION; - } - - STableMeta* pTableMeta = pTableMetaInfo->pTableMeta; - if (pTableMeta) { - tscDebug("0x%"PRIx64" update table meta:%s, old meta numOfTags:%d, numOfCols:%d, uid:%" PRIu64, pSql->self, name, - tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid); - } - - - // remove stored tableMeta info in hash table - tscResetSqlCmd(pCmd, true, pSql->self); - - SArray* pNameList = taosArrayInit(1, POINTER_BYTES); - SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); - - char* n = strdup(name); - taosArrayPush(pNameList, &n); - code = getMultiTableMetaFromMnode(pSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); - taosArrayDestroyEx(pNameList, freeElem); - taosArrayDestroyEx(vgroupList, freeElem); - - return code; -} - static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); @@ -3044,53 +2789,6 @@ static bool allVgroupInfoRetrieved(SQueryInfo* pQueryInfo) { return true; } -int tscGetSTableVgroupInfo(SSqlObj *pSql, SQueryInfo* pQueryInfo) { - int32_t code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - if (allVgroupInfoRetrieved(pQueryInfo)) { - return TSDB_CODE_SUCCESS; - } - SSqlObj *pNew = calloc(1, sizeof(SSqlObj)); - pNew->pTscObj = pSql->pTscObj; - pNew->signature = pNew; - - pNew->cmd.command = TSDB_SQL_STABLEVGROUP; - - // TODO TEST IT - SQueryInfo *pNewQueryInfo = tscGetQueryInfoS(&pNew->cmd); - if (pNewQueryInfo == NULL) { - tscFreeSqlObj(pNew); - return code; - } - - for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { - STableMetaInfo *pMInfo = tscGetMetaInfo(pQueryInfo, i); - STableMeta* pTableMeta = tscTableMetaDup(pMInfo->pTableMeta); - tscAddTableMetaInfo(pNewQueryInfo, &pMInfo->name, pTableMeta, NULL, pMInfo->tagColList, pMInfo->pVgroupTables); - } - - if ((code = tscAllocPayload(&pNew->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { - tscFreeSqlObj(pNew); - return code; - } - - pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; - registerSqlObj(pNew); - - tscDebug("0x%"PRIx64" svgroupRid from %" PRId64 " to %" PRId64 , pSql->self, pSql->svgroupRid, pNew->self); - - pSql->svgroupRid = pNew->self; - tscDebug("0x%"PRIx64" new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql->self, pNew, pNewQueryInfo->numOfTables); - - pNew->fp = tscTableMetaCallBack; - pNew->param = (void *)pSql->self; - code = tscBuildAndSendRequest(pNew, NULL); - if (code == TSDB_CODE_SUCCESS) { - code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; - } - - return code; -} - #endif int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { @@ -3205,36 +2903,6 @@ int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) { } } -STableMeta* createTableMetaFromMsg(STableMetaMsg* pTableMetaMsg) { - assert(pTableMetaMsg != NULL && pTableMetaMsg->numOfColumns >= 2); - - size_t schemaSize = (pTableMetaMsg->numOfColumns + pTableMetaMsg->numOfTags) * sizeof(SSchema); - STableMeta* pTableMeta = calloc(1, sizeof(STableMeta) + schemaSize); - - pTableMeta->tableType = pTableMetaMsg->tableType; - pTableMeta->vgId = pTableMetaMsg->vgId; - pTableMeta->suid = pTableMetaMsg->suid; - pTableMeta->uid = pTableMetaMsg->tuid; - - pTableMeta->tableInfo = (STableComInfo) { - .numOfTags = pTableMetaMsg->numOfTags, - .precision = pTableMetaMsg->precision, - .numOfColumns = pTableMetaMsg->numOfColumns, - }; - - pTableMeta->sversion = pTableMetaMsg->sversion; - pTableMeta->tversion = pTableMetaMsg->tversion; - - memcpy(pTableMeta->schema, pTableMetaMsg->pSchema, schemaSize); - - int32_t numOfTotalCols = pTableMeta->tableInfo.numOfColumns; - for(int32_t i = 0; i < numOfTotalCols; ++i) { - pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes; - } - - return pTableMeta; -} - int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) { SShowRsp* pShow = (SShowRsp *)pMsg; pShow->showId = htonl(pShow->showId);