From b9c6e0cdf9db43c6b56b3cf7ec7757ea2ee79843 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 9 Aug 2022 13:55:51 +0800 Subject: [PATCH] refactor: stream invoke previous APIs. --- include/libs/executor/executor.h | 3 +- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/tq/tqExec.c | 13 +----- source/libs/executor/src/executor.c | 59 +++++++++++++++++++++++++- source/libs/qworker/src/qworker.c | 2 +- source/libs/stream/src/streamExec.c | 23 +++------- 6 files changed, 70 insertions(+), 32 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a3cc7aaa8a..a64815f14f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -123,7 +123,8 @@ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* table * @param handle * @return */ -int32_t qExecTask(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds); +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds); +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pBlock, uint64_t* useconds); /** * kill the ongoing query and free the query handle and corresponding resources automatically diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ef9a64e946..6b513f0242 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -612,7 +612,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm while (1) { uint64_t ts; - int32_t code = qExecTask(taskInfo, pResList, &ts); + int32_t code = qExecTaskOpt(taskInfo, pResList, &ts); if (code < 0) { smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, pItem->level, terrstr(code)); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 09a5f1a8c6..435bbb77b8 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -80,22 +80,14 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa } int32_t rowCnt = 0; - SArray* pResList = taosArrayInit(4, POINTER_BYTES); - while (1) { - taosArrayClear(pResList); - SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; tqDebug("task start to execute"); - if (qExecTask(task, pResList, &ts) < 0) { + if (qExecTask(task, &pDataBlock, &ts) < 0) { ASSERT(0); } - - if (taosArrayGetSize(pResList) > 0) { - pDataBlock = taosArrayGet(pResList, 0); - tqDebug("task execute end, get %p", pDataBlock); - } + tqDebug("task execute end, get %p", pDataBlock); if (pDataBlock != NULL) { if (pRsp->withTbName) { @@ -151,7 +143,6 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa break; } - taosArrayDestroy(pResList); return 0; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5bbf5928da..328a65bec4 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -427,7 +427,7 @@ static void freeBlock(void* param) { blockDataDestroy(pBlock); } -int32_t qExecTask(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { +int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); @@ -496,6 +496,63 @@ int32_t qExecTask(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) { return pTaskInfo->code; } +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + int64_t threadId = taosGetSelfPthreadId(); + + *pRes = NULL; + int64_t curOwner = 0; + if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { + qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); + pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; + return pTaskInfo->code; + } + + if (pTaskInfo->cost.start == 0) { + pTaskInfo->cost.start = taosGetTimestampMs(); + } + + if (isTaskKilled(pTaskInfo)) { + atomic_store_64(&pTaskInfo->owner, 0); + qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); + return TSDB_CODE_SUCCESS; + } + + // error occurs, record the error code and return to client + int32_t ret = setjmp(pTaskInfo->env); + if (ret != TSDB_CODE_SUCCESS) { + pTaskInfo->code = ret; + cleanUpUdfs(); + qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); + atomic_store_64(&pTaskInfo->owner, 0); + + return pTaskInfo->code; + } + + qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); + + int64_t st = taosGetTimestampUs(); + + *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); + uint64_t el = (taosGetTimestampUs() - st); + + pTaskInfo->cost.elapsedTime += el; + if (NULL == *pRes) { + *useconds = pTaskInfo->cost.elapsedTime; + } + + cleanUpUdfs(); + + int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0; + uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows; + + qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", + GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0); + + atomic_store_64(&pTaskInfo->owner, 0); + return pTaskInfo->code; +} + int32_t qKillTask(qTaskInfo_t qinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; if (pTaskInfo == NULL) { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 85a1b0444c..e06b752862 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -88,7 +88,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { // if *taskHandle is NULL, it's killed right now if (taskHandle) { qwDbgSimulateSleep(); - code = qExecTask(taskHandle, pResList, &useconds); + code = qExecTaskOpt(taskHandle, pResList, &useconds); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b92f658de0..7512f792c1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -43,17 +43,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* } // exec - SArray* pResList = taosArrayInit(4, POINTER_BYTES); while (1) { SSDataBlock* output = NULL; uint64_t ts = 0; - - taosArrayClear(pResList); - if (qExecTask(exec, pResList, &ts) < 0) { + if (qExecTask(exec, &output, &ts) < 0) { ASSERT(false); } - - if (taosArrayGetSize(pResList) == 0) { + if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SSDataBlock block = {0}; const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data; @@ -69,7 +65,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* break; } - output = taosArrayGetP(pResList, 0); if (output->info.type == STREAM_RETRIEVE) { if (streamBroadcastToChildren(pTask, output) < 0) { // TODO @@ -84,8 +79,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); } - - taosArrayDestroy(pResList); return 0; } @@ -105,7 +98,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { void* exec = pTask->exec.executor; - SArray* pResList = taosArrayInit(4, POINTER_BYTES); while (1) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { @@ -115,17 +107,14 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { int32_t batchCnt = 0; while (1) { - uint64_t ts = 0; - taosArrayClear(pResList); - if (qExecTask(exec, pResList, &ts) < 0) { + SSDataBlock* output = NULL; + uint64_t ts = 0; + if (qExecTask(exec, &output, &ts) < 0) { ASSERT(0); } - - if (taosArrayGetSize(pResList) == 0) break; + if (output == NULL) break; SSDataBlock block = {0}; - SSDataBlock* output = taosArrayGetP(pResList, 0); - assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block);