feature/qnode

This commit is contained in:
dapan1121 2022-01-20 19:34:46 +08:00
parent b9033a7501
commit 36bfc3763b
13 changed files with 637 additions and 222 deletions

View File

@ -875,6 +875,7 @@ typedef struct SSubQueryMsg {
uint64_t sId;
uint64_t queryId;
uint64_t taskId;
int8_t taskType;
uint32_t contentLen;
char msg[];
} SSubQueryMsg;

View File

@ -84,6 +84,13 @@ void* qGetResultRetrieveMsg(qTaskInfo_t qinfo);
*/
int32_t qKillTask(qTaskInfo_t qinfo);
/**
* kill the ongoing query asynchronously
* @param qinfo qhandle
* @return
*/
int32_t qAsyncKillTask(qTaskInfo_t qinfo);
/**
* return whether query is completed or not
* @param qinfo

View File

@ -38,6 +38,11 @@ enum {
JOB_TASK_STATUS_FREEING,
};
enum {
TASK_TYPE_PERSISTENT = 1,
TASK_TYPE_TEMP,
};
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision

View File

@ -362,6 +362,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation")
#define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error")
#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) //"Job freed")
#define TSDB_CODE_QRY_TASK_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x071B) //"Task status error")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired")

View File

@ -317,6 +317,19 @@ int32_t qKillTask(qTaskInfo_t qinfo) {
return TSDB_CODE_SUCCESS;
}
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
SQInfo *pQInfo = (SQInfo *)qinfo;
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:0x%"PRIx64" query async killed", pQInfo->qId);
setQueryKilled(pQInfo);
return TSDB_CODE_SUCCESS;
}
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;

View File

@ -84,6 +84,7 @@ typedef struct SQWMsg {
typedef struct SQWPhaseInput {
int8_t status;
int8_t taskType;
int32_t code;
qTaskInfo_t taskHandle;
DataSinkHandle sinkHandle;
@ -91,8 +92,7 @@ typedef struct SQWPhaseInput {
typedef struct SQWPhaseOutput {
int32_t rspCode;
bool needStop;
bool needRsp;
bool needStop;
} SQWPhaseOutput;
@ -104,10 +104,15 @@ typedef struct SQWTaskStatus {
typedef struct SQWTaskCtx {
SRWLatch lock;
int8_t phase;
int8_t taskType;
void *readyConnection;
void *dropConnection;
void *cancelConnection;
bool emptyRes;
int8_t queryContinue;
int8_t inQueue;
int8_t queryInQueue;
int32_t rspCode;
int8_t events[QW_EVENT_MAX];
@ -170,6 +175,10 @@ typedef struct SQWorkerMgmt {
#define QW_TASK_WLOG(param, ...) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) qError("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_WLOG_E(param) qWarn("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_TASK_DLOG_E(param) qDebug("QW:%p QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, qId, tId)
#define QW_SCH_TASK_ELOG(param, ...) qError("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) qWarn("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) qDebug("QW:%p SID:%"PRIx64",QID:%"PRIx64",TID:%"PRIx64" " param, mgmt, sId, qId, tId, __VA_ARGS__)

View File

@ -23,7 +23,7 @@ extern "C" {
#include "qworkerInt.h"
#include "dataSinkMgt.h"
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg, int8_t taskType);
int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);
int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg);

View File

@ -141,7 +141,8 @@ int32_t qwAcquireSchedulerImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, u
} else if (QW_NOT_EXIST_RET_ERR == nOpt) {
QW_RET(TSDB_CODE_QRY_SCH_NOT_EXIST);
} else {
assert(0);
QW_TASK_ELOG("unknown notExistOpt:%d", nOpt);
QW_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
}
@ -260,7 +261,7 @@ int32_t qwGetTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tI
*ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id));
if (NULL == (*ctx)) {
QW_TASK_ELOG("ctx not in ctxHash, id:%s", id);
QW_TASK_ELOG("ctx not in ctxHash, ctxHashSize:%d", taosHashGetSize(mgmt->ctxHash));
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
}
@ -271,8 +272,6 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId);
printf("%"PRIx64", tid:%"PRIx64"\n", qId, tId);
SQWTaskCtx nctx = {0};
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
@ -324,11 +323,11 @@ void qwReleaseTaskCtx(int32_t rwType, SQWorkerMgmt *mgmt) {
QW_UNLOCK(rwType, &mgmt->ctxLock);
}
void qwFreeTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) {
// RC WARNING
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
qDestroyTask(taskHandle);
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) {
qDestroyTask(otaskHandle);
}
}
@ -337,7 +336,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
// RC WARNING
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
code = qKillTask(taskHandle);
code = qAsyncKillTask(taskHandle);
atomic_store_ptr(&ctx->taskHandle, taskHandle);
}
@ -346,7 +345,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
qwFreeTaskHandle(QW_FPARAMS(), ctx);
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
if (ctx->sinkHandle) {
dsDestroyDataSinker(ctx->sinkHandle);
@ -369,7 +368,7 @@ int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
octx = *ctx;
if (taosHashRemove(mgmt->ctxHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove from ctx hash failed, id:%s", id);
QW_TASK_ELOG_E("taosHashRemove from ctx hash failed");
QW_ERR_RET(TSDB_CODE_QRY_RES_CACHE_NOT_EXIST);
}
@ -380,6 +379,8 @@ int32_t qwDropTaskCtx(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t t
if (octx.sinkHandle) {
dsDestroyDataSinker(octx.sinkHandle);
}
QW_TASK_DLOG_E("task ctx dropped");
return TSDB_CODE_SUCCESS;
}
@ -394,23 +395,23 @@ int32_t qwDropTaskStatus(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
QW_SET_QTID(id, qId, tId);
if (qwAcquireScheduler(QW_FPARAMS(), QW_WRITE, &sch)) {
QW_TASK_WLOG("scheduler does not exist, id:%s", id);
QW_TASK_WLOG_E("scheduler does not exist");
return TSDB_CODE_SUCCESS;
}
if (qwAcquireTaskStatus(QW_FPARAMS(), QW_WRITE, sch, &task)) {
qwReleaseScheduler(QW_WRITE, mgmt);
QW_TASK_WLOG("task does not exist, id:%s", id);
QW_TASK_WLOG_E("task does not exist");
return TSDB_CODE_SUCCESS;
}
if (taosHashRemove(sch->tasksHash, id, sizeof(id))) {
QW_TASK_ELOG("taosHashRemove task from hash failed, task:%p", task);
QW_TASK_ELOG_E("taosHashRemove task from hash failed");
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
QW_TASK_DLOG("task dropped, id:%s", id);
QW_TASK_DLOG_E("task status dropped");
_return:
@ -444,7 +445,7 @@ int32_t qwDropTask(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId,
SQWTaskCtx *ctx = NULL;
bool locked = false;
QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), QW_WRITE, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
@ -484,7 +485,7 @@ _return:
QW_RET(code);
}
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) {
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle, DataSinkHandle sinkHandle, int8_t taskType) {
int32_t code = 0;
bool qcontinue = true;
SSDataBlock* pRes = NULL;
@ -494,7 +495,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
while (true) {
QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++);
code = qExecTask(taskHandle, &pRes, &useconds);
code = qExecTask(*taskHandle, &pRes, &useconds);
if (code) {
QW_TASK_ELOG("qExecTask failed, code:%x", code);
QW_ERR_JRET(code);
@ -502,7 +503,12 @@ int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHa
if (NULL == pRes) {
QW_TASK_DLOG("task query done, useconds:%"PRIu64, useconds);
dsEndPut(sinkHandle, useconds);
if (TASK_TYPE_TEMP == taskType) {
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
}
break;
}
@ -608,11 +614,14 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
}
int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0;
int8_t status = 0;
SQWTaskCtx *ctx = NULL;
bool locked = false;
void *readyConnection = NULL;
void *dropConnection = NULL;
void *cancelConnection = NULL;
QW_SCH_TASK_DLOG("handle event at phase %d", phase);
@ -620,17 +629,24 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
case QW_PHASE_PRE_QUERY: {
QW_ERR_JRET(qwAddGetTaskCtx(QW_FPARAMS(), &ctx));
ctx->phase = phase;
atomic_store_32(&ctx->phase, phase);
atomic_store_8(&ctx->taskType, input->taskType);
assert(!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL));
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_ELOG("task already cancelled at wrong phase, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
break;
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
output->needStop = true;
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it
locked = false;
@ -664,18 +680,17 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
}
if (input->code) {
QW_SET_RSP_CODE(ctx, input->code);
output->rspCode = input->code;
}
assert(!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
output->needStop = true;
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it
locked = false;
@ -689,11 +704,9 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
output->needRsp = true;
readyConnection = ctx->readyConnection;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
output->rspCode = input->code;
}
if (!output->needStop) {
@ -701,84 +714,14 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
}
break;
}
case QW_PHASE_PRE_CQUERY: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
ctx->phase = phase;
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task is dropping, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPING;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task is cancelling, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING;
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
break;
}
case QW_PHASE_POST_CQUERY: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
if (input->code) {
QW_SET_RSP_CODE(ctx, input->code);
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task is dropping, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPING;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task is cancelling, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING;
}
if (ctx->rspCode) {
QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
break;
}
case QW_PHASE_PRE_FETCH: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
ctx->phase = phase;
atomic_store_32(&ctx->phase, phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
@ -788,13 +731,15 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task is dropping, phase:%d", phase);
QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPING;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task is cancelling, phase:%d", phase);
QW_TASK_ELOG("cancel event at wrong phase, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
@ -827,7 +772,7 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
locked = true;
if (input->code) {
QW_SET_RSP_CODE(ctx, input->code);
output->rspCode = input->code;
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
@ -838,13 +783,27 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("task is dropping, phase:%d", phase);
QW_TASK_WLOG("start to drop task, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_DROPPING;
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it
locked = false;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task is cancelling, phase:%d", phase);
QW_TASK_WLOG("start to cancel task, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLING;
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
cancelConnection = ctx->cancelConnection;
}
if (ctx->rspCode) {
@ -855,21 +814,127 @@ int32_t qwHandleTaskEvent(QW_FPARAMS_DEF, int32_t phase, SQWPhaseInput *input, S
}
break;
}
case QW_PHASE_PRE_CQUERY: {
QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), QW_READ, &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
atomic_store_32(&ctx->phase, phase);
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_ELOG("cancel event at wrong phase, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (ctx->rspCode) {
QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
break;
}
case QW_PHASE_POST_CQUERY: {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
QW_LOCK(QW_WRITE, &ctx->lock);
locked = true;
if (input->code) {
output->rspCode = input->code;
}
if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("task already cancelled, phase:%d", phase);
output->needStop = true;
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG("start to drop task, phase:%d", phase);
output->needStop = true;
QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS()));
QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS()));
output->rspCode = TSDB_CODE_QRY_TASK_DROPPED;
dropConnection = ctx->dropConnection;
// Note: ctx freed, no need to unlock it
locked = false;
} else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) {
QW_TASK_WLOG("start to cancel task, phase:%d", phase);
output->needStop = true;
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED));
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL);
output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED;
cancelConnection = ctx->cancelConnection;
}
if (ctx->rspCode) {
QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase);
output->needStop = true;
output->rspCode = ctx->rspCode;
QW_ERR_JRET(output->rspCode);
}
break;
}
}
_return:
if (output->rspCode) {
QW_SET_RSP_CODE(ctx, output->rspCode);
}
if (locked) {
ctx->phase = phase;
atomic_store_32(&ctx->phase, phase);
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
if (readyConnection) {
qwBuildAndSendReadyRsp(readyConnection, output->rspCode);
QW_TASK_DLOG("ready msg rsped, code:%x", output->rspCode);
}
if (dropConnection) {
qwBuildAndSendDropRsp(dropConnection, output->rspCode);
QW_TASK_DLOG("drop msg rsped, code:%x", output->rspCode);
}
if (cancelConnection) {
qwBuildAndSendCancelRsp(cancelConnection, output->rspCode);
QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode);
}
QW_RET(code);
}
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg) {
int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, SQWMsg *qwMsg, int8_t taskType) {
int32_t code = 0;
bool queryRsped = false;
bool needStop = false;
@ -877,6 +942,10 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
int32_t rspCode = 0;
SQWPhaseInput input = {0};
SQWPhaseOutput output = {0};
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
input.taskType = taskType;
QW_ERR_JRET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, &output));
@ -893,15 +962,19 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_TASK_ELOG("task string to subplan failed, code:%x", code);
QW_ERR_JRET(code);
}
qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL;
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
QW_ERR_JRET(code);
}
if ((pTaskInfo && NULL == sinkHandle) || (NULL == pTaskInfo && sinkHandle)) {
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
//TODO OPTIMIZE EMTYP RESULT QUERY RSP TO AVOID FURTHER FETCH
QW_ERR_JRET(qwBuildAndSendQueryRsp(qwMsg->connection, code));
QW_TASK_DLOG("query msg rsped, code:%d", code);
@ -909,8 +982,9 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
queryRsped = true;
if (pTaskInfo && sinkHandle) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle));
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &pTaskInfo, sinkHandle, taskType));
}
_return:
if (code) {
@ -943,11 +1017,6 @@ _return:
}
QW_ERR_RET(qwHandleTaskEvent(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, &output));
if (queryRsped && output.needRsp) {
qwBuildAndSendReadyRsp(qwMsg->connection, output.rspCode);
QW_TASK_DLOG("ready msg rsped, code:%x", output.rspCode);
}
QW_RET(rspCode);
}
@ -956,6 +1025,8 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
int8_t phase = 0;
bool needRsp = false;
int32_t rspCode = 0;
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
@ -968,16 +1039,19 @@ int32_t qwProcessReady(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_TASK_DLOG("ready msg not rsped, phase:%d", phase);
} else if (phase == QW_PHASE_POST_QUERY) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
QW_ERR_JRET(qwBuildAndSendReadyRsp(qwMsg->connection, ctx->rspCode));
QW_TASK_DLOG("ready msg rsped, code:%x", ctx->rspCode);
needRsp = true;
rspCode = ctx->rspCode;
} else {
QW_TASK_ELOG("invalid phase when got ready msg, phase:%d", phase);
assert(0);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
needRsp = true;
rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR;
QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR);
}
_return:
if (code) {
if (code && ctx) {
QW_SET_RSP_CODE(ctx, code);
}
@ -985,6 +1059,11 @@ _return:
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
if (needRsp) {
qwBuildAndSendReadyRsp(qwMsg->connection, rspCode);
QW_TASK_DLOG("ready msg rsped, code:%x", rspCode);
}
QW_RET(code);
}
@ -1013,12 +1092,11 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->inQueue, 0);
atomic_store_8(&ctx->queryInQueue, 0);
qTaskInfo_t taskHandle = ctx->taskHandle;
DataSinkHandle sinkHandle = ctx->sinkHandle;
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), taskHandle, sinkHandle));
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), &ctx->taskHandle, sinkHandle, ctx->taskType));
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
SOutputData sOutput = {0};
@ -1110,10 +1188,10 @@ int32_t qwProcessFetch(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
// RC WARNING
if (QW_IN_EXECUTOR(ctx)) {
atomic_store_8(&ctx->queryContinue, 1);
} else if (0 == atomic_load_8(&ctx->inQueue)) {
} else if (0 == atomic_load_8(&ctx->queryInQueue)) {
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING));
atomic_store_8(&ctx->inQueue, 1);
atomic_store_8(&ctx->queryInQueue, 1);
QW_ERR_JRET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), qwMsg->connection));
}
@ -1197,10 +1275,10 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
mgmt->ctxHash = taosHashInit(mgmt->cfg.maxTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == mgmt->ctxHash) {
qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
taosHashCleanup(mgmt->schHash);
mgmt->schHash = NULL;
tfree(mgmt);
qError("init %d task ctx hash failed", mgmt->cfg.maxTaskNum);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}

View File

@ -290,7 +290,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_SCH_TASK_DLOG("processQuery start, node:%p", node);
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg));
QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
@ -374,7 +374,9 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SQWorkerMgmt *mgmt = (SQWorkerMgmt *)qWorkerMgmt;
msg->sId = htobe64(msg->sId);
uint64_t sId = msg->sId;
SSchedulerStatusRsp *sStatus = NULL;

View File

@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST})
TARGET_LINK_LIBRARIES(
qworkerTest
PUBLIC os util common transport gtest qcom planner qworker
PUBLIC os util common transport gtest qcom planner qworker executor
)
TARGET_INCLUDE_DIRECTORIES(

View File

@ -32,11 +32,75 @@
#include "qworker.h"
#include "stub.h"
#include "addr_any.h"
#include "executor.h"
#include "dataSinkMgt.h"
namespace {
bool testStop = false;
bool qwtTestEnableSleep = true;
bool qwtTestStop = false;
bool qwtTestDeadLoop = true;
int32_t qwtTestMTRunSec = 10;
int32_t qwtTestPrintNum = 100000;
int32_t qwtTestCaseIdx = 0;
int32_t qwtTestCaseNum = 4;
void qwtInitLogFile() {
const char *defaultLogFileNamePrefix = "taosdlog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
qDebugFlag = 159;
char temp[128] = {0};
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) {
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc->pCont = queryMsg;
queryRpc->contLen = sizeof(SSubQueryMsg) + 100;
}
void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) {
readyMsg->sId = htobe64(1);
readyMsg->queryId = htobe64(1);
readyMsg->taskId = htobe64(1);
readyRpc->pCont = readyMsg;
readyRpc->contLen = sizeof(SResReadyReq);
}
void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchMsg->sId = htobe64(1);
fetchMsg->queryId = htobe64(1);
fetchMsg->taskId = htobe64(1);
fetchRpc->pCont = fetchMsg;
fetchRpc->contLen = sizeof(SResFetchReq);
}
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = htobe64(1);
dropMsg->queryId = htobe64(1);
dropMsg->taskId = htobe64(1);
dropRpc->pCont = dropMsg;
dropRpc->contLen = sizeof(STaskDropReq);
}
void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) {
statusMsg->sId = htobe64(1);
statusRpc->pCont = statusMsg;
statusRpc->contLen = sizeof(SSchTasksStatusReq);
statusRpc->msgType = TDMT_VND_TASKS_STATUS;
}
int32_t qwtStringToPlan(const char* str, SSubplan** subplan) {
return 0;
@ -48,6 +112,7 @@ int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) {
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
/*
if (TDMT_VND_TASKS_STATUS_RSP == pRsp->msgType) {
SSchedulerStatusRsp *rsp = (SSchedulerStatusRsp *)pRsp->pCont;
printf("task num:%d\n", rsp->num);
@ -56,9 +121,63 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
printf("qId:%"PRIx64",tId:%"PRIx64",status:%d\n", task->queryId, task->taskId, task->status);
}
}
*/
return;
}
int32_t qwtCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
int32_t idx = qwtTestCaseIdx % qwtTestCaseNum;
if (0 == idx) {
*pTaskInfo = qwtTestCaseIdx;
*handle = qwtTestCaseIdx+1;
} else if (1 == idx) {
*pTaskInfo = NULL;
*handle = NULL;
} else if (2 == idx) {
*pTaskInfo = qwtTestCaseIdx;
*handle = NULL;
} else if (3 == idx) {
*pTaskInfo = NULL;
*handle = qwtTestCaseIdx;
}
++qwtTestCaseIdx;
return 0;
}
int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
return 0;
}
int32_t qwtKillTask(qTaskInfo_t qinfo) {
return 0;
}
void qwtDestroyTask(qTaskInfo_t qHandle) {
}
int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) {
return 0;
}
void qwtEndPut(DataSinkHandle handle, uint64_t useconds) {
}
void qwtGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) {
}
int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) {
return 0;
}
void qwtDestroyDataSinker(DataSinkHandle handle) {
}
void stubSetStringToPlan() {
@ -74,11 +193,118 @@ void stubSetStringToPlan() {
}
}
void stubSetExecTask() {
static Stub stub;
stub.set(qExecTask, qwtExecTask);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^qExecTask$", result);
for (const auto& f : result) {
stub.set(f.second, qwtExecTask);
}
}
}
void stubSetCreateExecTask() {
static Stub stub;
stub.set(qCreateExecTask, qwtCreateExecTask);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^qCreateExecTask$", result);
for (const auto& f : result) {
stub.set(f.second, qwtCreateExecTask);
}
}
}
void stubSetAsyncKillTask() {
static Stub stub;
stub.set(qAsyncKillTask, qwtKillTask);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^qAsyncKillTask$", result);
for (const auto& f : result) {
stub.set(f.second, qwtKillTask);
}
}
}
void stubSetDestroyTask() {
static Stub stub;
stub.set(qDestroyTask, qwtDestroyTask);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^qDestroyTask$", result);
for (const auto& f : result) {
stub.set(f.second, qwtDestroyTask);
}
}
}
void stubSetDestroyDataSinker() {
static Stub stub;
stub.set(dsDestroyDataSinker, qwtDestroyDataSinker);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsDestroyDataSinker$", result);
for (const auto& f : result) {
stub.set(f.second, qwtDestroyDataSinker);
}
}
}
void stubSetGetDataLength() {
static Stub stub;
stub.set(dsGetDataLength, qwtGetDataLength);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsGetDataLength$", result);
for (const auto& f : result) {
stub.set(f.second, qwtGetDataLength);
}
}
}
void stubSetEndPut() {
static Stub stub;
stub.set(dsEndPut, qwtEndPut);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsEndPut$", result);
for (const auto& f : result) {
stub.set(f.second, qwtEndPut);
}
}
}
void stubSetPutDataBlock() {
static Stub stub;
stub.set(dsPutDataBlock, qwtPutDataBlock);
{
AddrAny any("libexecutor.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsPutDataBlock$", result);
for (const auto& f : result) {
stub.set(f.second, qwtPutDataBlock);
}
}
}
void stubSetRpcSendResponse() {
static Stub stub;
stub.set(rpcSendResponse, qwtRpcSendResponse);
{
AddrAny any("libplanner.so");
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^rpcSendResponse$", result);
for (const auto& f : result) {
@ -87,24 +313,35 @@ void stubSetRpcSendResponse() {
}
}
void stubSetGetDataBlock() {
static Stub stub;
stub.set(dsGetDataBlock, qwtGetDataBlock);
{
AddrAny any("libtransport.so");
std::map<std::string,void*> result;
any.get_global_func_addr_dynsym("^dsGetDataBlock$", result);
for (const auto& f : result) {
stub.set(f.second, qwtGetDataBlock);
}
}
}
void *queryThread(void *param) {
SRpcMsg queryRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
while (!testStop) {
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("query:%d\n", n);
}
}
@ -119,16 +356,14 @@ void *readyThread(void *param) {
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyReq);
while (!testStop) {
while (!qwtTestStop) {
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("ready:%d\n", n);
}
}
@ -143,16 +378,14 @@ void *fetchThread(void *param) {
void *mockPointer = (void *)0x1;
void *mgmt = param;
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchReq);
while (!testStop) {
while (!qwtTestStop) {
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("fetch:%d\n", n);
}
}
@ -167,16 +400,14 @@ void *dropThread(void *param) {
void *mockPointer = (void *)0x1;
void *mgmt = param;
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropReq);
while (!testStop) {
while (!qwtTestStop) {
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("drop:%d\n", n);
}
}
@ -191,16 +422,14 @@ void *statusThread(void *param) {
void *mockPointer = (void *)0x1;
void *mgmt = param;
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
while (!testStop) {
statusMsg.sId = htobe64(1);
while (!qwtTestStop) {
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
usleep(rand()%5);
if (++n % 50000 == 0) {
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("status:%d\n", n);
}
}
@ -209,6 +438,27 @@ void *statusThread(void *param) {
}
void *controlThread(void *param) {
SRpcMsg queryRpc = {0};
int32_t code = 0;
uint32_t n = 0;
void *mockPointer = (void *)0x1;
void *mgmt = param;
while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
if (qwtTestEnableSleep) {
usleep(rand()%5);
}
if (++n % qwtTestPrintNum == 0) {
printf("query:%d\n", n);
}
}
return NULL;
}
@ -224,6 +474,8 @@ TEST(seqTest, normalCase) {
SRpcMsg fetchRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
qwtInitLogFile();
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
@ -262,6 +514,15 @@ TEST(seqTest, normalCase) {
stubSetStringToPlan();
stubSetRpcSendResponse();
stubSetExecTask();
stubSetCreateExecTask();
stubSetAsyncKillTask();
stubSetDestroyTask();
stubSetDestroyDataSinker();
stubSetGetDataLength();
stubSetEndPut();
stubSetPutDataBlock();
stubSetGetDataBlock();
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
@ -308,6 +569,8 @@ TEST(seqTest, cancelFirst) {
SRpcMsg queryRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
qwtInitLogFile();
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
@ -348,7 +611,7 @@ TEST(seqTest, cancelFirst) {
ASSERT_EQ(code, 0);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
ASSERT_EQ(code, 0);
ASSERT_EQ(code, TSDB_CODE_QRY_TASK_DROPPED);
statusMsg.sId = htobe64(1);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
@ -366,44 +629,16 @@ TEST(seqTest, randCase) {
SRpcMsg fetchRpc = {0};
SRpcMsg dropRpc = {0};
SRpcMsg statusRpc = {0};
SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100);
queryMsg->queryId = htobe64(1);
queryMsg->sId = htobe64(1);
queryMsg->taskId = htobe64(1);
queryMsg->contentLen = htonl(100);
queryRpc.pCont = queryMsg;
queryRpc.contLen = sizeof(SSubQueryMsg) + 100;
SResReadyReq readyMsg = {0};
readyMsg.sId = htobe64(1);
readyMsg.queryId = htobe64(1);
readyMsg.taskId = htobe64(1);
readyRpc.pCont = &readyMsg;
readyRpc.contLen = sizeof(SResReadyReq);
SResFetchReq fetchMsg = {0};
fetchMsg.sId = htobe64(1);
fetchMsg.queryId = htobe64(1);
fetchMsg.taskId = htobe64(1);
fetchRpc.pCont = &fetchMsg;
fetchRpc.contLen = sizeof(SResFetchReq);
STaskDropReq dropMsg = {0};
dropMsg.sId = htobe64(1);
dropMsg.queryId = htobe64(1);
dropMsg.taskId = htobe64(1);
dropRpc.pCont = &dropMsg;
dropRpc.contLen = sizeof(STaskDropReq);
SSchTasksStatusReq statusMsg = {0};
statusMsg.sId = htobe64(1);
statusRpc.pCont = &statusMsg;
statusRpc.contLen = sizeof(SSchTasksStatusReq);
statusRpc.msgType = TDMT_VND_TASKS_STATUS;
qwtInitLogFile();
stubSetStringToPlan();
stubSetRpcSendResponse();
stubSetCreateExecTask();
srand(time(NULL));
@ -416,20 +651,25 @@ TEST(seqTest, randCase) {
int32_t r = rand() % maxr;
if (r >= 0 && r < maxr/5) {
printf("Query,%d\n", t++);
printf("Query,%d\n", t++);
qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc);
free(queryRpc.pCont);
} else if (r >= maxr/5 && r < maxr * 2/5) {
printf("Ready,%d\n", t++);
qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc);
} else if (r >= maxr * 2/5 && r < maxr* 3/5) {
printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc);
} else if (r >= maxr * 3/5 && r < maxr * 4/5) {
printf("Drop,%d\n", t++);
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc);
} else if (r >= maxr * 4/5 && r < maxr-1) {
printf("Status,%d\n", t++);
statusMsg.sId = htobe64(1);
qwtBuildStatusReqMsg(&statusMsg, &statusRpc);
code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc);
ASSERT_EQ(code, 0);
} else {
@ -445,6 +685,8 @@ TEST(seqTest, multithreadRand) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
qwtInitLogFile();
stubSetStringToPlan();
stubSetRpcSendResponse();
@ -464,15 +706,69 @@ TEST(seqTest, multithreadRand) {
pthread_create(&(t4), &thattr, dropThread, NULL);
pthread_create(&(t5), &thattr, statusThread, NULL);
int32_t t = 0;
int32_t maxr = 10001;
sleep(300);
testStop = true;
sleep(1);
while (true) {
if (qwtTestDeadLoop) {
sleep(1);
} else {
sleep(qwtTestMTRunSec);
break;
}
}
qwtTestStop = true;
sleep(3);
qWorkerDestroy(&mgmt);
}
TEST(rcTest, multithread) {
void *mgmt = NULL;
int32_t code = 0;
void *mockPointer = (void *)0x1;
qwtInitLogFile();
stubSetStringToPlan();
stubSetRpcSendResponse();
stubSetExecTask();
stubSetCreateExecTask();
stubSetAsyncKillTask();
stubSetDestroyTask();
stubSetDestroyDataSinker();
stubSetGetDataLength();
stubSetEndPut();
stubSetPutDataBlock();
stubSetGetDataBlock();
srand(time(NULL));
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue);
ASSERT_EQ(code, 0);
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t t1,t2,t3,t4,t5;
pthread_create(&(t1), &thattr, controlThread, mgmt);
pthread_create(&(t2), &thattr, queryQueueThread, NULL);
pthread_create(&(t3), &thattr, fetchQueueThread, NULL);
while (true) {
if (qwtTestDeadLoop) {
sleep(1);
} else {
sleep(qwtTestMTRunSec);
break;
}
}
qwtTestStop = true;
sleep(3);
qWorkerDestroy(&mgmt);
}
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);

View File

@ -1102,6 +1102,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
pMsg->taskType = TASK_TYPE_TEMP;
pMsg->contentLen = htonl(pTask->msgLen);
memcpy(pMsg->msg, pTask->msg, pTask->msgLen);
break;
@ -1487,6 +1488,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(plan->id.queryId);
pMsg->taskId = htobe64(schGenUUID());
pMsg->taskType = TASK_TYPE_PERSISTENT;
pMsg->contentLen = htonl(msgLen);
memcpy(pMsg->msg, msg, msgLen);

View File

@ -361,6 +361,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_STATUS_ERROR, "Task status error")