distinguish between user parameters and internal parameters

This commit is contained in:
facetosea 2023-11-04 17:20:24 +08:00
parent 31cd7b0bd5
commit 567a55a737
6 changed files with 47 additions and 67 deletions

View File

@ -205,8 +205,7 @@ typedef struct SRequestSendRecvBody {
__taos_async_fn_t queryFp; __taos_async_fn_t queryFp;
__taos_async_fn_t fetchFp; __taos_async_fn_t fetchFp;
EQueryExecMode execMode; EQueryExecMode execMode;
void* param; void* interParam;
bool paramCreatedInternal;
SDataBuf requestMsg; SDataBuf requestMsg;
int64_t queryJob; // query job, created according to sql query DAG. int64_t queryJob; // query job, created according to sql query DAG.
int32_t subplanNum; int32_t subplanNum;
@ -285,6 +284,7 @@ typedef struct SRequestObj {
typedef struct SSyncQueryParam { typedef struct SSyncQueryParam {
tsem_t sem; tsem_t sem;
SRequestObj* pRequest; SRequestObj* pRequest;
void* userParam;
} SSyncQueryParam; } SSyncQueryParam;
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);

View File

@ -316,6 +316,15 @@ void *createRequest(uint64_t connId, int32_t type, int64_t reqid) {
terrno = TSDB_CODE_TSC_DISCONNECTED; terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL; return NULL;
} }
SSyncQueryParam *interParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (interParam == NULL) {
doDestroyRequest(pRequest);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tsem_init(&interParam->sem, 0, 0);
interParam->pRequest = pRequest;
pRequest->body.interParam = interParam;
pRequest->resType = RES_TYPE__QUERY; pRequest->resType = RES_TYPE__QUERY;
pRequest->requestId = reqid == 0 ? generateRequestId() : reqid; pRequest->requestId = reqid == 0 ? generateRequestId() : reqid;
@ -437,12 +446,10 @@ void doDestroyRequest(void *p) {
deregisterRequest(pRequest); deregisterRequest(pRequest);
} }
if (pRequest->syncQuery || pRequest->body.paramCreatedInternal) { if (pRequest->body.interParam) {
if (pRequest->body.param) { tsem_destroy(&((SSyncQueryParam *)pRequest->body.interParam)->sem);
tsem_destroy(&((SSyncQueryParam *)pRequest->body.param)->sem);
}
taosMemoryFree(pRequest->body.param);
} }
taosMemoryFree(pRequest->body.interParam);
qDestroyQuery(pRequest->pQuery); qDestroyQuery(pRequest->pQuery);
nodesDestroyAllocator(pRequest->allocatorRefId); nodesDestroyAllocator(pRequest->allocatorRefId);

View File

@ -196,22 +196,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->sqlLen = sqlLen; (*pRequest)->sqlLen = sqlLen;
(*pRequest)->validateOnly = validateSql; (*pRequest)->validateOnly = validateSql;
SSyncQueryParam* newpParam = NULL; ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
if (param == NULL) {
newpParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (newpParam == NULL) {
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
tsem_init(&newpParam->sem, 0, 0);
newpParam->pRequest = (*pRequest);
param = newpParam;
(*pRequest)->body.paramCreatedInternal = true;
}
(*pRequest)->body.param = param;
STscObj* pTscObj = (*pRequest)->pTscObj; STscObj* pTscObj = (*pRequest)->pTscObj;
int32_t err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, int32_t err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
@ -219,7 +204,6 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
if (err) { if (err) {
tscError("%" PRId64 " failed to add to request container, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", tscError("%" PRId64 " failed to add to request container, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s",
(*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
freeQueryParam(newpParam);
destroyRequest(*pRequest); destroyRequest(*pRequest);
*pRequest = NULL; *pRequest = NULL;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -231,7 +215,6 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) { nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
tscError("%" PRId64 " failed to create node allocator, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s", tscError("%" PRId64 " failed to create node allocator, reqId:0x%" PRIx64 ", conn:%" PRId64 ", %s",
(*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
freeQueryParam(newpParam);
destroyRequest(*pRequest); destroyRequest(*pRequest);
*pRequest = NULL; *pRequest = NULL;
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -1684,8 +1667,8 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
} }
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) { static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
SSyncQueryParam* pParam = param; tsem_t* sem = param;
tsem_post(&pParam->sem); tsem_post(sem);
} }
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
@ -1703,10 +1686,11 @@ void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertU
// convert ucs4 to native multi-bytes string // convert ucs4 to native multi-bytes string
pResultInfo->convertUcs4 = convertUcs4; pResultInfo->convertUcs4 = convertUcs4;
tsem_t sem;
SSyncQueryParam* pParam = pRequest->body.param; tsem_init(&sem, 0, 0);
taos_fetch_rows_a(pRequest, syncFetchFn, pParam); taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
tsem_wait(&pParam->sem); tsem_wait(&sem);
tsem_destroy(&sem);
} }
if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) { if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
@ -2473,6 +2457,10 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
tscDebug("taos_query start with sql:%s", sql); tscDebug("taos_query start with sql:%s", sql);
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (NULL == param) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tsem_init(&param->sem, 0, 0); tsem_init(&param->sem, 0, 0);
taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly); taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
@ -2482,9 +2470,8 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
if (param->pRequest != NULL) { if (param->pRequest != NULL) {
param->pRequest->syncQuery = true; param->pRequest->syncQuery = true;
pRequest = param->pRequest; pRequest = param->pRequest;
} else {
taosMemoryFree(param);
} }
taosMemoryFree(param);
tscDebug("taos_query end with sql:%s", sql); tscDebug("taos_query end with sql:%s", sql);
@ -2498,19 +2485,20 @@ TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly,
} }
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam)); SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
if (param == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tsem_init(&param->sem, 0, 0); tsem_init(&param->sem, 0, 0);
taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid); taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
tsem_wait(&param->sem); tsem_wait(&param->sem);
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
if (param->pRequest != NULL) { if (param->pRequest != NULL) {
param->pRequest->syncQuery = true; param->pRequest->syncQuery = true;
pRequest = param->pRequest; pRequest = param->pRequest;
} else {
taosMemoryFree(param);
} }
taosMemoryFree(param);
return pRequest; return pRequest;
} }
@ -2528,13 +2516,13 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code; pRequest->code = code;
taosMemoryFreeClear(pResultInfo->pData); taosMemoryFreeClear(pResultInfo->pData);
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); pRequest->body.fetchFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, 0);
return; return;
} }
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pResultInfo->pData); taosMemoryFreeClear(pResultInfo->pData);
pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); pRequest->body.fetchFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, 0);
return; return;
} }
@ -2555,21 +2543,12 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen); atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
} }
pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); pRequest->body.fetchFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
} }
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) { void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
if ((pRequest->syncQuery || pRequest->body.paramCreatedInternal) && pRequest->body.param != param) {
if (pRequest->body.param) {
tsem_destroy(&((SSyncQueryParam *)pRequest->body.param)->sem);
}
taosMemoryFree(pRequest->body.param);
pRequest->syncQuery = false;
pRequest->body.paramCreatedInternal = false;
}
pRequest->body.fetchFp = fp; pRequest->body.fetchFp = fp;
pRequest->body.param = param; ((SSyncQueryParam *)pRequest->body.interParam)->userParam = param;
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
@ -2608,9 +2587,5 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
} }
void doRequestCallback(SRequestObj* pRequest, int32_t code) { void doRequestCallback(SRequestObj* pRequest, int32_t code) {
if (pRequest->body.paramCreatedInternal) { pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code);
pRequest->body.queryFp(NULL, pRequest, code);
} else {
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}
} }

