fix res issue
This commit is contained in:
parent
a353b1a1fb
commit
3d5073e785
|
@ -39,12 +39,6 @@ enum {
|
||||||
SCH_WRITE,
|
SCH_WRITE,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
SCH_RES_TYPE_QUERY,
|
|
||||||
SCH_RES_TYPE_FETCH,
|
|
||||||
} SCH_RES_TYPE;
|
|
||||||
|
|
||||||
|
|
||||||
typedef struct SSchTrans {
|
typedef struct SSchTrans {
|
||||||
void *transInst;
|
void *transInst;
|
||||||
void *transHandle;
|
void *transHandle;
|
||||||
|
@ -197,7 +191,7 @@ typedef struct SSchJob {
|
||||||
int32_t errCode;
|
int32_t errCode;
|
||||||
SArray *errList; // SArray<SQueryErrorInfo>
|
SArray *errList; // SArray<SQueryErrorInfo>
|
||||||
SRWLatch resLock;
|
SRWLatch resLock;
|
||||||
SCH_RES_TYPE resType;
|
void *queryRes;
|
||||||
void *resData; //TODO free it or not
|
void *resData; //TODO free it or not
|
||||||
int32_t resNumOfRows;
|
int32_t resNumOfRows;
|
||||||
const char *sql;
|
const char *sql;
|
||||||
|
|
|
@ -1058,8 +1058,6 @@ _return:
|
||||||
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
|
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
|
||||||
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
|
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
|
||||||
|
|
||||||
pJob->resType = SCH_RES_TYPE_FETCH;
|
|
||||||
|
|
||||||
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
|
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
|
||||||
atomic_store_ptr(&pJob->resData, pRsp);
|
atomic_store_ptr(&pJob->resData, pRsp);
|
||||||
|
|
||||||
|
@ -1072,9 +1070,9 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs
|
||||||
|
|
||||||
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) {
|
int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) {
|
||||||
if (rsp->tbFName[0]) {
|
if (rsp->tbFName[0]) {
|
||||||
if (NULL == pJob->resData) {
|
if (NULL == pJob->queryRes) {
|
||||||
pJob->resData = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
|
pJob->queryRes = taosArrayInit(pJob->taskNum, sizeof(STbVerInfo));
|
||||||
if (NULL == pJob->resData) {
|
if (NULL == pJob->queryRes) {
|
||||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1084,7 +1082,7 @@ int32_t schSaveJobQueryRes(SSchJob *pJob, SResReadyRsp *rsp) {
|
||||||
tbInfo.sversion = rsp->sversion;
|
tbInfo.sversion = rsp->sversion;
|
||||||
tbInfo.tversion = rsp->tversion;
|
tbInfo.tversion = rsp->tversion;
|
||||||
|
|
||||||
taosArrayPush((SArray *)pJob->resData, &tbInfo);
|
taosArrayPush((SArray *)pJob->queryRes, &tbInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1201,10 +1199,9 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
||||||
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
|
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
|
||||||
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
|
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
|
||||||
|
|
||||||
pJob->resType = SCH_RES_TYPE_QUERY;
|
|
||||||
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||||
if (pJob->resData) {
|
if (pJob->queryRes) {
|
||||||
SSubmitRsp *sum = pJob->resData;
|
SSubmitRsp *sum = pJob->queryRes;
|
||||||
sum->affectedRows += rsp->affectedRows;
|
sum->affectedRows += rsp->affectedRows;
|
||||||
sum->nBlocks += rsp->nBlocks;
|
sum->nBlocks += rsp->nBlocks;
|
||||||
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
|
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
|
||||||
|
@ -1212,7 +1209,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
||||||
taosMemoryFree(rsp->pBlocks);
|
taosMemoryFree(rsp->pBlocks);
|
||||||
taosMemoryFree(rsp);
|
taosMemoryFree(rsp);
|
||||||
} else {
|
} else {
|
||||||
pJob->resData = rsp;
|
pJob->queryRes = rsp;
|
||||||
}
|
}
|
||||||
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
}
|
}
|
||||||
|
@ -1246,7 +1243,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
SCH_ERR_JRET(rsp->code);
|
SCH_ERR_JRET(rsp->code);
|
||||||
pJob->resType = SCH_RES_TYPE_QUERY;
|
|
||||||
|
|
||||||
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
|
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
|
||||||
|
|
||||||
|
@ -2424,6 +2420,12 @@ void schFreeJobImpl(void *job) {
|
||||||
|
|
||||||
qExplainFreeCtx(pJob->explainCtx);
|
qExplainFreeCtx(pJob->explainCtx);
|
||||||
|
|
||||||
|
if (SCH_IS_QUERY_JOB(pJob)) {
|
||||||
|
taosArrayDestroy((SArray *)pJob->queryRes);
|
||||||
|
} else {
|
||||||
|
tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes);
|
||||||
|
}
|
||||||
|
|
||||||
taosMemoryFreeClear(pJob->resData);
|
taosMemoryFreeClear(pJob->resData);
|
||||||
taosMemoryFreeClear(pJob);
|
taosMemoryFreeClear(pJob);
|
||||||
|
|
||||||
|
@ -2486,8 +2488,6 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa
|
||||||
|
|
||||||
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
|
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
|
||||||
|
|
||||||
pJob->resType = SCH_RES_TYPE_FETCH;
|
|
||||||
|
|
||||||
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
|
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
|
||||||
if (refId < 0) {
|
if (refId < 0) {
|
||||||
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
|
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
|
||||||
|
@ -2582,10 +2582,8 @@ _return:
|
||||||
|
|
||||||
pRes->code = atomic_load_32(&job->errCode);
|
pRes->code = atomic_load_32(&job->errCode);
|
||||||
pRes->numOfRows = job->resNumOfRows;
|
pRes->numOfRows = job->resNumOfRows;
|
||||||
if (SCH_RES_TYPE_QUERY == job->resType) {
|
pRes->res = job->queryRes;
|
||||||
pRes->res = job->resData;
|
job->queryRes = NULL;
|
||||||
job->resData = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
schReleaseJob(*pJob);
|
schReleaseJob(*pJob);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue