Merge pull request #14860 from taosdata/feature/stream
refactor(stream): simple batch
This commit is contained in:
commit
b7365fb0c0
|
@ -186,9 +186,12 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset);
|
||||||
|
|
||||||
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
void* qExtractReaderFromStreamScanner(void* scanner);
|
void* qExtractReaderFromStreamScanner(void* scanner);
|
||||||
|
|
||||||
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
|
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
|
||||||
|
|
||||||
|
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#include "executor.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
@ -120,7 +121,6 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue
|
||||||
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
|
static FORCE_INLINE void* streamQueueNextItem(SStreamQueue* queue) {
|
||||||
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
|
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
|
||||||
if (dequeueFlag == STREAM_QUEUE__FAILED) {
|
if (dequeueFlag == STREAM_QUEUE__FAILED) {
|
||||||
ASSERT(0);
|
|
||||||
ASSERT(queue->qItem != NULL);
|
ASSERT(queue->qItem != NULL);
|
||||||
return streamQueueCurItem(queue);
|
return streamQueueCurItem(queue);
|
||||||
} else {
|
} else {
|
||||||
|
@ -307,14 +307,18 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
|
||||||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
qInfo("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
|
qDebug("task %d %p submit enqueue %p %p %p", pTask->taskId, pTask, pItem, pSubmitClone, pSubmitClone->data);
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
|
taosWriteQitem(pTask->inputQueue->queue, pSubmitClone);
|
||||||
|
// qStreamInput(pTask->exec.executor, pSubmitClone);
|
||||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
// qStreamInput(pTask->exec.executor, pItem);
|
||||||
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
|
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
// qStreamInput(pTask->exec.executor, pItem);
|
||||||
} else if (pItem->type == STREAM_INPUT__TRIGGER) {
|
} else if (pItem->type == STREAM_INPUT__TRIGGER) {
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
// qStreamInput(pTask->exec.executor, pItem);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
|
if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
|
||||||
|
|
|
@ -357,16 +357,16 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -8,7 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC})
|
||||||
# )
|
# )
|
||||||
|
|
||||||
target_link_libraries(executor
|
target_link_libraries(executor
|
||||||
PRIVATE os util common function parser planner qcom vnode scalar nodes index
|
PRIVATE os util common function parser planner qcom vnode scalar nodes index stream
|
||||||
)
|
)
|
||||||
|
|
||||||
target_include_directories(
|
target_include_directories(
|
||||||
|
|
|
@ -39,6 +39,7 @@ extern "C" {
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tpagedbuf.h"
|
#include "tpagedbuf.h"
|
||||||
#include "tstreamUpdate.h"
|
#include "tstreamUpdate.h"
|
||||||
|
#include "tstream.h"
|
||||||
|
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "executorInt.h"
|
#include "executorInt.h"
|
||||||
|
@ -139,12 +140,14 @@ typedef struct STaskIdInfo {
|
||||||
} STaskIdInfo;
|
} STaskIdInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
//TODO remove prepareStatus
|
||||||
STqOffsetVal prepareStatus; // for tmq
|
STqOffsetVal prepareStatus; // for tmq
|
||||||
STqOffsetVal lastStatus; // for tmq
|
STqOffsetVal lastStatus; // for tmq
|
||||||
void* metaBlk; // for tmq fetching meta
|
void* metaBlk; // for tmq fetching meta
|
||||||
SSDataBlock* pullOverBlk; // for streaming
|
SSDataBlock* pullOverBlk; // for streaming
|
||||||
SWalFilterCond cond;
|
SWalFilterCond cond;
|
||||||
int64_t lastScanUid;
|
int64_t lastScanUid;
|
||||||
|
SStreamQueue* inputQueue;
|
||||||
} SStreamTaskInfo;
|
} SStreamTaskInfo;
|
||||||
|
|
||||||
typedef struct SExecTaskInfo {
|
typedef struct SExecTaskInfo {
|
||||||
|
|
|
@ -60,8 +60,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
||||||
taosArrayPush(pInfo->pBlockLists, &p);
|
taosArrayPush(pInfo->pBlockLists, &p);
|
||||||
}
|
}
|
||||||
/*} else if (type == STREAM_INPUT__TABLE_SCAN) {*/
|
|
||||||
/*ASSERT(pInfo->blockType == STREAM_INPUT__TABLE_SCAN);*/
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,6 +44,13 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (model == OPTR_EXEC_MODEL_STREAM) {
|
||||||
|
(*pTask)->streamInfo.inputQueue = streamQueueOpen();
|
||||||
|
if ((*pTask)->streamInfo.inputQueue == NULL) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
|
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
|
||||||
code = dsDataSinkMgtInit(&cfg);
|
code = dsDataSinkMgtInit(&cfg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -252,6 +259,13 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
|
||||||
|
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void* qExtractReaderFromStreamScanner(void* scanner) {
|
void* qExtractReaderFromStreamScanner(void* scanner) {
|
||||||
SStreamScanInfo* pInfo = scanner;
|
SStreamScanInfo* pInfo = scanner;
|
||||||
return (void*)pInfo->tqReader;
|
return (void*)pInfo->tqReader;
|
||||||
|
|
|
@ -1202,15 +1202,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
taosArrayDestroy(pBlock->pDataBlock);
|
taosArrayDestroy(pBlock->pDataBlock);
|
||||||
|
|
||||||
ASSERT(pInfo->pRes->pDataBlock != NULL);
|
ASSERT(pInfo->pRes->pDataBlock != NULL);
|
||||||
#if 0
|
|
||||||
if (pInfo->pRes->pDataBlock == NULL) {
|
|
||||||
// TODO add log
|
|
||||||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
|
||||||
pOperator->status = OP_EXEC_DONE;
|
|
||||||
pTaskInfo->code = terrno;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pInfo->numOfPseudoExpr > 0) {
|
if (pInfo->numOfPseudoExpr > 0) {
|
||||||
|
@ -1231,11 +1222,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
/*pTaskInfo->code = pOperator->fpSet._openFn(pOperator);*/
|
|
||||||
/*if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {*/
|
|
||||||
/*return NULL;*/
|
|
||||||
/*}*/
|
|
||||||
|
|
||||||
qDebug("stream scan called");
|
qDebug("stream scan called");
|
||||||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -1425,15 +1411,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
qDebug("scan rows: %d", pBlockInfo->rows);
|
qDebug("scan rows: %d", pBlockInfo->rows);
|
||||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||||
|
|
||||||
#if 0
|
|
||||||
} else if (pInfo->blockType == STREAM_INPUT__TABLE_SCAN) {
|
|
||||||
ASSERT(0);
|
|
||||||
// check reader last status
|
|
||||||
// if not match, reset status
|
|
||||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
|
||||||
return pResult && pResult->info.rows > 0 ? pResult : NULL;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -13,4 +13,4 @@ target_link_libraries(qworker
|
||||||
|
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
ADD_SUBDIRECTORY(test)
|
ADD_SUBDIRECTORY(test)
|
||||||
endif(${BUILD_TEST})
|
endif(${BUILD_TEST})
|
||||||
|
|
|
@ -42,6 +42,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
|
|
||||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
||||||
|
|
||||||
|
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem);
|
||||||
|
void streamFreeQitem(SStreamQueueItem* data);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -97,3 +97,29 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) {
|
||||||
taosMemoryFree(pDataSubmit->dataRef);
|
taosMemoryFree(pDataSubmit->dataRef);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) {
|
||||||
|
ASSERT(elem);
|
||||||
|
if (dst->type == elem->type && dst->type == STREAM_INPUT__DATA_BLOCK) {
|
||||||
|
SStreamDataBlock* pBlock = (SStreamDataBlock*)dst;
|
||||||
|
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem;
|
||||||
|
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamFreeQitem(SStreamQueueItem* data) {
|
||||||
|
int8_t type = data->type;
|
||||||
|
if (type == STREAM_INPUT__TRIGGER) {
|
||||||
|
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
|
||||||
|
taosFreeQitem(data);
|
||||||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
|
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
|
||||||
|
taosFreeQitem(data);
|
||||||
|
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
|
streamDataSubmitRefDec((SStreamDataSubmit*)data);
|
||||||
|
taosFreeQitem(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -251,8 +251,8 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data,
|
||||||
ASSERT(vgId > 0 || vgId == SNODE_HANDLE);
|
ASSERT(vgId > 0 || vgId == SNODE_HANDLE);
|
||||||
req.taskId = downstreamTaskId;
|
req.taskId = downstreamTaskId;
|
||||||
|
|
||||||
qInfo("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
|
qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId,
|
||||||
downstreamTaskId, vgId);
|
downstreamTaskId, vgId);
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
|
@ -298,6 +298,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
|
|
||||||
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
|
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
|
qDebug("stream stop dispatching since no output: task %d", pTask->taskId);
|
||||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,10 +75,35 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
|
|
||||||
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
while (1) {
|
while (1) {
|
||||||
void* data = streamQueueNextItem(pTask->inputQueue);
|
int32_t cnt = 0;
|
||||||
|
void* data = NULL;
|
||||||
|
while (1) {
|
||||||
|
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||||
|
if (qItem == NULL) {
|
||||||
|
qDebug("stream exec over, queue empty");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (data == NULL) {
|
||||||
|
data = qItem;
|
||||||
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
if (streamAppendQueueItem(data, qItem) < 0) {
|
||||||
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
cnt++;
|
||||||
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
|
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
|
||||||
|
taosFreeQitem(qItem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
if (data == NULL) break;
|
if (data == NULL) break;
|
||||||
|
|
||||||
|
qDebug("stream task %d exec begin, batch msg: %d", pTask->taskId, cnt);
|
||||||
streamTaskExecImpl(pTask, data, pRes);
|
streamTaskExecImpl(pTask, data, pRes);
|
||||||
|
qDebug("stream task %d exec end", pTask->taskId);
|
||||||
|
|
||||||
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
||||||
|
@ -95,27 +120,16 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
qRes->blocks = pRes;
|
qRes->blocks = pRes;
|
||||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
/*streamQueueProcessFail(pTask->inputQueue);*/
|
||||||
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
||||||
taosFreeQitem(qRes);
|
taosFreeQitem(qRes);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
/*streamQueueProcessSuccess(pTask->inputQueue);*/
|
||||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t type = ((SStreamQueueItem*)data)->type;
|
streamFreeQitem(data);
|
||||||
if (type == STREAM_INPUT__TRIGGER) {
|
|
||||||
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
|
|
||||||
taosFreeQitem(data);
|
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
|
|
||||||
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
|
|
||||||
taosFreeQitem(data);
|
|
||||||
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
|
||||||
ASSERT(pTask->isDataScan);
|
|
||||||
streamDataSubmitRefDec((SStreamDataSubmit*)data);
|
|
||||||
taosFreeQitem(data);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return pRes;
|
return pRes;
|
||||||
}
|
}
|
||||||
|
@ -129,6 +143,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
|
atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING);
|
||||||
if (execStatus == TASK_EXEC_STATUS__IDLE) {
|
if (execStatus == TASK_EXEC_STATUS__IDLE) {
|
||||||
// first run
|
// first run
|
||||||
|
qDebug("stream exec, enter exec status");
|
||||||
pRes = streamExecForQall(pTask, pRes);
|
pRes = streamExecForQall(pTask, pRes);
|
||||||
if (pRes == NULL) goto FAIL;
|
if (pRes == NULL) goto FAIL;
|
||||||
|
|
||||||
|
@ -136,11 +151,13 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
|
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
|
||||||
|
|
||||||
// second run, make sure inputQ and qall are cleared
|
// second run, make sure inputQ and qall are cleared
|
||||||
|
qDebug("stream exec, enter closing status");
|
||||||
pRes = streamExecForQall(pTask, pRes);
|
pRes = streamExecForQall(pTask, pRes);
|
||||||
if (pRes == NULL) goto FAIL;
|
if (pRes == NULL) goto FAIL;
|
||||||
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
||||||
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
|
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
|
||||||
|
qDebug("stream exec, return result");
|
||||||
return 0;
|
return 0;
|
||||||
} else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
|
} else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
|
||||||
continue;
|
continue;
|
||||||
|
|
Loading…
Reference in New Issue