Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30837
This commit is contained in:
commit
f5b6e800f3
|
@ -36,6 +36,7 @@ typedef struct SFuncExecEnv {
|
||||||
} SFuncExecEnv;
|
} SFuncExecEnv;
|
||||||
|
|
||||||
typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
|
typedef bool (*FExecGetEnv)(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
|
||||||
|
typedef void (*FExecCleanUp)(struct SqlFunctionCtx* pCtx);
|
||||||
typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
|
typedef int32_t (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo);
|
||||||
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
|
||||||
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
|
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
|
||||||
|
@ -54,6 +55,7 @@ typedef struct SFuncExecFuncs {
|
||||||
FExecProcess process;
|
FExecProcess process;
|
||||||
FExecFinalize finalize;
|
FExecFinalize finalize;
|
||||||
FExecCombine combine;
|
FExecCombine combine;
|
||||||
|
FExecCleanUp cleanup;
|
||||||
processFuncByRow processFuncByRow;
|
processFuncByRow processFuncByRow;
|
||||||
} SFuncExecFuncs;
|
} SFuncExecFuncs;
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,10 @@
|
||||||
#include "catalog.h"
|
#include "catalog.h"
|
||||||
#include "clientInt.h"
|
#include "clientInt.h"
|
||||||
#include "clientLog.h"
|
#include "clientLog.h"
|
||||||
#include "scheduler.h"
|
|
||||||
#include "trpc.h"
|
|
||||||
#include "tglobal.h"
|
|
||||||
#include "clientMonitor.h"
|
#include "clientMonitor.h"
|
||||||
|
#include "scheduler.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
union {
|
union {
|
||||||
|
@ -244,11 +244,9 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSC_ERR_JRET(catalogUpdateDBVgInfo(pCatalog,
|
TSC_ERR_JRET(catalogUpdateDBVgInfo(
|
||||||
(rsp->useDbRsp->db[0] == 'i') ?
|
pCatalog, (rsp->useDbRsp->db[0] == 'i') ? TSDB_PERFORMANCE_SCHEMA_DB : TSDB_INFORMATION_SCHEMA_DB,
|
||||||
TSDB_PERFORMANCE_SCHEMA_DB :
|
rsp->useDbRsp->uid, vgInfo));
|
||||||
TSDB_INFORMATION_SCHEMA_DB,
|
|
||||||
rsp->useDbRsp->uid, vgInfo));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -556,7 +554,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
taosHashRelease(pAppHbMgr->activeInfo, pReq);
|
taosHashRelease(pAppHbMgr->activeInfo, pReq);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -609,8 +606,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pInst->monitorParas = pRsp.monitorParas;
|
pInst->monitorParas = pRsp.monitorParas;
|
||||||
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
|
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId,
|
||||||
pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
|
pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
|
||||||
|
|
||||||
if (rspNum) {
|
if (rspNum) {
|
||||||
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
|
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum,
|
||||||
|
@ -1108,7 +1105,8 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
|
||||||
if (clientHbMgr.appHbHash) {
|
if (clientHbMgr.appHbHash) {
|
||||||
code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
|
code = taosHashPut(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(uint64_t), NULL, 0);
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId, tstrerror(code));
|
tscWarn("hbQueryHbReqHandle put clusterId failed, clusterId:%" PRIx64 ", error:%s", hbParam->clusterId,
|
||||||
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1261,7 +1259,7 @@ int32_t hbGatherAppInfo(void) {
|
||||||
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
SAppHbMgr *pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
||||||
if (pAppHbMgr == NULL) continue;
|
if (pAppHbMgr == NULL) continue;
|
||||||
|
|
||||||
int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
|
int64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
|
||||||
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
|
SAppHbReq *pApp = taosHashGet(clientHbMgr.appSummary, &clusterId, sizeof(clusterId));
|
||||||
if (NULL == pApp) {
|
if (NULL == pApp) {
|
||||||
(void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
|
(void)memcpy(&req.summary, &pAppHbMgr->pAppInstInfo->summary, sizeof(req.summary));
|
||||||
|
@ -1303,8 +1301,7 @@ static void *hbThreadFunc(void *param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (sz > 1 && !clientHbMgr.appHbHash) {
|
if (sz > 1 && !clientHbMgr.appHbHash) {
|
||||||
clientHbMgr.appHbHash =
|
clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
|
||||||
taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
|
|
||||||
if (NULL == clientHbMgr.appHbHash) {
|
if (NULL == clientHbMgr.appHbHash) {
|
||||||
tscError("taosHashInit failed");
|
tscError("taosHashInit failed");
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -1324,13 +1321,13 @@ static void *hbThreadFunc(void *param) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SClientHbBatchReq *pReq = NULL;
|
SClientHbBatchReq *pReq = NULL;
|
||||||
int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq);
|
int32_t code = hbGatherAllInfo(pAppHbMgr, &pReq);
|
||||||
if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
|
if (TSDB_CODE_SUCCESS != code || taosArrayGetP(clientHbMgr.appHbMgrs, i) == NULL) {
|
||||||
terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
|
terrno = code ? code : TSDB_CODE_OUT_OF_RANGE;
|
||||||
tFreeClientHbBatchReq(pReq);
|
tFreeClientHbBatchReq(pReq);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
|
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
|
||||||
if (tlen == -1) {
|
if (tlen == -1) {
|
||||||
tFreeClientHbBatchReq(pReq);
|
tFreeClientHbBatchReq(pReq);
|
||||||
break;
|
break;
|
||||||
|
@ -1368,9 +1365,8 @@ static void *hbThreadFunc(void *param) {
|
||||||
pInfo->requestObjRefId = 0;
|
pInfo->requestObjRefId = 0;
|
||||||
|
|
||||||
SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
|
SAppInstInfo *pAppInstInfo = pAppHbMgr->pAppInstInfo;
|
||||||
int64_t transporterId = 0;
|
|
||||||
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
|
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
|
||||||
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo)) {
|
if (TSDB_CODE_SUCCESS != asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, NULL, pInfo)) {
|
||||||
tscWarn("failed to async send msg to server");
|
tscWarn("failed to async send msg to server");
|
||||||
}
|
}
|
||||||
tFreeClientHbBatchReq(pReq);
|
tFreeClientHbBatchReq(pReq);
|
||||||
|
@ -1389,7 +1385,7 @@ static void *hbThreadFunc(void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t hbCreateThread() {
|
static int32_t hbCreateThread() {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
TdThreadAttr thAttr;
|
TdThreadAttr thAttr;
|
||||||
TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
|
TSC_ERR_JRET(taosThreadAttrInit(&thAttr));
|
||||||
TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
|
TSC_ERR_JRET(taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE));
|
||||||
|
@ -1467,9 +1463,9 @@ int32_t appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key, SAppHbMgr **pAppHbMg
|
||||||
|
|
||||||
TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
|
TSC_ERR_JRET(taosThreadMutexLock(&clientHbMgr.lock));
|
||||||
if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
|
if (taosArrayPush(clientHbMgr.appHbMgrs, &(*pAppHbMgr)) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
|
(void)taosThreadMutexUnlock(&clientHbMgr.lock);
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
(*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
|
(*pAppHbMgr)->idx = taosArrayGetSize(clientHbMgr.appHbMgrs) - 1;
|
||||||
TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));
|
TSC_ERR_JRET(taosThreadMutexUnlock(&clientHbMgr.lock));
|
||||||
|
|
|
@ -1567,9 +1567,8 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
// int64_t transporterId = 0;
|
||||||
code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, &transporterId,
|
code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body);
|
||||||
body);
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
destroyTscObj(*pTscObj);
|
destroyTscObj(*pTscObj);
|
||||||
tscError("failed to send connect msg to server, code:%s", tstrerror(code));
|
tscError("failed to send connect msg to server, code:%s", tstrerror(code));
|
||||||
|
|
|
@ -530,8 +530,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob,
|
||||||
pMsgSendInfo->msgInfo.handle = NULL;
|
pMsgSendInfo->msgInfo.handle = NULL;
|
||||||
pMsgSendInfo->msgType = msgType;
|
pMsgSendInfo->msgType = msgType;
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, NULL, pMsgSendInfo);
|
||||||
code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
|
|
||||||
pMsgSendInfo = NULL;
|
pMsgSendInfo = NULL;
|
||||||
if (code) {
|
if (code) {
|
||||||
ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
|
ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
|
||||||
|
|
|
@ -326,6 +326,9 @@ int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (pCtx[k].fpSet.cleanup != NULL) {
|
||||||
|
pCtx[k].fpSet.cleanup(&pCtx[k]);
|
||||||
|
}
|
||||||
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -640,6 +643,9 @@ void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pC
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (pCtx[k].fpSet.cleanup != NULL) {
|
||||||
|
pCtx[k].fpSet.cleanup(&pCtx[k]);
|
||||||
|
}
|
||||||
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
|
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
|
||||||
taskInfo->code = code;
|
taskInfo->code = code;
|
||||||
T_LONG_JMP(taskInfo->env, code);
|
T_LONG_JMP(taskInfo->env, code);
|
||||||
|
|
|
@ -1919,10 +1919,10 @@ _return:
|
||||||
if (pInfo != NULL) {
|
if (pInfo != NULL) {
|
||||||
destroyMergeJoinOperator(pInfo);
|
destroyMergeJoinOperator(pInfo);
|
||||||
}
|
}
|
||||||
|
destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum);
|
||||||
if (newDownstreams) {
|
if (newDownstreams) {
|
||||||
taosMemoryFree(pDownstream);
|
taosMemoryFree(pDownstream);
|
||||||
}
|
}
|
||||||
destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum);
|
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -1057,6 +1057,9 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
|
|
||||||
code = pfCtx->fpSet.process(pfCtx);
|
code = pfCtx->fpSet.process(pfCtx);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
if (pCtx[k].fpSet.cleanup != NULL) {
|
||||||
|
pCtx[k].fpSet.cleanup(&pCtx[k]);
|
||||||
|
}
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -40,6 +40,7 @@ typedef struct SBuiltinFuncDefinition {
|
||||||
FExecProcess processFunc;
|
FExecProcess processFunc;
|
||||||
FScalarExecProcess sprocessFunc;
|
FScalarExecProcess sprocessFunc;
|
||||||
FExecFinalize finalizeFunc;
|
FExecFinalize finalizeFunc;
|
||||||
|
FExecCleanUp cleanupFunc;
|
||||||
#ifdef BUILD_NO_CALL
|
#ifdef BUILD_NO_CALL
|
||||||
FExecProcess invertFunc;
|
FExecProcess invertFunc;
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -239,6 +239,7 @@ bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
int32_t modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t modeFunction(SqlFunctionCtx* pCtx);
|
int32_t modeFunction(SqlFunctionCtx* pCtx);
|
||||||
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
void modeFunctionCleanupExt(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
int32_t twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
|
|
|
@ -3658,6 +3658,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.processFunc = modeFunction,
|
.processFunc = modeFunction,
|
||||||
.sprocessFunc = modeScalarFunction,
|
.sprocessFunc = modeScalarFunction,
|
||||||
.finalizeFunc = modeFinalize,
|
.finalizeFunc = modeFinalize,
|
||||||
|
.cleanupFunc = modeFunctionCleanupExt
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "abs",
|
.name = "abs",
|
||||||
|
|
|
@ -6019,6 +6019,12 @@ static void modeFunctionCleanup(SModeInfo * pInfo) {
|
||||||
taosMemoryFreeClear(pInfo->buf);
|
taosMemoryFreeClear(pInfo->buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void modeFunctionCleanupExt(SqlFunctionCtx* pCtx) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
modeFunctionCleanup(pInfo);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) {
|
static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) {
|
||||||
if (IS_VAR_DATA_TYPE(pInfo->colType)) {
|
if (IS_VAR_DATA_TYPE(pInfo->colType)) {
|
||||||
(void)memcpy(pInfo->buf, data, varDataTLen(data));
|
(void)memcpy(pInfo->buf, data, varDataTLen(data));
|
||||||
|
|
|
@ -142,6 +142,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
|
||||||
pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
|
pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
|
||||||
pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
|
pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
|
||||||
pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow;
|
pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow;
|
||||||
|
pFpSet->cleanup = funcMgtBuiltins[funcId].cleanupFunc;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -586,7 +586,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
|
||||||
}
|
}
|
||||||
memcpy(*pDst, pSrc, sizeof(*pSrc));
|
memcpy(*pDst, pSrc, sizeof(*pSrc));
|
||||||
(*pDst)->vgArray = NULL;
|
(*pDst)->vgArray = NULL;
|
||||||
|
|
||||||
if (pSrc->vgHash) {
|
if (pSrc->vgHash) {
|
||||||
(*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
|
(*pDst)->vgHash = taosHashInit(taosHashGetSize(pSrc->vgHash), taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true,
|
||||||
HASH_ENTRY_LOCK);
|
HASH_ENTRY_LOCK);
|
||||||
|
|
|
@ -982,6 +982,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
||||||
SCH_ERR_JRET(code);
|
SCH_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
trans->pHandle = (void *)refId;
|
trans->pHandle = (void *)refId;
|
||||||
|
pMsgSendInfo->msgInfo.handle =trans->pHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pJob && pTask) {
|
if (pJob && pTask) {
|
||||||
|
@ -996,8 +997,7 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
||||||
pTask->lastMsgType = msgType;
|
pTask->lastMsgType = msgType;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
code = asyncSendMsgToServerExt(trans->pTrans, epSet, NULL, pMsgSendInfo, persistHandle, ctx);
|
||||||
code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx);
|
|
||||||
pMsgSendInfo = NULL;
|
pMsgSendInfo = NULL;
|
||||||
if (code) {
|
if (code) {
|
||||||
SCH_ERR_JRET(code);
|
SCH_ERR_JRET(code);
|
||||||
|
|
|
@ -1496,9 +1496,10 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
|
||||||
int32_t code = taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem));
|
int32_t code =
|
||||||
|
taosHashPut(pThrd->failFastCache, pConn->dstAddr, strlen(pConn->dstAddr), &item, sizeof(SFailFastItem));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code));
|
tError("failed to put fail-fast item to cache, reason:%s", tstrerror(code));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2980,7 +2981,9 @@ int32_t transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, S
|
||||||
|
|
||||||
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
|
QUEUE_PUSH(&exh->q, &pCliMsg->seqq);
|
||||||
taosWUnLockLatch(&exh->latch);
|
taosWUnLockLatch(&exh->latch);
|
||||||
|
|
||||||
tDebug("msg refId: %" PRId64 "", handle);
|
tDebug("msg refId: %" PRId64 "", handle);
|
||||||
|
(void)transReleaseExHandle(transGetRefMgt(), handle);
|
||||||
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue