[TD-825] adjust callback sequence in async query

This commit is contained in:
Shengliang Guan 2020-07-14 14:10:40 +08:00
parent 045129ed01
commit 4b8428855f
5 changed files with 12 additions and 10 deletions

View File

@ -198,6 +198,10 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
// user-defined callback function is stored in fetchFp
pSql->fetchFp = fp;
pSql->fp = tscAsyncFetchRowsProxy;
if (pRes->qhandle == 0) {
tscError("qhandle is NULL");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
@ -205,10 +209,6 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
return;
}
// user-defined callback function is stored in fetchFp
pSql->fetchFp = fp;
pSql->fp = tscAsyncFetchRowsProxy;
pSql->param = param;
tscResetForNextRetrieve(pRes);

View File

@ -137,7 +137,7 @@ void httpReleaseContext(HttpContext *pContext) {
assert(refCount >= 0);
HttpContext **ppContext = pContext->ppContext;
httpDebug("context:%p, is releasd, data:%p refCount:%d", pContext, ppContext, refCount);
httpDebug("context:%p, is released, data:%p refCount:%d", pContext, ppContext, refCount);
if (tsHttpServer.contextCache != NULL) {
taosCacheRelease(tsHttpServer.contextCache, (void **)(&ppContext), false);

View File

@ -226,9 +226,9 @@ void httpProcessSingleSqlRetrieveCallBack(void *param, TAOS_RES *result, int num
if (numOfRows < 0) {
httpError("context:%p, fd:%d, ip:%s, user:%s, retrieve failed, code:%s", pContext, pContext->fd, pContext->ipstr,
pContext->user, tstrerror(numOfRows));
}
taos_free_result(result);
}
taos_free_result(result);
if (encode->stopJsonFp) {
(encode->stopJsonFp)(pContext, &pContext->singleCmd);

View File

@ -6545,13 +6545,14 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
SQueryMgmt *pQueryMgmt = pMgmt;
if (pQueryMgmt->qinfoPool == NULL) {
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
return NULL;
}
pthread_mutex_lock(&pQueryMgmt->lock);
if (pQueryMgmt->closed) {
pthread_mutex_unlock(&pQueryMgmt->lock);
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
return NULL;
} else {
uint64_t handleVal = (uint64_t) qInfo;

View File

@ -108,9 +108,10 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (code == TSDB_CODE_SUCCESS) {
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
if (handle == NULL) { // failed to register qhandle
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
tstrerror(pRsp->code));
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
qDestroyQueryInfo(pQInfo); // destroy it directly
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void*) pQInfo, tstrerror(pRsp->code));
} else {
assert(*handle == pQInfo);
pRsp->qhandle = htobe64((uint64_t) pQInfo);