cache use trav_fn callback
This commit is contained in:
parent
afb9962c2f
commit
85be955001
|
@ -59,7 +59,7 @@ void tscCheckDiskUsage(void *UNUSED_PARAM(para), void *UNUSED_PARAM(param)) {
|
||||||
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
|
taosTmrReset(tscCheckDiskUsage, 20 * 1000, NULL, tscTmr, &tscCheckDiskUsageTmr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscFreeRpcObj(void *param, void* param1) {
|
void tscFreeRpcObj(void *param) {
|
||||||
assert(param);
|
assert(param);
|
||||||
SRpcObj *pRpcObj = (SRpcObj *)(param);
|
SRpcObj *pRpcObj = (SRpcObj *)(param);
|
||||||
tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn);
|
tscDebug("free rpcObj:%p and free pDnodeConn: %p", pRpcObj, pRpcObj->pDnodeConn);
|
||||||
|
|
|
@ -46,7 +46,7 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
|
||||||
static void mnodeCancelGetNextConn(void *pIter);
|
static void mnodeCancelGetNextConn(void *pIter);
|
||||||
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
|
||||||
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
|
||||||
static void mnodeFreeConn(void *data, void* param1);
|
static void mnodeFreeConn(void *data);
|
||||||
static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg);
|
static int32_t mnodeProcessKillQueryMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg);
|
static int32_t mnodeProcessKillStreamMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg);
|
static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg);
|
||||||
|
@ -135,7 +135,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
|
||||||
return pConn;
|
return pConn;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mnodeFreeConn(void *data, void* param1) {
|
static void mnodeFreeConn(void *data) {
|
||||||
SConnObj *pConn = data;
|
SConnObj *pConn = data;
|
||||||
tfree(pConn->pQueries);
|
tfree(pConn->pQueries);
|
||||||
tfree(pConn->pStreams);
|
tfree(pConn->pStreams);
|
||||||
|
|
|
@ -46,7 +46,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *mnodeMsg);
|
||||||
static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg);
|
static int32_t mnodeProcessConnectMsg(SMnodeMsg *mnodeMsg);
|
||||||
static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg);
|
static int32_t mnodeProcessUseMsg(SMnodeMsg *mnodeMsg);
|
||||||
|
|
||||||
static void mnodeFreeShowObj(void *data, void* param1);
|
static void mnodeFreeShowObj(void *data);
|
||||||
static bool mnodeAccquireShowObj(SShowObj *pShow);
|
static bool mnodeAccquireShowObj(SShowObj *pShow);
|
||||||
static bool mnodeCheckShowFinished(SShowObj *pShow);
|
static bool mnodeCheckShowFinished(SShowObj *pShow);
|
||||||
static void *mnodePutShowObj(SShowObj *pShow);
|
static void *mnodePutShowObj(SShowObj *pShow);
|
||||||
|
@ -420,7 +420,7 @@ static void* mnodePutShowObj(SShowObj *pShow) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mnodeFreeShowObj(void *data, void* param1) {
|
static void mnodeFreeShowObj(void *data) {
|
||||||
SShowObj *pShow = *(SShowObj **)data;
|
SShowObj *pShow = *(SShowObj **)data;
|
||||||
if (tsMnodeShowFreeIterFp[pShow->type] != NULL) {
|
if (tsMnodeShowFreeIterFp[pShow->type] != NULL) {
|
||||||
if (pShow->pVgIter != NULL) {
|
if (pShow->pVgIter != NULL) {
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
#include "httpContext.h"
|
#include "httpContext.h"
|
||||||
#include "httpParser.h"
|
#include "httpParser.h"
|
||||||
|
|
||||||
static void httpDestroyContext(void *data, void* param1);
|
static void httpDestroyContext(void *data);
|
||||||
|
|
||||||
static void httpRemoveContextFromEpoll(HttpContext *pContext) {
|
static void httpRemoveContextFromEpoll(HttpContext *pContext) {
|
||||||
HttpThread *pThread = pContext->pThread;
|
HttpThread *pThread = pContext->pThread;
|
||||||
|
@ -44,7 +44,7 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void httpDestroyContext(void *data, void* param1) {
|
static void httpDestroyContext(void *data) {
|
||||||
HttpContext *pContext = *(HttpContext **)data;
|
HttpContext *pContext = *(HttpContext **)data;
|
||||||
if (pContext->fd > 0) taosCloseSocket(pContext->fd);
|
if (pContext->fd > 0) taosCloseSocket(pContext->fd);
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,7 @@ void httpReleaseSession(HttpContext *pContext) {
|
||||||
pContext->session = NULL;
|
pContext->session = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void httpDestroySession(void *data, void* param1) {
|
static void httpDestroySession(void *data) {
|
||||||
HttpSession *session = data;
|
HttpSession *session = data;
|
||||||
httpDebug("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount);
|
httpDebug("session:%p:%p, is destroyed, sessionRef:%d", session, session->taos, session->refCount);
|
||||||
|
|
||||||
|
|
|
@ -40,7 +40,7 @@ static void queryMgmtKillQueryFn(void* handle, void* param1) {
|
||||||
qKillQuery(*fp);
|
qKillQuery(*fp);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void freeqinfoFn(void *qhandle, void* param1) {
|
static void freeqinfoFn(void *qhandle) {
|
||||||
void** handle = qhandle;
|
void** handle = qhandle;
|
||||||
if (handle == NULL || *handle == NULL) {
|
if (handle == NULL || *handle == NULL) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -48,8 +48,8 @@ int32_t tsdbInsertNewBlock(STsdbRepo * pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// switch anther thread to run
|
// switch anther thread to run
|
||||||
void* cbKillQueryFree(void* param1) {
|
void* cbKillQueryFree(void* param) {
|
||||||
STsdbRepo* pRepo = (STsdbRepo*)param1;
|
STsdbRepo* pRepo = (STsdbRepo*)param;
|
||||||
// vnode
|
// vnode
|
||||||
if(pRepo->appH.notifyStatus) {
|
if(pRepo->appH.notifyStatus) {
|
||||||
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
|
pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_NOBLOCK, TSDB_CODE_SUCCESS);
|
||||||
|
|
|
@ -32,7 +32,8 @@ extern "C" {
|
||||||
#define TSDB_CACHE_PTR_TYPE int64_t
|
#define TSDB_CACHE_PTR_TYPE int64_t
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef void (*__cache_free_fn_t)(void*, void*);
|
typedef void (*__cache_free_fn_t)(void*);
|
||||||
|
typedef void (*__cache_trav_fn_t)(void*, void*);
|
||||||
|
|
||||||
typedef struct SCacheStatis {
|
typedef struct SCacheStatis {
|
||||||
int64_t missCount;
|
int64_t missCount;
|
||||||
|
@ -176,7 +177,7 @@ void taosCacheCleanup(SCacheObj *pCacheObj);
|
||||||
* @param fp
|
* @param fp
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp, void* param1);
|
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* stop background refresh worker thread
|
* stop background refresh worker thread
|
||||||
|
|
|
@ -140,7 +140,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNo
|
||||||
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
|
pCacheObj->name, pNode->key, pNode->data, pNode->size, size - 1, pCacheObj->totalSize);
|
||||||
|
|
||||||
if (pCacheObj->freeFp) {
|
if (pCacheObj->freeFp) {
|
||||||
pCacheObj->freeFp(pNode->data, NULL);
|
pCacheObj->freeFp(pNode->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(pNode);
|
free(pNode);
|
||||||
|
@ -174,7 +174,7 @@ static FORCE_INLINE STrashElem* doRemoveElemInTrashcan(SCacheObj* pCacheObj, STr
|
||||||
|
|
||||||
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) {
|
static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) {
|
||||||
if (pCacheObj->freeFp) {
|
if (pCacheObj->freeFp) {
|
||||||
pCacheObj->freeFp(pElem->pData->data, NULL);
|
pCacheObj->freeFp(pElem->pData->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(pElem->pData);
|
free(pElem->pData);
|
||||||
|
@ -249,7 +249,7 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
if (T_REF_VAL_GET(p) == 0) {
|
if (T_REF_VAL_GET(p) == 0) {
|
||||||
if (pCacheObj->freeFp) {
|
if (pCacheObj->freeFp) {
|
||||||
pCacheObj->freeFp(p->data, NULL);
|
pCacheObj->freeFp(p->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_sub_fetch_64(&pCacheObj->totalSize, p->size);
|
atomic_sub_fetch_64(&pCacheObj->totalSize, p->size);
|
||||||
|
@ -458,7 +458,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
|
pCacheObj->name, pNode->key, pNode->data, pNode->size, size, pCacheObj->totalSize);
|
||||||
|
|
||||||
if (pCacheObj->freeFp) {
|
if (pCacheObj->freeFp) {
|
||||||
pCacheObj->freeFp(pNode->data, NULL);
|
pCacheObj->freeFp(pNode->data);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(pNode);
|
free(pNode);
|
||||||
|
@ -503,7 +503,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
typedef struct SHashTravSupp {
|
typedef struct SHashTravSupp {
|
||||||
SCacheObj* pCacheObj;
|
SCacheObj* pCacheObj;
|
||||||
int64_t time;
|
int64_t time;
|
||||||
__cache_free_fn_t fp;
|
__cache_trav_fn_t fp;
|
||||||
void* param1;
|
void* param1;
|
||||||
} SHashTravSupp;
|
} SHashTravSupp;
|
||||||
|
|
||||||
|
@ -671,7 +671,7 @@ bool travHashTableFn(void* param, void* data) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp, void* param1) {
|
static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_trav_fn_t fp, void* param1) {
|
||||||
assert(pCacheObj != NULL);
|
assert(pCacheObj != NULL);
|
||||||
|
|
||||||
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
|
SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time, .param1 = param1};
|
||||||
|
@ -755,7 +755,7 @@ void* taosCacheTimedRefresh(void *handle) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_free_fn_t fp, void* param1) {
|
void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void* param1) {
|
||||||
if (pCacheObj == NULL) {
|
if (pCacheObj == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue