[td-1319]
This commit is contained in:
parent
a28d714d98
commit
2b1519c167
|
@ -29,6 +29,7 @@ extern "C" {
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "tsqlfunction.h"
|
#include "tsqlfunction.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "tcache.h"
|
||||||
|
|
||||||
#include "qExecutor.h"
|
#include "qExecutor.h"
|
||||||
#include "qSqlparser.h"
|
#include "qSqlparser.h"
|
||||||
|
@ -359,6 +360,8 @@ typedef struct SSqlObj {
|
||||||
uint16_t numOfSubs;
|
uint16_t numOfSubs;
|
||||||
struct SSqlObj **pSubs;
|
struct SSqlObj **pSubs;
|
||||||
struct SSqlObj * prev, *next;
|
struct SSqlObj * prev, *next;
|
||||||
|
|
||||||
|
struct SSqlObj **self;
|
||||||
} SSqlObj;
|
} SSqlObj;
|
||||||
|
|
||||||
typedef struct SSqlStream {
|
typedef struct SSqlStream {
|
||||||
|
@ -413,7 +416,6 @@ int32_t tscTansformSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
|
||||||
void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
|
void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
|
||||||
|
|
||||||
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
|
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
|
||||||
void tscDestroyResPointerInfo(SSqlRes *pRes);
|
|
||||||
|
|
||||||
void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
|
void tscResetSqlCmdObj(SSqlCmd *pCmd, bool removeFromCache);
|
||||||
|
|
||||||
|
@ -425,17 +427,19 @@ void tscFreeSqlResult(SSqlObj *pSql);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* only free part of resources allocated during query.
|
* only free part of resources allocated during query.
|
||||||
|
* TODO remove it later
|
||||||
* Note: this function is multi-thread safe.
|
* Note: this function is multi-thread safe.
|
||||||
* @param pObj
|
* @param pObj
|
||||||
*/
|
*/
|
||||||
void tscPartiallyFreeSqlObj(SSqlObj *pObj);
|
void tscPartiallyFreeSqlObj(SSqlObj *pSql);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* free sql object, release allocated resource
|
* free sql object, release allocated resource
|
||||||
* @param pObj Free metric/meta information, dynamically allocated payload, and
|
* @param pObj
|
||||||
* response buffer, object itself
|
|
||||||
*/
|
*/
|
||||||
void tscFreeSqlObj(SSqlObj *pObj);
|
void tscFreeSqlObj(SSqlObj *pSql);
|
||||||
|
|
||||||
|
void tscFreeSqlObjInCache(void *pSql);
|
||||||
|
|
||||||
void tscCloseTscObj(STscObj *pObj);
|
void tscCloseTscObj(STscObj *pObj);
|
||||||
|
|
||||||
|
@ -451,9 +455,6 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen)
|
||||||
bool tscIsUpdateQuery(SSqlObj* pSql);
|
bool tscIsUpdateQuery(SSqlObj* pSql);
|
||||||
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
|
bool tscHasReachLimitation(SQueryInfo *pQueryInfo, SSqlRes *pRes);
|
||||||
|
|
||||||
// todo remove this function.
|
|
||||||
bool tscResultsetFetchCompleted(TAOS_RES *result);
|
|
||||||
|
|
||||||
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
|
char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
|
||||||
|
|
||||||
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
|
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
|
||||||
|
@ -502,7 +503,8 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extern void * tscCacheHandle;
|
extern SCacheObj* tscCacheHandle;
|
||||||
|
extern SCacheObj* tscObjCache;
|
||||||
extern void * tscTmr;
|
extern void * tscTmr;
|
||||||
extern void * tscQhandle;
|
extern void * tscQhandle;
|
||||||
extern int tscKeepConn[];
|
extern int tscKeepConn[];
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
#include "tnote.h"
|
#include "tnote.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
#include "tcache.h"
|
||||||
#include "tscLog.h"
|
#include "tscLog.h"
|
||||||
#include "tscSubquery.h"
|
#include "tscSubquery.h"
|
||||||
#include "tscLocalMerge.h"
|
#include "tscLocalMerge.h"
|
||||||
|
@ -40,6 +41,8 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||||
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
|
static void tscAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows);
|
||||||
|
|
||||||
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
|
void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const char* sqlstr, size_t sqlLen) {
|
||||||
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
pSql->signature = pSql;
|
pSql->signature = pSql;
|
||||||
pSql->param = param;
|
pSql->param = param;
|
||||||
pSql->pTscObj = pObj;
|
pSql->pTscObj = pObj;
|
||||||
|
@ -59,7 +62,10 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
|
||||||
strntolower(pSql->sqlstr, sqlstr, (int32_t)sqlLen);
|
strntolower(pSql->sqlstr, sqlstr, (int32_t)sqlLen);
|
||||||
|
|
||||||
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
|
tscDebugL("%p SQL: %s", pSql, pSql->sqlstr);
|
||||||
pSql->cmd.curSql = pSql->sqlstr;
|
pCmd->curSql = pSql->sqlstr;
|
||||||
|
|
||||||
|
uint64_t handle = (uint64_t) pSql;
|
||||||
|
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
|
||||||
|
|
||||||
int32_t code = tsParseSql(pSql, true);
|
int32_t code = tsParseSql(pSql, true);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
|
||||||
|
@ -69,7 +75,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
|
||||||
tscQueueAsyncRes(pSql);
|
tscQueueAsyncRes(pSql);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDoQuery(pSql);
|
tscDoQuery(pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -472,10 +472,8 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("%p start to free local reducer", pSql);
|
|
||||||
SSqlRes *pRes = &(pSql->res);
|
SSqlRes *pRes = &(pSql->res);
|
||||||
if (pRes->pLocalReducer == NULL) {
|
if (pRes->pLocalReducer == NULL) {
|
||||||
tscDebug("%p local reducer has been freed, abort", pSql);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -27,10 +27,7 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tlockfree.h"
|
#include "tlockfree.h"
|
||||||
|
|
||||||
#define TSC_MGMT_VNODE 999
|
|
||||||
|
|
||||||
SRpcCorEpSet tscMgmtEpSet;
|
SRpcCorEpSet tscMgmtEpSet;
|
||||||
SRpcEpSet tscDnodeEpSet;
|
|
||||||
|
|
||||||
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
|
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
|
||||||
|
|
||||||
|
@ -236,12 +233,17 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
SSqlObj *pSql = (SSqlObj *)rpcMsg->ahandle;
|
uint64_t handle = (uint64_t) rpcMsg->ahandle;
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
|
||||||
tscError("%p sql is already released", pSql);
|
void** p = taosCacheAcquireByKey(tscObjCache, &handle, sizeof(uint64_t));
|
||||||
|
if (p == NULL) {
|
||||||
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSqlObj* pSql = *p;
|
||||||
|
assert(pSql != NULL);
|
||||||
|
|
||||||
STscObj *pObj = pSql->pTscObj;
|
STscObj *pObj = pSql->pTscObj;
|
||||||
SSqlRes *pRes = &pSql->res;
|
SSqlRes *pRes = &pSql->res;
|
||||||
SSqlCmd *pCmd = &pSql->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
|
@ -249,7 +251,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
if (pObj->signature != pObj) {
|
if (pObj->signature != pObj) {
|
||||||
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
|
tscDebug("%p DB connection is closed, cmd:%d pObj:%p signature:%p", pSql, pCmd->command, pObj, pObj->signature);
|
||||||
|
|
||||||
tscFreeSqlObj(pSql);
|
taosCacheRelease(tscObjCache, (void**) &p, true);
|
||||||
rpcFreeCont(rpcMsg->pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -261,18 +263,18 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
|
tscDebug("%p sqlObj needs to be released or DB connection is closed, cmd:%d type:%d, pObj:%p signature:%p",
|
||||||
pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
|
pSql, pCmd->command, pQueryInfo->type, pObj, pObj->signature);
|
||||||
|
|
||||||
tscFreeSqlObj(pSql);
|
taosCacheRelease(tscObjCache, (void**) &p, true);
|
||||||
rpcFreeCont(rpcMsg->pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pEpSet) {
|
if (pEpSet) {
|
||||||
if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
|
if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
|
||||||
if(pCmd->command < TSDB_SQL_MGMT) {
|
if (pCmd->command < TSDB_SQL_MGMT) {
|
||||||
tscUpdateVgroupInfo(pSql, pEpSet);
|
tscUpdateVgroupInfo(pSql, pEpSet);
|
||||||
} else {
|
} else {
|
||||||
tscUpdateMgmtEpSet(pEpSet);
|
tscUpdateMgmtEpSet(pEpSet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,7 +296,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
if (pSql->retry > pSql->maxRetry) {
|
if (pSql->retry > pSql->maxRetry) {
|
||||||
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
|
tscError("%p max retry %d reached, give up", pSql, pSql->maxRetry);
|
||||||
} else {
|
} else {
|
||||||
// wait for a little bit moment and then retry
|
// wait for a little bit moment and then retry, todo do not sleep in rpc callback thread
|
||||||
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
if (rpcMsg->code == TSDB_CODE_APP_NOT_READY || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID) {
|
||||||
int32_t duration = getWaitingTimeInterval(pSql->retry);
|
int32_t duration = getWaitingTimeInterval(pSql->retry);
|
||||||
taosMsleep(duration);
|
taosMsleep(duration);
|
||||||
|
@ -304,6 +306,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
|
|
||||||
// if there is an error occurring, proceed to the following error handling procedure.
|
// if there is an error occurring, proceed to the following error handling procedure.
|
||||||
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
|
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||||
rpcFreeCont(rpcMsg->pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -365,18 +368,21 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
|
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool shouldFree = false;
|
||||||
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
|
rpcMsg->code = (pRes->code == TSDB_CODE_SUCCESS) ? (int32_t)pRes->numOfRows : pRes->code;
|
||||||
|
|
||||||
bool shouldFree = tscShouldBeFreed(pSql);
|
shouldFree = tscShouldBeFreed(pSql);
|
||||||
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
|
(*pSql->fp)(pSql->param, pSql, rpcMsg->code);
|
||||||
|
|
||||||
if (shouldFree) {
|
if (shouldFree) {
|
||||||
|
void** p1 = p;
|
||||||
|
taosCacheRelease(tscObjCache, (void **)&p1, true);
|
||||||
tscDebug("%p sqlObj is automatically freed", pSql);
|
tscDebug("%p sqlObj is automatically freed", pSql);
|
||||||
tscFreeSqlObj(pSql);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosCacheRelease(tscObjCache, (void**) &p, false);
|
||||||
rpcFreeCont(rpcMsg->pCont);
|
rpcFreeCont(rpcMsg->pCont);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2000,7 +2006,7 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
|
||||||
|
|
||||||
createHBObj(pObj);
|
createHBObj(pObj);
|
||||||
|
|
||||||
taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
|
// taosTmrReset(tscProcessActivityTimer, tsShellActivityTimer * 500, pObj, tscTmr, &pObj->pTimer);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -2164,6 +2170,10 @@ static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInf
|
||||||
pNew->fp = tscTableMetaCallBack;
|
pNew->fp = tscTableMetaCallBack;
|
||||||
pNew->param = pSql;
|
pNew->param = pSql;
|
||||||
|
|
||||||
|
// TODO add test case on x86 platform
|
||||||
|
uint64_t adr = (uint64_t) pNew;
|
||||||
|
pNew->self = taosCachePut(tscObjCache, &adr, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2*60*1000);
|
||||||
|
|
||||||
int32_t code = tscProcessSql(pNew);
|
int32_t code = tscProcessSql(pNew);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated
|
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated
|
||||||
|
@ -2265,6 +2275,9 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
|
pNewQueryInfo->numOfTables = pQueryInfo->numOfTables;
|
||||||
|
|
||||||
|
uint64_t p = (uint64_t) pNew;
|
||||||
|
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
|
||||||
tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
|
tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables);
|
||||||
|
|
||||||
pNew->fp = tscTableMetaCallBack;
|
pNew->fp = tscTableMetaCallBack;
|
||||||
|
|
|
@ -100,6 +100,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
}
|
}
|
||||||
|
|
||||||
pObj->signature = pObj;
|
pObj->signature = pObj;
|
||||||
|
pObj->pDnodeConn = pDnodeConn;
|
||||||
|
|
||||||
tstrncpy(pObj->user, user, sizeof(pObj->user));
|
tstrncpy(pObj->user, user, sizeof(pObj->user));
|
||||||
secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass));
|
secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass));
|
||||||
|
@ -132,20 +133,15 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSql->pTscObj = pObj;
|
pSql->pTscObj = pObj;
|
||||||
pSql->signature = pSql;
|
pSql->signature = pSql;
|
||||||
pSql->maxRetry = TSDB_MAX_REPLICA;
|
pSql->maxRetry = TSDB_MAX_REPLICA;
|
||||||
tsem_init(&pSql->rspSem, 0, 0);
|
pSql->fp = fp;
|
||||||
|
pSql->param = param;
|
||||||
pObj->pDnodeConn = pDnodeConn;
|
|
||||||
|
|
||||||
pSql->fp = fp;
|
|
||||||
pSql->param = param;
|
|
||||||
if (taos != NULL) {
|
|
||||||
*taos = pObj;
|
|
||||||
}
|
|
||||||
|
|
||||||
pSql->cmd.command = TSDB_SQL_CONNECT;
|
pSql->cmd.command = TSDB_SQL_CONNECT;
|
||||||
|
|
||||||
|
tsem_init(&pSql->rspSem, 0, 0);
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
if (TSDB_CODE_SUCCESS != tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE)) {
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
rpcClose(pDnodeConn);
|
rpcClose(pDnodeConn);
|
||||||
|
@ -154,7 +150,14 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (taos != NULL) {
|
||||||
|
*taos = pObj;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t key = (uint64_t) pSql;
|
||||||
|
pSql->self = taosCachePut(tscObjCache, &key, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
|
||||||
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
||||||
|
|
||||||
return pSql;
|
return pSql;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -533,60 +536,72 @@ int taos_select_db(TAOS *taos, const char *db) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// send free message to vnode to free qhandle and corresponding resources in vnode
|
// send free message to vnode to free qhandle and corresponding resources in vnode
|
||||||
static bool tscKillQueryInVnode(SSqlObj* pSql) {
|
//static bool tscKillQueryInVnode(SSqlObj* pSql) {
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
// SSqlCmd* pCmd = &pSql->cmd;
|
||||||
SSqlRes* pRes = &pSql->res;
|
// SSqlRes* pRes = &pSql->res;
|
||||||
|
//
|
||||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
// STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
//
|
||||||
if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && !tscIsTwoStageSTableQuery(pQueryInfo, 0) &&
|
// if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
|
||||||
(pCmd->command == TSDB_SQL_SELECT ||
|
// return false;
|
||||||
pCmd->command == TSDB_SQL_SHOW ||
|
// }
|
||||||
pCmd->command == TSDB_SQL_RETRIEVE ||
|
//
|
||||||
pCmd->command == TSDB_SQL_FETCH) &&
|
// if (pRes->code == TSDB_CODE_SUCCESS && pRes->completed == false && (pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL) &&
|
||||||
(pSql->pStream == NULL && pTableMetaInfo->pTableMeta != NULL)) {
|
// (pCmd->command == TSDB_SQL_SELECT ||
|
||||||
|
// pCmd->command == TSDB_SQL_SHOW ||
|
||||||
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
// pCmd->command == TSDB_SQL_RETRIEVE ||
|
||||||
tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]);
|
// pCmd->command == TSDB_SQL_FETCH)) {
|
||||||
tscProcessSql(pSql);
|
//
|
||||||
return true;
|
// pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
|
||||||
}
|
// tscDebug("%p send msg to dnode to free qhandle ASAP, command:%s, ", pSql, sqlCmd[pCmd->command]);
|
||||||
|
// tscProcessSql(pSql);
|
||||||
return false;
|
// return true;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
|
// return false;
|
||||||
|
//}
|
||||||
|
|
||||||
void taos_free_result(TAOS_RES *res) {
|
void taos_free_result(TAOS_RES *res) {
|
||||||
SSqlObj *pSql = (SSqlObj *)res;
|
if (res == NULL) {
|
||||||
|
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
|
||||||
tscDebug("%p sqlObj has been freed", pSql);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// The semaphore can not be changed while freeing async sub query objects.
|
|
||||||
SSqlRes *pRes = &pSql->res;
|
|
||||||
if (pRes == NULL || pRes->qhandle == 0) {
|
|
||||||
tscFreeSqlObj(pSql);
|
|
||||||
tscDebug("%p SqlObj is freed by app, qhandle is null", pSql);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set freeFlag to 1 in retrieve message if there are un-retrieved results data in node
|
SSqlObj* pSql = (SSqlObj*) res;
|
||||||
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
taosCacheRelease(tscObjCache, (void**) &pSql->self, true);
|
||||||
if (pQueryInfo == NULL) {
|
|
||||||
tscFreeSqlObj(pSql);
|
|
||||||
tscDebug("%p SqlObj is freed by app", pSql);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
|
|
||||||
if (!tscKillQueryInVnode(pSql)) {
|
|
||||||
tscFreeSqlObj(pSql);
|
|
||||||
tscDebug("%p sqlObj is freed by app", pSql);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//static void doFreeResult(TAOS_RES *res) {
|
||||||
|
// SSqlObj *pSql = (SSqlObj *)res;
|
||||||
|
//
|
||||||
|
// if (pSql == NULL || pSql->signature != pSql) {
|
||||||
|
// tscDebug("%p sqlObj has been freed", pSql);
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // The semaphore can not be changed while freeing async sub query objects.
|
||||||
|
// SSqlRes *pRes = &pSql->res;
|
||||||
|
// if (pRes == NULL || pRes->qhandle == 0) {
|
||||||
|
// tscFreeSqlObj(pSql);
|
||||||
|
// tscDebug("%p SqlObj is freed by app, qhandle is null", pSql);
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // set freeFlag to 1 in retrieve message if there are un-retrieved results data in node
|
||||||
|
// SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||||
|
// if (pQueryInfo == NULL) {
|
||||||
|
// tscFreeSqlObj(pSql);
|
||||||
|
// tscDebug("%p SqlObj is freed by app", pSql);
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
|
||||||
|
// if (!tscKillQueryInVnode(pSql)) {
|
||||||
|
// tscFreeSqlObj(pSql);
|
||||||
|
// tscDebug("%p sqlObj is freed by app", pSql);
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
int taos_errno(TAOS_RES *tres) {
|
int taos_errno(TAOS_RES *tres) {
|
||||||
SSqlObj *pSql = (SSqlObj *) tres;
|
SSqlObj *pSql = (SSqlObj *) tres;
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL || pSql->signature != pSql) {
|
||||||
|
|
|
@ -1512,9 +1512,9 @@ static void tscFreeSubSqlObj(SRetrieveSupport *trsupport, SSqlObj *pSql) {
|
||||||
SSqlObj *pParentSql = trsupport->pParentSql;
|
SSqlObj *pParentSql = trsupport->pParentSql;
|
||||||
|
|
||||||
assert(pSql == pParentSql->pSubs[index]);
|
assert(pSql == pParentSql->pSubs[index]);
|
||||||
pParentSql->pSubs[index] = NULL;
|
// pParentSql->pSubs[index] = NULL;
|
||||||
|
//
|
||||||
taos_free_result(pSql);
|
// taos_free_result(pSql);
|
||||||
taosTFree(trsupport->localBuffer);
|
taosTFree(trsupport->localBuffer);
|
||||||
taosTFree(trsupport);
|
taosTFree(trsupport);
|
||||||
}
|
}
|
||||||
|
@ -1728,10 +1728,6 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
|
||||||
|
|
||||||
assert(tres != NULL);
|
assert(tres != NULL);
|
||||||
SSqlObj *pSql = (SSqlObj *)tres;
|
SSqlObj *pSql = (SSqlObj *)tres;
|
||||||
// if (pSql == NULL) { // sql object has been released in error process, return immediately
|
|
||||||
// tscDebug("%p subquery has been released, idx:%d, abort", pParentSql, idx);
|
|
||||||
// return;
|
|
||||||
// }
|
|
||||||
|
|
||||||
SSubqueryState* pState = trsupport->pState;
|
SSubqueryState* pState = trsupport->pState;
|
||||||
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
|
assert(pState->numOfRemain <= pState->numOfTotal && pState->numOfRemain >= 0 && pParentSql->numOfSubs == pState->numOfTotal);
|
||||||
|
@ -1907,9 +1903,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
|
||||||
pParentObj->res.code = pSql->res.code;
|
pParentObj->res.code = pSql->res.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(tres);
|
|
||||||
taosTFree(pSupporter);
|
taosTFree(pSupporter);
|
||||||
|
|
||||||
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
|
if (atomic_sub_fetch_32(&pState->numOfRemain, 1) > 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -2029,11 +2023,6 @@ int32_t tscHandleMultivnodeInsert(SSqlObj *pSql) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
for(int32_t j = 0; j < numOfSub; ++j) {
|
|
||||||
taosTFree(pSql->pSubs[j]->param);
|
|
||||||
taos_free_result(pSql->pSubs[j]);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosTFree(pState);
|
taosTFree(pState);
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,8 @@
|
||||||
#include "tlocale.h"
|
#include "tlocale.h"
|
||||||
|
|
||||||
// global, not configurable
|
// global, not configurable
|
||||||
void * tscCacheHandle;
|
SCacheObj* tscCacheHandle;
|
||||||
|
SCacheObj* tscObjCache;
|
||||||
void * tscTmr;
|
void * tscTmr;
|
||||||
void * tscQhandle;
|
void * tscQhandle;
|
||||||
void * tscCheckDiskUsageTmr;
|
void * tscCheckDiskUsageTmr;
|
||||||
|
@ -146,6 +147,7 @@ void taos_init_imp(void) {
|
||||||
|
|
||||||
if (tscCacheHandle == NULL) {
|
if (tscCacheHandle == NULL) {
|
||||||
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
|
tscCacheHandle = taosCacheInit(TSDB_DATA_TYPE_BINARY, refreshTime, false, NULL, "tableMeta");
|
||||||
|
tscObjCache = taosCacheInit(TSDB_DATA_TYPE_BIGINT, refreshTime, false, tscFreeSqlObjInCache, "sqlObjHandle");
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("client is initialized successfully");
|
tscDebug("client is initialized successfully");
|
||||||
|
@ -157,6 +159,9 @@ void taos_cleanup() {
|
||||||
if (tscCacheHandle != NULL) {
|
if (tscCacheHandle != NULL) {
|
||||||
taosCacheCleanup(tscCacheHandle);
|
taosCacheCleanup(tscCacheHandle);
|
||||||
tscCacheHandle = NULL;
|
tscCacheHandle = NULL;
|
||||||
|
|
||||||
|
taosCacheCleanup(tscObjCache);
|
||||||
|
tscObjCache = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tscQhandle != NULL) {
|
if (tscQhandle != NULL) {
|
||||||
|
|
|
@ -252,11 +252,11 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
||||||
if (pRes->tsrow == NULL) {
|
if (pRes->tsrow == NULL) {
|
||||||
int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
|
int32_t numOfOutput = pQueryInfo->fieldsInfo.numOfOutput;
|
||||||
pRes->numOfCols = numOfOutput;
|
pRes->numOfCols = numOfOutput;
|
||||||
|
|
||||||
pRes->tsrow = calloc(numOfOutput, POINTER_BYTES);
|
pRes->tsrow = calloc(numOfOutput, POINTER_BYTES);
|
||||||
pRes->length = calloc(numOfOutput, sizeof(int32_t));
|
pRes->length = calloc(numOfOutput, sizeof(int32_t));
|
||||||
pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
|
pRes->buffer = calloc(numOfOutput, POINTER_BYTES);
|
||||||
|
|
||||||
// not enough memory
|
// not enough memory
|
||||||
if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
|
if (pRes->tsrow == NULL || (pRes->buffer == NULL && pRes->numOfCols > 0)) {
|
||||||
taosTFree(pRes->tsrow);
|
taosTFree(pRes->tsrow);
|
||||||
|
@ -268,7 +268,7 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
|
||||||
if (pRes->buffer != NULL) { // free all buffers containing the multibyte string
|
if (pRes->buffer != NULL) { // free all buffers containing the multibyte string
|
||||||
for (int i = 0; i < pRes->numOfCols; i++) {
|
for (int i = 0; i < pRes->numOfCols; i++) {
|
||||||
taosTFree(pRes->buffer[i]);
|
taosTFree(pRes->buffer[i]);
|
||||||
|
@ -367,12 +367,36 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) {
|
||||||
tscResetSqlCmdObj(pCmd, false);
|
tscResetSqlCmdObj(pCmd, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tscFreeSubobj(SSqlObj* pSql) {
|
||||||
|
if (pSql->numOfSubs == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
tscDebug("%p start to free sub SqlObj, numOfSub:%d", pSql, pSql->numOfSubs);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < pSql->numOfSubs; ++i) {
|
||||||
|
tscDebug("%p free sub SqlObj:%p, index:%d", pSql, pSql->pSubs[i], i);
|
||||||
|
taos_free_result(pSql->pSubs[i]);
|
||||||
|
pSql->pSubs[i] = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pSql->numOfSubs = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tscFreeSqlObjInCache(void *pSql) {
|
||||||
|
assert(pSql != NULL);
|
||||||
|
SSqlObj** p = (SSqlObj**) pSql;
|
||||||
|
|
||||||
|
tscFreeSqlObj(*p);
|
||||||
|
}
|
||||||
|
|
||||||
void tscFreeSqlObj(SSqlObj* pSql) {
|
void tscFreeSqlObj(SSqlObj* pSql) {
|
||||||
if (pSql == NULL || pSql->signature != pSql) {
|
if (pSql == NULL || pSql->signature != pSql) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("%p start to free sql object", pSql);
|
tscDebug("%p start to free sql object", pSql);
|
||||||
|
tscFreeSubobj(pSql);
|
||||||
tscPartiallyFreeSqlObj(pSql);
|
tscPartiallyFreeSqlObj(pSql);
|
||||||
|
|
||||||
pSql->signature = NULL;
|
pSql->signature = NULL;
|
||||||
|
@ -724,13 +748,25 @@ void tscCloseTscObj(STscObj* pObj) {
|
||||||
|
|
||||||
pObj->signature = NULL;
|
pObj->signature = NULL;
|
||||||
taosTmrStopA(&(pObj->pTimer));
|
taosTmrStopA(&(pObj->pTimer));
|
||||||
pthread_mutex_destroy(&pObj->mutex);
|
|
||||||
|
// wait for all sqlObjs created according to this connect closed
|
||||||
|
while(1) {
|
||||||
|
pthread_mutex_lock(&pObj->mutex);
|
||||||
|
void* p = pObj->sqlList;
|
||||||
|
pthread_mutex_unlock(&pObj->mutex);
|
||||||
|
|
||||||
|
if (p == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pObj->pDnodeConn != NULL) {
|
if (pObj->pDnodeConn != NULL) {
|
||||||
rpcClose(pObj->pDnodeConn);
|
rpcClose(pObj->pDnodeConn);
|
||||||
pObj->pDnodeConn = NULL;
|
pObj->pDnodeConn = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_destroy(&pObj->mutex);
|
||||||
|
|
||||||
tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, pObj->pDnodeConn);
|
tscDebug("%p DB connection is closed, dnodeConn:%p", pObj, pObj->pDnodeConn);
|
||||||
taosTFree(pObj);
|
taosTFree(pObj);
|
||||||
}
|
}
|
||||||
|
@ -1721,6 +1757,9 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
|
||||||
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
|
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, pSql->cmd.clauseIndex, 0);
|
||||||
|
|
||||||
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
|
tscAddTableMetaInfo(pQueryInfo, pMasterTableMetaInfo->name, NULL, NULL, NULL);
|
||||||
|
|
||||||
|
uint64_t p = (uint64_t) pNew;
|
||||||
|
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 1000);
|
||||||
return pNew;
|
return pNew;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1960,6 +1999,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
||||||
tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
|
tscDebug("%p new sub insertion: %p, vnodeIdx:%d", pSql, pNew, pTableMetaInfo->vgroupIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t p = (uint64_t) pNew;
|
||||||
|
pNew->self = taosCachePut(tscObjCache, &p, sizeof(uint64_t), &pNew, sizeof(uint64_t), 2 * 600 * 10);
|
||||||
return pNew;
|
return pNew;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
@ -2101,11 +2142,6 @@ bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) {
|
||||||
return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
|
return (pQueryInfo->clauseLimit > 0 && pRes->numOfClauseTotal >= pQueryInfo->clauseLimit);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tscResultsetFetchCompleted(TAOS_RES *result) {
|
|
||||||
SSqlRes* pRes = result;
|
|
||||||
return pRes->completed;
|
|
||||||
}
|
|
||||||
|
|
||||||
char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
|
char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -50,10 +50,6 @@ void httpProcessMultiSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int n
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (tscResultsetFetchCompleted(result)) {
|
|
||||||
// isContinue = false;
|
|
||||||
// }
|
|
||||||
|
|
||||||
if (isContinue) {
|
if (isContinue) {
|
||||||
// retrieve next batch of rows
|
// retrieve next batch of rows
|
||||||
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s",
|
httpDebug("context:%p, fd:%d, ip:%s, user:%s, process pos:%d, continue retrieve, numOfRows:%d, sql:%s",
|
||||||
|
@ -223,15 +219,6 @@ void httpProcessSingleSqlRetrieveCallBackImp(void *param, TAOS_RES *result, int
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
// todo refactor
|
|
||||||
if (tscResultsetFetchCompleted(result)) {
|
|
||||||
httpDebug("context:%p, fd:%d, ip:%s, user:%s, resultset fetch completed", pContext, pContext->fd, pContext->ipstr,
|
|
||||||
pContext->user);
|
|
||||||
isContinue = false;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (isContinue) {
|
if (isContinue) {
|
||||||
// retrieve next batch of rows
|
// retrieve next batch of rows
|
||||||
httpDebug("context:%p, fd:%d, ip:%s, user:%s, continue retrieve, numOfRows:%d", pContext, pContext->fd,
|
httpDebug("context:%p, fd:%d, ip:%s, user:%s, continue retrieve, numOfRows:%d", pContext, pContext->fd,
|
||||||
|
|
|
@ -248,6 +248,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
||||||
assert(pMeta != NULL && sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
|
assert(pMeta != NULL && sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
|
||||||
|
|
||||||
|
// todo apply the lastkey of table check to avoid to load header file
|
||||||
for (int32_t i = 0; i < sizeOfGroup; ++i) {
|
for (int32_t i = 0; i < sizeOfGroup; ++i) {
|
||||||
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
|
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
|
||||||
|
|
||||||
|
|
|
@ -165,7 +165,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set free cache node callback function for hash table
|
// set free cache node callback function
|
||||||
pCacheObj->freeFp = fn;
|
pCacheObj->freeFp = fn;
|
||||||
pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
|
pCacheObj->refreshTime = refreshTimeInSeconds * 1000;
|
||||||
pCacheObj->extendLifespan = extendLifespan;
|
pCacheObj->extendLifespan = extendLifespan;
|
||||||
|
@ -322,7 +322,12 @@ void *taosCacheTransfer(SCacheObj *pCacheObj, void **data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
|
||||||
if (pCacheObj == NULL || (*data) == NULL || (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0)) {
|
if (taosHashGetSize(pCacheObj->pHashTable) + pCacheObj->numOfElemsInTrash == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCacheObj == NULL || (*data) == NULL) {
|
||||||
|
uError("cache:%s, NULL data to release", pCacheObj->name);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -423,6 +423,8 @@ if $data97 != @group_tb0@ then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print ---------------------------------> group by binary|nchar data add cases
|
||||||
|
|
||||||
|
|
||||||
#=========================== group by multi tags ======================
|
#=========================== group by multi tags ======================
|
||||||
sql create table st (ts timestamp, c int) tags (t1 int, t2 int, t3 int, t4 int);
|
sql create table st (ts timestamp, c int) tags (t1 int, t2 int, t3 int, t4 int);
|
||||||
|
|
Loading…
Reference in New Issue