feature/qnode
This commit is contained in:
parent
7ee4403d84
commit
913455684f
|
@ -70,7 +70,7 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
|
||||||
*/
|
*/
|
||||||
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
|
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
|
||||||
|
|
||||||
void dsEndPut(DataSinkHandle handle, int64_t useconds);
|
void dsEndPut(DataSinkHandle handle, uint64_t useconds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the length of the data returned by the next call to dsGetDataBlock.
|
* Get the length of the data returned by the next call to dsGetDataBlock.
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
typedef void* qTaskInfo_t;
|
typedef void* qTaskInfo_t;
|
||||||
typedef void* DataSinkHandle;
|
typedef void* DataSinkHandle;
|
||||||
struct SSubplan;
|
struct SSubplan;
|
||||||
|
@ -34,7 +36,7 @@ struct SSubplan;
|
||||||
* @param qId
|
* @param qId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo);
|
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The main task execution function, including query on both table and multiple tables,
|
* The main task execution function, including query on both table and multiple tables,
|
||||||
|
@ -44,7 +46,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskI
|
||||||
* @param handle
|
* @param handle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
struct SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle);
|
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the produced results information, if current query is not paused or completed,
|
* Retrieve the produced results information, if current query is not paused or completed,
|
||||||
|
|
|
@ -32,7 +32,7 @@ typedef struct SDataSinkManager {
|
||||||
} SDataSinkManager;
|
} SDataSinkManager;
|
||||||
|
|
||||||
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
|
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
|
||||||
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds);
|
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
|
||||||
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
|
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
|
||||||
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
|
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
|
||||||
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
||||||
|
|
|
@ -44,7 +44,7 @@ typedef struct SDataDispatchHandle {
|
||||||
SDataDispatchBuf nextOutput;
|
SDataDispatchBuf nextOutput;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
bool queryEnd;
|
bool queryEnd;
|
||||||
int64_t useconds;
|
uint64_t useconds;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} SDataDispatchHandle;
|
} SDataDispatchHandle;
|
||||||
|
|
||||||
|
@ -158,7 +158,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) {
|
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
pthread_mutex_lock(&pDispatcher->mutex);
|
pthread_mutex_lock(&pDispatcher->mutex);
|
||||||
pDispatcher->queryEnd = true;
|
pDispatcher->queryEnd = true;
|
||||||
|
|
|
@ -37,7 +37,7 @@ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pC
|
||||||
return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
|
return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dsEndPut(DataSinkHandle handle, int64_t useconds) {
|
void dsEndPut(DataSinkHandle handle, uint64_t useconds) {
|
||||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
return pHandleImpl->fEndPut(pHandleImpl, useconds);
|
return pHandleImpl->fEndPut(pHandleImpl, useconds);
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,7 +68,7 @@ void freeParam(STaskParam *param) {
|
||||||
tfree(param->prevResult);
|
tfree(param->prevResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) {
|
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
||||||
assert(tsdb != NULL && pSubplan != NULL);
|
assert(tsdb != NULL && pSubplan != NULL);
|
||||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||||
|
|
||||||
|
@ -85,6 +85,8 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
|
||||||
|
|
||||||
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
|
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
|
||||||
|
|
||||||
|
*handle = (*pTask)->dsHandle;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
// if failed to add ref for all tables in this query, abort current query
|
// if failed to add ref for all tables in this query, abort current query
|
||||||
return code;
|
return code;
|
||||||
|
@ -135,16 +137,18 @@ int waitMoment(SQInfo* pQInfo){
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
|
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
int64_t threadId = taosGetSelfPthreadId();
|
int64_t threadId = taosGetSelfPthreadId();
|
||||||
|
|
||||||
|
*pRes = NULL;
|
||||||
|
|
||||||
int64_t curOwner = 0;
|
int64_t curOwner = 0;
|
||||||
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
||||||
qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
|
qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
|
||||||
(void*)curOwner);
|
(void*)curOwner);
|
||||||
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
||||||
return NULL;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTaskInfo->cost.start == 0) {
|
if (pTaskInfo->cost.start == 0) {
|
||||||
|
@ -153,7 +157,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
|
||||||
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
|
qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
|
||||||
return NULL;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
|
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
|
||||||
|
@ -170,7 +174,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
|
||||||
pTaskInfo->code = ret;
|
pTaskInfo->code = ret;
|
||||||
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
|
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
|
||||||
tstrerror(pTaskInfo->code));
|
tstrerror(pTaskInfo->code));
|
||||||
return NULL;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
|
qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
|
||||||
|
@ -179,21 +183,21 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
|
||||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
int64_t st = 0;
|
int64_t st = 0;
|
||||||
|
|
||||||
if (handle) {
|
|
||||||
*handle = pTaskInfo->dsHandle;
|
|
||||||
}
|
|
||||||
|
|
||||||
st = taosGetTimestampUs();
|
st = taosGetTimestampUs();
|
||||||
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
|
*pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
|
||||||
|
|
||||||
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
|
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
|
||||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
|
|
||||||
|
if (NULL == *pRes) {
|
||||||
|
*useconds = pTaskInfo->cost.elapsedTime;
|
||||||
|
}
|
||||||
|
|
||||||
qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
|
qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
|
||||||
GET_TASKID(pTaskInfo), 0, 0L, 0);
|
GET_TASKID(pTaskInfo), 0, 0L, 0);
|
||||||
|
|
||||||
atomic_store_64(&pTaskInfo->owner, 0);
|
atomic_store_64(&pTaskInfo->owner, 0);
|
||||||
return pRes;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
|
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
|
||||||
|
|
|
@ -217,5 +217,6 @@ TEST(testCase, build_executor_tree_Test) {
|
||||||
"}";
|
"}";
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = nullptr;
|
SExecTaskInfo* pTaskInfo = nullptr;
|
||||||
int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo);
|
DataSinkHandle sinkHandle = nullptr;
|
||||||
|
int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo, &sinkHandle);
|
||||||
}
|
}
|
|
@ -458,6 +458,37 @@ _return:
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) {
|
||||||
|
int32_t code = 0;
|
||||||
|
bool qcontinue = true;
|
||||||
|
SSDataBlock* pRes = NULL;
|
||||||
|
uint64_t useconds = 0;
|
||||||
|
|
||||||
|
while (qcontinue) {
|
||||||
|
code = qExecTask(taskHandle, &pRes, &useconds);
|
||||||
|
if (code) {
|
||||||
|
QW_TASK_ELOG("qExecTask failed, code:%x", code);
|
||||||
|
QW_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pRes) {
|
||||||
|
QW_TASK_DLOG("query done, useconds:%"PRIu64, useconds);
|
||||||
|
dsEndPut(sinkHandle, useconds);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
|
||||||
|
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
|
||||||
|
if (code) {
|
||||||
|
QW_TASK_ELOG("dsPutDataBlock failed, code:%x", code);
|
||||||
|
QW_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
QW_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
|
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
|
||||||
|
@ -733,7 +764,9 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo);
|
DataSinkHandle sinkHandle = NULL;
|
||||||
|
|
||||||
|
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
|
||||||
if (code) {
|
if (code) {
|
||||||
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
|
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
|
||||||
QW_ERR_JRET(code);
|
QW_ERR_JRET(code);
|
||||||
|
@ -743,12 +776,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
|
|
||||||
queryRsped = true;
|
queryRsped = true;
|
||||||
|
|
||||||
DataSinkHandle sinkHandle = NULL;
|
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle));
|
||||||
SSDataBlock* pRes = qExecTask(pTaskInfo, &sinkHandle);
|
|
||||||
if (code) {
|
|
||||||
QW_TASK_ELOG("qExecTask failed, code:%x", code);
|
|
||||||
QW_ERR_JRET(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
@ -840,11 +868,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
qTaskInfo_t taskHandle = ctx->taskHandle;
|
qTaskInfo_t taskHandle = ctx->taskHandle;
|
||||||
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
||||||
|
|
||||||
code = qExecTask(taskHandle, &sinkHandle);
|
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), taskHandle, sinkHandle));
|
||||||
if (code) {
|
|
||||||
QW_TASK_ELOG("qExecTask failed, code:%x", code);
|
|
||||||
QW_ERR_JRET(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY);
|
||||||
|
|
||||||
|
|
|
@ -412,6 +412,8 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
++addNum;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -792,6 +794,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
||||||
if (rspCode != TSDB_CODE_SUCCESS) {
|
if (rspCode != TSDB_CODE_SUCCESS) {
|
||||||
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
|
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
|
||||||
|
if (rsp) {
|
||||||
|
pJob->resNumOfRows += rsp->affectedRows;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||||
|
@ -1355,9 +1362,9 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
|
||||||
|
|
||||||
SSchJob *job = NULL;
|
SSchJob *job = NULL;
|
||||||
|
|
||||||
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, true));
|
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true));
|
||||||
|
|
||||||
*pJob = job;
|
job = *pJob;
|
||||||
|
|
||||||
pRes->code = atomic_load_32(&job->errCode);
|
pRes->code = atomic_load_32(&job->errCode);
|
||||||
pRes->numOfRows = job->resNumOfRows;
|
pRes->numOfRows = job->resNumOfRows;
|
||||||
|
|
|
@ -34,10 +34,12 @@
|
||||||
#include "stub.h"
|
#include "stub.h"
|
||||||
#include "addr_any.h"
|
#include "addr_any.h"
|
||||||
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
|
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
|
||||||
|
|
||||||
|
|
||||||
void schtInitLogFile() {
|
void schtInitLogFile() {
|
||||||
const char *defaultLogFileNamePrefix = "taoslog";
|
const char *defaultLogFileNamePrefix = "taoslog";
|
||||||
const int32_t maxLogFileNum = 10;
|
const int32_t maxLogFileNum = 10;
|
||||||
|
@ -113,9 +115,9 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
dag->queryId = qId;
|
dag->queryId = qId;
|
||||||
dag->numOfSubplans = 2;
|
dag->numOfSubplans = 2;
|
||||||
dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
|
dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
|
||||||
SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan));
|
SArray *inserta = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
|
||||||
|
|
||||||
SSubplan insertPlan[2] = {0};
|
SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan));
|
||||||
|
|
||||||
insertPlan[0].id.queryId = qId;
|
insertPlan[0].id.queryId = qId;
|
||||||
insertPlan[0].id.templateId = 0x0000000000000003;
|
insertPlan[0].id.templateId = 0x0000000000000003;
|
||||||
|
@ -131,6 +133,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
insertPlan[0].pParents = NULL;
|
insertPlan[0].pParents = NULL;
|
||||||
insertPlan[0].pNode = NULL;
|
insertPlan[0].pNode = NULL;
|
||||||
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
||||||
|
insertPlan[0].msgType = TDMT_VND_SUBMIT;
|
||||||
|
|
||||||
insertPlan[1].id.queryId = qId;
|
insertPlan[1].id.queryId = qId;
|
||||||
insertPlan[1].id.templateId = 0x0000000000000003;
|
insertPlan[1].id.templateId = 0x0000000000000003;
|
||||||
|
@ -146,10 +149,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
insertPlan[1].pParents = NULL;
|
insertPlan[1].pParents = NULL;
|
||||||
insertPlan[1].pNode = NULL;
|
insertPlan[1].pNode = NULL;
|
||||||
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
||||||
|
insertPlan[1].msgType = TDMT_VND_SUBMIT;
|
||||||
|
|
||||||
|
taosArrayPush(inserta, &insertPlan);
|
||||||
taosArrayPush(inserta, &insertPlan[0]);
|
insertPlan += 1;
|
||||||
taosArrayPush(inserta, &insertPlan[1]);
|
taosArrayPush(inserta, &insertPlan);
|
||||||
|
|
||||||
taosArrayPush(dag->pSubplans, &inserta);
|
taosArrayPush(dag->pSubplans, &inserta);
|
||||||
}
|
}
|
||||||
|
@ -210,6 +214,24 @@ void schtSetRpcSendRequest() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void schtSetAsyncSendMsgToServer() {
|
||||||
|
static Stub stub;
|
||||||
|
stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
|
||||||
|
{
|
||||||
|
AddrAny any("libtransport.so");
|
||||||
|
std::map<std::string,void*> result;
|
||||||
|
any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
|
||||||
|
for (const auto& f : result) {
|
||||||
|
stub.set(f.second, schtAsyncSendMsgToServer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void *schtSendRsp(void *param) {
|
void *schtSendRsp(void *param) {
|
||||||
SSchJob *job = NULL;
|
SSchJob *job = NULL;
|
||||||
|
@ -230,7 +252,7 @@ void *schtSendRsp(void *param) {
|
||||||
|
|
||||||
SShellSubmitRsp rsp = {0};
|
SShellSubmitRsp rsp = {0};
|
||||||
rsp.affectedRows = 10;
|
rsp.affectedRows = 10;
|
||||||
schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
|
schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
|
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
}
|
}
|
||||||
|
@ -238,6 +260,23 @@ void *schtSendRsp(void *param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *schtCreateFetchRspThread(void *param) {
|
||||||
|
struct SSchJob* job = (struct SSchJob*)param;
|
||||||
|
|
||||||
|
sleep(1);
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
|
||||||
|
rsp->completed = 1;
|
||||||
|
rsp->numOfRows = 10;
|
||||||
|
code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0);
|
||||||
|
|
||||||
|
assert(code == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
struct SSchJob *pInsertJob = NULL;
|
struct SSchJob *pInsertJob = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,6 +305,7 @@ TEST(queryTest, normalCase) {
|
||||||
|
|
||||||
schtSetPlanToString();
|
schtSetPlanToString();
|
||||||
schtSetExecNode();
|
schtSetExecNode();
|
||||||
|
schtSetAsyncSendMsgToServer();
|
||||||
|
|
||||||
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
|
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
@ -276,7 +316,7 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SQueryTableRsp rsp = {0};
|
SQueryTableRsp rsp = {0};
|
||||||
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
|
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
|
@ -287,8 +327,8 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SResReadyRsp rsp = {0};
|
SResReadyRsp rsp = {0};
|
||||||
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
|
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
|
printf("code:%d", code);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
}
|
}
|
||||||
|
@ -298,7 +338,7 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SQueryTableRsp rsp = {0};
|
SQueryTableRsp rsp = {0};
|
||||||
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
|
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
|
@ -309,22 +349,19 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SResReadyRsp rsp = {0};
|
SResReadyRsp rsp = {0};
|
||||||
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
|
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRetrieveTableRsp rsp = {0};
|
pthread_attr_t thattr;
|
||||||
rsp.completed = 1;
|
pthread_attr_init(&thattr);
|
||||||
rsp.numOfRows = 10;
|
|
||||||
code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
|
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
|
||||||
|
|
||||||
|
pthread_t thread1;
|
||||||
|
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
|
||||||
|
|
||||||
void *data = NULL;
|
void *data = NULL;
|
||||||
|
|
||||||
code = scheduleFetchRows(job, &data);
|
code = scheduleFetchRows(job, &data);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
@ -340,6 +377,8 @@ TEST(queryTest, normalCase) {
|
||||||
scheduleFreeJob(pJob);
|
scheduleFreeJob(pJob);
|
||||||
|
|
||||||
schtFreeQueryDag(&dag);
|
schtFreeQueryDag(&dag);
|
||||||
|
|
||||||
|
schedulerDestroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -369,6 +408,7 @@ TEST(insertTest, normalCase) {
|
||||||
schtBuildInsertDag(&dag);
|
schtBuildInsertDag(&dag);
|
||||||
|
|
||||||
schtSetPlanToString();
|
schtSetPlanToString();
|
||||||
|
schtSetAsyncSendMsgToServer();
|
||||||
|
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
|
@ -382,6 +422,8 @@ TEST(insertTest, normalCase) {
|
||||||
ASSERT_EQ(res.numOfRows, 20);
|
ASSERT_EQ(res.numOfRows, 20);
|
||||||
|
|
||||||
scheduleFreeJob(pInsertJob);
|
scheduleFreeJob(pInsertJob);
|
||||||
|
|
||||||
|
schedulerDestroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(multiThread, forceFree) {
|
TEST(multiThread, forceFree) {
|
||||||
|
|
Loading…
Reference in New Issue