From ad9731bd6f2afd091883ddd52f9cf09900d09227 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 12 Apr 2022 11:00:09 +0800 Subject: [PATCH] merge from 3.0 --- source/client/inc/clientInt.h | 5 +- source/client/src/clientImpl.c | 84 +++++++++++++++++----------------- source/client/src/clientMain.c | 6 +-- source/client/src/tmq.c | 17 ++++--- 4 files changed, 58 insertions(+), 54 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index aa27e3bafc..604e6a49ae 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -282,9 +282,10 @@ int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj* void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void doSetOneRowPtr(SReqResultInfo* pResultInfo); -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4); +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, + bool convertUcs4); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); -int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp); +int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); // --- heartbeat // global, called by mgmt diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a348c252d0..71c5df6a09 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -41,7 +41,6 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo); -static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, uint16_t port) { @@ -173,7 +172,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) { int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { SRetrieveTableRsp* pRsp = NULL; - int32_t code = qExecCommand(pQuery->pRoot, &pRsp); + int32_t code = qExecCommand(pQuery->pRoot, &pRsp); if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false); } @@ -189,7 +188,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg; pRequest->type = pMsgInfo->msgType; pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL}; - pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management + pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management STscObj* pTscObj = pRequest->pTscObj; SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest); @@ -210,14 +209,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) { pRequest->type = pQuery->msgType; - SPlanContext cxt = { - .queryId = pRequest->requestId, - .acctId = pRequest->pTscObj->acctId, - .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), - .pAstRoot = pQuery->pRoot, - .showRewrite = pQuery->showRewrite - }; - int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList); + SPlanContext cxt = {.queryId = pRequest->requestId, + .acctId = pRequest->pTscObj->acctId, + .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), + .pAstRoot = pQuery->pRoot, + .showRewrite = pQuery->showRewrite}; + int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList); if (code != 0) { return code; } @@ -233,10 +230,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { pResInfo->fields[i].bytes = pSchema[i].bytes; - pResInfo->fields[i].type = pSchema[i].type; + pResInfo->fields[i].type = pSchema[i].type; pResInfo->userFields[i].bytes = pSchema[i].bytes; - pResInfo->userFields[i].type = pSchema[i].type; + pResInfo->userFields[i].type = pSchema[i].type; if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) { pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE; @@ -253,7 +250,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res); + int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, + pRequest->metric.start, &res); if (code != TSDB_CODE_SUCCESS) { if (pRequest->body.queryJob != 0) { schedulerFreeJob(pRequest->body.queryJob); @@ -273,14 +271,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList } pRequest->code = res.code; - terrno = res.code; + terrno = res.code; return pRequest->code; } SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* pRequest = NULL; - SQuery* pQuery = NULL; - SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); + SQuery* pQuery = NULL; + SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr)); int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest); if (TSDB_CODE_SUCCESS == code) { @@ -319,15 +317,15 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) { } int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { - SCatalog *pCatalog = NULL; - int32_t code = 0; - int32_t dbNum = taosArrayGetSize(pRequest->dbList); - int32_t tblNum = taosArrayGetSize(pRequest->tableList); + SCatalog* pCatalog = NULL; + int32_t code = 0; + int32_t dbNum = taosArrayGetSize(pRequest->dbList); + int32_t tblNum = taosArrayGetSize(pRequest->tableList); if (dbNum <= 0 && tblNum <= 0) { return TSDB_CODE_QRY_APP_ERROR; } - + code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog); if (code != TSDB_CODE_SUCCESS) { return code; @@ -336,8 +334,8 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); for (int32_t i = 0; i < dbNum; ++i) { - char *dbFName = taosArrayGet(pRequest->dbList, i); - + char* dbFName = taosArrayGet(pRequest->dbList, i); + code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName); if (code != TSDB_CODE_SUCCESS) { return code; @@ -345,7 +343,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { } for (int32_t i = 0; i < tblNum; ++i) { - SName *tableName = taosArrayGet(pRequest->tableList, i); + SName* tableName = taosArrayGet(pRequest->tableList, i); code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1); if (code != TSDB_CODE_SUCCESS) { @@ -356,11 +354,10 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) { return code; } - SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { SRequestObj* pRequest = NULL; - int32_t retryNum = 0; - int32_t code = 0; + int32_t retryNum = 0; + int32_t code = 0; while (retryNum++ < REQUEST_MAX_TRY_TIMES) { pRequest = execQueryImpl(pTscObj, sql, sqlLen); @@ -376,7 +373,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { destroyRequest(pRequest); } - + return pRequest; } @@ -508,7 +505,8 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { } bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) { - return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP; + return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || + msgType == TDMT_VND_QUERY_HEARTBEAT_RSP; } void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { @@ -535,10 +533,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start; if (pMsg->code == TSDB_CODE_SUCCESS) { tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId); + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId); } else { tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self, - TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId); + TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId); } taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId); @@ -721,8 +719,8 @@ _return: static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { if (pResInfo->row == NULL) { - pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES); - pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn)); + pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES); + pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn)); pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t)); pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES); @@ -769,7 +767,8 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int return TSDB_CODE_SUCCESS; } -int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4) { +int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, + bool convertUcs4) { assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL); if (numOfRows == 0) { return TSDB_CODE_SUCCESS; @@ -840,15 +839,16 @@ void resetConnectDB(STscObj* pTscObj) { int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) { assert(pResultInfo != NULL && pRsp != NULL); - pResultInfo->pRspMsg = (const char*)pRsp; - pResultInfo->pData = (void*)pRsp->data; - pResultInfo->numOfRows = htonl(pRsp->numOfRows); - pResultInfo->current = 0; - pResultInfo->completed = (pRsp->completed == 1); + pResultInfo->pRspMsg = (const char*)pRsp; + pResultInfo->pData = (void*)pRsp->data; + pResultInfo->numOfRows = htonl(pRsp->numOfRows); + pResultInfo->current = 0; + pResultInfo->completed = (pRsp->completed == 1); pResultInfo->payloadLen = htonl(pRsp->compLen); - pResultInfo->precision = pRsp->precision; + pResultInfo->precision = pRsp->precision; // TODO handle the compressed case pResultInfo->totalRows += pResultInfo->numOfRows; - return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4); + return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, + convertUcs4); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 99efecdb1b..577049ebc8 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -168,7 +168,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - return doFetchRow(pRequest, true); + return doFetchRow(pRequest, true, false); } else if (TD_RES_TMQ(res)) { tmq_message_t *msg = ((tmq_message_t *)res); @@ -430,7 +430,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { return 0; } - doFetchRow(pRequest, false); + doFetchRow(pRequest, false, false); // TODO refactor SReqResultInfo *pResultInfo = &pRequest->body.resInfo; @@ -465,7 +465,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { return 0; } - doFetchRow(pRequest, false); + doFetchRow(pRequest, false, false); SReqResultInfo *pResultInfo = &pRequest->body.resInfo; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 84e3515539..b3e0cc6fd9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -835,7 +835,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (msgEpoch < tmqEpoch) { /*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/ /*tsem_post(&tmq->rspSem);*/ - tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); + tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, + tmqEpoch); return 0; } @@ -872,8 +873,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg); - pRsp->iter.curBlock = 0; - pRsp->iter.curRow = 0; + /*pRsp->iter.curBlock = 0;*/ + /*pRsp->iter.curRow = 0;*/ // TODO: alloc mem /*pRsp->*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ @@ -885,8 +886,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } #endif - tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId, pRsp->msg.reqOffset, - pRsp->msg.rspOffset); + tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId, + pRsp->msg.reqOffset, pRsp->msg.rspOffset); pRsp->vg = pParam->pVg; taosWriteQitem(tmq->mqueue, pRsp); @@ -907,7 +908,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { bool set = false; int32_t topicNumGet = taosArrayGetSize(pRsp->topics); char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; - tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, topicNumGet); + tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, + topicNumGet); SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); if (newTopics == NULL) { return false; @@ -1275,7 +1277,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int64_t transporterId = 0; /*printf("send poll\n");*/ atomic_add_fetch_32(&tmq->waitingRequest, 1); - tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); + tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, + pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); pVg->pollCnt++;