Merge pull request #24484 from taosdata/fix/fixInvalidReadOnQuery

fix invalid read write
This commit is contained in:
wade zhang 2024-01-17 10:23:46 +08:00 committed by GitHub
commit e908679a32
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 103 additions and 89 deletions

View File

@ -21,12 +21,12 @@
extern "C" {
#endif
#include "systable.h"
#include "tarray.h"
#include "thash.h"
#include "tlog.h"
#include "tmsg.h"
#include "tmsgcb.h"
#include "systable.h"
typedef enum {
JOB_TASK_STATUS_NULL = 0,
@ -90,8 +90,7 @@ typedef struct SExecResult {
void* res;
} SExecResult;
#pragma pack(push, 1)
#pragma pack(push, 1)
typedef struct SCTableMeta {
uint64_t uid;
uint64_t suid;
@ -100,8 +99,7 @@ typedef struct SCTableMeta {
} SCTableMeta;
#pragma pack(pop)
#pragma pack(push, 1)
#pragma pack(push, 1)
typedef struct STableMeta {
// BEGIN: KEEP THIS PART SAME WITH SCTableMeta
uint64_t uid;
@ -137,8 +135,8 @@ typedef struct SDBVgInfo {
int8_t hashMethod;
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs;
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
SArray* vgArray; // SVgroupInfo
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
SArray* vgArray; // SVgroupInfo
} SDBVgInfo;
typedef struct SUseDbOutput {
@ -173,6 +171,7 @@ typedef struct SDataBuf {
void* pData;
uint32_t len;
void* handle;
int64_t handleRefId;
SEpSet* pEpSet;
} SDataBuf;
@ -283,7 +282,7 @@ void getColumnTypeFromMeta(STableMeta* pMeta, char* pName, ETableColumnType*
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst);
void freeVgInfo(SDBVgInfo* vgInfo);
void freeDbCfgInfo(SDbCfgInfo *pInfo);
void freeDbCfgInfo(SDbCfgInfo* pInfo);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
void* (*mallocFp)(int64_t));
@ -314,7 +313,9 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_RESTORING || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_MNODE_NOT_FOUND)
#define NO_RET_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define NO_RET_REDIRECT_ERROR(_code) \
((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define NEED_REDIRECT_ERROR(_code) \
(NO_RET_REDIRECT_ERROR(_code) || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \

View File

@ -285,8 +285,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL;
int8_t biMode = atomic_load_8(&pRequest->pTscObj->biMode);
int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode);
int8_t biMode = atomic_load_8(&pRequest->pTscObj->biMode);
int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
}
@ -324,7 +324,8 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
return;
}
int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, atomic_load_8(&pRequest->pTscObj->biMode));
int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
atomic_load_8(&pRequest->pTscObj->biMode));
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
}
@ -492,7 +493,8 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
pResInfo->userFields[i].bytes = pSchema[i].bytes;
pResInfo->userFields[i].type = pSchema[i].type;
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY || pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY ||
pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
} else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
@ -1103,9 +1105,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
SSqlCallbackWrapper* pWrapper) {
int32_t code = TSDB_CODE_SUCCESS;
pRequest->type = pQuery->msgType;
SArray* pMnodeList = NULL;
SQueryPlan* pDag = NULL;
int64_t st = taosGetTimestampUs();
SArray* pMnodeList = NULL;
SQueryPlan* pDag = NULL;
int64_t st = taosGetTimestampUs();
if (!pRequest->parseOnly) {
pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
@ -1278,7 +1280,7 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
if (isView) {
for (int32_t i = 0; i < tbNum; ++i) {
SName* pViewName = taosArrayGet(tbList, i);
char dbFName[TSDB_DB_FNAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pViewName, dbFName);
catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0);
}
@ -1510,8 +1512,12 @@ int32_t doProcessMsgFromServer(void* param) {
updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
SDataBuf buf = {
.msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet};
SDataBuf buf = {.msgType = pMsg->msgType,
.len = pMsg->contLen,
.pData = NULL,
.handle = pMsg->info.handle,
.handleRefId = pMsg->info.refId,
.pEpSet = pEpSet};
if (pMsg->contLen > 0) {
buf.pData = taosMemoryCalloc(1, pMsg->contLen);
@ -2538,13 +2544,13 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
taosMemoryFreeClear(pResultInfo->pData);
pRequest->body.fetchFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, 0);
pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, 0);
return;
}
if (pRequest->code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pResultInfo->pData);
pRequest->body.fetchFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, 0);
pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, 0);
return;
}
@ -2565,12 +2571,12 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
}
pRequest->body.fetchFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
}
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
pRequest->body.fetchFp = fp;
((SSyncQueryParam *)pRequest->body.interParam)->userParam = param;
((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
@ -2611,7 +2617,7 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
pRequest->inCallback = true;
int64_t this = pRequest->self;
pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code);
pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
SRequestObj* pReq = acquireRequest(this);
if (pReq != NULL) {
pReq->inCallback = false;
@ -2619,11 +2625,11 @@ void doRequestCallback(SRequestObj* pRequest, int32_t code) {
}
}
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser, SParseSqlRes* pRes) {
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
SParseSqlRes* pRes) {
#ifndef TD_ENTERPRISE
return TSDB_CODE_SUCCESS;
#else
return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
#endif
}

View File

