commit
043260fab1
|
@ -307,7 +307,6 @@ typedef struct STscObj {
|
||||||
SRpcCorEpSet *tscCorMgmtEpSet;
|
SRpcCorEpSet *tscCorMgmtEpSet;
|
||||||
void* pDnodeConn;
|
void* pDnodeConn;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
T_REF_DECLARE()
|
|
||||||
} STscObj;
|
} STscObj;
|
||||||
|
|
||||||
typedef struct SSubqueryState {
|
typedef struct SSubqueryState {
|
||||||
|
@ -483,7 +482,6 @@ extern int tscObjRef;
|
||||||
extern void * tscTmr;
|
extern void * tscTmr;
|
||||||
extern void * tscQhandle;
|
extern void * tscQhandle;
|
||||||
extern int tscKeepConn[];
|
extern int tscKeepConn[];
|
||||||
extern int tsInsertHeadSize;
|
|
||||||
extern int tscNumOfThreads;
|
extern int tscNumOfThreads;
|
||||||
extern int tscRefId;
|
extern int tscRefId;
|
||||||
|
|
||||||
|
|
|
@ -115,9 +115,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
|
||||||
|
|
||||||
pObj->signature = pObj;
|
pObj->signature = pObj;
|
||||||
pObj->pDnodeConn = pDnodeConn;
|
pObj->pDnodeConn = pDnodeConn;
|
||||||
T_REF_INIT_VAL(pObj, 1);
|
|
||||||
|
|
||||||
|
|
||||||
tstrncpy(pObj->user, user, sizeof(pObj->user));
|
tstrncpy(pObj->user, user, sizeof(pObj->user));
|
||||||
secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass));
|
secretEncryptLen = MIN(secretEncryptLen, sizeof(pObj->pass));
|
||||||
memcpy(pObj->pass, secretEncrypt, secretEncryptLen);
|
memcpy(pObj->pass, secretEncrypt, secretEncryptLen);
|
||||||
|
@ -172,11 +170,9 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
|
||||||
if (taos != NULL) {
|
if (taos != NULL) {
|
||||||
*taos = pObj;
|
*taos = pObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
registerSqlObj(pSql);
|
|
||||||
tsInsertHeadSize = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
|
||||||
|
|
||||||
pObj->rid = taosAddRef(tscRefId, pObj);
|
pObj->rid = taosAddRef(tscRefId, pObj);
|
||||||
|
registerSqlObj(pSql);
|
||||||
|
|
||||||
return pSql;
|
return pSql;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,34 +284,19 @@ void taos_close(TAOS *taos) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure that the close connection can only be executed once.
|
SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
|
||||||
pObj->signature = NULL;
|
if (pHb != NULL) {
|
||||||
taosTmrStopA(&(pObj->pTimer));
|
if (pHb->rpcRid > 0) { // wait for rsp from dnode
|
||||||
|
rpcCancelRequest(pHb->rpcRid);
|
||||||
if (pObj->hbrid > 0) {
|
pHb->rpcRid = -1;
|
||||||
SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid);
|
|
||||||
if (pHb != NULL) {
|
|
||||||
if (pHb->rpcRid > 0) { // wait for rsp from dnode
|
|
||||||
rpcCancelRequest(pHb->rpcRid);
|
|
||||||
pHb->rpcRid = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tscDebug("%p HB is freed", pHb);
|
|
||||||
taos_free_result(pHb);
|
|
||||||
taosReleaseRef(tscObjRef, pHb->self);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
int32_t ref = T_REF_DEC(pObj);
|
tscDebug("%p HB is freed", pHb);
|
||||||
assert(ref >= 0);
|
taosReleaseRef(tscObjRef, pHb->self);
|
||||||
|
taos_free_result(pHb);
|
||||||
if (ref > 0) {
|
|
||||||
tscDebug("%p %d remain sqlObjs, not free tscObj and dnodeConn:%p", pObj, ref, pObj->pDnodeConn);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
|
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
|
||||||
|
|
||||||
taosRemoveRef(tscRefId, pObj->rid);
|
taosRemoveRef(tscRefId, pObj->rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,6 @@ int tscObjRef = -1;
|
||||||
void * tscTmr;
|
void * tscTmr;
|
||||||
void * tscQhandle;
|
void * tscQhandle;
|
||||||
void * tscCheckDiskUsageTmr;
|
void * tscCheckDiskUsageTmr;
|
||||||
int tsInsertHeadSize;
|
|
||||||
int tscRefId = -1;
|
int tscRefId = -1;
|
||||||
|
|
||||||
int tscNumOfThreads;
|
int tscNumOfThreads;
|
||||||
|
|
|
@ -460,15 +460,7 @@ void tscFreeRegisteredSqlObj(void *pSql) {
|
||||||
|
|
||||||
assert(p->self != 0);
|
assert(p->self != 0);
|
||||||
tscFreeSqlObj(p);
|
tscFreeSqlObj(p);
|
||||||
|
taosReleaseRef(tscRefId, pTscObj->rid);
|
||||||
int32_t ref = T_REF_DEC(pTscObj);
|
|
||||||
assert(ref >= 0);
|
|
||||||
|
|
||||||
tscDebug("%p free sqlObj completed, tscObj:%p ref:%d", p, pTscObj, ref);
|
|
||||||
if (ref == 0) {
|
|
||||||
tscDebug("%p all sqlObj freed, free tscObj:%p", p, pTscObj);
|
|
||||||
taosRemoveRef(tscRefId, pTscObj->rid);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscFreeTableMetaHelper(void *pTableMeta) {
|
void tscFreeTableMetaHelper(void *pTableMeta) {
|
||||||
|
@ -810,6 +802,7 @@ static void extractTableMeta(SSqlCmd* pCmd) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
|
int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
|
||||||
|
const int INSERT_HEAD_SIZE = sizeof(SMsgDesc) + sizeof(SSubmitMsg);
|
||||||
SSqlCmd* pCmd = &pSql->cmd;
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
|
||||||
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
|
||||||
|
@ -824,7 +817,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql) {
|
||||||
STableDataBlocks* dataBuf = NULL;
|
STableDataBlocks* dataBuf = NULL;
|
||||||
|
|
||||||
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
||||||
tsInsertHeadSize, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
|
INSERT_HEAD_SIZE, 0, pOneTableBlock->tableId, pOneTableBlock->pTableMeta, &dataBuf, pVnodeDataBlockList);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
|
tscError("%p failed to prepare the data block buffer for merging table data, code:%d", pSql, ret);
|
||||||
taosHashCleanup(pVnodeDataBlockHashList);
|
taosHashCleanup(pVnodeDataBlockHashList);
|
||||||
|
@ -1917,9 +1910,7 @@ void tscResetForNextRetrieve(SSqlRes* pRes) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void registerSqlObj(SSqlObj* pSql) {
|
void registerSqlObj(SSqlObj* pSql) {
|
||||||
int32_t ref = T_REF_INC(pSql->pTscObj);
|
taosAcquireRef(tscRefId, pSql->pTscObj->rid);
|
||||||
tscDebug("%p add to tscObj:%p, ref:%d", pSql, pSql->pTscObj, ref);
|
|
||||||
|
|
||||||
pSql->self = taosAddRef(tscObjRef, pSql);
|
pSql->self = taosAddRef(tscObjRef, pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2629,4 +2620,4 @@ int32_t copyTagData(STagData* dst, const STagData* src) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue