enh:[TD-31070] Handling return value of clientImpl.c
This commit is contained in:
parent
ca30014396
commit
47be646de4
|
@ -519,7 +519,7 @@ STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientCon
|
||||||
void releaseTscObj(int64_t rid) {
|
void releaseTscObj(int64_t rid) {
|
||||||
int32_t code = taosReleaseRef(clientConnRefPool, rid);
|
int32_t code = taosReleaseRef(clientConnRefPool, rid);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
tscWarn("failed to release TscObj, rid:%lld, code:%s", rid, tstrerror(code));
|
tscWarn("failed to release TscObj, code:%s", tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
#include "tsched.h"
|
#include "tsched.h"
|
||||||
#include "tversion.h"
|
#include "tversion.h"
|
||||||
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
|
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
|
||||||
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
|
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
|
||||||
|
|
||||||
static bool stringLengthCheck(const char* str, size_t maxsize) {
|
static bool stringLengthCheck(const char* str, size_t maxsize) {
|
||||||
if (str == NULL) {
|
if (str == NULL) {
|
||||||
|
@ -408,11 +408,7 @@ int compareQueryNodeLoad(const void* elem1, const void* elem2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
|
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
|
||||||
int32_t code = taosThreadMutexLock(&pInfo->qnodeMutex);
|
TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
if (pInfo->pQnodeList) {
|
if (pInfo->pQnodeList) {
|
||||||
taosArrayDestroy(pInfo->pQnodeList);
|
taosArrayDestroy(pInfo->pQnodeList);
|
||||||
pInfo->pQnodeList = NULL;
|
pInfo->pQnodeList = NULL;
|
||||||
|
@ -425,11 +421,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
|
||||||
tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
|
tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
|
||||||
taosArrayGetSize(pInfo->pQnodeList));
|
taosArrayGetSize(pInfo->pQnodeList));
|
||||||
}
|
}
|
||||||
code = taosThreadMutexUnlock(&pInfo->qnodeMutex);
|
TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -444,17 +436,9 @@ int32_t qnodeRequired(SRequestObj* pRequest, bool *required) {
|
||||||
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
||||||
*required = false;
|
*required = false;
|
||||||
|
|
||||||
code = taosThreadMutexLock(&pInfo->qnodeMutex);
|
TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
*required = (NULL == pInfo->pQnodeList);
|
*required = (NULL == pInfo->pQnodeList);
|
||||||
code = taosThreadMutexUnlock(&pInfo->qnodeMutex);
|
TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -462,19 +446,11 @@ int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
|
||||||
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
code = taosThreadMutexLock(&pInfo->qnodeMutex);
|
TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
tscError("failed to lock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
if (pInfo->pQnodeList) {
|
if (pInfo->pQnodeList) {
|
||||||
*pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
|
*pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
|
||||||
}
|
}
|
||||||
code = taosThreadMutexUnlock(&pInfo->qnodeMutex);
|
TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
|
||||||
tscError("failed to unlock qnode mutex, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
if (NULL == *pNodeList) {
|
if (NULL == *pNodeList) {
|
||||||
SCatalog* pCatalog = NULL;
|
SCatalog* pCatalog = NULL;
|
||||||
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
|
@ -689,7 +665,10 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
|
||||||
case QUERY_POLICY_CLIENT: {
|
case QUERY_POLICY_CLIENT: {
|
||||||
if (pResultMeta) {
|
if (pResultMeta) {
|
||||||
pDbVgList = taosArrayInit(4, POINTER_BYTES);
|
pDbVgList = taosArrayInit(4, POINTER_BYTES);
|
||||||
|
if (NULL == pDbVgList) {
|
||||||
|
code = terrno;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
|
int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
|
||||||
for (int32_t i = 0; i < dbNum; ++i) {
|
for (int32_t i = 0; i < dbNum; ++i) {
|
||||||
SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
|
SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
|
||||||
|
@ -697,7 +676,10 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pDbVgList, &pRes->pRes);
|
if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -712,14 +694,22 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
|
||||||
pQnodeList = NULL;
|
pQnodeList = NULL;
|
||||||
} else {
|
} else {
|
||||||
pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
|
pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
|
||||||
|
if (NULL == pQnodeList) {
|
||||||
|
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
|
SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
|
||||||
taosThreadMutexLock(&pInst->qnodeMutex);
|
TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
|
||||||
if (pInst->pQnodeList) {
|
if (pInst->pQnodeList) {
|
||||||
pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
|
pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
|
||||||
|
if (NULL == pQnodeList) {
|
||||||
|
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pInst->qnodeMutex);
|
TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
|
||||||
}
|
}
|
||||||
|
|
||||||
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
|
code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
|
||||||
|
@ -730,6 +720,7 @@ int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
taosArrayDestroy(pDbVgList);
|
taosArrayDestroy(pDbVgList);
|
||||||
taosArrayDestroy(pQnodeList);
|
taosArrayDestroy(pQnodeList);
|
||||||
|
|
||||||
|
@ -759,6 +750,10 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
|
||||||
}
|
}
|
||||||
|
|
||||||
pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
|
pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
|
||||||
|
if (NULL == pDbVgList) {
|
||||||
|
code = terrno;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
SArray* pVgList = NULL;
|
SArray* pVgList = NULL;
|
||||||
for (int32_t i = 0; i < dbNum; ++i) {
|
for (int32_t i = 0; i < dbNum; ++i) {
|
||||||
char* dbFName = taosArrayGet(pRequest->dbList, i);
|
char* dbFName = taosArrayGet(pRequest->dbList, i);
|
||||||
|
@ -767,12 +762,16 @@ int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray*
|
||||||
.requestObjRefId = pRequest->self,
|
.requestObjRefId = pRequest->self,
|
||||||
.mgmtEps = getEpSet_s(&pInst->mgmtEp)};
|
.mgmtEps = getEpSet_s(&pInst->mgmtEp)};
|
||||||
|
|
||||||
|
// catalogGetDBVgList will handle dbFName == null.
|
||||||
code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
|
code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pDbVgList, &pVgList);
|
if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -825,7 +824,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||||
|
|
||||||
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
||||||
memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
|
(void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||||
|
@ -841,7 +840,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
||||||
if (TDMT_VND_SUBMIT == pRequest->type) {
|
if (TDMT_VND_SUBMIT == pRequest->type) {
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
|
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
schedulerFreeJob(&pRequest->body.queryJob, 0);
|
||||||
|
@ -863,7 +862,7 @@ int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
|
SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
|
||||||
if (pTbRsp->pMeta) {
|
if (pTbRsp->pMeta) {
|
||||||
handleCreateTbExecRes(pTbRsp->pMeta, pCatalog);
|
TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -887,8 +886,15 @@ int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog,
|
||||||
|
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
|
STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
|
||||||
|
if (NULL == tbInfo) {
|
||||||
|
code = TSDB_CODE_OUT_OF_RANGE;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
|
STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
|
||||||
taosArrayPush(pArray, &tbSver);
|
if (NULL == taosArrayPush(pArray, &tbSver)) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
|
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
|
||||||
|
@ -939,6 +945,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
||||||
int32_t num = taosArrayGetSize(pList);
|
int32_t num = taosArrayGetSize(pList);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
void* res = taosArrayGetP(pList, i);
|
void* res = taosArrayGetP(pList, i);
|
||||||
|
// handleCreateTbExecRes will handle res == null
|
||||||
code = handleCreateTbExecRes(res, pCatalog);
|
code = handleCreateTbExecRes(res, pCatalog);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -948,7 +955,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_VND_SUBMIT: {
|
case TDMT_VND_SUBMIT: {
|
||||||
atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
|
(void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
|
||||||
|
|
||||||
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
|
code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
|
||||||
break;
|
break;
|
||||||
|
@ -985,7 +992,7 @@ void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
|
||||||
code = qContinuePlanPostQuery(pRequest->pPostPlan);
|
code = qContinuePlanPostQuery(pRequest->pPostPlan);
|
||||||
}
|
}
|
||||||
|
|
||||||
nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
|
code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
|
||||||
handleQueryAnslyseRes(pWrapper, NULL, code);
|
handleQueryAnslyseRes(pWrapper, NULL, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1001,7 +1008,7 @@ void returnToUser(SRequestObj* pRequest) {
|
||||||
pUserReq->code = pRequest->code;
|
pUserReq->code = pRequest->code;
|
||||||
// return to client
|
// return to client
|
||||||
doRequestCallback(pUserReq, pUserReq->code);
|
doRequestCallback(pUserReq, pUserReq->code);
|
||||||
releaseRequest(pRequest->relation.userRefId);
|
(void)releaseRequest(pRequest->relation.userRefId);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
tscError("0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
|
tscError("0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
|
||||||
|
@ -1009,20 +1016,32 @@ void returnToUser(SRequestObj* pRequest) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* createResultBlock(TAOS_RES* pRes, int32_t numOfRows) {
|
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock**pBlock) {
|
||||||
int64_t lastTs = 0;
|
int64_t lastTs = 0;
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
|
TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
|
||||||
int32_t numOfFields = taos_num_fields(pRes);
|
int32_t numOfFields = taos_num_fields(pRes);
|
||||||
|
|
||||||
SSDataBlock* pBlock = createDataBlock();
|
*pBlock = createDataBlock();
|
||||||
|
if (NULL == *pBlock) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfFields; ++i) {
|
for(int32_t i = 0; i < numOfFields; ++i) {
|
||||||
SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
|
SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
|
||||||
blockDataAppendColInfo(pBlock, &colInfoData);
|
code = blockDataAppendColInfo(*pBlock, &colInfoData);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
blockDataDestroy(*pBlock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataEnsureCapacity(pBlock, numOfRows);
|
code = blockDataEnsureCapacity(*pBlock, numOfRows);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
blockDataDestroy(*pBlock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||||
TAOS_ROW pRow = taos_fetch_row(pRes);
|
TAOS_ROW pRow = taos_fetch_row(pRes);
|
||||||
|
@ -1032,18 +1051,22 @@ static SSDataBlock* createResultBlock(TAOS_RES* pRes, int32_t numOfRows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int32_t j = 0; j < numOfFields; ++j) {
|
for(int32_t j = 0; j < numOfFields; ++j) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
|
SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
|
||||||
colDataSetVal(pColInfoData, i, pRow[j], false);
|
code = colDataSetVal(pColInfoData, i, pRow[j], false);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
blockDataDestroy(*pBlock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
|
tscDebug("lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
|
||||||
}
|
}
|
||||||
|
|
||||||
pBlock->info.window.ekey = lastTs;
|
(*pBlock)->info.window.ekey = lastTs;
|
||||||
pBlock->info.rows = numOfRows;
|
(*pBlock)->info.rows = numOfRows;
|
||||||
|
|
||||||
tscDebug("lastKey:%"PRId64" numOfRows:%d from all vgroups", lastTs, numOfRows);
|
tscDebug("lastKey:%"PRId64" numOfRows:%d from all vgroups", lastTs, numOfRows);
|
||||||
return pBlock;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
|
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
|
||||||
|
@ -1053,7 +1076,12 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = createResultBlock(res, rowNum);
|
SSDataBlock* pBlock = NULL;
|
||||||
|
if (TSDB_CODE_SUCCESS != createResultBlock(res, rowNum, &pBlock)) {
|
||||||
|
tscError("0x%" PRIx64 ", create result block failed, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
|
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
|
||||||
if (pNextReq) {
|
if (pNextReq) {
|
||||||
continuePostSubQuery(pNextReq, pBlock);
|
continuePostSubQuery(pNextReq, pBlock);
|
||||||
|
@ -1076,7 +1104,7 @@ void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
|
||||||
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
|
SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
|
||||||
if (pNextReq) {
|
if (pNextReq) {
|
||||||
continuePostSubQuery(pNextReq, NULL);
|
continuePostSubQuery(pNextReq, NULL);
|
||||||
releaseRequest(pRequest->relation.nextRefId);
|
(void)releaseRequest(pRequest->relation.nextRefId);
|
||||||
} else {
|
} else {
|
||||||
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
|
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, reqId:0x%" PRIx64, pRequest->self,
|
||||||
pRequest->relation.nextRefId, pRequest->requestId);
|
pRequest->relation.nextRefId, pRequest->requestId);
|
||||||
|
@ -1092,7 +1120,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
if (pResult) {
|
if (pResult) {
|
||||||
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
|
||||||
memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
|
(void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t type = pRequest->type;
|
int32_t type = pRequest->type;
|
||||||
|
@ -1103,7 +1131,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||||
// record the insert rows
|
// record the insert rows
|
||||||
if (TDMT_VND_SUBMIT == type) {
|
if (TDMT_VND_SUBMIT == type) {
|
||||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
|
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1117,14 +1145,14 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||||
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
|
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self,
|
||||||
tstrerror(code), pRequest->retry, pRequest->requestId);
|
tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
|
(void)removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
|
||||||
restartAsyncQuery(pRequest, code);
|
restartAsyncQuery(pRequest, code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
|
tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
|
||||||
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
|
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
|
||||||
removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
|
(void)removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
|
pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
|
||||||
|
@ -1161,9 +1189,9 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||||
if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
|
if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
|
||||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
|
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
|
||||||
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
|
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
|
||||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
|
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1186,15 +1214,20 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
||||||
break;
|
break;
|
||||||
case QUERY_EXEC_MODE_SCHEDULE: {
|
case QUERY_EXEC_MODE_SCHEDULE: {
|
||||||
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||||
|
if (NULL == pMnodeList) {
|
||||||
|
code = terrno;
|
||||||
|
break;
|
||||||
|
}
|
||||||
SQueryPlan* pDag = NULL;
|
SQueryPlan* pDag = NULL;
|
||||||
code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
|
code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pRequest->body.subplanNum = pDag->numOfSubplans;
|
pRequest->body.subplanNum = pDag->numOfSubplans;
|
||||||
if (!pRequest->validateOnly) {
|
if (!pRequest->validateOnly) {
|
||||||
SArray* pNodeList = NULL;
|
SArray* pNodeList = NULL;
|
||||||
buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
|
code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = scheduleQuery(pRequest, pDag, pNodeList);
|
code = scheduleQuery(pRequest, pDag, pNodeList);
|
||||||
|
}
|
||||||
taosArrayDestroy(pNodeList);
|
taosArrayDestroy(pNodeList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1213,10 +1246,12 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
|
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
|
||||||
removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
|
(void)removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
|
||||||
}
|
}
|
||||||
|
|
||||||
handleQueryExecRsp(pRequest);
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = handleQueryExecRsp(pRequest);
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
pRequest->code = terrno;
|
pRequest->code = terrno;
|
||||||
|
@ -1240,7 +1275,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
||||||
|
|
||||||
if (!pRequest->parseOnly) {
|
if (!pRequest->parseOnly) {
|
||||||
pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
|
||||||
|
if (NULL == pMnodeList) {
|
||||||
|
code = terrno;
|
||||||
|
}
|
||||||
SPlanContext cxt = {.queryId = pRequest->requestId,
|
SPlanContext cxt = {.queryId = pRequest->requestId,
|
||||||
.acctId = pRequest->pTscObj->acctId,
|
.acctId = pRequest->pTscObj->acctId,
|
||||||
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
|
||||||
|
@ -1253,8 +1290,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
||||||
.pUser = pRequest->pTscObj->user,
|
.pUser = pRequest->pTscObj->user,
|
||||||
.sysInfo = pRequest->pTscObj->sysInfo,
|
.sysInfo = pRequest->pTscObj->sysInfo,
|
||||||
.allocatorId = pRequest->allocatorRefId};
|
.allocatorId = pRequest->allocatorRefId};
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
|
code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
|
||||||
|
}
|
||||||
if (code) {
|
if (code) {
|
||||||
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
|
@ -1270,8 +1308,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
||||||
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
|
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
|
||||||
SArray* pNodeList = NULL;
|
SArray* pNodeList = NULL;
|
||||||
if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
|
if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
|
||||||
//TODO(smj) fuckthis
|
code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
||||||
buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
|
SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
|
||||||
|
@ -1293,7 +1330,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
|
||||||
.pExecRes = NULL,
|
.pExecRes = NULL,
|
||||||
.source = pRequest->source,
|
.source = pRequest->source,
|
||||||
};
|
};
|
||||||
code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = schedulerExecJob(&req, &pRequest->body.queryJob);
|
||||||
|
}
|
||||||
taosArrayDestroy(pNodeList);
|
taosArrayDestroy(pNodeList);
|
||||||
} else {
|
} else {
|
||||||
qDestroyQueryPlan(pDag);
|
qDestroyQueryPlan(pDag);
|
||||||
|
@ -1333,9 +1372,9 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
|
||||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||||
if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
|
if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
|
||||||
(0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
|
(0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
|
||||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
|
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
|
||||||
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
|
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
|
||||||
atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
|
(void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1384,6 +1423,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
|
||||||
for (int32_t i = 0; i < dbNum; ++i) {
|
for (int32_t i = 0; i < dbNum; ++i) {
|
||||||
char* dbFName = taosArrayGet(pRequest->dbList, i);
|
char* dbFName = taosArrayGet(pRequest->dbList, i);
|
||||||
|
|
||||||
|
// catalogRefreshDBVgInfo will handle dbFName == null.
|
||||||
code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
|
code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1393,6 +1433,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
|
||||||
for (int32_t i = 0; i < tblNum; ++i) {
|
for (int32_t i = 0; i < tblNum; ++i) {
|
||||||
SName* tableName = taosArrayGet(pRequest->tableList, i);
|
SName* tableName = taosArrayGet(pRequest->tableList, i);
|
||||||
|
|
||||||
|
// catalogRefreshTableMeta will handle tableName == null.
|
||||||
code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
|
code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1414,20 +1455,23 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
SName* pViewName = taosArrayGet(tbList, i);
|
SName* pViewName = taosArrayGet(tbList, i);
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
tNameGetFullDbName(pViewName, dbFName);
|
if (NULL == pViewName) {
|
||||||
catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0);
|
continue;
|
||||||
|
}
|
||||||
|
(void)tNameGetFullDbName(pViewName, dbFName);
|
||||||
|
(void)catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
SName* pTbName = taosArrayGet(tbList, i);
|
SName* pTbName = taosArrayGet(tbList, i);
|
||||||
catalogRemoveTableMeta(pCatalog, pTbName);
|
(void)catalogRemoveTableMeta(pCatalog, pTbName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
|
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
|
||||||
pEpSet->version = 0;
|
pEpSet->version = 0;
|
||||||
|
|
||||||
// init mnode ip set
|
// init mnode ip set
|
||||||
|
@ -1451,7 +1495,7 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
|
||||||
if (code) {
|
if (code) {
|
||||||
tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
|
tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
|
||||||
tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
|
tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
|
||||||
memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
|
(void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
|
||||||
} else {
|
} else {
|
||||||
mgmtEpSet->numOfEps++;
|
mgmtEpSet->numOfEps++;
|
||||||
}
|
}
|
||||||
|
@ -1460,16 +1504,19 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
|
||||||
if (secondEp && secondEp[0] != 0) {
|
if (secondEp && secondEp[0] != 0) {
|
||||||
if (strlen(secondEp) >= TSDB_EP_LEN) {
|
if (strlen(secondEp) >= TSDB_EP_LEN) {
|
||||||
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
||||||
return -1;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
|
int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
uint32_t addr = 0;
|
uint32_t addr = 0;
|
||||||
int32_t code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
|
code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
|
||||||
if (code) {
|
if (code) {
|
||||||
tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
|
tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
|
||||||
tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
|
tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
|
||||||
memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
|
(void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
|
||||||
} else {
|
} else {
|
||||||
mgmtEpSet->numOfEps++;
|
mgmtEpSet->numOfEps++;
|
||||||
}
|
}
|
||||||
|
@ -1503,11 +1550,20 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
|
||||||
pRequest->sqlLen = strlen(pRequest->sqlstr);
|
pRequest->sqlLen = strlen(pRequest->sqlstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
SMsgSendInfo* body = buildConnectMsg(pRequest);
|
SMsgSendInfo* body = NULL;
|
||||||
|
code = buildConnectMsg(pRequest, &body);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
destroyTscObj(*pTscObj);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
destroyTscObj(*pTscObj);
|
||||||
|
tscError("failed to send connect msg to server, code:%s", tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
(void)tsem_wait(&pRequest->body.rspSem);
|
(void)tsem_wait(&pRequest->body.rspSem);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
|
const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
|
||||||
|
@ -1525,21 +1581,24 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
|
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
|
||||||
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
*pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
if (pMsgSendInfo == NULL) {
|
if (*pMsgSendInfo == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pMsgSendInfo->msgType = TDMT_MND_CONNECT;
|
(*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
|
||||||
|
|
||||||
pMsgSendInfo->requestObjRefId = pRequest->self;
|
(*pMsgSendInfo)->requestObjRefId = pRequest->self;
|
||||||
pMsgSendInfo->requestId = pRequest->requestId;
|
(*pMsgSendInfo)->requestId = pRequest->requestId;
|
||||||
pMsgSendInfo->fp = getMsgRspHandle(pMsgSendInfo->msgType);
|
(*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
|
||||||
pMsgSendInfo->param = taosMemoryCalloc(1, sizeof(pRequest->self));
|
(*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
|
||||||
|
if (NULL == (*pMsgSendInfo)->param) {
|
||||||
|
taosMemoryFree(*pMsgSendInfo);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
*(int64_t*)pMsgSendInfo->param = pRequest->self;
|
*(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
|
||||||
|
|
||||||
SConnectReq connectReq = {0};
|
SConnectReq connectReq = {0};
|
||||||
STscObj* pObj = pRequest->pTscObj;
|
STscObj* pObj = pRequest->pTscObj;
|
||||||
|
@ -1561,11 +1620,20 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
|
||||||
|
|
||||||
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
|
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
|
||||||
void* pReq = taosMemoryMalloc(contLen);
|
void* pReq = taosMemoryMalloc(contLen);
|
||||||
tSerializeSConnectReq(pReq, contLen, &connectReq);
|
if (NULL == pReq) {
|
||||||
|
taosMemoryFree(*pMsgSendInfo);
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
pMsgSendInfo->msgInfo.len = contLen;
|
if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
|
||||||
pMsgSendInfo->msgInfo.pData = pReq;
|
taosMemoryFree(*pMsgSendInfo);
|
||||||
return pMsgSendInfo;
|
taosMemoryFree(pReq);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
(*pMsgSendInfo)->msgInfo.len = contLen;
|
||||||
|
(*pMsgSendInfo)->msgInfo.pData = pReq;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
|
@ -1603,7 +1671,12 @@ void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
|
code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscError("fail to update catalog vg epset, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
|
||||||
|
tstrerror(code));
|
||||||
|
return;
|
||||||
|
}
|
||||||
taosMemoryFreeClear(pSendInfo->target.dbFName);
|
taosMemoryFreeClear(pSendInfo->target.dbFName);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1642,7 +1715,7 @@ int32_t doProcessMsgFromServer(void* param) {
|
||||||
tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64,
|
tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64,
|
||||||
pRequest->self, pSendInfo->requestObjRefId);
|
pRequest->self, pSendInfo->requestObjRefId);
|
||||||
|
|
||||||
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
(void)taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
||||||
taosMemoryFree(arg->pEpset);
|
taosMemoryFree(arg->pEpset);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
destroySendMsgInfo(pSendInfo);
|
destroySendMsgInfo(pSendInfo);
|
||||||
|
@ -1668,14 +1741,14 @@ int32_t doProcessMsgFromServer(void* param) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
} else {
|
} else {
|
||||||
memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
|
(void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
|
(void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
|
||||||
|
|
||||||
if (pTscObj) {
|
if (pTscObj) {
|
||||||
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
(void)taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
@ -1689,7 +1762,13 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
SEpSet* tEpSet = NULL;
|
SEpSet* tEpSet = NULL;
|
||||||
if (pEpSet != NULL) {
|
if (pEpSet != NULL) {
|
||||||
tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
|
tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
|
||||||
memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
|
if (NULL == tEpSet) {
|
||||||
|
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
destroySendMsgInfo(pMsg->info.ahandle);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
(void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
|
||||||
}
|
}
|
||||||
|
|
||||||
// pMsg is response msg
|
// pMsg is response msg
|
||||||
|
@ -1708,6 +1787,12 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
|
AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
|
||||||
|
if (NULL == arg) {
|
||||||
|
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
destroySendMsgInfo(pMsg->info.ahandle);
|
||||||
|
return;
|
||||||
|
}
|
||||||
arg->msg = *pMsg;
|
arg->msg = *pMsg;
|
||||||
arg->pEpset = tEpSet;
|
arg->pEpset = tEpSet;
|
||||||
|
|
||||||
|
@ -1735,6 +1820,9 @@ TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, cons
|
||||||
int32_t code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
|
int32_t code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
|
int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
|
||||||
|
if (NULL == rid) {
|
||||||
|
tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
|
||||||
|
}
|
||||||
*rid = pObj->id;
|
*rid = pObj->id;
|
||||||
return (TAOS*)rid;
|
return (TAOS*)rid;
|
||||||
}
|
}
|
||||||
|
@ -1749,10 +1837,10 @@ TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, c
|
||||||
char userStr[TSDB_USER_LEN] = {0};
|
char userStr[TSDB_USER_LEN] = {0};
|
||||||
char passStr[TSDB_PASSWORD_LEN] = {0};
|
char passStr[TSDB_PASSWORD_LEN] = {0};
|
||||||
|
|
||||||
strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
|
(void)strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
|
||||||
strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
|
(void)strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
|
||||||
strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
|
(void)strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
|
||||||
strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
|
(void)strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
|
||||||
return taos_connect(ipStr, userStr, passStr, dbStr, port);
|
return taos_connect(ipStr, userStr, passStr, dbStr, port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1819,7 +1907,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
|
||||||
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||||
atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
(void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||||
|
|
||||||
if (pResultInfo->numOfRows == 0) {
|
if (pResultInfo->numOfRows == 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1836,7 +1924,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
|
||||||
|
|
||||||
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
|
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
|
||||||
tsem_t* sem = param;
|
tsem_t* sem = param;
|
||||||
tsem_post(sem);
|
(void)tsem_post(sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
||||||
|
@ -1855,10 +1943,10 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
|
||||||
// convert ucs4 to native multi-bytes string
|
// convert ucs4 to native multi-bytes string
|
||||||
pResultInfo->convertUcs4 = convertUcs4;
|
pResultInfo->convertUcs4 = convertUcs4;
|
||||||
tsem_t sem;
|
tsem_t sem;
|
||||||
tsem_init(&sem, 0, 0);
|
(void)tsem_init(&sem, 0, 0);
|
||||||
taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
|
taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
|
||||||
tsem_wait(&sem);
|
(void)tsem_wait(&sem);
|
||||||
tsem_destroy(&sem);
|
(void)tsem_destroy(&sem);
|
||||||
pRequest->inCallback = false;
|
pRequest->inCallback = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2041,7 +2129,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = getVersion1BlockMetaSize(p, numOfCols);
|
int32_t len = getVersion1BlockMetaSize(p, numOfCols);
|
||||||
memcpy(p1, p, len);
|
(void)memcpy(p1, p, len);
|
||||||
|
|
||||||
p += len;
|
p += len;
|
||||||
p1 += len;
|
p1 += len;
|
||||||
|
@ -2050,7 +2138,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
||||||
len = sizeof(int32_t) * numOfCols;
|
len = sizeof(int32_t) * numOfCols;
|
||||||
int32_t* colLength = (int32_t*)p;
|
int32_t* colLength = (int32_t*)p;
|
||||||
int32_t* colLength1 = (int32_t*)p1;
|
int32_t* colLength1 = (int32_t*)p1;
|
||||||
memcpy(p1, p, len);
|
(void)memcpy(p1, p, len);
|
||||||
p += len;
|
p += len;
|
||||||
p1 += len;
|
p1 += len;
|
||||||
totalLen += len;
|
totalLen += len;
|
||||||
|
@ -2068,7 +2156,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
||||||
int32_t* offset = (int32_t*)pStart;
|
int32_t* offset = (int32_t*)pStart;
|
||||||
int32_t* offset1 = (int32_t*)pStart1;
|
int32_t* offset1 = (int32_t*)pStart1;
|
||||||
len = numOfRows * sizeof(int32_t);
|
len = numOfRows * sizeof(int32_t);
|
||||||
memcpy(pStart1, pStart, len);
|
(void)memcpy(pStart1, pStart, len);
|
||||||
pStart += len;
|
pStart += len;
|
||||||
pStart1 += len;
|
pStart1 += len;
|
||||||
totalLen += len;
|
totalLen += len;
|
||||||
|
@ -2084,7 +2172,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
||||||
char* jsonInnerData = data + CHAR_BYTES;
|
char* jsonInnerData = data + CHAR_BYTES;
|
||||||
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
|
char dst[TSDB_MAX_JSON_TAG_LEN] = {0};
|
||||||
if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
|
if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
|
||||||
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
|
(void)sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
|
||||||
varDataSetLen(dst, strlen(varDataVal(dst)));
|
varDataSetLen(dst, strlen(varDataVal(dst)));
|
||||||
} else if (tTagIsJson(data)) {
|
} else if (tTagIsJson(data)) {
|
||||||
char* jsonString = NULL;
|
char* jsonString = NULL;
|
||||||
|
@ -2103,10 +2191,10 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
||||||
*(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
|
*(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
|
||||||
} else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
|
} else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
double jsonVd = *(double*)(jsonInnerData);
|
double jsonVd = *(double*)(jsonInnerData);
|
||||||
sprintf(varDataVal(dst), "%.9lf", jsonVd);
|
(void)sprintf(varDataVal(dst), "%.9lf", jsonVd);
|
||||||
varDataSetLen(dst, strlen(varDataVal(dst)));
|
varDataSetLen(dst, strlen(varDataVal(dst)));
|
||||||
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
|
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
|
||||||
sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
|
(void)sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
|
||||||
varDataSetLen(dst, strlen(varDataVal(dst)));
|
varDataSetLen(dst, strlen(varDataVal(dst)));
|
||||||
} else {
|
} else {
|
||||||
tscError("doConvertJson error: invalid type:%d", jsonInnerType);
|
tscError("doConvertJson error: invalid type:%d", jsonInnerType);
|
||||||
|
@ -2114,7 +2202,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
||||||
}
|
}
|
||||||
|
|
||||||
offset1[j] = len;
|
offset1[j] = len;
|
||||||
memcpy(pStart1 + len, dst, varDataTLen(dst));
|
(void)memcpy(pStart1 + len, dst, varDataTLen(dst));
|
||||||
len += varDataTLen(dst);
|
len += varDataTLen(dst);
|
||||||
}
|
}
|
||||||
colLen1 = len;
|
colLen1 = len;
|
||||||
|
@ -2122,20 +2210,20 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
||||||
colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
|
colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
|
||||||
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||||
len = numOfRows * sizeof(int32_t);
|
len = numOfRows * sizeof(int32_t);
|
||||||
memcpy(pStart1, pStart, len);
|
(void)memcpy(pStart1, pStart, len);
|
||||||
pStart += len;
|
pStart += len;
|
||||||
pStart1 += len;
|
pStart1 += len;
|
||||||
totalLen += len;
|
totalLen += len;
|
||||||
totalLen += colLen;
|
totalLen += colLen;
|
||||||
memcpy(pStart1, pStart, colLen);
|
(void)memcpy(pStart1, pStart, colLen);
|
||||||
} else {
|
} else {
|
||||||
len = BitmapLen(pResultInfo->numOfRows);
|
len = BitmapLen(pResultInfo->numOfRows);
|
||||||
memcpy(pStart1, pStart, len);
|
(void)memcpy(pStart1, pStart, len);
|
||||||
pStart += len;
|
pStart += len;
|
||||||
pStart1 += len;
|
pStart1 += len;
|
||||||
totalLen += len;
|
totalLen += len;
|
||||||
totalLen += colLen;
|
totalLen += colLen;
|
||||||
memcpy(pStart1, pStart, colLen);
|
(void)memcpy(pStart1, pStart, colLen);
|
||||||
}
|
}
|
||||||
pStart += colLen;
|
pStart += colLen;
|
||||||
pStart1 += colLen1;
|
pStart1 += colLen1;
|
||||||
|
@ -2244,13 +2332,13 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
|
||||||
|
|
||||||
char* getDbOfConnection(STscObj* pObj) {
|
char* getDbOfConnection(STscObj* pObj) {
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
taosThreadMutexLock(&pObj->mutex);
|
(void)taosThreadMutexLock(&pObj->mutex);
|
||||||
size_t len = strlen(pObj->db);
|
size_t len = strlen(pObj->db);
|
||||||
if (len > 0) {
|
if (len > 0) {
|
||||||
p = strndup(pObj->db, tListLen(pObj->db));
|
p = strndup(pObj->db, tListLen(pObj->db));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
(void)taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2260,9 +2348,9 @@ void setConnectionDB(STscObj* pTscObj, const char* db) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pTscObj->mutex);
|
(void)taosThreadMutexLock(&pTscObj->mutex);
|
||||||
tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
|
tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
|
||||||
taosThreadMutexUnlock(&pTscObj->mutex);
|
(void)taosThreadMutexUnlock(&pTscObj->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetConnectDB(STscObj* pTscObj) {
|
void resetConnectDB(STscObj* pTscObj) {
|
||||||
|
@ -2270,9 +2358,9 @@ void resetConnectDB(STscObj* pTscObj) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pTscObj->mutex);
|
(void)taosThreadMutexLock(&pTscObj->mutex);
|
||||||
pTscObj->db[0] = 0;
|
pTscObj->db[0] = 0;
|
||||||
taosThreadMutexUnlock(&pTscObj->mutex);
|
(void)taosThreadMutexUnlock(&pTscObj->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
|
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
|
||||||
|
@ -2294,6 +2382,11 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
|
||||||
if (pRsp->compressed) {
|
if (pRsp->compressed) {
|
||||||
if (pResultInfo->decompBuf == NULL) {
|
if (pResultInfo->decompBuf == NULL) {
|
||||||
pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
|
pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
|
||||||
|
if (pResultInfo->decompBuf == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
pResultInfo->decompBufSize = payloadLen;
|
pResultInfo->decompBufSize = payloadLen;
|
||||||
} else {
|
} else {
|
||||||
if (pResultInfo->decompBufSize < payloadLen) {
|
if (pResultInfo->decompBufSize < payloadLen) {
|
||||||
|
@ -2319,6 +2412,10 @@ int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableR
|
||||||
if (pRsp->compressed && compLen < rawLen) {
|
if (pRsp->compressed && compLen < rawLen) {
|
||||||
int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
|
int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
|
||||||
ASSERT(len == rawLen);
|
ASSERT(len == rawLen);
|
||||||
|
if (len < 0) {
|
||||||
|
tscError("tsDecompressString failed");
|
||||||
|
return terrno ? terrno : TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
pResultInfo->pData = pResultInfo->decompBuf;
|
pResultInfo->pData = pResultInfo->decompBuf;
|
||||||
pResultInfo->payloadLen = rawLen;
|
pResultInfo->payloadLen = rawLen;
|
||||||
|
@ -2361,7 +2458,10 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
|
||||||
connLimitNum = TMIN(connLimitNum, 500);
|
connLimitNum = TMIN(connLimitNum, 500);
|
||||||
rpcInit.connLimitNum = connLimitNum;
|
rpcInit.connLimitNum = connLimitNum;
|
||||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
if (TSDB_CODE_SUCCESS != taosVersionStrToInt(version, &(rpcInit.compatibilityVer))) {
|
||||||
|
tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
clientRpc = rpcOpen(&rpcInit);
|
clientRpc = rpcOpen(&rpcInit);
|
||||||
if (clientRpc == NULL) {
|
if (clientRpc == NULL) {
|
||||||
|
@ -2379,7 +2479,11 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
|
||||||
|
|
||||||
tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
|
tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
|
||||||
epSet.eps[0].port = (uint16_t)port;
|
epSet.eps[0].port = (uint16_t)port;
|
||||||
rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
|
int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
|
||||||
|
if (TSDB_CODE_SUCCESS != ret) {
|
||||||
|
tscError("failed to send recv since %s", tstrerror(ret));
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
|
if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
|
||||||
tscError("failed to send server status req since %s", terrstr());
|
tscError("failed to send server status req since %s", terrstr());
|
||||||
|
@ -2446,13 +2550,20 @@ int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2,
|
||||||
|
|
||||||
STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
|
STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
|
||||||
if (pDb) {
|
if (pDb) {
|
||||||
taosArrayPush(pDb->pTables, &name);
|
if (NULL == taosArrayPush(pDb->pTables, &name)) {
|
||||||
|
return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
STablesReq db;
|
STablesReq db;
|
||||||
db.pTables = taosArrayInit(20, sizeof(SName));
|
db.pTables = taosArrayInit(20, sizeof(SName));
|
||||||
strcpy(db.dbFName, dbFName);
|
if (NULL == db.pTables) {
|
||||||
taosArrayPush(db.pTables, &name);
|
return terrno;
|
||||||
taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db));
|
}
|
||||||
|
(void)strcpy(db.dbFName, dbFName);
|
||||||
|
if (NULL == taosArrayPush(db.pTables, &name)) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2461,7 +2572,6 @@ int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2,
|
||||||
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
|
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
|
||||||
SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
if (NULL == pHash) {
|
if (NULL == pHash) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2473,8 +2583,8 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
|
||||||
int32_t vPos[2];
|
int32_t vPos[2];
|
||||||
int32_t vLen[2];
|
int32_t vLen[2];
|
||||||
|
|
||||||
memset(vPos, -1, sizeof(vPos));
|
(void)memset(vPos, -1, sizeof(vPos));
|
||||||
memset(vLen, 0, sizeof(vLen));
|
(void)memset(vLen, 0, sizeof(vLen));
|
||||||
|
|
||||||
for (int32_t i = 0;; ++i) {
|
for (int32_t i = 0;; ++i) {
|
||||||
if (0 == *(tbList + i)) {
|
if (0 == *(tbList + i)) {
|
||||||
|
@ -2537,8 +2647,8 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(vPos, -1, sizeof(vPos));
|
(void)memset(vPos, -1, sizeof(vPos));
|
||||||
memset(vLen, 0, sizeof(vLen));
|
(void)memset(vLen, 0, sizeof(vLen));
|
||||||
vIdx = 0;
|
vIdx = 0;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2566,10 +2676,15 @@ int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName,
|
||||||
|
|
||||||
int32_t dbNum = taosHashGetSize(pHash);
|
int32_t dbNum = taosHashGetSize(pHash);
|
||||||
*pReq = taosArrayInit(dbNum, sizeof(STablesReq));
|
*pReq = taosArrayInit(dbNum, sizeof(STablesReq));
|
||||||
|
if (NULL == pReq) {
|
||||||
|
TSC_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
pIter = taosHashIterate(pHash, NULL);
|
pIter = taosHashIterate(pHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
STablesReq* pDb = (STablesReq*)pIter;
|
STablesReq* pDb = (STablesReq*)pIter;
|
||||||
taosArrayPush(*pReq, pDb);
|
if (NULL == taosArrayPush(*pReq, pDb)) {
|
||||||
|
TSC_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
pIter = taosHashIterate(pHash, pIter);
|
pIter = taosHashIterate(pHash, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2597,7 +2712,7 @@ void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
|
||||||
SSyncQueryParam* pParam = param;
|
SSyncQueryParam* pParam = param;
|
||||||
pParam->pRequest->code = code;
|
pParam->pRequest->code = code;
|
||||||
|
|
||||||
tsem_post(&pParam->sem);
|
(void)tsem_post(&pParam->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncQueryFn(void* param, void* res, int32_t code) {
|
void syncQueryFn(void* param, void* res, int32_t code) {
|
||||||
|
@ -2608,7 +2723,7 @@ void syncQueryFn(void* param, void* res, int32_t code) {
|
||||||
pParam->pRequest->code = code;
|
pParam->pRequest->code = code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsem_post(&pParam->sem);
|
(void)tsem_post(&pParam->sem);
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source) {
|
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly, int8_t source) {
|
||||||
|
@ -2685,10 +2800,10 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t s
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
tsem_init(¶m->sem, 0, 0);
|
(void)tsem_init(¶m->sem, 0, 0);
|
||||||
|
|
||||||
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
|
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
|
||||||
tsem_wait(¶m->sem);
|
(void)tsem_wait(¶m->sem);
|
||||||
|
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
if (param->pRequest != NULL) {
|
if (param->pRequest != NULL) {
|
||||||
|
@ -2714,10 +2829,10 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
tsem_init(¶m->sem, 0, 0);
|
(void)tsem_init(¶m->sem, 0, 0);
|
||||||
|
|
||||||
taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
|
taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
|
||||||
tsem_wait(¶m->sem);
|
(void)tsem_wait(¶m->sem);
|
||||||
SRequestObj* pRequest = NULL;
|
SRequestObj* pRequest = NULL;
|
||||||
if (param->pRequest != NULL) {
|
if (param->pRequest != NULL) {
|
||||||
param->pRequest->syncQuery = true;
|
param->pRequest->syncQuery = true;
|
||||||
|
@ -2764,7 +2879,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
|
||||||
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
|
||||||
atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
(void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
|
pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
|
||||||
|
@ -2807,7 +2922,9 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
|
||||||
.cbParam = pRequest,
|
.cbParam = pRequest,
|
||||||
};
|
};
|
||||||
|
|
||||||
schedulerFetchRows(pRequest->body.queryJob, &req);
|
if (TSDB_CODE_SUCCESS != schedulerFetchRows(pRequest->body.queryJob, &req)) {
|
||||||
|
tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->self);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
|
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
|
||||||
|
|
|
@ -1231,7 +1231,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_schemalessInsert
|
||||||
}
|
}
|
||||||
|
|
||||||
int numLines = (*env)->GetArrayLength(env, lines);
|
int numLines = (*env)->GetArrayLength(env, lines);
|
||||||
char **c_lines = C(numLines, sizeof(char *));
|
char **c_lines = taosMemoryCalloc(numLines, sizeof(char *));
|
||||||
if (c_lines == NULL) {
|
if (c_lines == NULL) {
|
||||||
jniError("c_lines:%p, alloc memory failed", c_lines);
|
jniError("c_lines:%p, alloc memory failed", c_lines);
|
||||||
return JNI_OUT_OF_MEMORY;
|
return JNI_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -119,6 +119,10 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
|
||||||
int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj);
|
int32_t code = taos_connect_internal(ip, user, pass, NULL, db, port, CONN_TYPE__QUERY, &pObj);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
|
int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
|
||||||
|
if (NULL == rid) {
|
||||||
|
tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
*rid = pObj->id;
|
*rid = pObj->id;
|
||||||
return (TAOS *)rid;
|
return (TAOS *)rid;
|
||||||
}
|
}
|
||||||
|
@ -1247,7 +1251,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
|
||||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
refreshMeta(pRequest->pTscObj, pRequest);
|
(void)refreshMeta(pRequest->pTscObj, pRequest); //ignore return code,try again
|
||||||
pRequest->prevCode = code;
|
pRequest->prevCode = code;
|
||||||
doAsyncQuery(pRequest, true);
|
doAsyncQuery(pRequest, true);
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Reference in New Issue