@ -52,10 +52,10 @@ typedef enum {
SCH_ALL,
} SCH_POLICY;
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
#define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ
#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 5000000
#define SCH_MAX_TASK_TIMEOUT_USEC 300000000
@ -68,8 +68,9 @@ typedef struct SSchDebug {
} SSchDebug;
typedef struct SSchTrans {
void *pTrans;
void *pHandle;
void *pTrans;
void *pHandle;
int64_t pHandleId;
} SSchTrans;
typedef struct SSchHbTrans {
@ -299,7 +300,7 @@ typedef struct SSchJob {
void *fetchRes; // TODO free it or not
bool fetched;
bool noMoreRetry;
int64_t resNumOfRows; // from int32_t to int64_t
int64_t resNumOfRows; // from int32_t to int64_t
SSchResInfo userRes;
char *sql;
SQueryProfileSummary summary;
@ -334,14 +335,15 @@ extern SSchedulerMgmt schMgmt;
(!SCH_IS_DATA_BIND_QRY_TASK(_task)))
#define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
#define SCH_GET_REDIRECT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
#define SCH_GET_REDIRECT_CODE(job, _code) \
(((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
#define SCH_TASK_ALREADY_LAUNCHED(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_EXEC)
#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC)
#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC)
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
@ -362,8 +364,8 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) \
(SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_MULTI_LEVEL_LAUNCHED(_job) ((_job)->levelIdx != ((_job)->levelNum - 1))
#define SCH_SET_JOB_TYPE(_job, type) \
@ -380,25 +382,26 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_WAIT(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define SCH_NETWORK_ERR(_code) \
((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
(_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
#define SCH_REDIRECT_MSGTYPE(_msgType) \
((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
(_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
#define SCH_LOW_LEVEL_NETWORK_ERR(_job, _task, _code) \
(SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx))
(SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx))
#define SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code) \
(SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx))
#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) \
(SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect)
(SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx))
#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) (SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect)
#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code))
#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code))))
#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code))
#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || \
SCH_TASK_RETRY_NETWORK_ERR((_task), (_code))))
#define SCH_TASK_NEED_RETRY(_msgType, _code) \
((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
@ -488,50 +491,51 @@ extern SSchedulerMgmt schMgmt;
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define SCH_LOCK(type, _lock) \
do { \
if (SCH_READ == (type)) { \
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before read lock"); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32(_lock) > 0, "invalid lock value after read lock"); \
} else { \
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before write lock"); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
} \
#define SCH_LOCK(type, _lock) \
do { \
if (SCH_READ == (type)) { \
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before read lock"); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32(_lock) > 0, "invalid lock value after read lock"); \
} else { \
ASSERTS(atomic_load_32(_lock) >= 0, "invalid lock value before write lock"); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32(_lock) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \
} \
} while (0)
#define SCH_UNLOCK(type, _lock) \
do { \
if (SCH_READ == (type)) { \
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
} else { \
ASSERTS(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
} \
#define SCH_UNLOCK(type, _lock) \
do { \
if (SCH_READ == (type)) { \
ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \
} else { \
ASSERTS(atomic_load_32((_lock)) & TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
SCH_LOCK_DEBUG("SCH WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \
} \
} while (0)
#define SCH_RESET_JOB_LEVEL_IDX(_job) do { \
(_job)->levelIdx = (_job)->levelNum - 1; \
SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx); \
} while (0)
#define SCH_RESET_JOB_LEVEL_IDX(_job) \
do { \
(_job)->levelIdx = (_job)->levelNum - 1; \
SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx); \
} while (0)
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
void schCleanClusterHb(void *pTrans);
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void* param);
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param);
SSchJob *schAcquireJob(int64_t refId);
int32_t schReleaseJob(int64_t refId);
void schFreeFlowCtrl(SSchJob *pJob);

View File

@ -505,7 +505,7 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
taosMemoryFree(pMsg->pEpSet);
SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL};
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL, .pHandleId = 0};
SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans));
SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId, NULL));
@ -537,6 +537,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTrans trans = {0};
trans.pTrans = pParam->pTrans;
trans.pHandle = pMsg->handle;
trans.pHandleId = pMsg->handleRefId;
SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans));
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));

View File

@ -42,7 +42,7 @@ char *schDumpEpSet(SEpSet *pEpSet) {
}
int32_t maxSize = 1024;
char *str = taosMemoryMalloc(maxSize);
char *str = taosMemoryMalloc(maxSize);
if (NULL == str) {
return NULL;
}
@ -73,7 +73,7 @@ char *schGetOpStr(SCH_OP_TYPE type) {
}
void schFreeHbTrans(SSchHbTrans *pTrans) {
rpcReleaseHandle(pTrans->trans.pHandle, TAOS_CONN_CLIENT);
rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT);
schFreeRpcCtx(&pTrans->rpcCtx);
}
@ -209,7 +209,7 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
if (!tsEnableQueryHb) {
return TSDB_CODE_SUCCESS;
}
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
SQueryNodeEpId epId = {0};
@ -234,7 +234,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
if (NULL == hb) {
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qInfo("taosHashGet hb connection not exists, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
qInfo("taosHashGet hb connection not exists, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn,
epId->ep.port);
SCH_ERR_RET(TSDB_CODE_APP_ERROR);
}

View File

@ -414,6 +414,7 @@ void cliHandleResp(SCliConn* conn) {
// buf's mem alread translated to transMsg.pCont
if (!CONN_NO_PERSIST_BY_APP(conn)) {
transMsg.info.handle = (void*)conn->refId;
transMsg.info.refId = (int64_t)(void*)conn->refId;
tDebug("%s conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
}