commit
21a0c74106
|
@ -333,7 +333,7 @@ typedef struct STscObj {
|
|||
char superAuth : 1;
|
||||
uint32_t connId;
|
||||
uint64_t rid; // ref ID returned by taosAddRef
|
||||
struct SSqlObj * pHb;
|
||||
int64_t hbrid;
|
||||
struct SSqlObj * sqlList;
|
||||
struct SSqlStream *streamList;
|
||||
void* pDnodeConn;
|
||||
|
@ -373,7 +373,7 @@ typedef struct SSqlObj {
|
|||
struct SSqlObj **pSubs;
|
||||
|
||||
struct SSqlObj *prev, *next;
|
||||
struct SSqlObj **self;
|
||||
int64_t self;
|
||||
} SSqlObj;
|
||||
|
||||
typedef struct SSqlStream {
|
||||
|
@ -507,7 +507,7 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
|
|||
}
|
||||
|
||||
extern SCacheObj* tscMetaCache;
|
||||
extern SCacheObj* tscObjCache;
|
||||
extern int tscObjRef;
|
||||
extern void * tscTmr;
|
||||
extern void * tscQhandle;
|
||||
extern int tscKeepConn[];
|
||||
|
|
|
@ -825,8 +825,11 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) {
|
|||
static int32_t tscProcessServStatus(SSqlObj *pSql) {
|
||||
STscObj* pObj = pSql->pTscObj;
|
||||
|
||||
if (pObj->pHb != NULL) {
|
||||
if (pObj->pHb->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||
SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
|
||||
if (pHb != NULL) {
|
||||
int32_t code = pHb->res.code;
|
||||
taosReleaseRef(tscObjRef, pObj->hbrid);
|
||||
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
|
||||
pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||
return pSql->res.code;
|
||||
}
|
||||
|
|
|
@ -175,10 +175,10 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
|||
if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId));
|
||||
}
|
||||
} else {
|
||||
tscDebug("%p heartbeat failed, code:%s", pObj->pHb, tstrerror(code));
|
||||
tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code));
|
||||
}
|
||||
|
||||
if (pObj->pHb != NULL) {
|
||||
if (pObj->hbrid != 0) {
|
||||
int32_t waitingDuring = tsShellActivityTimer * 500;
|
||||
tscDebug("%p send heartbeat in %dms", pSql, waitingDuring);
|
||||
|
||||
|
@ -193,20 +193,12 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
|
|||
STscObj *pObj = taosAcquireRef(tscRefId, rid);
|
||||
if (pObj == NULL) return;
|
||||
|
||||
SSqlObj* pHB = pObj->pHb;
|
||||
|
||||
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (p == NULL) {
|
||||
tscWarn("%p HB object has been released already", pHB);
|
||||
taosReleaseRef(tscRefId, pObj->rid);
|
||||
return;
|
||||
}
|
||||
|
||||
assert(*pHB->self == pHB);
|
||||
SSqlObj* pHB = taosAcquireRef(tscObjRef, pObj->hbrid);
|
||||
assert(pHB->self == pObj->hbrid);
|
||||
|
||||
pHB->retry = 0;
|
||||
int32_t code = tscProcessSql(pHB);
|
||||
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||
taosReleaseRef(tscObjRef, pObj->hbrid);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
|
||||
|
@ -236,7 +228,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
|||
.msgType = pSql->cmd.msgType,
|
||||
.pCont = pMsg,
|
||||
.contLen = pSql->cmd.payloadLen,
|
||||
.ahandle = pSql,
|
||||
.ahandle = (void*)pSql->self,
|
||||
.handle = NULL,
|
||||
.code = 0
|
||||
};
|
||||
|
@ -247,26 +239,24 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
|||
|
||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
|
||||
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (p == NULL) {
|
||||
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
|
||||
if (pSql == NULL) {
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
SSqlObj* pSql = *p;
|
||||
assert(pSql != NULL);
|
||||
assert(pSql->self == handle);
|
||||
|
||||
STscObj *pObj = pSql->pTscObj;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
|
||||
assert(*pSql->self == pSql);
|
||||
pSql->rpcRid = -1;
|
||||
|
||||
if (pObj->signature != pObj) {
|
||||
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
|
||||
|
||||
taosCacheRelease(tscObjCache, (void**) &p, true);
|
||||
taosRemoveRef(tscObjRef, pSql->self);
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
@ -276,10 +266,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
|
||||
pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
|
||||
|
||||
void** p1 = p;
|
||||
taosCacheRelease(tscObjCache, (void**) &p1, false);
|
||||
|
||||
taosCacheRelease(tscObjCache, (void**) &p, true);
|
||||
taosRemoveRef(tscObjRef, pSql->self);
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
@ -322,7 +310,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
|
||||
// if there is an error occurring, proceed to the following error handling procedure.
|
||||
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
rpcFreeCont(rpcMsg->pCont);
|
||||
return;
|
||||
}
|
||||
|
@ -390,11 +378,10 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
|
||||
}
|
||||
|
||||
void** p1 = p;
|
||||
taosCacheRelease(tscObjCache, (void**) &p1, false);
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
|
||||
if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
|
||||
taosCacheRelease(tscObjCache, (void **)&p, true);
|
||||
taosRemoveRef(tscObjRef, pSql->self);
|
||||
tscDebug("%p sqlObj is automatically freed", pSql);
|
||||
}
|
||||
|
||||
|
@ -2020,7 +2007,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
|
|||
|
||||
// TODO multithread problem
|
||||
static void createHBObj(STscObj* pObj) {
|
||||
if (pObj->pHb != NULL) {
|
||||
if (pObj->hbrid != 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -2052,7 +2039,7 @@ static void createHBObj(STscObj* pObj) {
|
|||
registerSqlObj(pSql);
|
||||
tscDebug("%p HB is allocated, pObj:%p", pSql, pObj);
|
||||
|
||||
pObj->pHb = pSql;
|
||||
pObj->hbrid = pSql->self;
|
||||
}
|
||||
|
||||
int tscProcessConnectRsp(SSqlObj *pSql) {
|
||||
|
|
|
@ -276,8 +276,8 @@ void taos_close(TAOS *taos) {
|
|||
pObj->signature = NULL;
|
||||
taosTmrStopA(&(pObj->pTimer));
|
||||
|
||||
SSqlObj* pHb = pObj->pHb;
|
||||
if (pHb != NULL && atomic_val_compare_exchange_ptr(&pObj->pHb, pHb, 0) == pHb) {
|
||||
SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
|
||||
if (pHb != NULL) {
|
||||
if (pHb->rpcRid > 0) { // wait for rsp from dnode
|
||||
rpcCancelRequest(pHb->rpcRid);
|
||||
pHb->rpcRid = -1;
|
||||
|
@ -285,6 +285,7 @@ void taos_close(TAOS *taos) {
|
|||
|
||||
tscDebug("%p HB is freed", pHb);
|
||||
taos_free_result(pHb);
|
||||
taosReleaseRef(tscObjRef, pHb->self);
|
||||
}
|
||||
|
||||
int32_t ref = T_REF_DEC(pObj);
|
||||
|
@ -606,8 +607,7 @@ void taos_free_result(TAOS_RES *res) {
|
|||
bool freeNow = tscKillQueryInDnode(pSql);
|
||||
if (freeNow) {
|
||||
tscDebug("%p free sqlObj in cache", pSql);
|
||||
SSqlObj** p = pSql->self;
|
||||
taosCacheRelease(tscObjCache, (void**) &p, true);
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -700,13 +700,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
|
|||
continue;
|
||||
}
|
||||
|
||||
void** p = taosCacheAcquireByKey(tscObjCache, &pSub, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||
if (p == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSqlObj* pSubObj = (SSqlObj*) (*p);
|
||||
assert(pSubObj->self == (SSqlObj**) p);
|
||||
SSqlObj* pSubObj = pSub;
|
||||
|
||||
pSubObj->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||
if (pSubObj->rpcRid > 0) {
|
||||
|
@ -715,7 +709,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
tscQueueAsyncRes(pSubObj);
|
||||
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||
taosReleaseRef(tscObjRef, pSubObj->self);
|
||||
}
|
||||
|
||||
tscDebug("%p super table query cancelled", pSql);
|
||||
|
|
|
@ -179,8 +179,8 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
|
|||
fail:
|
||||
tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
|
||||
if (pSql != NULL) {
|
||||
if (pSql->self != NULL) {
|
||||
taos_free_result(pSql);
|
||||
if (pSql->self != 0) {
|
||||
taosReleaseRef(tscObjRef, pSql->self);
|
||||
} else {
|
||||
tscFreeSqlObj(pSql);
|
||||
}
|
||||
|
|
|
@ -2198,6 +2198,9 @@ int32_t tscHandleInsertRetry(SSqlObj* pSql) {
|
|||
STableDataBlocks* pTableDataBlock = taosArrayGetP(pCmd->pDataBlocks, pSupporter->index);
|
||||
int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock);
|
||||
|
||||
// free the data block created from insert sql string
|
||||
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
|
||||
|
||||
if ((pRes->code = code)!= TSDB_CODE_SUCCESS) {
|
||||
tscQueueAsyncRes(pSql);
|
||||
return code; // here the pSql may have been released already.
|
||||
|
|
|
@ -15,7 +15,7 @@
|
|||
|
||||
#include "os.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tcache.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
#include "tsystem.h"
|
||||
#include "ttimer.h"
|
||||
|
@ -31,7 +31,7 @@
|
|||
|
||||
// global, not configurable
|
||||
SCacheObj* tscMetaCache;
|
||||
SCacheObj* tscObjCache;
|
||||
int tscObjRef = -1;
|
||||
void * tscTmr;
|
||||
void * tscQhandle;
|
||||
void * tscCheckDiskUsageTmr;
|
||||
|
@ -144,7 +144,7 @@ void taos_init_imp(void) {
|
|||
int64_t refreshTime = 10; // 10 seconds by default
|
||||
if (tscMetaCache == NULL) {
|
||||
tscMetaCache = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, tscFreeTableMetaHelper, "tableMeta");
|
||||
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj");
|
||||
tscObjRef = taosOpenRef(4096, tscFreeRegisteredSqlObj);
|
||||
}
|
||||
|
||||
tscRefId = taosOpenRef(200, tscCloseTscObj);
|
||||
|
@ -167,9 +167,9 @@ void taos_cleanup(void) {
|
|||
taosCacheCleanup(m);
|
||||
}
|
||||
|
||||
m = tscObjCache;
|
||||
if (m != NULL && atomic_val_compare_exchange_ptr(&tscObjCache, m, 0) == m) {
|
||||
taosCacheCleanup(m);
|
||||
int refId = atomic_exchange_32(&tscObjRef, -1);
|
||||
if (refId != -1) {
|
||||
taosCloseRef(refId);
|
||||
}
|
||||
|
||||
m = tscQhandle;
|
||||
|
|
|
@ -447,20 +447,18 @@ static void tscFreeSubobj(SSqlObj* pSql) {
|
|||
void tscFreeRegisteredSqlObj(void *pSql) {
|
||||
assert(pSql != NULL);
|
||||
|
||||
SSqlObj** p = (SSqlObj**)pSql;
|
||||
STscObj* pTscObj = (*p)->pTscObj;
|
||||
SSqlObj* p = *(SSqlObj**)pSql;
|
||||
STscObj* pTscObj = p->pTscObj;
|
||||
|
||||
assert((*p)->self != 0 && (*p)->self == (p));
|
||||
|
||||
SSqlObj* ptr = *p;
|
||||
tscFreeSqlObj(*p);
|
||||
assert(p->self != 0);
|
||||
tscFreeSqlObj(p);
|
||||
|
||||
int32_t ref = T_REF_DEC(pTscObj);
|
||||
assert(ref >= 0);
|
||||
|
||||
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", ptr, pTscObj, ref);
|
||||
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref);
|
||||
if (ref == 0) {
|
||||
tscDebug("%p all sqlObj freed, free tscObj:%p", ptr, pTscObj);
|
||||
tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj);
|
||||
taosRemoveRef(tscRefId, pTscObj->rid);
|
||||
}
|
||||
}
|
||||
|
@ -840,7 +838,6 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
|||
|
||||
// the length does not include the SSubmitBlk structure
|
||||
pBlocks->dataLen = htonl(finalLen);
|
||||
|
||||
dataBuf->numOfTables += 1;
|
||||
}
|
||||
|
||||
|
@ -1565,19 +1562,6 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
void tscSetFreeHeatBeat(STscObj* pObj) {
|
||||
if (pObj == NULL || pObj->signature != pObj || pObj->pHb == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
SSqlObj* pHeatBeat = pObj->pHb;
|
||||
assert(pHeatBeat == pHeatBeat->signature);
|
||||
|
||||
// to denote the heart-beat timer close connection and free all allocated resources
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pHeatBeat->cmd, 0);
|
||||
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
|
||||
}
|
||||
|
||||
/*
|
||||
* the following four kinds of SqlObj should not be freed
|
||||
* 1. SqlObj for stream computing
|
||||
|
@ -1596,7 +1580,7 @@ bool tscShouldBeFreed(SSqlObj* pSql) {
|
|||
}
|
||||
|
||||
STscObj* pTscObj = pSql->pTscObj;
|
||||
if (pSql->pStream != NULL || pTscObj->pHb == pSql || pSql->pSubscription != NULL) {
|
||||
if (pSql->pStream != NULL || pTscObj->hbrid == pSql->self || pSql->pSubscription != NULL) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1888,13 +1872,10 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
|
|||
}
|
||||
|
||||
void registerSqlObj(SSqlObj* pSql) {
|
||||
int32_t DEFAULT_LIFE_TIME = 2 * 600 * 1000; // 1200 sec
|
||||
|
||||
int32_t ref = T_REF_INC(pSql->pTscObj);
|
||||
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
|
||||
|
||||
TSDB_CACHE_PTR_TYPE p = (TSDB_CACHE_PTR_TYPE) pSql;
|
||||
pSql->self = taosCachePut(tscObjCache, &p, sizeof(TSDB_CACHE_PTR_TYPE), &p, sizeof(TSDB_CACHE_PTR_TYPE), DEFAULT_LIFE_TIME);
|
||||
pSql->self = taosAddRef(tscObjRef, pSql);
|
||||
}
|
||||
|
||||
SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cmd) {
|
||||
|
|
Loading…
Reference in New Issue