Merge pull request #24083 from taosdata/enh/xsren/TD-27847

enh: call fetch_rows in taos_query_a callback
This commit is contained in:
dapan1121 2023-12-20 16:12:31 +08:00 committed by GitHub
commit e90fe43f93
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 23 additions and 4 deletions

View File

@ -273,6 +273,7 @@ typedef struct SRequestObj {
bool killed;
bool inRetry;
bool isSubReq;
bool inCallback;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry;
int64_t allocatorRefId;

View File

@ -336,6 +336,7 @@ void *createRequest(uint64_t connId, int32_t type, int64_t reqid) {
pRequest->pDb = getDbOfConnection(pTscObj);
pRequest->pTscObj = pTscObj;
pRequest->inCallback = false;
pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE);
pRequest->msgBufLen = ERROR_MSG_BUF_DEFAULT_SIZE;

View File

@ -1711,6 +1711,7 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
tsem_wait(&sem);
tsem_destroy(&sem);
pRequest->inCallback = false;
}
if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
@ -2490,6 +2491,7 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
if (param->pRequest != NULL) {
param->pRequest->syncQuery = true;
pRequest = param->pRequest;
param->pRequest->inCallback = false;
}
taosMemoryFree(param);
@ -2607,7 +2609,14 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
}
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
pRequest->inCallback = true;
int64_t this = pRequest->self;
pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code);
SRequestObj* pReq = acquireRequest(this);
if (pReq != NULL) {
pReq->inCallback = false;
releaseRequest(this);
}
}
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser, SParseSqlRes* pRes) {

View File

@ -418,6 +418,12 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL;
}
if(pRequest->inCallback) {
tscError("can not call taos_fetch_row before query callback ends.");
terrno = TSDB_CODE_TSC_INVALID_OPERATION;
return NULL;
}
return doAsyncFetchRows(pRequest, true, true);
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
SMqRspObj *msg = ((SMqRspObj *)res);

View File

@ -190,10 +190,12 @@ void queryCallback2(void* param, void* res, int32_t code) {
ASSERT_TRUE(param == pUserParam);
// After using taos_query_a to query, using taos_fetch_row in the callback will cause blocking.
// Reason: schProcessOnCbBegin SCH_LOCK_TASK(pTask)
/* TAOS_ROW row;
while ((row = taos_fetch_row(res))) {
getRecordCounts++;
} */
TAOS_ROW row;
row = taos_fetch_row(res);
ASSERT_TRUE(row == NULL);
int* errCode = taosGetErrno();
ASSERT_TRUE(*errCode = TSDB_CODE_TSC_INVALID_OPERATION);
tsem_post(&query_sem);
taos_free_result(res);
}