Merge pull request #3615 from taosdata/bugfix/cache
[TD-1529]<fix>: use cache in subscribe & prepare
This commit is contained in:
commit
a1ce0a2628
|
@ -546,6 +546,10 @@ int taos_stmt_prepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
|
||||||
pSql->cmd.numOfParams = 0;
|
pSql->cmd.numOfParams = 0;
|
||||||
pSql->cmd.batchSize = 0;
|
pSql->cmd.batchSize = 0;
|
||||||
|
|
||||||
|
uint64_t handle = (uint64_t) pSql;
|
||||||
|
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
|
||||||
|
T_REF_INC(pSql->pTscObj);
|
||||||
|
|
||||||
int32_t code = tsParseSql(pSql, true);
|
int32_t code = tsParseSql(pSql, true);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
// wait for the callback function to post the semaphore
|
// wait for the callback function to post the semaphore
|
||||||
|
@ -574,7 +578,7 @@ int taos_stmt_close(TAOS_STMT* stmt) {
|
||||||
free(normal->sql);
|
free(normal->sql);
|
||||||
}
|
}
|
||||||
|
|
||||||
tscFreeSqlObj(pStmt->pSql);
|
taos_free_result(pStmt->pSql);
|
||||||
free(pStmt);
|
free(pStmt);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -612,7 +612,6 @@ void taos_close_stream(TAOS_STREAM *handle) {
|
||||||
* Here, we need a check before release memory
|
* Here, we need a check before release memory
|
||||||
*/
|
*/
|
||||||
if (pSql->signature == pSql) {
|
if (pSql->signature == pSql) {
|
||||||
T_REF_DEC(pSql->pTscObj);
|
|
||||||
tscRemoveFromStreamList(pStream, pSql);
|
tscRemoveFromStreamList(pStream, pSql);
|
||||||
|
|
||||||
taosTmrStopA(&(pStream->pTimer));
|
taosTmrStopA(&(pStream->pTimer));
|
||||||
|
@ -621,7 +620,7 @@ void taos_close_stream(TAOS_STREAM *handle) {
|
||||||
// notify CQ to release the pStream object
|
// notify CQ to release the pStream object
|
||||||
pStream->fp(pStream->param, NULL, NULL);
|
pStream->fp(pStream->param, NULL, NULL);
|
||||||
|
|
||||||
tscFreeSqlObj(pSql);
|
taos_free_result(pSql);
|
||||||
pStream->pSql = NULL;
|
pStream->pSql = NULL;
|
||||||
|
|
||||||
taosTFree(pStream);
|
taosTFree(pStream);
|
||||||
|
|
|
@ -152,6 +152,10 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t handle = (uint64_t) pSql;
|
||||||
|
pSql->self = taosCachePut(tscObjCache, &handle, sizeof(uint64_t), &pSql, sizeof(uint64_t), 2*3600*1000);
|
||||||
|
T_REF_INC(pSql->pTscObj);
|
||||||
|
|
||||||
code = tsParseSql(pSql, false);
|
code = tsParseSql(pSql, false);
|
||||||
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
tsem_wait(&pSub->sem);
|
tsem_wait(&pSub->sem);
|
||||||
|
@ -173,7 +177,11 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
|
||||||
fail:
|
fail:
|
||||||
tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
|
tscError("tscCreateSubscription failed at line %d, reason: %s", line, tstrerror(code));
|
||||||
if (pSql != NULL) {
|
if (pSql != NULL) {
|
||||||
tscFreeSqlObj(pSql);
|
if (pSql->self != NULL) {
|
||||||
|
taos_free_result(pSql);
|
||||||
|
} else {
|
||||||
|
tscFreeSqlObj(pSql);
|
||||||
|
}
|
||||||
pSql = NULL;
|
pSql = NULL;
|
||||||
}
|
}
|
||||||
if (pSub != NULL) {
|
if (pSub != NULL) {
|
||||||
|
@ -494,6 +502,10 @@ void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pSub->pSql != NULL) {
|
||||||
|
taos_free_result(pSub->pSql);
|
||||||
|
}
|
||||||
|
|
||||||
tscFreeSqlObj(pSub->pSql);
|
tscFreeSqlObj(pSub->pSql);
|
||||||
taosArrayDestroy(pSub->progress);
|
taosArrayDestroy(pSub->progress);
|
||||||
tsem_destroy(&pSub->sem);
|
tsem_destroy(&pSub->sem);
|
||||||
|
|
Loading…
Reference in New Issue