Merge pull request #13017 from taosdata/feature/qnode
feat: scheduler async api
This commit is contained in:
commit
47ea043d5f
|
@ -23,6 +23,8 @@ extern "C" {
|
|||
#include "catalog.h"
|
||||
#include "planner.h"
|
||||
|
||||
extern tsem_t schdRspSem;
|
||||
|
||||
typedef struct SSchedulerCfg {
|
||||
uint32_t maxJobNum;
|
||||
int32_t maxNodeTableNum;
|
||||
|
@ -62,6 +64,11 @@ typedef struct STaskInfo {
|
|||
SSubQueryMsg *msg;
|
||||
} STaskInfo;
|
||||
|
||||
typedef struct SSchdFetchParam {
|
||||
void **pData;
|
||||
int32_t* code;
|
||||
} SSchdFetchParam;
|
||||
|
||||
typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_t code);
|
||||
typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code);
|
||||
|
||||
|
@ -113,23 +120,8 @@ void schedulerFreeJob(int64_t job);
|
|||
|
||||
void schedulerDestroy(void);
|
||||
|
||||
/**
|
||||
* convert dag to task list
|
||||
* @param pDag
|
||||
* @param pTasks SArray**<STaskInfo>
|
||||
* @return
|
||||
*/
|
||||
int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks);
|
||||
|
||||
/**
|
||||
* make one task info's multiple copies
|
||||
* @param src
|
||||
* @param dst SArray**<STaskInfo>
|
||||
* @return
|
||||
*/
|
||||
int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum);
|
||||
|
||||
void schedulerFreeTaskList(SArray *taskList);
|
||||
void schdExecCallback(SQueryResult* pResult, void* param, int32_t code);
|
||||
void schdFetchCallback(void* pResult, void* param, int32_t code);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -289,6 +289,52 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
|
|||
pResInfo->precision = precision;
|
||||
}
|
||||
|
||||
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
|
||||
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||
|
||||
tsem_init(&schdRspSem, 0, 0);
|
||||
|
||||
SQueryResult res = {.code = 0, .numOfRows = 0};
|
||||
int32_t code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
|
||||
pRequest->metric.start, schdExecCallback, &res);
|
||||
while (true) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (pRequest->body.queryJob != 0) {
|
||||
schedulerFreeJob(pRequest->body.queryJob);
|
||||
}
|
||||
|
||||
*pRes = res.res;
|
||||
|
||||
pRequest->code = code;
|
||||
terrno = code;
|
||||
return pRequest->code;
|
||||
} else {
|
||||
tsem_wait(&schdRspSem);
|
||||
|
||||
if (res.code) {
|
||||
code = res.code;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
|
||||
pRequest->body.resInfo.numOfRows = res.numOfRows;
|
||||
|
||||
if (pRequest->body.queryJob != 0) {
|
||||
schedulerFreeJob(pRequest->body.queryJob);
|
||||
}
|
||||
}
|
||||
|
||||
*pRes = res.res;
|
||||
|
||||
pRequest->code = res.code;
|
||||
terrno = res.code;
|
||||
return pRequest->code;
|
||||
}
|
||||
|
||||
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
|
||||
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||
|
||||
|
@ -796,7 +842,58 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
|
|||
}
|
||||
}
|
||||
|
||||
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
||||
assert(pRequest != NULL);
|
||||
|
||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
||||
// All data has returned to App already, no need to try again
|
||||
if (pResultInfo->completed) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tsem_init(&schdRspSem, 0, 0);
|
||||
|
||||
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
|
||||
SSchdFetchParam param = {.pData = (void**)&pResInfo->pData, .code = &pRequest->code};
|
||||
pRequest->code = schedulerAsyncFetchRows(pRequest->body.queryJob, schdFetchCallback, ¶m);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tsem_wait(&schdRspSem);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
pResultInfo->numOfRows = 0;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
|
||||
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
||||
|
||||
if (pResultInfo->numOfRows == 0) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (setupOneRowPtr) {
|
||||
doSetOneRowPtr(pResultInfo);
|
||||
pResultInfo->current += 1;
|
||||
}
|
||||
|
||||
return pResultInfo->row;
|
||||
}
|
||||
|
||||
|
||||
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
|
||||
//return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
|
||||
assert(pRequest != NULL);
|
||||
|
||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||
|
|
|
@ -2791,6 +2791,7 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt,
|
|||
int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg);
|
||||
int32_t num = taosArrayGetSize(dbCfg.pRetensions);
|
||||
if (TSDB_CODE_SUCCESS != code || num < 2) {
|
||||
taosArrayDestroy(dbCfg.pRetensions);
|
||||
return code;
|
||||
}
|
||||
for (int32_t i = 1; i < num; ++i) {
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "query.h"
|
||||
#include "schedulerInt.h"
|
||||
|
||||
tsem_t schdRspSem;
|
||||
|
||||
void schdExecCallback(SQueryResult* pResult, void* param, int32_t code) {
|
||||
if (code) {
|
||||
pResult->code = code;
|
||||
}
|
||||
|
||||
*(SQueryResult*)param = *pResult;
|
||||
|
||||
taosMemoryFree(pResult);
|
||||
|
||||
tsem_post(&schdRspSem);
|
||||
}
|
||||
|
||||
void schdFetchCallback(void* pResult, void* param, int32_t code) {
|
||||
SSchdFetchParam* fParam = (SSchdFetchParam*)param;
|
||||
|
||||
*fParam->pData = pResult;
|
||||
*fParam->code = code;
|
||||
|
||||
tsem_post(&schdRspSem);
|
||||
}
|
||||
|
||||
|
|
@ -856,7 +856,12 @@ _return:
|
|||
|
||||
void schProcessOnDataFetched(SSchJob *job) {
|
||||
atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0);
|
||||
tsem_post(&job->rspSem);
|
||||
|
||||
if (job->attr.syncSchedule) {
|
||||
tsem_post(&job->rspSem);
|
||||
} else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&job->userCb, SCH_FETCH_CB, 0)) {
|
||||
schNotifyUserFetchRes(job);
|
||||
}
|
||||
}
|
||||
|
||||
// Note: no more task error processing, handled in function internal
|
||||
|
|
|
@ -66,6 +66,7 @@ void schFreeRpcCtxVal(const void *arg) {
|
|||
|
||||
SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
|
||||
taosMemoryFreeClear(pMsgSendInfo->param);
|
||||
taosMemoryFreeClear(pMsgSendInfo->msgInfo.pData);
|
||||
taosMemoryFreeClear(pMsgSendInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -124,7 +124,7 @@ int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* pa
|
|||
pJob->userRes.fetchFp = fp;
|
||||
pJob->userRes.userParam = param;
|
||||
|
||||
code = schFetchRows(pJob);
|
||||
code = schAsyncFetchRows(pJob);
|
||||
|
||||
schReleaseJob(job);
|
||||
|
||||
|
|
Loading…
Reference in New Issue