enh: support dynamic table scan

This commit is contained in:
dapan1121 2023-07-06 09:54:45 +08:00
parent 0718859c0a
commit e7c200ad6e
5 changed files with 64 additions and 8 deletions

View File

@ -309,6 +309,8 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
if (pTableScanNode->scan.node.dynamicOp) {
pTaskInfo->dynamicTask = true;
pTableListInfo->idInfo.suid = pTableScanNode->scan.suid;
pTableListInfo->idInfo.tableType = pTableScanNode->scan.tableType;
} else {
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, pHandle,
pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);

View File

@ -784,8 +784,33 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
return NULL;
}
static int32_t createTableListInfoFromParam(STableScanInfo* pInfo, STableScanOperatorParam* pParam) {
static int32_t createTableListInfoFromParam(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = 0;
STableListInfo* pListInfo = pInfo->base.pTableListInfo;
int32_t num = taosArrayGetSize(pOperator->pOperatorParam->pUidList);
if (num <= 0) {
qError("empty table scan uid list");
return TSDB_CODE_INVALID_PARA;
}
qDebug("add total %d dynamic tables to scan", num);
for (int32_t i = 0; i < num; ++i) {
uint64_t* pUid = taosArrayGet(pOperator->pOperatorParam->pUidList, i);
STableKeyInfo info = {.uid = *pUid, .groupId = 0};
void* p = taosArrayPush(pListInfo->pTableList, &info);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashPut(pListInfo->map, pUid, sizeof(uint64_t), &i, sizeof(int32_t));
qTrace("add dynamic table scan uid:%" PRIu64 ", %s", info.uid, GET_TASKID(pTaskInfo));
}
pOperator->pOperatorParam = NULL;
return code;
}
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
@ -794,7 +819,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
if (pOperator->pOperatorParam) {
int32_t code = createTableListInfoFromParam(pInfo, (STableScanOperatorParam*)pOperator->pOperatorParam);
int32_t code = createTableListInfoFromParam(pOperator);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);

View File

@ -120,6 +120,7 @@ typedef struct SQWTaskCtx {
int8_t explain;
int8_t needFetch;
int8_t localExec;
int8_t dynamicTask;
int32_t queryMsgType;
int32_t fetchMsgType;
int32_t level;

View File

@ -412,11 +412,13 @@ int32_t qwHandleDynamicTaskEnd(QW_FPARAMS_DEF) {
QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
}
if (!qIsDynamicExecTask(ctx->taskHandle)) {
if (!ctx->dynamicTask) {
return TSDB_CODE_SUCCESS;
}
qwHandleTaskComplete(QW_FPARAMS_DEF, ctx);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
qwHandleTaskComplete(QW_FPARAMS(), ctx);
return TSDB_CODE_SUCCESS;
}

View File

@ -203,7 +203,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
}
if (numOfResBlock == 0 || (hasMore == false)) {
if (!qIsDynamicExecTask(taskHandle)) {
if (!ctx->dynamicTask) {
if (numOfResBlock == 0) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds);
} else {
@ -217,6 +217,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
} else {
QW_TASK_DLOG("dyn task qExecTask done, useconds:%" PRIu64, useconds);
}
ctx->queryExecDone = true;
}
dsEndPut(sinkHandle, useconds);
@ -341,7 +343,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows);
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
if (!ctx->dynamicTask) {
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC);
}
if (NULL == rsp) {
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &rsp));
*pOutput = output;
@ -481,6 +486,22 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx * ctx, SQWMsg *qwMsg, i
return TSDB_CODE_SUCCESS;
}
int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) {
if (!ctx->queryExecDone || !ctx->queryEnd) {
QW_TASK_ELOG("dynamic task prev exec not finished, execDone:%d, queryEnd:%d", ctx->queryExecDone, ctx->queryEnd);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
qUpdateOperatorParam(ctx->taskHandle);
atomic_store_8((int8_t *)&ctx->queryInQueue, 1);
QW_ERR_RET(qwBuildAndSendCQueryMsg(QW_FPARAMS(), &qwMsg->connInfo));
return TSDB_CODE_SUCCESS;
}
int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) {
int32_t code = 0;
SQWTaskCtx *ctx = NULL;
@ -734,13 +755,17 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
//qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true);
ctx->level = plan->level;
ctx->dynamicTask = qIsDynamicExecTask(pTaskInfo)
atomic_store_ptr(&ctx->taskHandle, pTaskInfo);
atomic_store_ptr(&ctx->sinkHandle, sinkHandle);
qwSaveTbVersionInfo(pTaskInfo, ctx);
if (!qIsDynamicExecTask(pTaskInfo)) {
if (!ctx->dynamicTask) {
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL));
} else {
ctx->queryExecDone = true;
ctx->queryEnd = true;
}
_return:
@ -862,7 +887,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
ctx->dataConnInfo = qwMsg->connInfo;
if (qwMsg->msg) {
qUpdateOperatorParam(ctx->taskHandle);
code = qwStartDynamicTaskNewExec(QW_FPARAMS(), ctx, qwMsg);
goto _return;
}
SOutputData sOutput = {0};