View File

@ -1561,12 +1561,12 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); conn.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, pRequest->body.param, NULL); code = catalogAsyncGetAllMeta(pCtg, &conn, &catalogReq, syncCatalogFn, pRequest->body.interParam, NULL);
if (code) { if (code) {
goto _return; goto _return;
} }
SSyncQueryParam *pParam = pRequest->body.param; SSyncQueryParam *pParam = pRequest->body.interParam;
tsem_wait(&pParam->sem); tsem_wait(&pParam->sem);
_return: _return:

View File

@ -455,7 +455,6 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
tstrerror(rspCode)); tstrerror(rspCode));
SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId)); SCH_ERR_JRET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId));
code = schHandleResponseMsg(pJob, pTask, pParam->execId, pMsg, rspCode); code = schHandleResponseMsg(pJob, pTask, pParam->execId, pMsg, rspCode);
pMsg->pData = NULL; pMsg->pData = NULL;

View File

@ -188,10 +188,9 @@ TEST_F(taoscTest, taos_query_test) {
void queryCallback2(void* param, void* res, int32_t code) { void queryCallback2(void* param, void* res, int32_t code) {
ASSERT_TRUE(code == 0); ASSERT_TRUE(code == 0);
ASSERT_TRUE(param == pUserParam); ASSERT_TRUE(param == pUserParam);
// After using taos_query_a to query, using taos_fetch_row // After using taos_query_a to query, using taos_fetch_row in the callback will cause blocking.
// in the callback will cause blocking. // Reason: schProcessOnCbBegin SCH_LOCK_TASK(pTask)
/* /* TAOS_ROW row;
TAOS_ROW row;
while ((row = taos_fetch_row(res))) { while ((row = taos_fetch_row(res))) {
getRecordCounts++; getRecordCounts++;
} */ } */
@ -251,14 +250,14 @@ TEST_F(taoscTest, taos_query_a_fetch_row) {
getRecordCounts = 0; getRecordCounts = 0;
TAOS_ROW row; TAOS_ROW row;
printf("taos_query_a_fetch_row taos_fetch_row start...\n"); printf("taos_query_a_fetch_row taos_fetch_row start...\n");
// will cause heap-buffer-overfow
// while ((row = taos_fetch_row(*pres))) { while ((row = taos_fetch_row(*pres))) {
// getRecordCounts++; getRecordCounts++;
// } }
printf("taos_query_a_fetch_row taos_fetch_row end. %p record count:%d.\n", *pres, getRecordCounts); printf("taos_query_a_fetch_row taos_fetch_row end. %p record count:%d.\n", *pres, getRecordCounts);
taos_free_result(*pres); taos_free_result(*pres);
ASSERT_NE(getRecordCounts, insertCounts); ASSERT_EQ(getRecordCounts, insertCounts);
taos_close(taos); taos_close(taos);
printf("taos_query_a_fetch_row test finished.\n"); printf("taos_query_a_fetch_row test finished.\n");