first version
This commit is contained in:
parent
6c0e357291
commit
ceeec650a1
|
@ -30,6 +30,7 @@ extern "C" {
|
||||||
#include "tsqlfunction.h"
|
#include "tsqlfunction.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tcache.h"
|
#include "tcache.h"
|
||||||
|
#include "tref.h"
|
||||||
|
|
||||||
#include "qExecutor.h"
|
#include "qExecutor.h"
|
||||||
#include "qSqlparser.h"
|
#include "qSqlparser.h"
|
||||||
|
@ -446,7 +447,7 @@ void tscFreeSqlObj(SSqlObj *pSql);
|
||||||
void tscFreeRegisteredSqlObj(void *pSql);
|
void tscFreeRegisteredSqlObj(void *pSql);
|
||||||
void tscFreeTableMetaHelper(void *pTableMeta);
|
void tscFreeTableMetaHelper(void *pTableMeta);
|
||||||
|
|
||||||
void tscCloseTscObj(STscObj *pObj);
|
void tscCloseTscObj(void *pObj);
|
||||||
|
|
||||||
// todo move to taos? or create a new file: taos_internal.h
|
// todo move to taos? or create a new file: taos_internal.h
|
||||||
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int),
|
||||||
|
@ -516,6 +517,7 @@ extern void * tscQhandle;
|
||||||
extern int tscKeepConn[];
|
extern int tscKeepConn[];
|
||||||
extern int tsInsertHeadSize;
|
extern int tsInsertHeadSize;
|
||||||
extern int tscNumOfThreads;
|
extern int tscNumOfThreads;
|
||||||
|
extern int tscRefId;
|
||||||
|
|
||||||
extern SRpcCorEpSet tscMgmtEpSet;
|
extern SRpcCorEpSet tscMgmtEpSet;
|
||||||
|
|
||||||
|
|
|
@ -190,18 +190,19 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
|
|
||||||
void tscProcessActivityTimer(void *handle, void *tmrId) {
|
void tscProcessActivityTimer(void *handle, void *tmrId) {
|
||||||
STscObj *pObj = (STscObj *)handle;
|
STscObj *pObj = (STscObj *)handle;
|
||||||
if (pObj == NULL || pObj->signature != pObj) {
|
|
||||||
|
int ret = taosAcquireRef(tscRefId, pObj);
|
||||||
|
if (ret < 0) {
|
||||||
|
tscTrace("%p failed to acquire TSC obj, reason:%s", pObj, tstrerror(ret));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSqlObj* pHB = pObj->pHb;
|
SSqlObj* pHB = pObj->pHb;
|
||||||
if (pObj->pTimer != tmrId || pHB == NULL) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
|
void** p = taosCacheAcquireByKey(tscObjCache, &pHB, sizeof(TSDB_CACHE_PTR_TYPE));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
tscWarn("%p HB object has been released already", pHB);
|
tscWarn("%p HB object has been released already", pHB);
|
||||||
|
taosReleaseRef(tscRefId, pObj);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,6 +214,8 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
|
tscError("%p failed to sent HB to server, reason:%s", pHB, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosReleaseRef(tscRefId, pObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscSendMsgToServer(SSqlObj *pSql) {
|
int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
|
|
|
@ -161,6 +161,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
|
||||||
registerSqlObj(pSql);
|
registerSqlObj(pSql);
|
||||||
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
||||||
|
|
||||||
|
taosAddRef(tscRefId, pObj);
|
||||||
return pSql;
|
return pSql;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -296,7 +297,8 @@ void taos_close(TAOS *taos) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
|
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
|
||||||
tscCloseTscObj(pObj);
|
|
||||||
|
taosRemoveRef(tscRefId, pObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
|
void waitForQueryRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
|
|
|
@ -36,6 +36,7 @@ void * tscTmr;
|
||||||
void * tscQhandle;
|
void * tscQhandle;
|
||||||
void * tscCheckDiskUsageTmr;
|
void * tscCheckDiskUsageTmr;
|
||||||
int tsInsertHeadSize;
|
int tsInsertHeadSize;
|
||||||
|
int tscRefId;
|
||||||
|
|
||||||
int tscNumOfThreads;
|
int tscNumOfThreads;
|
||||||
|
|
||||||
|
@ -146,6 +147,8 @@ void taos_init_imp(void) {
|
||||||
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj");
|
tscObjCache = taosCacheInit(TSDB_CACHE_PTR_KEY, refreshTime / 2, false, tscFreeRegisteredSqlObj, "sqlObj");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tscRefId = taosOpenRef(200, tscCloseTscObj);
|
||||||
|
|
||||||
tscDebug("client is initialized successfully");
|
tscDebug("client is initialized successfully");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,6 +168,7 @@ void taos_cleanup() {
|
||||||
tscQhandle = NULL;
|
tscQhandle = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosCloseRef(tscRefId);
|
||||||
taosCleanupKeywordsTable();
|
taosCleanupKeywordsTable();
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
|
|
||||||
|
|
|
@ -404,7 +404,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
|
||||||
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
|
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", *p, pTscObj, ref);
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
|
tscDebug("%p all sqlObj freed, free tscObj:%p", *p, pTscObj);
|
||||||
tscCloseTscObj(pTscObj);
|
taosRemoveRef(tscRefId, pTscObj);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -786,8 +786,8 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: all subqueries should be freed correctly before close this connection.
|
// TODO: all subqueries should be freed correctly before close this connection.
|
||||||
void tscCloseTscObj(STscObj* pObj) {
|
void tscCloseTscObj(void *param) {
|
||||||
assert(pObj != NULL);
|
STscObj *pObj = param;
|
||||||
|
|
||||||
pObj->signature = NULL;
|
pObj->signature = NULL;
|
||||||
taosTmrStopA(&(pObj->pTimer));
|
taosTmrStopA(&(pObj->pTimer));
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 8d7bf743852897110cbdcc7c4322cd7a74d4167b
|
Subproject commit 8c58c512b6acda8bcdfa48fdc7140227b5221766
|
|
@ -20,6 +20,7 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "lz4.h"
|
#include "lz4.h"
|
||||||
|
#include "tref.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tsocket.h"
|
#include "tsocket.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
@ -72,7 +73,6 @@ typedef struct {
|
||||||
SRpcInfo *pRpc; // associated SRpcInfo
|
SRpcInfo *pRpc; // associated SRpcInfo
|
||||||
SRpcEpSet epSet; // ip list provided by app
|
SRpcEpSet epSet; // ip list provided by app
|
||||||
void *ahandle; // handle provided by app
|
void *ahandle; // handle provided by app
|
||||||
void *signature; // for validation
|
|
||||||
struct SRpcConn *pConn; // pConn allocated
|
struct SRpcConn *pConn; // pConn allocated
|
||||||
char msgType; // message type
|
char msgType; // message type
|
||||||
uint8_t *pCont; // content provided by app
|
uint8_t *pCont; // content provided by app
|
||||||
|
@ -132,6 +132,10 @@ int tsRpcMaxRetry;
|
||||||
int tsRpcHeadSize;
|
int tsRpcHeadSize;
|
||||||
int tsRpcOverhead;
|
int tsRpcOverhead;
|
||||||
|
|
||||||
|
static int tsRpcRefId = -1;
|
||||||
|
static int32_t tsRpcNum = 0;
|
||||||
|
static pthread_once_t tsRpcInit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
// server:0 client:1 tcp:2 udp:0
|
// server:0 client:1 tcp:2 udp:0
|
||||||
#define RPC_CONN_UDPS 0
|
#define RPC_CONN_UDPS 0
|
||||||
#define RPC_CONN_UDPC 1
|
#define RPC_CONN_UDPC 1
|
||||||
|
@ -211,14 +215,21 @@ static void rpcUnlockConn(SRpcConn *pConn);
|
||||||
static void rpcAddRef(SRpcInfo *pRpc);
|
static void rpcAddRef(SRpcInfo *pRpc);
|
||||||
static void rpcDecRef(SRpcInfo *pRpc);
|
static void rpcDecRef(SRpcInfo *pRpc);
|
||||||
|
|
||||||
void *rpcOpen(const SRpcInit *pInit) {
|
static void rpcInit(void) {
|
||||||
SRpcInfo *pRpc;
|
|
||||||
|
|
||||||
tsProgressTimer = tsRpcTimer/2;
|
tsProgressTimer = tsRpcTimer/2;
|
||||||
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
|
tsRpcMaxRetry = tsRpcMaxTime * 1000/tsProgressTimer;
|
||||||
tsRpcHeadSize = RPC_MSG_OVERHEAD;
|
tsRpcHeadSize = RPC_MSG_OVERHEAD;
|
||||||
tsRpcOverhead = sizeof(SRpcReqContext);
|
tsRpcOverhead = sizeof(SRpcReqContext);
|
||||||
|
|
||||||
|
tsRpcRefId = taosOpenRef(200, free);
|
||||||
|
}
|
||||||
|
|
||||||
|
void *rpcOpen(const SRpcInit *pInit) {
|
||||||
|
SRpcInfo *pRpc;
|
||||||
|
|
||||||
|
pthread_once(&tsRpcInit, rpcInit);
|
||||||
|
|
||||||
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
|
pRpc = (SRpcInfo *)calloc(1, sizeof(SRpcInfo));
|
||||||
if (pRpc == NULL) return NULL;
|
if (pRpc == NULL) return NULL;
|
||||||
|
|
||||||
|
@ -237,6 +248,8 @@ void *rpcOpen(const SRpcInit *pInit) {
|
||||||
pRpc->afp = pInit->afp;
|
pRpc->afp = pInit->afp;
|
||||||
pRpc->refCount = 1;
|
pRpc->refCount = 1;
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&tsRpcNum, 1);
|
||||||
|
|
||||||
size_t size = sizeof(SRpcConn) * pRpc->sessions;
|
size_t size = sizeof(SRpcConn) * pRpc->sessions;
|
||||||
pRpc->connList = (SRpcConn *)calloc(1, size);
|
pRpc->connList = (SRpcConn *)calloc(1, size);
|
||||||
if (pRpc->connList == NULL) {
|
if (pRpc->connList == NULL) {
|
||||||
|
@ -363,7 +376,6 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
|
||||||
pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
pContext = (SRpcReqContext *) ((char*)pMsg->pCont-sizeof(SRpcHead)-sizeof(SRpcReqContext));
|
||||||
pContext->ahandle = pMsg->ahandle;
|
pContext->ahandle = pMsg->ahandle;
|
||||||
pContext->signature = pContext;
|
|
||||||
pContext->pRpc = (SRpcInfo *)shandle;
|
pContext->pRpc = (SRpcInfo *)shandle;
|
||||||
pContext->epSet = *pEpSet;
|
pContext->epSet = *pEpSet;
|
||||||
pContext->contLen = contLen;
|
pContext->contLen = contLen;
|
||||||
|
@ -386,6 +398,7 @@ void rpcSendRequest(void *shandle, const SRpcEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
// set the handle to pContext, so app can cancel the request
|
// set the handle to pContext, so app can cancel the request
|
||||||
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
|
if (pMsg->handle) *((void **)pMsg->handle) = pContext;
|
||||||
|
|
||||||
|
taosAddRef(tsRpcRefId, pContext);
|
||||||
rpcSendReqToServer(pRpc, pContext);
|
rpcSendReqToServer(pRpc, pContext);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
@ -536,14 +549,15 @@ int rpcReportProgress(void *handle, char *pCont, int contLen) {
|
||||||
void rpcCancelRequest(void *handle) {
|
void rpcCancelRequest(void *handle) {
|
||||||
SRpcReqContext *pContext = handle;
|
SRpcReqContext *pContext = handle;
|
||||||
|
|
||||||
// signature is used to check if pContext is freed.
|
int code = taosAcquireRef(tsRpcRefId, pContext);
|
||||||
// pContext may have been released just before app calls the rpcCancelRequest
|
if (code < 0) return;
|
||||||
if (pContext == NULL || pContext->signature != pContext) return;
|
|
||||||
|
|
||||||
if (pContext->pConn) {
|
if (pContext->pConn) {
|
||||||
tDebug("%s, app tries to cancel request", pContext->pConn->info);
|
tDebug("%s, app tries to cancel request", pContext->pConn->info);
|
||||||
rpcCloseConn(pContext->pConn);
|
rpcCloseConn(pContext->pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosReleaseRef(tsRpcRefId, pContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcFreeMsg(void *msg) {
|
static void rpcFreeMsg(void *msg) {
|
||||||
|
@ -612,7 +626,7 @@ static void rpcReleaseConn(SRpcConn *pConn) {
|
||||||
// if there is an outgoing message, free it
|
// if there is an outgoing message, free it
|
||||||
if (pConn->outType && pConn->pReqMsg) {
|
if (pConn->outType && pConn->pReqMsg) {
|
||||||
if (pConn->pContext) pConn->pContext->pConn = NULL;
|
if (pConn->pContext) pConn->pContext->pConn = NULL;
|
||||||
rpcFreeMsg(pConn->pReqMsg);
|
taosRemoveRef(tsRpcRefId, pConn->pContext);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1068,7 +1082,6 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
|
||||||
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
|
static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
|
||||||
SRpcInfo *pRpc = pContext->pRpc;
|
SRpcInfo *pRpc = pContext->pRpc;
|
||||||
|
|
||||||
pContext->signature = NULL;
|
|
||||||
pContext->pConn = NULL;
|
pContext->pConn = NULL;
|
||||||
if (pContext->pRsp) {
|
if (pContext->pRsp) {
|
||||||
// for synchronous API
|
// for synchronous API
|
||||||
|
@ -1085,7 +1098,7 @@ static void rpcNotifyClient(SRpcReqContext *pContext, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// free the request message
|
// free the request message
|
||||||
rpcFreeCont(pContext->pCont);
|
taosRemoveRef(tsRpcRefId, pContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
|
static void rpcProcessIncomingMsg(SRpcConn *pConn, SRpcHead *pHead, SRpcReqContext *pContext) {
|
||||||
|
@ -1593,6 +1606,12 @@ static void rpcDecRef(SRpcInfo *pRpc)
|
||||||
pthread_mutex_destroy(&pRpc->mutex);
|
pthread_mutex_destroy(&pRpc->mutex);
|
||||||
tDebug("%s rpc resources are released", pRpc->label);
|
tDebug("%s rpc resources are released", pRpc->label);
|
||||||
taosTFree(pRpc);
|
taosTFree(pRpc);
|
||||||
|
|
||||||
|
int count = atomic_sub_fetch_32(&tsRpcNum, 1);
|
||||||
|
if (count == 0) {
|
||||||
|
taosCloseRef(tsRpcRefId);
|
||||||
|
tsRpcInit = PTHREAD_ONCE_INIT;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue