diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 8b5bf1abb1..f79a0a0718 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -21,12 +21,12 @@ extern "C" { #endif +#include "systable.h" #include "tarray.h" #include "thash.h" #include "tlog.h" #include "tmsg.h" #include "tmsgcb.h" -#include "systable.h" typedef enum { JOB_TASK_STATUS_NULL = 0, @@ -90,8 +90,7 @@ typedef struct SExecResult { void* res; } SExecResult; - -#pragma pack(push, 1) +#pragma pack(push, 1) typedef struct SCTableMeta { uint64_t uid; uint64_t suid; @@ -100,8 +99,7 @@ typedef struct SCTableMeta { } SCTableMeta; #pragma pack(pop) - -#pragma pack(push, 1) +#pragma pack(push, 1) typedef struct STableMeta { // BEGIN: KEEP THIS PART SAME WITH SCTableMeta uint64_t uid; @@ -137,8 +135,8 @@ typedef struct SDBVgInfo { int8_t hashMethod; int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT int64_t stateTs; - SHashObj* vgHash; // key:vgId, value:SVgroupInfo - SArray* vgArray; // SVgroupInfo + SHashObj* vgHash; // key:vgId, value:SVgroupInfo + SArray* vgArray; // SVgroupInfo } SDBVgInfo; typedef struct SUseDbOutput { @@ -173,6 +171,7 @@ typedef struct SDataBuf { void* pData; uint32_t len; void* handle; + int64_t handleRefId; SEpSet* pEpSet; } SDataBuf; @@ -283,7 +282,7 @@ void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType* int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst); int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst); void freeVgInfo(SDBVgInfo* vgInfo); -void freeDbCfgInfo(SDbCfgInfo *pInfo); +void freeDbCfgInfo(SDbCfgInfo* pInfo); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen, void* (*mallocFp)(int64_t)); @@ -314,7 +313,9 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) #define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND) -#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) +#define NO_RET_REDIRECT_ERROR(_code) \ + ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ + (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) #define NEED_REDIRECT_ERROR(_code) \ (NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 28a29c2138..d4f89d6b54 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -285,8 +285,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { SRetrieveTableRsp* pRsp = NULL; - int8_t biMode = atomic_load_8(&pRequest->pTscObj->biMode); - int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode); + int8_t biMode = atomic_load_8(&pRequest->pTscObj->biMode); + int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode); if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true); } @@ -324,7 +324,8 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { return; } - int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, atomic_load_8(&pRequest->pTscObj->biMode)); + int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, + atomic_load_8(&pRequest->pTscObj->biMode)); if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true); } @@ -492,7 +493,8 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t pResInfo->userFields[i].bytes = pSchema[i].bytes; pResInfo->userFields[i].type = pSchema[i].type; - if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY || pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) { + if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY || + pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) { pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE; } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) { pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; @@ -1103,9 +1105,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat SSqlCallbackWrapper* pWrapper) { int32_t code = TSDB_CODE_SUCCESS; pRequest->type = pQuery->msgType; - SArray* pMnodeList = NULL; - SQueryPlan* pDag = NULL; - int64_t st = taosGetTimestampUs(); + SArray* pMnodeList = NULL; + SQueryPlan* pDag = NULL; + int64_t st = taosGetTimestampUs(); if (!pRequest->parseOnly) { pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); @@ -1278,7 +1280,7 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) { if (isView) { for (int32_t i = 0; i < tbNum; ++i) { SName* pViewName = taosArrayGet(tbList, i); - char dbFName[TSDB_DB_FNAME_LEN]; + char dbFName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pViewName, dbFName); catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0); } @@ -1510,8 +1512,12 @@ int32_t doProcessMsgFromServer(void* param) { updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet); - SDataBuf buf = { - .msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet}; + SDataBuf buf = {.msgType = pMsg->msgType, + .len = pMsg->contLen, + .pData = NULL, + .handle = pMsg->info.handle, + .handleRefId = pMsg->info.refId, + .pEpSet = pEpSet}; if (pMsg->contLen > 0) { buf.pData = taosMemoryCalloc(1, pMsg->contLen); @@ -2538,13 +2544,13 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { if (code != TSDB_CODE_SUCCESS) { pRequest->code = code; taosMemoryFreeClear(pResultInfo->pData); - pRequest->body.fetchFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, 0); + pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, 0); return; } if (pRequest->code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pResultInfo->pData); - pRequest->body.fetchFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, 0); + pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, 0); return; } @@ -2565,12 +2571,12 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); } - pRequest->body.fetchFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows); + pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows); } void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) { pRequest->body.fetchFp = fp; - ((SSyncQueryParam *)pRequest->body.interParam)->userParam = param; + ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param; SReqResultInfo* pResultInfo = &pRequest->body.resInfo; @@ -2611,7 +2617,7 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param void doRequestCallback(SRequestObj* pRequest, int32_t code) { pRequest->inCallback = true; int64_t this = pRequest->self; - pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code); + pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code); SRequestObj* pReq = acquireRequest(this); if (pReq != NULL) { pReq->inCallback = false; @@ -2619,11 +2625,11 @@ void doRequestCallback(SRequestObj* pRequest, int32_t code) { } } -int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser, SParseSqlRes* pRes) { +int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser, + SParseSqlRes* pRes) { #ifndef TD_ENTERPRISE return TSDB_CODE_SUCCESS; #else return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes); #endif } - diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 3b7a76bfc7..1c90e61ea3 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -52,10 +52,10 @@ typedef enum { SCH_ALL, } SCH_POLICY; -#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 -#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 -#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ -#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20 +#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 +#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 +#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ +#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20 #define SCH_DEFAULT_TASK_TIMEOUT_USEC 5000000 #define SCH_MAX_TASK_TIMEOUT_USEC 300000000 @@ -68,8 +68,9 @@ typedef struct SSchDebug { } SSchDebug; typedef struct SSchTrans { - void *pTrans; - void *pHandle; + void *pTrans; + void *pHandle; + int64_t pHandleId; } SSchTrans; typedef struct SSchHbTrans { @@ -299,7 +300,7 @@ typedef struct SSchJob { void *fetchRes; // TODO free it or not bool fetched; bool noMoreRetry; - int64_t resNumOfRows; // from int32_t to int64_t + int64_t resNumOfRows; // from int32_t to int64_t SSchResInfo userRes; char *sql; SQueryProfileSummary summary; @@ -334,14 +335,15 @@ extern SSchedulerMgmt schMgmt; (!SCH_IS_DATA_BIND_QRY_TASK(_task))) #define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code) -#define SCH_GET_REDIRECT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode) +#define SCH_GET_REDIRECT_CODE(job, _code) \ + (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) #define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task)) #define SCH_TASK_ALREADY_LAUNCHED(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_EXEC) -#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC) +#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC) #define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL) #define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle)) @@ -362,8 +364,8 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) \ (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) -#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) -#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) +#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) +#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) #define SCH_MULTI_LEVEL_LAUNCHED(_job) ((_job)->levelIdx != ((_job)->levelNum - 1)) #define SCH_SET_JOB_TYPE(_job, type) \ @@ -380,25 +382,26 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job)) #define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job)) #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) -#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) +#define SCH_NETWORK_ERR(_code) \ + ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ + (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) #define SCH_REDIRECT_MSGTYPE(_msgType) \ ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \ (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) #define SCH_LOW_LEVEL_NETWORK_ERR(_job, _task, _code) \ - (SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx)) + (SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx)) #define SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code) \ - (SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx)) -#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) \ - (SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect) + (SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx)) +#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) (SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect) -#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \ - (SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code)) -#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \ - (SCH_REDIRECT_MSGTYPE(_msgType) && \ - (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code)))) +#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \ + (SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code)) +#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \ + (SCH_REDIRECT_MSGTYPE(_msgType) && \ + (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || \ + SCH_TASK_RETRY_NETWORK_ERR((_task), (_code)))) #define SCH_TASK_NEED_RETRY(_msgType, _code) \ - ((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) - + ((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) @@ -488,50 +491,51 @@ extern SSchedulerMgmt schMgmt; #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 -#define SCH_LOCK(type, _lock) \ - do { \ - if (SCH_READ == (type)) { \ - ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before read lock"); \ - SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosRLockLatch(_lock); \ - SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32(_lock) > 0, "invalid lock value after read lock"); \ - } else { \ - ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before write lock"); \ - SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosWLockLatch(_lock); \ - SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \ - } \ +#define SCH_LOCK(type, _lock) \ + do { \ + if (SCH_READ == (type)) { \ + ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before read lock"); \ + SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosRLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + ASSERTS(atomic_load_32(_lock) > 0, "invalid lock value after read lock"); \ + } else { \ + ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before write lock"); \ + SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosWLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + ASSERTS(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \ + } \ } while (0) -#define SCH_UNLOCK(type, _lock) \ - do { \ - if (SCH_READ == (type)) { \ - ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \ - SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosRUnLockLatch(_lock); \ - SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \ - } else { \ - ASSERTS(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \ - SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosWUnLockLatch(_lock); \ - SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \ - } \ +#define SCH_UNLOCK(type, _lock) \ + do { \ + if (SCH_READ == (type)) { \ + ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \ + SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosRUnLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \ + } else { \ + ASSERTS(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \ + SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + taosWUnLockLatch(_lock); \ + SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \ + } \ } while (0) -#define SCH_RESET_JOB_LEVEL_IDX(_job) do { \ - (_job)->levelIdx = (_job)->levelNum - 1; \ - SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx); \ -} while (0) +#define SCH_RESET_JOB_LEVEL_IDX(_job) \ + do { \ + (_job)->levelIdx = (_job)->levelNum - 1; \ + SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx); \ + } while (0) void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); void schCleanClusterHb(void *pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task); int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask); -int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void* param); +int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param); SSchJob *schAcquireJob(int64_t refId); int32_t schReleaseJob(int64_t refId); void schFreeFlowCtrl(SSchJob *pJob); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index f987420ece..5c67c7974f 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -505,7 +505,7 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) { taosMemoryFree(pMsg->pEpSet); SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; - SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL}; + SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL, .pHandleId = 0}; SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans)); SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL)); @@ -537,6 +537,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) { SSchTrans trans = {0}; trans.pTrans = pParam->pTrans; trans.pHandle = pMsg->handle; + trans.pHandleId = pMsg->handleRefId; SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans)); SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus)); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index df9cde6b7a..39c54ea731 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -42,7 +42,7 @@ char *schDumpEpSet(SEpSet *pEpSet) { } int32_t maxSize = 1024; - char *str = taosMemoryMalloc(maxSize); + char *str = taosMemoryMalloc(maxSize); if (NULL == str) { return NULL; } @@ -73,7 +73,7 @@ char *schGetOpStr(SCH_OP_TYPE type) { } void schFreeHbTrans(SSchHbTrans *pTrans) { - rpcReleaseHandle(pTrans->trans.pHandle, TAOS_CONN_CLIENT); + rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT); schFreeRpcCtx(&pTrans->rpcCtx); } @@ -209,7 +209,7 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) { if (!tsEnableQueryHb) { return TSDB_CODE_SUCCESS; } - + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeEpId epId = {0}; @@ -234,7 +234,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId)); if (NULL == hb) { SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); - qInfo("taosHashGet hb connection not exists, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port); + qInfo("taosHashGet hb connection not exists, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, + epId->ep.port); SCH_ERR_RET(TSDB_CODE_APP_ERROR); } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 246605694b..15b35030d3 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -414,6 +414,7 @@ void cliHandleResp(SCliConn* conn) { // buf's mem alread translated to transMsg.pCont if (!CONN_NO_PERSIST_BY_APP(conn)) { transMsg.info.handle = (void*)conn->refId; + transMsg.info.refId = (int64_t)(void*)conn->refId; tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); }