reschedule timeout task
This commit is contained in:
parent
c2c2defc7c
commit
9bd195d94d
|
@ -1982,6 +1982,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
SClientHbKey connKey;
|
||||
int64_t clusterId;
|
||||
SQueryHbReqBasic* query;
|
||||
SHashObj* info; // hash<Skv.key, Skv>
|
||||
} SClientHbReq;
|
||||
|
|
|
@ -104,6 +104,8 @@ void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param
|
|||
|
||||
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
|
||||
|
||||
void schedulerStopQueryHb(void *pTrans);
|
||||
|
||||
|
||||
/**
|
||||
* Cancel query job
|
||||
|
|
|
@ -58,11 +58,6 @@ enum {
|
|||
|
||||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
typedef struct {
|
||||
void* param;
|
||||
SClientHbReq* req;
|
||||
} SHbConnInfo;
|
||||
|
||||
typedef struct {
|
||||
char* key;
|
||||
// statistics
|
||||
|
@ -72,11 +67,8 @@ typedef struct {
|
|||
int64_t startTime;
|
||||
// ctl
|
||||
SRWLatch lock; // lock is used in serialization
|
||||
// connection
|
||||
SAppInstInfo* pAppInstInfo;
|
||||
// info
|
||||
SHashObj* activeInfo; // hash<SClientHbKey, SClientHbReq>
|
||||
SHashObj* connInfo; // hash<SClientHbKey, SHbConnInfo>
|
||||
} SAppHbMgr;
|
||||
|
||||
typedef int32_t (*FHbRspHandle)(SAppHbMgr* pAppHbMgr, SClientHbRsp* pRsp);
|
||||
|
@ -326,8 +318,6 @@ void appHbMgrCleanup(void);
|
|||
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
|
||||
void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey);
|
||||
|
||||
int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* value, int32_t keyLen, int32_t valueLen);
|
||||
|
||||
// --- mq
|
||||
void hbMgrInitMqHbRspHandle();
|
||||
|
||||
|
|
|
@ -130,8 +130,12 @@ void destroyTscObj(void *pObj) {
|
|||
|
||||
SClientHbKey connKey = {.tscRid = pTscObj->id, .connType = pTscObj->connType};
|
||||
hbDeregisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey);
|
||||
atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
|
||||
closeAllRequests(pTscObj->pRequests);
|
||||
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
|
||||
if (0 == connNum) {
|
||||
closeTransporter(pTscObj);
|
||||
}
|
||||
tscDebug("connObj 0x%" PRIx64 " destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj->pAppInfo->numOfConns);
|
||||
taosThreadMutexDestroy(&pTscObj->mutex);
|
||||
taosMemoryFreeClear(pTscObj);
|
||||
|
|
|
@ -129,9 +129,9 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
|||
}
|
||||
|
||||
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
||||
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
||||
if (NULL == info) {
|
||||
tscWarn("fail to get connInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
|
||||
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
||||
if (NULL == pReq) {
|
||||
tscWarn("pReq to get activeInfo, may be dropped, refId:%" PRIx64 ", type:%d", pRsp->connKey.tscRid,
|
||||
pRsp->connKey.connType);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -181,12 +181,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
break;
|
||||
}
|
||||
|
||||
int64_t *clusterId = (int64_t *)info->param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
|
||||
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -199,12 +198,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
break;
|
||||
}
|
||||
|
||||
int64_t *clusterId = (int64_t *)info->param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
|
||||
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -217,12 +215,11 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
|
|||
break;
|
||||
}
|
||||
|
||||
int64_t *clusterId = (int64_t *)info->param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
|
||||
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", *clusterId, tstrerror(code));
|
||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -547,13 +544,10 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
|
||||
pOneReq = taosArrayPush(pBatchReq->reqs, pOneReq);
|
||||
|
||||
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &pOneReq->connKey, sizeof(SClientHbKey));
|
||||
if (info) {
|
||||
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, info->param, pOneReq);
|
||||
if (code) {
|
||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
continue;
|
||||
}
|
||||
code = (*clientHbMgr.reqHandle[pOneReq->connKey.connType])(&pOneReq->connKey, &pOneReq->clusterId, pOneReq);
|
||||
if (code) {
|
||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
continue;
|
||||
}
|
||||
|
||||
//hbClearClientHbReq(pOneReq);
|
||||
|
@ -569,23 +563,6 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
|||
return pBatchReq;
|
||||
}
|
||||
|
||||
void hbClearReqInfo(SAppHbMgr *pAppHbMgr) {
|
||||
void *pIter = taosHashIterate(pAppHbMgr->activeInfo, NULL);
|
||||
while (pIter != NULL) {
|
||||
SClientHbReq *pOneReq = pIter;
|
||||
|
||||
tFreeReqKvHash(pOneReq->info);
|
||||
taosHashClear(pOneReq->info);
|
||||
|
||||
if (pOneReq->query) {
|
||||
taosArrayDestroy(pOneReq->query->queryDesc);
|
||||
taosMemoryFreeClear(pOneReq->query);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
void hbThreadFuncUnexpectedStopped(void) {
|
||||
atomic_store_8(&clientHbMgr.threadStop, 2);
|
||||
}
|
||||
|
@ -715,14 +692,6 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
|
|||
}
|
||||
|
||||
taosHashSetFreeFp(pAppHbMgr->activeInfo, tFreeClientHbReq);
|
||||
// init getInfoFunc
|
||||
pAppHbMgr->connInfo = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
|
||||
if (pAppHbMgr->connInfo == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosMemoryFree(pAppHbMgr);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&clientHbMgr.lock);
|
||||
taosArrayPush(clientHbMgr.appHbMgrs, &pAppHbMgr);
|
||||
|
@ -745,15 +714,6 @@ void appHbMgrCleanup(void) {
|
|||
taosHashCleanup(pTarget->activeInfo);
|
||||
pTarget->activeInfo = NULL;
|
||||
|
||||
pIter = taosHashIterate(pTarget->connInfo, NULL);
|
||||
while (pIter != NULL) {
|
||||
SHbConnInfo *info = pIter;
|
||||
taosMemoryFree(info->param);
|
||||
pIter = taosHashIterate(pTarget->connInfo, pIter);
|
||||
}
|
||||
taosHashCleanup(pTarget->connInfo);
|
||||
pTarget->connInfo = NULL;
|
||||
|
||||
taosMemoryFree(pTarget->key);
|
||||
taosMemoryFree(pTarget);
|
||||
}
|
||||
|
@ -791,7 +751,7 @@ void hbMgrCleanUp() {
|
|||
clientHbMgr.appHbMgrs = NULL;
|
||||
}
|
||||
|
||||
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
|
||||
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, int64_t clusterId) {
|
||||
// init hash in activeinfo
|
||||
void *data = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
if (data != NULL) {
|
||||
|
@ -799,17 +759,11 @@ int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
|
|||
}
|
||||
SClientHbReq hbReq = {0};
|
||||
hbReq.connKey = connKey;
|
||||
hbReq.clusterId = clusterId;
|
||||
//hbReq.info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
|
||||
|
||||
taosHashPut(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey), &hbReq, sizeof(SClientHbReq));
|
||||
|
||||
// init hash
|
||||
if (info != NULL) {
|
||||
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
info->req = pReq;
|
||||
taosHashPut(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey), info, sizeof(SHbConnInfo));
|
||||
}
|
||||
|
||||
atomic_add_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||
return 0;
|
||||
}
|
||||
|
@ -819,15 +773,10 @@ int hbRegisterConn(SAppHbMgr *pAppHbMgr, int64_t tscRefId, int64_t clusterId, in
|
|||
.tscRid = tscRefId,
|
||||
.connType = connType,
|
||||
};
|
||||
SHbConnInfo info = {0};
|
||||
|
||||
switch (connType) {
|
||||
case CONN_TYPE__QUERY: {
|
||||
int64_t *pClusterId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pClusterId = clusterId;
|
||||
|
||||
info.param = pClusterId;
|
||||
return hbRegisterConnImpl(pAppHbMgr, connKey, &info);
|
||||
return hbRegisterConnImpl(pAppHbMgr, connKey, clusterId);
|
||||
}
|
||||
case CONN_TYPE__TMQ: {
|
||||
return 0;
|
||||
|
@ -844,26 +793,10 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
|
|||
taosHashRemove(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
}
|
||||
|
||||
SHbConnInfo *info = taosHashGet(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
|
||||
if (info) {
|
||||
taosMemoryFree(info->param);
|
||||
taosHashRemove(pAppHbMgr->connInfo, &connKey, sizeof(SClientHbKey));
|
||||
}
|
||||
|
||||
if (NULL == pReq || NULL == info) {
|
||||
if (NULL == pReq) {
|
||||
return;
|
||||
}
|
||||
|
||||
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||
}
|
||||
|
||||
int hbAddConnInfo(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, void *key, void *value, int32_t keyLen,
|
||||
int32_t valueLen) {
|
||||
// find req by connection id
|
||||
SClientHbReq *pReq = taosHashGet(pAppHbMgr->activeInfo, &connKey, sizeof(SClientHbKey));
|
||||
ASSERT(pReq != NULL);
|
||||
|
||||
taosHashPut(pReq->info, key, keyLen, value, valueLen);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -741,20 +741,6 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
|
|||
do {
|
||||
destroyRequest(pRequest);
|
||||
pRequest = launchQuery(pTscObj, sql, sqlLen);
|
||||
if (*sql == 'y') {
|
||||
SCatalog *pCatalog = NULL;
|
||||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||
ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false);
|
||||
break;
|
||||
} else if (*sql == 'z') {
|
||||
SCatalog *pCatalog = NULL;
|
||||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||
ctgdLaunchAsyncCall(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, pRequest->requestId, false);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -2332,6 +2332,8 @@ static int32_t isSchemalessDb(SSmlHandle* info){
|
|||
smlBuildInvalidDataMsg(&info->msgBuf, "catalogGetDBCfg error, code:", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
taosArrayDestroy(pInfo.pRetensions);
|
||||
|
||||
if (!pInfo.schemaless){
|
||||
info->pRequest->code = TSDB_CODE_SML_INVALID_DB_CONF;
|
||||
smlBuildInvalidDataMsg(&info->msgBuf, "can not insert into schemaless db:", dbFname);
|
||||
|
|
|
@ -132,6 +132,21 @@ void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
prepare SQL:
|
||||
create database db1;
|
||||
use db1;
|
||||
create stable st1 (ts timestamp, f1 int) tags(t1 int);
|
||||
create table tb1 using st1 tags(1);
|
||||
insert into tb1 values (now, 1);
|
||||
create qnode on dnode 1;
|
||||
create user user1 pass "abc";
|
||||
create database db2;
|
||||
grant write on db2.* to user1;
|
||||
create function udf1 as '/tmp/libudf1.so' outputtype int;
|
||||
create aggregate function udf2 as '/tmp/libudf2.so' outputtype int;
|
||||
*/
|
||||
int32_t ctgdLaunchAsyncCall(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, bool forceUpdate) {
|
||||
int32_t code = 0;
|
||||
SCatalogReq req = {0};
|
||||
|
|
|
@ -1424,6 +1424,8 @@ int32_t isNotSchemalessDb(SParseContext* pContext, char *dbName){
|
|||
parserError("catalogGetDBCfg error, code:%s, dbFName:%s", tstrerror(code), dbFname);
|
||||
return code;
|
||||
}
|
||||
taosArrayDestroy(pInfo.pRetensions);
|
||||
|
||||
if (pInfo.schemaless){
|
||||
parserError("can not insert into schemaless db:%s", dbFname);
|
||||
return TSDB_CODE_SML_INVALID_DB_CONF;
|
||||
|
|
|
@ -34,6 +34,7 @@ extern "C" {
|
|||
#define QW_DEFAULT_SCH_TASK_NUMBER 10000
|
||||
#define QW_DEFAULT_SHORT_RUN_TIMES 2
|
||||
#define QW_DEFAULT_HEARTBEAT_MSEC 5000
|
||||
#define QW_SCH_TIMEOUT_MSEC 180000
|
||||
|
||||
enum {
|
||||
QW_PHASE_PRE_QUERY = 1,
|
||||
|
@ -137,7 +138,7 @@ typedef struct SQWTaskCtx {
|
|||
} SQWTaskCtx;
|
||||
|
||||
typedef struct SQWSchStatus {
|
||||
int32_t lastAccessTs; // timestamp in second
|
||||
int64_t hbBrokenTs; // timestamp in msecond
|
||||
SRWLatch hbConnLock;
|
||||
SRpcHandleInfo hbConnInfo;
|
||||
SQueryNodeEpId hbEpId;
|
||||
|
@ -354,6 +355,8 @@ int32_t qwOpenRef(void);
|
|||
void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
|
||||
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
|
||||
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
|
||||
void qwClearExpiredSch(SArray* pExpiredSch);
|
||||
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
|
||||
|
||||
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
|
||||
int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore);
|
||||
|
|
|
@ -535,3 +535,9 @@ int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
|
||||
void qwClearExpiredSch(SArray* pExpiredSch) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -21,10 +21,12 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
|
|||
SSchedulerHbRsp rsp = {0};
|
||||
SQWSchStatus *sch = NULL;
|
||||
|
||||
QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
|
||||
QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
|
||||
|
||||
QW_LOCK(QW_WRITE, &sch->hbConnLock);
|
||||
|
||||
sch->hbBrokenTs = taosGetTimestampMs();
|
||||
|
||||
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
|
||||
tmsgReleaseHandle(&sch->hbConnInfo, TAOS_CONN_SERVER);
|
||||
sch->hbConnInfo.handle = NULL;
|
||||
|
@ -794,6 +796,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
SQWSchStatus *sch = NULL;
|
||||
int32_t taskNum = 0;
|
||||
SQWHbInfo *rspList = NULL;
|
||||
SArray *pExpiredSch = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
qwDbgDumpMgmtInfo(mgmt);
|
||||
|
@ -809,8 +812,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
}
|
||||
|
||||
rspList = taosMemoryCalloc(schNum, sizeof(SQWHbInfo));
|
||||
if (NULL == rspList) {
|
||||
pExpiredSch = taosArrayInit(schNum, sizeof(uint64_t));
|
||||
if (NULL == rspList || NULL == pExpiredSch) {
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
taosMemoryFree(rspList);
|
||||
taosArrayDestroy(pExpiredSch);
|
||||
QW_ELOG("calloc %d SQWHbInfo failed", schNum);
|
||||
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
|
||||
qwRelease(refId);
|
||||
|
@ -820,6 +826,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
void *key = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t i = 0;
|
||||
int64_t currentMs = taosGetTimestampMs();
|
||||
|
||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||
while (pIter) {
|
||||
|
@ -827,6 +834,11 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
if (NULL == sch->hbConnInfo.handle) {
|
||||
uint64_t *sId = taosHashGetKey(pIter, NULL);
|
||||
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
|
||||
|
||||
if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch->tasksHash) <= 0) {
|
||||
taosArrayPush(pExpiredSch, sId);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(mgmt->schHash, pIter);
|
||||
continue;
|
||||
}
|
||||
|
@ -852,7 +864,12 @@ _return:
|
|||
tFreeSSchedulerHbRsp(&rspList[j].rsp);
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pExpiredSch) > 0) {
|
||||
qwClearExpiredSch(pExpiredSch);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(rspList);
|
||||
taosArrayDestroy(pExpiredSch);
|
||||
|
||||
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
|
||||
qwRelease(refId);
|
||||
|
|
|
@ -102,11 +102,11 @@ typedef struct SSchedulerMgmt {
|
|||
uint64_t taskId; // sequential taksId
|
||||
uint64_t sId; // schedulerId
|
||||
SSchedulerCfg cfg;
|
||||
SRWLatch lock;
|
||||
bool exit;
|
||||
int32_t jobRef;
|
||||
int32_t jobNum;
|
||||
SSchStat stat;
|
||||
SRWLatch hbLock;
|
||||
SHashObj *hbConnections;
|
||||
} SSchedulerMgmt;
|
||||
|
||||
|
@ -320,6 +320,8 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
|
||||
|
||||
|
||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
|
||||
void schCleanClusterHb(void* pTrans);
|
||||
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
||||
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
|
||||
SSchJob *schAcquireJob(int64_t refId);
|
||||
|
|
|
@ -126,30 +126,6 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (!pTask->registerdHb) {
|
||||
return;
|
||||
}
|
||||
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
||||
SEp* pEp = SCH_GET_CUR_EP(addr);
|
||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
|
||||
return;
|
||||
}
|
||||
|
||||
atomic_sub_fetch_64(&hb->taskNum, 1);
|
||||
|
||||
pTask->registerdHb = false;
|
||||
}
|
||||
|
||||
void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
|
||||
schDeregisterTaskHb(pJob, pTask);
|
||||
|
@ -1484,9 +1460,10 @@ void schFreeJobImpl(void *job) {
|
|||
|
||||
qDebug("QID:0x%" PRIx64 " job freed, refId:%" PRIx64 ", pointer:%p", queryId, refId, pJob);
|
||||
|
||||
atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||
|
||||
schCloseJobRef();
|
||||
int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1);
|
||||
if (jobNum == 0) {
|
||||
schCloseJobRef();
|
||||
}
|
||||
}
|
||||
|
||||
int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
|
|
|
@ -648,31 +648,6 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
|
||||
int32_t code = 0;
|
||||
SSchHbTrans hb = {0};
|
||||
|
||||
hb.trans.pTrans = pJob->pTrans;
|
||||
|
||||
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
|
||||
|
||||
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
|
||||
if (code) {
|
||||
schFreeRpcCtx(&hb.rpcCtx);
|
||||
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
*exist = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
SCH_ERR_RET(code);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
|
||||
SSchedulerHbReq req = {0};
|
||||
int32_t code = 0;
|
||||
|
@ -684,17 +659,20 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray* taskAction) {
|
|||
req.sId = schMgmt.sId;
|
||||
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
|
||||
|
||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
qError("hb connection no longer exist, nodeId:%d, fqdn:%s, port:%d", nodeEpId->nodeId, nodeEpId->ep.fqdn,
|
||||
nodeEpId->ep.port);
|
||||
SCH_ERR_RET(code);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &hb->lock);
|
||||
code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
|
||||
memcpy(&trans, &hb->trans, sizeof(trans));
|
||||
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
|
||||
SCH_ERR_RET(code);
|
||||
|
||||
|
@ -764,60 +742,6 @@ _return:
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
||||
SEp* pEp = SCH_GET_CUR_EP(addr);
|
||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SSchHbTrans *hb = NULL;
|
||||
while (true) {
|
||||
hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
bool exist = false;
|
||||
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId, &exist));
|
||||
if (!exist) {
|
||||
SCH_ERR_RET(schBuildAndSendHbMsg(&epId, NULL));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&hb->taskNum, 1);
|
||||
|
||||
pTask->registerdHb = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
|
||||
int32_t code = 0;
|
||||
SSchHbTrans *hb = NULL;
|
||||
|
||||
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &hb->lock);
|
||||
memcpy(&hb->trans, trans, sizeof(*trans));
|
||||
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
||||
|
||||
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
|
||||
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
SSchedulerHbRsp rsp = {0};
|
||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
|
|
|
@ -21,17 +21,187 @@
|
|||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
|
||||
|
||||
void schCleanClusterHb(void* pTrans) {
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
SSchHbTrans *hb = taosHashIterate(schMgmt.hbConnections, NULL);
|
||||
while (hb) {
|
||||
if (hb->trans.pTrans == pTrans) {
|
||||
SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL);
|
||||
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
|
||||
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
||||
}
|
||||
|
||||
hb = taosHashIterate(schMgmt.hbConnections, hb);
|
||||
}
|
||||
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
}
|
||||
|
||||
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
|
||||
int32_t code = 0;
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
SCH_TASK_ELOG("nodeId %d fqdn %s port %d not in hb connections", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int64_t taskNum = atomic_load_64(&hb->taskNum);
|
||||
if (taskNum <= 0) {
|
||||
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
|
||||
taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schAddHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId, bool *exist) {
|
||||
int32_t code = 0;
|
||||
SSchHbTrans hb = {0};
|
||||
|
||||
hb.trans.pTrans = pJob->pTrans;
|
||||
hb.taskNum = 1;
|
||||
|
||||
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx));
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
code = taosHashPut(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId), &hb, sizeof(SSchHbTrans));
|
||||
if (code) {
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
schFreeRpcCtx(&hb.rpcCtx);
|
||||
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
*exist = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qError("taosHashPut hb trans failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
SCH_ERR_RET(code);
|
||||
}
|
||||
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *pEpId) {
|
||||
SSchHbTrans *hb = NULL;
|
||||
|
||||
while (true) {
|
||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||
hb = taosHashGet(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
bool exist = false;
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
SCH_ERR_RET(schAddHbConnection(pJob, pTask, pEpId, &exist));
|
||||
if (!exist) {
|
||||
SCH_RET(schBuildAndSendHbMsg(pEpId, NULL));
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
atomic_add_fetch_64(&hb->taskNum, 1);
|
||||
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (!pTask->registerdHb) {
|
||||
return;
|
||||
}
|
||||
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
||||
SEp* pEp = SCH_GET_CUR_EP(addr);
|
||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
SCH_TASK_WLOG("nodeId %d fqdn %s port %d not in hb connections", epId.nodeId, epId.ep.fqdn, epId.ep.port);
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1);
|
||||
if (0 == taskNum) {
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
schRemoveHbConnection(pJob, pTask, &epId);
|
||||
} else {
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
}
|
||||
|
||||
pTask->registerdHb = false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
|
||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||
SQueryNodeEpId epId = {0};
|
||||
|
||||
epId.nodeId = addr->nodeId;
|
||||
|
||||
SEp* pEp = SCH_GET_CUR_EP(addr);
|
||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
||||
epId.ep.port = pEp->port;
|
||||
|
||||
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
|
||||
|
||||
pTask->registerdHb = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
|
||||
int32_t code = 0;
|
||||
SSchHbTrans *hb = NULL;
|
||||
|
||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||
hb = taosHashGet(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
if (NULL == hb) {
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
qError("taosHashGet hb connection failed, nodeId:%d, fqdn:%s, port:%d", epId->nodeId, epId->ep.fqdn, epId->ep.port);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &hb->lock);
|
||||
memcpy(&hb->trans, trans, sizeof(*trans));
|
||||
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||
|
||||
qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
|
||||
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void schCloseJobRef(void) {
|
||||
if (!atomic_load_8((int8_t *)&schMgmt.exit)) {
|
||||
return;
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.lock);
|
||||
if (atomic_load_32(&schMgmt.jobNum) <= 0 && schMgmt.jobRef >= 0) {
|
||||
if (schMgmt.jobRef >= 0) {
|
||||
taosCloseRef(schMgmt.jobRef);
|
||||
schMgmt.jobRef = -1;
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.lock);
|
||||
}
|
||||
|
||||
uint64_t schGenTaskId(void) { return atomic_add_fetch_64(&schMgmt.taskId, 1); }
|
||||
|
@ -88,4 +258,3 @@ void schFreeRpcCtx(SRpcCtx *pCtx) {
|
|||
(*pCtx->freeFunc)(pCtx->brokenVal.val);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -182,6 +182,14 @@ int32_t scheduleCancelJob(int64_t job) {
|
|||
SCH_RET(code);
|
||||
}
|
||||
|
||||
void schedulerStopQueryHb(void *pTrans) {
|
||||
if (NULL == pTrans) {
|
||||
return;
|
||||
}
|
||||
|
||||
schCleanClusterHb(pTrans);
|
||||
}
|
||||
|
||||
void schedulerFreeJob(int64_t job) {
|
||||
SSchJob *pJob = schAcquireJob(job);
|
||||
if (NULL == pJob) {
|
||||
|
@ -220,6 +228,7 @@ void schedulerDestroy(void) {
|
|||
}
|
||||
}
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
if (schMgmt.hbConnections) {
|
||||
void *pIter = taosHashIterate(schMgmt.hbConnections, NULL);
|
||||
while (pIter != NULL) {
|
||||
|
@ -230,4 +239,5 @@ void schedulerDestroy(void) {
|
|||
taosHashCleanup(schMgmt.hbConnections);
|
||||
schMgmt.hbConnections = NULL;
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue