From f5b52749c288eff98799dd0f8db5dd677a84f688 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 9 Apr 2023 01:39:09 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/executor/executor.h | 16 +- include/libs/qcom/query.h | 1 + include/libs/stream/tstream.h | 44 +-- include/libs/wal/wal.h | 1 + source/client/inc/clientInt.h | 36 +-- source/client/src/clientImpl.c | 4 +- source/client/src/clientTmq.c | 47 ++- source/dnode/mnode/impl/src/mndDef.c | 10 +- source/dnode/mnode/impl/src/mndScheduler.c | 8 +- source/dnode/mnode/impl/src/mndStream.c | 4 +- source/dnode/snode/src/snode.c | 11 +- source/dnode/vnode/inc/vnode.h | 6 +- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/meta/metaCache.c | 11 +- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/tq/tq.c | 284 +++++++++++++----- source/dnode/vnode/src/tq/tqMeta.c | 10 +- source/dnode/vnode/src/tq/tqOffset.c | 25 +- source/dnode/vnode/src/tq/tqPush.c | 3 +- source/dnode/vnode/src/tq/tqRead.c | 85 ++++-- source/dnode/vnode/src/tq/tqScan.c | 18 +- source/dnode/vnode/src/tq/tqSink.c | 12 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 - source/libs/executor/src/executor.c | 54 ++-- source/libs/executor/src/executorimpl.c | 6 +- source/libs/executor/src/scanoperator.c | 11 +- source/libs/executor/src/timewindowoperator.c | 7 +- source/libs/stream/inc/streamInc.h | 2 +- source/libs/stream/src/stream.c | 43 ++- source/libs/stream/src/streamData.c | 25 +- source/libs/stream/src/streamDispatch.c | 26 +- source/libs/stream/src/streamExec.c | 120 ++++---- source/libs/stream/src/streamMeta.c | 19 +- source/libs/stream/src/streamRecover.c | 36 +-- source/libs/stream/src/streamState.c | 2 +- source/libs/stream/src/streamTask.c | 51 +++- 36 files changed, 625 insertions(+), 419 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 33172a4f86..ee8ee1050d 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -26,6 +26,7 @@ extern "C" { typedef void* qTaskInfo_t; typedef void* DataSinkHandle; + struct SRpcMsg; struct SSubplan; @@ -118,7 +119,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, * @param isAdd * @return */ -int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd); +int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd, SArray* pList); /** * Create the exec task object according to task json @@ -162,6 +163,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); * @return */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); + int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode); bool qTaskIsExecuting(qTaskInfo_t qinfo); @@ -181,21 +183,11 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len); STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key); -/** - * return the scan info, in the form of tuple of two items, including table uid and current timestamp - * @param tinfo - * @param uid - * @param ts - * @return - */ -int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts); -int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts); +SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); -// int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t ver); -// int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit); void qStreamSetOpen(qTaskInfo_t tinfo); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index b6ada5a0c7..cfc6ef2025 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -194,6 +194,7 @@ typedef struct SRequestConnInfo { typedef void (*__freeFunc)(void* param); +// todo add creator/destroyer function typedef struct SMsgSendInfo { __async_send_cb_fn_t fp; // async callback function STargetInfo target; // for update epset diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5b1d1fa1bc..d9b82c8c59 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -225,15 +225,15 @@ static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { void* streamQueueNextItem(SStreamQueue* queue); -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit); +SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); typedef struct { - char* qmsg; - // followings are not applicable to encoder and decoder - void* executor; + char* qmsg; + void* pExecutor; // not applicable to encoder and decoder + struct STqReader* pTqReader; // not applicable to encoder and decoder } STaskExec; typedef struct { @@ -280,16 +280,20 @@ typedef struct { SEpSet epSet; } SStreamChildEpInfo; -struct SStreamTask { - int64_t streamId; - int32_t taskId; - int32_t totalLevel; - int8_t taskLevel; - int8_t outputType; - int16_t dispatchMsgType; +typedef struct SStreamId { + int64_t streamId; + int32_t taskId; + const char* idStr; +} SStreamId; - int8_t taskStatus; - int8_t schedStatus; +struct SStreamTask { + SStreamId id; + int32_t totalLevel; + int8_t taskLevel; + int8_t outputType; + int16_t dispatchMsgType; + int8_t taskStatus; + int8_t schedStatus; // node info int32_t selfChildId; @@ -319,11 +323,8 @@ struct SStreamTask { STaskSinkFetch fetchSink; }; - int8_t inputStatus; - int8_t outputStatus; - - // STaosQueue* inputQueue1; - // STaosQall* inputQall; + int8_t inputStatus; + int8_t outputStatus; SStreamQueue* inputQueue; SStreamQueue* outputQueue; @@ -345,8 +346,8 @@ struct SStreamTask { SArray* checkReqIds; // shuffle int32_t refCnt; - int64_t checkpointingId; - int32_t checkpointAlignCnt; + int64_t checkpointingId; + int32_t checkpointAlignCnt; }; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -355,8 +356,9 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); SStreamTask* tNewSStreamTask(int64_t streamId); int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); -void tFreeSStreamTask(SStreamTask* pTask); +void tFreeStreamTask(SStreamTask* pTask); int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem); +bool tInputQueueIsFull(const SStreamTask* pTask); static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index ccbc53fa5d..fdd21c7092 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -138,6 +138,7 @@ typedef struct { int8_t enableRef; } SWalFilterCond; +// todo hide this struct typedef struct { SWal *pWal; int64_t readerId; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 86db35b412..41f87379a9 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -36,14 +36,6 @@ extern "C" { #include "tconfig.h" -#define CHECK_CODE_GOTO(expr, label) \ - do { \ - code = expr; \ - if (TSDB_CODE_SUCCESS != code) { \ - goto label; \ - } \ - } while (0) - #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms @@ -286,28 +278,7 @@ static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { return (SReqResultInfo*)&msg->resInfo; } -static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { - SMqRspObj* pRspObj = (SMqRspObj*)res; - pRspObj->resIter++; - - if (pRspObj->resIter < pRspObj->rsp.blockNum) { - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); - if (pRspObj->rsp.withSchema) { - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter); - setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); - taosMemoryFreeClear(pRspObj->resInfo.row); - taosMemoryFreeClear(pRspObj->resInfo.pCol); - taosMemoryFreeClear(pRspObj->resInfo.length); - taosMemoryFreeClear(pRspObj->resInfo.convertBuf); - taosMemoryFreeClear(pRspObj->resInfo.convertJson); - } - - setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false); - return &pRspObj->resInfo; - } - - return NULL; -} +SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4); static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) { if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo); @@ -320,7 +291,6 @@ extern int32_t clientConnRefPool; extern int32_t timestampDeltaLimit; extern int64_t lastClusterId; - __async_send_cb_fn_t getMsgRspHandle(int32_t msgType); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj); @@ -373,7 +343,6 @@ void taos_close_internal(void* taos); // global, called by mgmt int hbMgrInit(); void hbMgrCleanUp(); -int hbHandleRsp(SClientHbBatchRsp* hbRsp); // cluster level SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); @@ -386,9 +355,6 @@ void stopAllRequests(SHashObj* pRequests); int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); void hbDeregisterConn(SAppHbMgr* pAppHbMgr, SClientHbKey connKey); -// --- mq -void hbMgrInitMqHbRspHandle(); - typedef struct SSqlCallbackWrapper { SParseContext* pParseCtx; SCatalogReq* pCatalogReq; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 9e5d9080b4..8ced67c9b2 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1039,8 +1039,7 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat .sysInfo = pRequest->pTscObj->sysInfo, .allocatorId = pRequest->allocatorRefId}; - SAppInstInfo* pAppInfo = getAppInfo(pRequest); - SQueryPlan* pDag = NULL; + SQueryPlan* pDag = NULL; int64_t st = taosGetTimestampUs(); int32_t code = qCreateQueryPlan(&cxt, &pDag, pMnodeList); @@ -1052,7 +1051,6 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat } pRequest->metric.execStart = taosGetTimestampUs(); - pRequest->metric.planCostUs = pRequest->metric.execStart - st; if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e295ec93af..f90c004eec 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -154,7 +154,7 @@ typedef struct { typedef struct { int8_t tmqRspType; - int32_t epoch; // epoch can be used to guard the vgHandle + int32_t epoch; // epoch can be used to guard the vgHandle int32_t vgId; SMqClientVg* vgHandle; SMqClientTopic* topicHandle; @@ -210,6 +210,11 @@ typedef struct { tmq_t* pTmq; } SMqCommitCbParam; +typedef struct SSyncCommitInfo { + tsem_t sem; + int32_t code; +} SSyncCommitInfo; + static int32_t doAskEp(tmq_t* tmq); static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); @@ -521,11 +526,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN return TSDB_CODE_OUT_OF_MEMORY; } - pMsgSendInfo->msgInfo = (SDataBuf){ - .pData = buf, - .len = sizeof(SMsgHead) + len, - .handle = NULL, - }; + pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL }; pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestObjRefId = 0; @@ -786,11 +787,7 @@ void tmqSendHbReq(void* param, void* tmrId) { goto OVER; } - sendInfo->msgInfo = (SDataBuf){ - .pData = pReq, - .len = tlen, - .handle = NULL, - }; + sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL }; sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; @@ -2115,11 +2112,6 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* } } -typedef struct SSyncCommitInfo { - tsem_t sem; - int32_t code; -} SSyncCommitInfo; - static void commitCallBackFn(tmq_t *pTmq, int32_t code, void* param) { SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param; pInfo->code = code; @@ -2298,3 +2290,26 @@ void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, cons waitingRspNum); } } + +SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { + SMqRspObj* pRspObj = (SMqRspObj*)res; + pRspObj->resIter++; + + if (pRspObj->resIter < pRspObj->rsp.blockNum) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); + if (pRspObj->rsp.withSchema) { + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter); + setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); + taosMemoryFreeClear(pRspObj->resInfo.row); + taosMemoryFreeClear(pRspObj->resInfo.pCol); + taosMemoryFreeClear(pRspObj->resInfo.length); + taosMemoryFreeClear(pRspObj->resInfo.convertBuf); + taosMemoryFreeClear(pRspObj->resInfo.convertJson); + } + + setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false); + return &pRspObj->resInfo; + } + + return NULL; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index fb81a764f1..e221a64619 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -158,7 +158,10 @@ void tFreeStreamObj(SStreamObj *pStream) { taosMemoryFree(pStream->sql); taosMemoryFree(pStream->ast); taosMemoryFree(pStream->physicalPlan); - if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema); + + if (pStream->outputSchema.nCols) { + taosMemoryFree(pStream->outputSchema.pSchema); + } int32_t sz = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < sz; i++) { @@ -166,11 +169,14 @@ void tFreeStreamObj(SStreamObj *pStream) { int32_t taskSz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < taskSz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - tFreeSStreamTask(pTask); + tFreeStreamTask(pTask); } + taosArrayDestroy(pLevel); } + taosArrayDestroy(pStream->tasks); + // tagSchema.pSchema if (pStream->tagSchema.nCols > 0) { taosMemoryFree(pStream->tagSchema.pSchema); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index d1671aa12a..504749df49 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -138,7 +138,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream for (int32_t j = 0; j < sinkLvSize; j++) { SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j); if (pLastLevelTask->nodeId == pVgInfo->vgId) { - pVgInfo->taskId = pLastLevelTask->taskId; + pVgInfo->taskId = pLastLevelTask->id.taskId; break; } } @@ -149,7 +149,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream SArray* pArray = taosArrayGetP(pStream->tasks, 0); // one sink only SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0); - pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; + pTask->fixedEpDispatcher.taskId = lastLevelTask->id.taskId; pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } @@ -440,7 +440,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; - pTask->fixedEpDispatcher.taskId = pInnerTask->taskId; + pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId; pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId; pTask->fixedEpDispatcher.epSet = pInnerTask->epSet; @@ -460,7 +460,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { pEpInfo->childId = pTask->selfChildId; pEpInfo->epSet = pTask->epSet; pEpInfo->nodeId = pTask->nodeId; - pEpInfo->taskId = pTask->taskId; + pEpInfo->taskId = pTask->id.taskId; taosArrayPush(pInnerTask->childEpInfo, &pEpInfo); sdbRelease(pSdb, pVgroup); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b7f80f6b0e..ab83f29ef9 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -600,7 +600,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { return -1; } pReq->head.vgId = htonl(pTask->nodeId); - pReq->taskId = pTask->taskId; + pReq->taskId = pTask->id.taskId; STransAction action = {0}; memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); action.pCont = pReq; @@ -1208,7 +1208,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // task id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->taskId, false); + colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->id.taskId, false); // node type char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3d1b356f8c..447c90eb58 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -83,14 +83,11 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { return -1; } - SReadHandle mgHandle = { - .vnode = NULL, - .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), - .pStateBackend = pTask->pState, - }; + int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo); + SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState }; - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0); - ASSERT(pTask->exec.executor); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0); + ASSERT(pTask->exec.pExecutor); streamSetupTrigger(pTask); return 0; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a9e5fe628b..d62eebd2e1 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -256,15 +256,15 @@ void tqCloseReader(STqReader *); void tqReaderSetColIdList(STqReader *pReader, SArray *pColIdList); int32_t tqReaderSetTbUidList(STqReader *pReader, const SArray *tbUidList); -int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *tbUidList); +int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); void tqNextBlock(STqReader *pReader, SFetchRet *ret); -int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); +int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); -bool tqNextDataBlock2(STqReader *pReader); +bool tqNextDataBlock(STqReader *pReader); bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids); int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 9037644602..884c01d397 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -80,7 +80,7 @@ typedef struct { typedef struct { int8_t subType; - STqReader* pExecReader; + STqReader* pTqReader; qTaskInfo_t task; union { STqExecCol execCol; diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c index 9501bf4b8e..795f281ab2 100644 --- a/source/dnode/vnode/src/meta/metaCache.c +++ b/source/dnode/vnode/src/meta/metaCache.c @@ -531,10 +531,11 @@ static void freePayload(const void* key, size_t keyLen, void* value) { return; } - SHashObj* pHashObj = (SHashObj*)p[0]; + SHashObj* pHashObj = (SHashObj*)p[0]; + STagFilterResEntry** pEntry = taosHashGet(pHashObj, &p[1], sizeof(uint64_t)); - { + if (pEntry != NULL && (*pEntry) != NULL) { int64_t st = taosGetTimestampUs(); SListIter iter = {0}; @@ -547,9 +548,9 @@ static void freePayload(const void* key, size_t keyLen, void* value) { void* tmp = tdListPopNode(&((*pEntry)->list), pNode); taosMemoryFree(tmp); - int64_t et = taosGetTimestampUs(); - metaInfo("clear items in cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)), - (et - st) / 1000.0); + double el = (taosGetTimestampUs() - st) / 1000.0; + metaInfo("clear items in meta-cache, remain cached item:%d, elapsed time:%.2fms", listNEles(&((*pEntry)->list)), + el); break; } } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index c75c675ec3..8aeb705d90 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -168,7 +168,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pRSmaInfo->taskInfo[i]) { - if ((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) { + if ((terrno = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd, NULL)) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, terrstr()); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 66e6ac4cd8..44d5e26603 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,6 +15,8 @@ #include "tq.h" +#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) + int32_t tqInit() { int8_t old; while (1) { @@ -57,12 +59,12 @@ static void destroyTqHandle(void* data) { if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { taosMemoryFreeClear(pData->execHandle.execCol.qmsg); } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) { - tqCloseReader(pData->execHandle.pExecReader); + tqCloseReader(pData->execHandle.pTqReader); walCloseReader(pData->pWalReader); taosHashCleanup(pData->execHandle.execDb.pFilterOutTbUid); } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { walCloseReader(pData->pWalReader); - tqCloseReader(pData->execHandle.pExecReader); + tqCloseReader(pData->execHandle.pTqReader); } } @@ -78,12 +80,33 @@ static void tqPushEntryFree(void* data) { taosMemoryFree(p); } +static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { + return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && + pLeft->val.version <= pRight->val.version; +} + +// stream_task:stream_id:task_id +static void createStreamTaskOffsetKey(char* dst, uint64_t streamId, uint32_t taskId) { + int32_t n = 12; + char* p = dst; + + memcpy(p, "stream_task:", n); + p += n; + + int32_t inc = tintToHex(streamId, p); + p += inc; + + *(p++) = ':'; + tintToHex(taskId, p); +} + STQ* tqOpen(const char* path, SVnode* pVnode) { STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pTq->path = taosStrdup(path); pTq->pVnode = pVnode; pTq->walLogLastVer = pVnode->pWal->vers.lastVer; @@ -249,11 +272,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } -static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { - return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && - pLeft->val.version <= pRight->val.version; -} - int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { STqOffset offset = {0}; int32_t vgId = TD_VID(pTq->pVnode); @@ -432,8 +450,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } -#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) - static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { uint64_t consumerId = pRequest->consumerId; @@ -805,10 +821,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.task, &scanner); - pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); + pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL); - pHandle->execHandle.pExecReader = tqOpenReader(pVnode); + pHandle->execHandle.pTqReader = tqOpenReader(pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); @@ -827,8 +843,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); } - pHandle->execHandle.pExecReader = tqOpenReader(pVnode); - tqReaderSetTbUidList(pHandle->execHandle.pExecReader, tbUidList); + pHandle->execHandle.pTqReader = tqOpenReader(pVnode); + tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta, @@ -886,16 +902,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { -#if 0 - if (pTask->taskLevel == TASK_LEVEL__AGG) { - A(taosArrayGetSize(pTask->childEpInfo) != 0); - } -#endif + // todo extract method + char buf[128] = {0}; + sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId); int32_t vgId = TD_VID(pTq->pVnode); + pTask->id.idStr = taosStrdup(buf); pTask->refCnt = 1; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->inputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen(); @@ -920,14 +934,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } SReadHandle handle = { - .meta = pTq->pVnode->pMeta, - .vnode = pTq->pVnode, - .initTqReader = 1, - .pStateBackend = pTask->pState, - }; + .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState}; - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); - if (pTask->exec.executor == NULL) { + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); + if (pTask->exec.pExecutor == NULL) { return -1; } @@ -936,14 +946,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->pState == NULL) { return -1; } - SReadHandle mgHandle = { - .vnode = NULL, - .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), - .pStateBackend = pTask->pState, - }; - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId); - if (pTask->exec.executor == NULL) { + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo); + SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState}; + + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId); + if (pTask->exec.pExecutor == NULL) { return -1; } } @@ -964,15 +972,26 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { ver1 = info.skmVer; } - pTask->tbSink.pTSchema = - tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1); + SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper; + pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1); if(pTask->tbSink.pTSchema == NULL) { return -1; } } + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + pTask->exec.pTqReader = tqOpenReader(pTq->pVnode); + if (pTask->exec.pTqReader == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + SArray* pList = qGetQueriedTableListInfo(pTask->exec.pExecutor); + tqReaderAddTbUidList(pTask->exec.pTqReader, pList); + } + streamSetupTrigger(pTask); - tqInfo("expand stream task on vg %d, task id %d, child id %d, level %d", vgId, pTask->taskId, pTask->selfChildId, pTask->taskLevel); + tqInfo("vgId:%d expand stream task, s-task:%s, child id %d, level %d", vgId, pTask->id.idStr, pTask->selfChildId, pTask->taskLevel); return 0; } @@ -995,6 +1014,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { .upstreamNodeId = req.upstreamNodeId, .upstreamTaskId = req.upstreamTaskId, }; + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask && atomic_load_8(&pTask->taskStatus) == TASK_STATUS__NORMAL) { rsp.status = 1; @@ -1085,6 +1105,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosMemoryFree(pTask); return -1; } + tDecoderClear(&decoder); // 2.save task @@ -1298,7 +1319,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; - qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); + qDebug("delete req enqueue stream task: %d, ver: %" PRId64, pTask->id.taskId, ver); if (!failed) { SStreamRefDataBlock* pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); @@ -1308,7 +1329,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { atomic_add_fetch_32(pRefBlock->dataRef, 1); if (tAppendDataForStream(pTask, (SStreamQueueItem*)pRefBlock) < 0) { - qError("stream task input del failed, task id %d", pTask->taskId); + qError("stream task input del failed, task id %d", pTask->id.taskId); atomic_sub_fetch_32(pRef, 1); taosFreeQitem(pRefBlock); @@ -1316,7 +1337,7 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { } if (streamSchedExec(pTask) < 0) { - qError("stream task launch failed, task id %d", pTask->taskId); + qError("stream task launch failed, task id %d", pTask->id.taskId); continue; } @@ -1343,12 +1364,12 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { if (!failed) { if (tAppendDataForStream(pTask, (SStreamQueueItem*)pStreamBlock) < 0) { - qError("stream task input del failed, task id %d", pTask->taskId); + qError("stream task input del failed, task id %d", pTask->id.taskId); continue; } if (streamSchedExec(pTask) < 0) { - qError("stream task launch failed, task id %d", pTask->taskId); + qError("stream task launch failed, task id %d", pTask->id.taskId); continue; } } else { @@ -1361,15 +1382,82 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { return 0; } +static int32_t doAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { + int32_t code = tAppendDataForStream(pTask, pQueueItem); + if (code < 0) { + tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); + return -1; + } + + if (streamSchedExec(pTask) < 0) { + tqError("stream task:%d failed to be launched, code:%s", pTask->id.taskId, tstrerror(terrno)); + return -1; + } + + return TSDB_CODE_SUCCESS; +} + +static void doSaveTaskOffset(STqOffsetStore* pOffsetStore, const char* pKey, int64_t ver) { + STqOffset offset = {0}; + tqOffsetResetToLog(&offset.val, ver); + + tstrncpy(offset.subKey, pKey, tListLen(offset.subKey)); + + // keep the offset info in the offset store + tqOffsetWrite(pOffsetStore, &offset); +} + +static int32_t addSubmitBlockNLaunchTask(STqOffsetStore* pOffsetStore, SStreamTask* pTask, SStreamDataSubmit2* pSubmit, + const char* key, int64_t ver) { + doSaveTaskOffset(pOffsetStore, key, ver); + int32_t code = doAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pSubmit, ver); + + // remove the offset, if all functions are completed successfully. + if (code == TSDB_CODE_SUCCESS) { + tqOffsetDelete(pOffsetStore, key); + } + return TSDB_CODE_SUCCESS; +} + +static void saveOffsetForAllTasks(STQ* pTq, SPackedData submit) { + void* pIter = NULL; + + while(1) { + pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); + if (pIter == NULL) { + break; + } + + SStreamTask* pTask = *(SStreamTask**)pIter; + if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + continue; + } + + if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { + tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, + pTask->taskStatus); + continue; + } + + char key[128] = {0}; + createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); + + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); + if (pOffset == NULL) { + doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver); + } + } +} + int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { void* pIter = NULL; - bool succ = true; - SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit); + SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT); if (pSubmit == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to create data submit for stream since out of memory"); - succ = false; + saveOffsetForAllTasks(pTq, submit); + return -1; } while (1) { @@ -1384,35 +1472,85 @@ int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { } if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->taskId, pTask->taskStatus); + tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, + pTask->taskStatus); continue; } - tqDebug("data submit enqueue stream task:%d, ver: %" PRId64, pTask->taskId, submit.ver); - if (succ) { - int32_t code = tAppendDataForStream(pTask, (SStreamQueueItem*)pSubmit); - if (code < 0) { - // let's handle the back pressure + // check if offset value exists + char key[128] = {0}; + createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - tqError("stream task:%d failed to put into queue for, too many", pTask->taskId); - continue; + if (tInputQueueIsFull(pTask)) { + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); + + int64_t ver = submit.ver; + if (pOffset == NULL) { + doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver); + } else { + ver = pOffset->val.version; } - if (streamSchedExec(pTask) < 0) { - tqError("stream task:%d launch failed, code:%s", pTask->taskId, tstrerror(terrno)); - continue; + tqDebug("s-task:%s input queue is full, do nothing, start ver:%" PRId64, pTask->id.idStr, ver); + continue; + } + + // check if offset value exists + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); + if (pOffset != NULL) { + // seek the stored version and extract data from WAL + int32_t code = tqSeekVer(pTask->exec.pTqReader, pOffset->val.version, ""); + + // all data has been retrieved from WAL, let's try submit block directly. + if (code == TSDB_CODE_SUCCESS) { // all data retrieved, abort + // append the data for the stream + SFetchRet ret = {0}; + terrno = 0; + + tqNextBlock(pTask->exec.pTqReader, &ret); + if (ret.fetchType == FETCH_TYPE__DATA) { + SStreamDataBlock* pBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); + if (pBlocks == NULL) { // failed, do nothing + terrno = TSDB_CODE_OUT_OF_MEMORY; + continue; + } + + ret.data.info.type = STREAM_NORMAL; + pBlocks->type = STREAM_INPUT__DATA_BLOCK; + pBlocks->sourceVer = pOffset->val.version; + pBlocks->blocks = taosArrayInit(0, sizeof(SSDataBlock)); + taosArrayPush(pBlocks->blocks, &ret.data); + + int64_t* ts = (int64_t*)(((SColumnInfoData*)ret.data.pDataBlock->pData)->pData); +// tqDebug("-----------%ld\n", ts[0]); + + code = doAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pBlocks, pBlocks->sourceVer); + if (code == TSDB_CODE_SUCCESS) { + pOffset->val.version = pTask->exec.pTqReader->pWalReader->curVersion; + tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, + pOffset->val.version); + } + + } else { // FETCH_TYPE__NONE, let's try submit block directly + tqDebug("s-task:%s data in WAL are all consumed, try data in submit message", pTask->id.idStr); + addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); + } + + // do nothing if failed, since the offset value is kept already + } else { // failed to seek to the WAL version + // todo handle the case where offset has been deleted in WAL, due to stream computing too slow + tqDebug("s-task:%s data in WAL are all consumed, try data in submit msg", pTask->id.idStr); + addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); } } else { - streamTaskInputFail(pTask); + addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); } } - if (pSubmit != NULL) { - streamDataSubmitDestroy(pSubmit); - taosFreeQitem(pSubmit); - } + streamDataSubmitDestroy(pSubmit); + taosFreeQitem(pSubmit); - return succ ? 0 : -1; + return 0; } int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { @@ -1420,6 +1558,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = pReq->taskId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { + tqDebug("stream task:%d start to process run req", pTask->id.taskId); streamProcessRunReq(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; @@ -1456,7 +1595,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = ntohl(pRsp->upstreamTaskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - tqDebug("recv dispatch rsp, code: %x", pMsg->code); + tqDebug("recv dispatch rsp, code:%x", pMsg->code); if (pTask) { streamProcessDispatchRsp(pTask, pRsp, pMsg->code); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1484,10 +1623,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { int32_t taskId = req.dstTaskId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; + SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; streamProcessRetrieveReq(pTask, &req, &rsp); streamMetaReleaseTask(pTq->pStreamMeta, pTask); tDeleteStreamRetrieveReq(&req); @@ -1523,10 +1659,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; + SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; streamProcessDispatchReq(pTask, &req, &rsp, false); streamMetaReleaseTask(pTq->pStreamMeta, pTask); rpcFreeCont(pMsg->pCont); @@ -1543,10 +1676,7 @@ FAIL: SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); if (pRspHead == NULL) { - SRpcMsg rsp = { - .code = TSDB_CODE_OUT_OF_MEMORY, - .info = pMsg->info, - }; + SRpcMsg rsp = { .code = TSDB_CODE_OUT_OF_MEMORY, .info = pMsg->info }; tqDebug("send dispatch error rsp, code: %x", code); tmsgSendRsp(&rsp); rpcFreeCont(pMsg->pCont); @@ -1564,11 +1694,7 @@ FAIL: pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL; SRpcMsg rsp = { - .code = code, - .info = pMsg->info, - .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), - .pCont = pRspHead, - }; + .code = code, .info = pMsg->info, .contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp), .pCont = pRspHead}; tqDebug("send dispatch error rsp, code: %x", code); tmsgSendRsp(&rsp); rpcFreeCont(pMsg->pCont); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index a273f2edec..8f8ee78e97 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -320,15 +320,15 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { code = -1; goto end; } - handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); - if (handle.execHandle.pExecReader == NULL) { + handle.execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner); + if (handle.execHandle.pTqReader == NULL) { tqError("cannot extract exec reader for %s", handle.subKey); code = -1; goto end; } } else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) { handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); + handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode); buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta, (SSnapContext**)(&reader.sContext)); @@ -343,8 +343,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid); } - handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode); - tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList); + handle.execHandle.pTqReader = tqOpenReader(pTq->pVnode); + tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList); taosArrayDestroy(tbUidList); buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType, diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 66d1ac2c7e..e8051a1406 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -128,31 +128,35 @@ int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) { } int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { - if (!pStore->needCommit) return 0; + if (!pStore->needCommit) { + return 0; + } + // TODO file name should be with a newer version char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - - int32_t err = terrno; - const char* errStr = tstrerror(err); - int32_t sysErr = errno; - const char* sysErrStr = strerror(errno); - tqError("vgId:%d, cannot open file %s when commit offset since %s", pStore->pTq->pVnode->config.vgId, fname, - sysErrStr); + const char* err = strerror(errno); + tqError("vgId:%d, failed to open offset file %s, since %s", TD_VID(pStore->pTq->pVnode), fname, err); taosMemoryFree(fname); return -1; } + taosMemoryFree(fname); + void* pIter = NULL; while (1) { pIter = taosHashIterate(pStore->pHash, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + STqOffset* pOffset = (STqOffset*)pIter; int32_t bodyLen; int32_t code; tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code); + if (code < 0) { taosHashCancelIterate(pStore->pHash, pIter); return -1; @@ -166,6 +170,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { SEncoder encoder; tEncoderInit(&encoder, abuf, bodyLen); tEncodeSTqOffset(&encoder, pOffset); + // write file int64_t writeLen; if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) { @@ -174,8 +179,10 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore) { taosMemoryFree(buf); return -1; } + taosMemoryFree(buf); } + // close and rename file taosCloseFile(&pFile); pStore->needCommit = 0; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 9dfcf43e4d..dd003aec98 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -331,6 +331,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v if (msgType == TDMT_VND_SUBMIT) { void* data = taosMemoryMalloc(len); if (data == NULL) { + // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId); return -1; @@ -339,7 +340,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v memcpy(data, pReq, len); SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver}; - tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq); + tqDebug("tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", data, len, ver, pReq); tqProcessSubmitReq(pTq, submit); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 54e4e393ec..24546168e5 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -113,7 +113,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { } SMetaReader mr = {0}; - metaReaderInit(&mr, pHandle->execHandle.pExecReader->pVnodeMeta, 0); + metaReaderInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0); if (metaGetTableEntryByName(&mr, req.tbName) < 0) { metaReaderClear(&mr); @@ -262,8 +262,6 @@ STqReader* tqOpenReader(SVnode* pVnode) { } pReader->pVnodeMeta = pVnode->pMeta; - /*pReader->pMsg = NULL;*/ -// pReader->ver = -1; pReader->pColIdList = NULL; pReader->cachedSchemaVer = 0; pReader->cachedSchemaSuid = 0; @@ -296,10 +294,10 @@ void tqCloseReader(STqReader* pReader) { int32_t tqSeekVer(STqReader* pReader, int64_t ver, const char* id) { if (walReadSeekVer(pReader->pWalReader, ver) < 0) { - tqDebug("tmq poll: wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id); + tqDebug("wal reader failed to seek to ver:%"PRId64" code:%s, %s", ver, tstrerror(terrno), id); return -1; } - tqDebug("tmq poll: wal reader seek to ver:%"PRId64" %s", ver, id); + tqDebug("wal reader seek to ver:%"PRId64" %s", ver, id); return 0; } @@ -310,26 +308,28 @@ void tqNextBlock(STqReader* pReader, SFetchRet* ret) { ret->fetchType = FETCH_TYPE__NONE; return; } - void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); + + void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); int64_t ver = pReader->pWalReader->pHead->head.version; - tqReaderSetSubmitReq2(pReader, body, bodyLen, ver); + tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); } - while (tqNextDataBlock2(pReader)) { + while (tqNextDataBlock(pReader)) { memset(&ret->data, 0, sizeof(SSDataBlock)); int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL); if (code != 0 || ret->data.info.rows == 0) { continue; } + ret->fetchType = FETCH_TYPE__DATA; return; } } } -int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { +int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { pReader->msg2.msgStr = msgStr; pReader->msg2.msgLen = msgLen; pReader->msg2.ver = ver; @@ -346,7 +346,7 @@ int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, return 0; } -bool tqNextDataBlock2(STqReader* pReader) { +bool tqNextDataBlock(STqReader* pReader) { if (pReader->msg2.msgStr == NULL) { return false; } @@ -355,13 +355,20 @@ bool tqNextDataBlock2(STqReader* pReader) { while (pReader->nextBlk < blockSz) { tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen, pReader->msg2.ver, pReader->nextBlk); + SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - if (pReader->tbIdHash == NULL) return true; + if (pReader->tbIdHash == NULL) { + return true; + } void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); if (ret != NULL) { + tqDebug("tq reader block found, ver:%"PRId64", uid:%"PRId64, pReader->msg2.ver, pSubmitTbData->uid); return true; + } else { + tqDebug("tq reader discard block, uid:%"PRId64", continue", pSubmitTbData->uid); } + pReader->nextBlk++; } @@ -901,7 +908,7 @@ int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { return 0; } -int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) { +int tqReaderAddTbUidList(STqReader* pReader, const SArray* pTableUidList) { if (pReader->tbIdHash == NULL) { pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (pReader->tbIdHash == NULL) { @@ -910,8 +917,9 @@ int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) { } } - for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { - int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); + int32_t numOfTables = taosArrayGetSize(pTableUidList); + for (int i = 0; i < numOfTables; i++) { + int64_t* pKey = (int64_t*)taosArrayGet(pTableUidList, i); taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0); } @@ -927,30 +935,34 @@ int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) { return 0; } +// todo update the table list in wal reader int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { - void* pIter = NULL; + void* pIter = NULL; + int32_t vgId = TD_VID(pTq->pVnode); + + // update the table list for each consumer handle while (1) { pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) { break; } - STqHandle* pExec = (STqHandle*)pIter; - if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - int32_t code = qUpdateQualifiedTableId(pExec->execHandle.task, tbUidList, isAdd); + STqHandle* pTqHandle = (STqHandle*)pIter; + if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd, NULL); if (code != 0) { - tqError("update qualified table error for %s", pExec->subKey); + tqError("update qualified table error for %s", pTqHandle->subKey); continue; } - } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__DB) { + } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { if (!isAdd) { int32_t sz = taosArrayGetSize(tbUidList); for (int32_t i = 0; i < sz; i++) { int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i); - taosHashPut(pExec->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); + taosHashPut(pTqHandle->execHandle.execDb.pFilterOutTbUid, &tbUid, sizeof(int64_t), NULL, 0); } } - } else if (pExec->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + } else if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { if (isAdd) { SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); SMetaReader mr = {0}; @@ -965,35 +977,50 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } tDecoderClear(&mr.coder); - - if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pExec->execHandle.execTb.suid) { + if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pTqHandle->execHandle.execTb.suid) { tqDebug("table uid %" PRId64 " does not add to tq handle", *id); continue; } + tqDebug("table uid %" PRId64 " add to tq handle", *id); taosArrayPush(qa, id); } + metaReaderClear(&mr); if (taosArrayGetSize(qa) > 0) { - tqReaderAddTbUidList(pExec->execHandle.pExecReader, qa); + tqReaderAddTbUidList(pTqHandle->execHandle.pTqReader, qa); } + taosArrayDestroy(qa); } else { - tqReaderRemoveTbUidList(pExec->execHandle.pExecReader, tbUidList); + tqReaderRemoveTbUidList(pTqHandle->execHandle.pTqReader, tbUidList); } } } + + // update the table list handle for each stream scanner/wal reader while (1) { pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd); + SArray* pList = NULL; + int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd, pList); if (code != 0) { - tqError("update qualified table error for stream task %d", pTask->taskId); + tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr); continue; } + + if (isAdd) { // only add qualified tables + tqReaderAddTbUidList(pTask->exec.pTqReader, pList); + } else { + tqReaderRemoveTbUidList(pTask->exec.pTqReader, tbUidList); + } } } + return 0; } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index b4e50312fd..5633dc37a3 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -38,7 +38,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t } static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) { - SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper); + SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pTqReader->pSchemaWrapper); if (pSW == NULL) { return -1; } @@ -135,7 +135,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta if (pDataBlock != NULL && pDataBlock->info.rows > 0) { if (pRsp->withTbName) { if (pOffset->type == TMQ_OFFSET__LOG) { - int64_t uid = pExec->pExecReader->lastBlkUid; + int64_t uid = pExec->pTqReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) { continue; } @@ -200,9 +200,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR SArray* pSchemas = taosArrayInit(0, sizeof(void*)); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { - STqReader* pReader = pExec->pExecReader; - tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); - while (tqNextDataBlock2(pReader)) { + STqReader* pReader = pExec->pTqReader; + tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); + while (tqNextDataBlock(pReader)) { taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; @@ -210,7 +210,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { - int64_t uid = pExec->pExecReader->lastBlkUid; + int64_t uid = pExec->pTqReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); @@ -259,8 +259,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { - STqReader* pReader = pExec->pExecReader; - tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); + STqReader* pReader = pExec->pTqReader; + tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) { taosArrayClear(pBlocks); taosArrayClear(pSchemas); @@ -269,7 +269,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { - int64_t uid = pExec->pExecReader->lastBlkUid; + int64_t uid = pExec->pTqReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 8a89cb6bd7..22f0387d4e 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -87,7 +87,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d return; } - tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz); + tqDebug("vgId:%d, s-task:%s write into table, block num: %d", TD_VID(pVnode), pTask->id.idStr, blockSz); for (int32_t i = 0; i < blockSz; i++) { bool createTb = true; SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); @@ -382,7 +382,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* int32_t blockSz = taosArrayGetSize(pBlocks); - tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz); + tqDebug("vgId:%d, s-task:%s write results blocks:%d into table", TD_VID(pVnode), pTask->id.idStr, blockSz); void* pBuf = NULL; SArray* tagArray = NULL; @@ -475,11 +475,9 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* } for (int32_t tagId = UD_TAG_COLUMN_INDEX, step = 1; tagId < size; tagId++, step++) { SColumnInfoData* pTagData = taosArrayGet(pDataBlock->pDataBlock, tagId); - STagVal tagVal = { - .cid = pTSchema->numOfCols + step, - .type = pTagData->info.type, - }; - void* pData = colDataGetData(pTagData, rowId); + + STagVal tagVal = {.cid = pTSchema->numOfCols + step, .type = pTagData->info.type}; + void* pData = colDataGetData(pTagData, rowId); if (colDataIsNull_s(pTagData, rowId)) { continue; } else if (IS_VAR_DATA_TYPE(pTagData->info.type)) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 3d2b032156..e353988d4c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -447,13 +447,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp walApplyVer(pVnode->pWal, version); - /*vInfo("vgId:%d, push msg begin", pVnode->config.vgId);*/ if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } - /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ // commit if need if (needCommit) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b4431a7c3b..4389cc7fdf 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -127,7 +127,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; - qDebug("task stream set total blocks:%d %s", (int32_t)numOfBlocks, id); + qDebug("s-task set source blocks:%d %s", (int32_t)numOfBlocks, id); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { @@ -363,27 +363,28 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S return qa; } -int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { +int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd, SArray* pList) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + const char* id = GET_TASKID(pTaskInfo); + int32_t code = 0; if (isAdd) { - qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), pTaskInfo->id.str); + qDebug("add %d tables id into query list, %s", (int32_t)taosArrayGetSize(tableIdList), id); } // traverse to the stream scanner node to add this table id - SOperatorInfo* pInfo = pTaskInfo->pRoot; - while (pInfo->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - pInfo = pInfo->pDownstream[0]; - } - - int32_t code = 0; + SOperatorInfo* pInfo = extractOperatorInTree(pTaskInfo->pRoot, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, id); SStreamScanInfo* pScanInfo = pInfo->info; + if (isAdd) { // add new table id SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); int32_t numOfQualifiedTables = taosArrayGetSize(qa); - qDebug(" %d qualified child tables added into stream scanner", numOfQualifiedTables); + if (pList != NULL) { + taosArrayAddAll(pList, qa); + } + qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id); code = tqReaderAddTbUidList(pScanInfo->tqReader, qa); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(qa); @@ -424,19 +425,6 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } } -#if 0 - bool exists = false; - for (int32_t k = 0; k < taosArrayGetSize(pListInfo->pTableList); ++k) { - STableKeyInfo* pKeyInfo = taosArrayGet(pListInfo->pTableList, k); - if (pKeyInfo->uid == keyInfo.uid) { - qWarn("ignore duplicated query table uid:%" PRIu64 " added, %s", pKeyInfo->uid, pTaskInfo->id.str); - exists = true; - } - } - - if (!exists) { -#endif - tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId); } @@ -447,7 +435,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo taosArrayDestroy(qa); } else { // remove the table id in current list - qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); + qDebug("%d remove child tables from the stream scanner, %s", (int32_t)taosArrayGetSize(tableIdList), id); taosWLockLatch(&pTaskInfo->lock); code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList); taosWUnLockLatch(&pTaskInfo->lock); @@ -1263,3 +1251,21 @@ void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { destroySendMsgInfo(pSendInfo); } +SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = tinfo; + SArray* plist = getTableListInfo(pTaskInfo); + + // only extract table in the first elements + STableListInfo* pTableListInfo = taosArrayGetP(plist, 0); + + SArray* pUidList = taosArrayInit(10, sizeof(uint64_t)); + + int32_t numOfTables = tableListGetSize(pTableListInfo); + for(int32_t i = 0; i < numOfTables; ++i) { + STableKeyInfo* pKeyInfo = tableListGetInfo(pTableListInfo, i); + taosArrayPush(pUidList, &pKeyInfo->uid); + } + + taosArrayDestroy(plist); + return pUidList; +} diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 11753c181c..52b749efee 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2001,7 +2001,11 @@ void qStreamCloseTsdbReader(void* task) { } static void extractTableList(SArray* pList, const SOperatorInfo* pOperator) { - if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + SStreamScanInfo* pScanInfo = pOperator->info; + STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info; + taosArrayPush(pList, &pTableScanInfo->base.pTableListInfo); + } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pScanInfo = pOperator->info; taosArrayPush(pList, &pScanInfo->base.pTableListInfo); } else { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 29990f2d06..21e3c75924 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1635,7 +1635,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pInfo->tqReader->msg2.msgStr == NULL) { SPackedData submit = pTaskInfo->streamInfo.submit; - if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { + if (tqReaderSetSubmitMsg(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { qError("submit msg messed up when initing stream submit block %p", submit.msgStr); return NULL; } @@ -1644,7 +1644,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - while (tqNextDataBlock2(pInfo->tqReader)) { + while (tqNextDataBlock(pInfo->tqReader)) { SSDataBlock block = {0}; int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); @@ -1805,7 +1805,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { /*resetTableScanInfo(pTSInfo, pWin);*/ tsdbReaderClose(pTSInfo->base.dataReader); - qDebug("4"); pTSInfo->base.dataReader = NULL; pInfo->pTableScanOp->status = OP_OPENED; @@ -1888,7 +1887,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; tsdbReaderClose(pTSInfo->base.dataReader); - qDebug("5"); pTSInfo->base.dataReader = NULL; @@ -1915,6 +1913,7 @@ FETCH_NEXT_BLOCK: if (pBlock->info.parTbName[0]) { streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, pBlock->info.parTbName); } + // TODO move into scan pBlock->info.calWin.skey = INT64_MIN; pBlock->info.calWin.ekey = INT64_MAX; @@ -2057,7 +2056,7 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SPackedData* pSubmit = taosArrayGet(pInfo->pBlockLists, current); - if (tqReaderSetSubmitReq2(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { + if (tqReaderSetSubmitMsg(pInfo->tqReader, pSubmit->msgStr, pSubmit->msgLen, pSubmit->ver) < 0) { qError("submit msg messed up when initing stream submit block %p, current %d, total %d", pSubmit, current, totBlockNum); continue; @@ -2066,7 +2065,7 @@ FETCH_NEXT_BLOCK: blockDataCleanup(pInfo->pRes); - while (tqNextDataBlock2(pInfo->tqReader)) { + while (tqNextDataBlock(pInfo->tqReader)) { SSDataBlock block = {0}; int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1a1fb6208d..880de7d6bf 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2432,10 +2432,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pSDataBlock->info.rows, numOfOutput); - SWinKey key = { - .ts = nextWin.skey, - .groupId = groupId, - }; + + SWinKey key = { .ts = nextWin.skey, .groupId = groupId }; saveOutputBuf(pInfo->pState, &key, pResult, pInfo->aggSup.resultRowSize); releaseOutputBuf(pInfo->pState, &key, pResult); if (pInfo->delKey.ts > key.ts) { @@ -4771,6 +4769,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { pInfo->numOfDatapack = 0; break; } + pInfo->numOfDatapack++; printDataBlock(pBlock, "single interval recv"); diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 66496f11f8..876b80697a 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -44,7 +44,7 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, SEpSet* pEpSet); -SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); +SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); #ifdef __cplusplus } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 361cd2cacc..59ac8a61d6 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,6 +16,8 @@ #include "streamInc.h" #include "ttimer.h" +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 2 + int32_t streamInit() { int8_t old; while (1) { @@ -96,13 +98,14 @@ int32_t streamSchedExec(SStreamTask* pTask) { if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); return -1; } pRunReq->head.vgId = pTask->nodeId; - pRunReq->streamId = pTask->streamId; - pRunReq->taskId = pTask->taskId; + pRunReq->streamId = pTask->id.streamId; + pRunReq->taskId = pTask->id.taskId; SRpcMsg msg = { .msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq) }; tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); @@ -142,7 +145,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, const SStreamDispatchReq* pReq, SR pCont->upstreamNodeId = htonl(pReq->upstreamNodeId); pCont->upstreamTaskId = htonl(pReq->upstreamTaskId); pCont->downstreamNodeId = htonl(pTask->nodeId); - pCont->downstreamTaskId = htonl(pTask->taskId); + pCont->downstreamTaskId = htonl(pTask->id.taskId); pRsp->pCont = buf; pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); @@ -155,7 +158,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // enqueue if (pData != NULL) { - qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->taskId, pTask->selfChildId, + qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, pReq->srcTaskId, pReq->reqId); pData->type = STREAM_INPUT__DATA_RETRIEVE; @@ -205,7 +208,7 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("task %d receive dispatch req from node %d task %d", pTask->taskId, pReq->upstreamNodeId, + qDebug("task %d receive dispatch req from node %d task %d", pTask->id.taskId, pReq->upstreamNodeId, pReq->upstreamTaskId); streamTaskEnqueue(pTask, pReq, pRsp); @@ -228,12 +231,11 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); - - qDebug("task %d receive dispatch rsp, code: %x", pTask->taskId, code); + qDebug("task %d receive dispatch rsp, code: %x", pTask->id.taskId, code); if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp); + qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp); if (leftRsp > 0) return 0; } @@ -261,7 +263,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId); + qDebug("task %d receive retrieve req from node %d task %d", pTask->id.taskId, pReq->srcNodeId, pReq->srcTaskId); streamTaskEnqueueRetrieve(pTask, pReq, pRsp); @@ -275,26 +277,43 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S return 0; } +bool tInputQueueIsFull(const SStreamTask* pTask) { + return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; +} + int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem); if (pSubmitBlock == NULL) { - qDebug("task %d %p submit enqueue failed since out of memory", pTask->taskId, pTask); + qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask); terrno = TSDB_CODE_OUT_OF_MEMORY; atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); return -1; } int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; - qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId, - pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, + qDebug("s-task:%s submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->id.idStr, + pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, total); + if (total > STREAM_TASK_INPUT_QUEUEU_CAPACITY) { + qDebug("s-task:%s input queue is full, capacity:%d, abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY); + streamDataSubmitDestroy(pSubmitBlock); + return -1; + } + taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { + int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1; + if (total > 2) { + qDebug("stream task input queue is full, abort"); + return -1; + } + + qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 3fba1cb556..63d15f134d 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -67,7 +67,7 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock return 0; } -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) { +SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, 0); if (pDataSubmit == NULL) { @@ -82,7 +82,7 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit) { pDataSubmit->submit = submit; *pDataSubmit->dataRef = 1; // initialize the reference count to be 1 - pDataSubmit->type = STREAM_INPUT__DATA_SUBMIT; + pDataSubmit->type = type; return pDataSubmit; } @@ -139,28 +139,27 @@ SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { return pSubmitClone; } -SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { - ASSERT(elem); - if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) { +SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem) { + if (dst->type == STREAM_INPUT__DATA_BLOCK && pElem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; - SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem; + SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem; taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); taosArrayDestroy(pBlockSrc->blocks); - taosFreeQitem(elem); + taosFreeQitem(pElem); return dst; - } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { + } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst; - SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)elem; + SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)pElem; streamMergeSubmit(pMerged, pBlockSrc); - taosFreeQitem(elem); + taosFreeQitem(pElem); return dst; - } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { + } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); ASSERT(pMerged); streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); - streamMergeSubmit(pMerged, (SStreamDataSubmit2*)elem); + streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); taosFreeQitem(dst); - taosFreeQitem(elem); + taosFreeQitem(pElem); return (SStreamQueueItem*)pMerged; } else { return NULL; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7e7c23f98a..4e491f906a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -121,9 +121,9 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); SStreamRetrieveReq req = { - .streamId = pTask->streamId, + .streamId = pTask->id.streamId, .srcNodeId = pTask->nodeId, - .srcTaskId = pTask->taskId, + .srcTaskId = pTask->id.taskId, .pRetrieve = pRetrieve, .retrieveLen = dataStrLen, }; @@ -168,7 +168,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) } buf = NULL; - qDebug("task %d(child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->taskId, + qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->id.idStr, pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); } code = 0; @@ -238,7 +238,7 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* msg.pCont = buf; msg.msgType = TDMT_STREAM_TASK_CHECK; - qDebug("dispatch from task %d to task %d node %d: check msg", pTask->taskId, pReq->downstreamTaskId, nodeId); + qDebug("dispatch from task %d to task %d node %d: check msg", pTask->id.taskId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); @@ -282,7 +282,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov tmsgSendReq(pEpSet, &msg); - qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->taskId, pReq->taskId, vgId); + qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->id.taskId, pReq->taskId, vgId); return 0; FAIL: @@ -319,7 +319,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p msg.pCont = buf; msg.msgType = pTask->dispatchMsgType; - qDebug("dispatch from task %d to task %d node %d: data msg", pTask->taskId, pReq->taskId, vgId); + qDebug("dispatch from task %d to task %d node %d: data msg", pTask->id.taskId, pReq->taskId, vgId); tmsgSendReq(pEpSet, &msg); @@ -382,9 +382,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq req = { - .streamId = pTask->streamId, + .streamId = pTask->id.streamId, .dataSrcVgId = pData->srcVgId, - .upstreamTaskId = pTask->taskId, + .upstreamTaskId = pTask->id.taskId, .upstreamChildId = pTask->selfChildId, .upstreamNodeId = pTask->nodeId, .blockNum = blockNum, @@ -408,7 +408,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat req.taskId = downstreamTaskId; - qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId, + qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->id.taskId, pTask->selfChildId, downstreamTaskId, vgId); if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { @@ -432,9 +432,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } for (int32_t i = 0; i < vgSz; i++) { - pReqs[i].streamId = pTask->streamId; + pReqs[i].streamId = pTask->id.streamId; pReqs[i].dataSrcVgId = pData->srcVgId; - pReqs[i].upstreamTaskId = pTask->taskId; + pReqs[i].upstreamTaskId = pTask->id.taskId; pReqs[i].upstreamChildId = pTask->selfChildId; pReqs[i].upstreamNodeId = pTask->nodeId; pReqs[i].blockNum = 0; @@ -503,13 +503,13 @@ int32_t streamDispatch(SStreamTask* pTask) { SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); if (pBlock == NULL) { - qDebug("stream stop dispatching since no output: task %d", pTask->taskId); + qDebug("stream stop dispatching since no output: task %d", pTask->id.taskId); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return 0; } ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - qDebug("stream dispatching: task %d", pTask->taskId); + qDebug("stream dispatching: task %d", pTask->id.taskId); int32_t code = 0; if (streamDispatchAllBlocks(pTask, pBlock) < 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6ef327049c..d23590a08b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,8 +18,9 @@ #define STREAM_EXEC_MAX_BATCH_NUM 100 static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { - int32_t code; - void* exec = pTask->exec.executor; + int32_t code = TSDB_CODE_SUCCESS; + void* pExecutor = pTask->exec.pExecutor; + while(pTask->taskLevel == TASK_LEVEL__SOURCE && atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { qError("stream task wait for the end of fill history"); taosMsleep(2); @@ -30,31 +31,35 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; if (pItem->type == STREAM_INPUT__GET_RES) { const SStreamTrigger* pTrigger = (const SStreamTrigger*)data; - qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; - qDebug("stream task:%d %p set submit input %p %p %d %" PRId64, pTask->taskId, pTask, pSubmit, pSubmit->submit.msgStr, + qDebug("s-task:%s set submit blocks as input %p %p %d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - qSetMultiStreamInput(exec, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); + qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; - SArray* blocks = pBlock->blocks; - qDebug("task %d %p set ssdata input", pTask->taskId, pTask); - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK); + + SArray* pBlockList = pBlock->blocks; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; - SArray* blocks = pMerged->submits; - qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size); - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT); + + SArray* pBlockList = pMerged->submits; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data; - qSetMultiStreamInput(exec, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else { ASSERT(0); } - // exec + // pExecutor while (1) { if (pTask->taskStatus == TASK_STATUS__DROPPING) { return 0; @@ -62,26 +67,28 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* SSDataBlock* output = NULL; uint64_t ts = 0; - if ((code = qExecTask(exec, &output, &ts)) < 0) { + if ((code = qExecTask(pExecutor, &output, &ts)) < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { - resetTaskInfo(exec); + resetTaskInfo(pExecutor); } - /*ASSERT(false);*/ - qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, - terrstr()); + + qError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, terrstr()); continue; } + if (output == NULL) { if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - SSDataBlock block = {0}; + SSDataBlock block = {0}; + const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data; ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); + assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); block.info.type = STREAM_PULL_OVER; block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); - qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->taskId, pTask->selfChildId, + qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, pRetrieveBlock->reqId); } break; @@ -94,20 +101,21 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* continue; } - qDebug("task %d(child %d) executed and get block", pTask->taskId, pTask->selfChildId); + qDebug("task %d(child %d) executed and get block", pTask->id.taskId, pTask->selfChildId); SSDataBlock block = {0}; assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); } + return 0; } int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - void* exec = pTask->exec.executor; + void* exec = pTask->exec.pExecutor; qSetStreamOpOpen(exec); bool finished = false; @@ -147,17 +155,17 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { batchCnt++; - qDebug("task %d scan exec block num %d, block limit %d", pTask->taskId, batchCnt, batchSz); + qDebug("task %d scan exec block num %d, block limit %d", pTask->id.taskId, batchCnt, batchSz); if (batchCnt >= batchSz) break; } if (taosArrayGetSize(pRes) == 0) { if (finished) { taosArrayDestroy(pRes); - qDebug("task %d finish recover exec task ", pTask->taskId); + qDebug("task %d finish recover exec task ", pTask->id.taskId); break; } else { - qDebug("task %d continue recover exec task ", pTask->taskId); + qDebug("task %d continue recover exec task ", pTask->id.taskId); continue; } } @@ -173,7 +181,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { streamTaskOutput(pTask, qRes); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - qDebug("task %d scan exec dispatch block num %d", pTask->taskId, batchCnt); + qDebug("task %d scan exec dispatch block num %d", pTask->id.taskId, batchCnt); streamDispatch(pTask); } if (finished) break; @@ -186,7 +194,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { // fetch all queue item, merge according to batchLimit int32_t numOfItems = taosReadAllQitems(pTask->inputQueue1, pTask->inputQall); if (numOfItems == 0) { - qDebug("task: %d, stream task exec over, queue empty", pTask->taskId); + qDebug("task: %d, stream task exec over, queue empty", pTask->id.taskId); return 0; } SStreamQueueItem* pMerged = NULL; @@ -221,30 +229,33 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { int32_t streamExecForAll(SStreamTask* pTask) { while (1) { - int32_t batchCnt = 1; - void* input = NULL; + int32_t batchSize = 1; + void* pInput = NULL; + + // merge multiple input data if possible in the input queue. while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { - qDebug("stream task exec over, queue empty, task: %d", pTask->taskId); + qDebug("stream task exec over, queue empty, task: %d", pTask->id.taskId); break; } - if (input == NULL) { - input = qItem; + + if (pInput == NULL) { + pInput = qItem; streamQueueProcessSuccess(pTask->inputQueue); if (pTask->taskLevel == TASK_LEVEL__SINK) { break; } } else { - void* newRet; - if ((newRet = streamMergeQueueItem(input, qItem)) == NULL) { + void* newRet = NULL; + if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) { streamQueueProcessFail(pTask->inputQueue); break; } else { - batchCnt++; - input = newRet; + batchSize++; + pInput = newRet; streamQueueProcessSuccess(pTask->inputQueue); - if (batchCnt > STREAM_EXEC_MAX_BATCH_NUM) { + if (batchSize > STREAM_EXEC_MAX_BATCH_NUM) { break; } } @@ -252,75 +263,82 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pTask->taskStatus == TASK_STATUS__DROPPING) { - if (input) streamFreeQitem(input); + if (pInput) { + streamFreeQitem(pInput); + } return 0; } - if (input == NULL) { + if (pInput == NULL) { break; } if (pTask->taskLevel == TASK_LEVEL__SINK) { - ASSERT(((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, input); + ASSERT(((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_BLOCK); + streamTaskOutput(pTask, pInput); continue; } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + qDebug("s-task:%s exec begin, msg batch: %d", pTask->id.idStr, batchSize); - qDebug("stream task:%d exec begin, msg batch: %d", pTask->taskId, batchCnt); - streamTaskExecImpl(pTask, input, pRes); + streamTaskExecImpl(pTask, pInput, pRes); - qDebug("stream task:%d exec end", pTask->taskId); + qDebug("s-task:%s exec end", pTask->id.idStr); if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(input); + streamFreeQitem(pInput); return -1; } + qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; - if (((SStreamQueueItem*)input)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)input; + if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pSubmit->ver; - } else if (((SStreamQueueItem*)input)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)input; + } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pMerged->ver; } if (streamTaskOutput(pTask, qRes) < 0) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(input); + streamFreeQitem(pInput); taosFreeQitem(qRes); return -1; } } else { taosArrayDestroy(pRes); } - streamFreeQitem(input); + streamFreeQitem(pInput); } return 0; } int32_t streamTryExec(SStreamTask* pTask) { + // this function may be executed by multi-threads, so status check is required. int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE); + if (schedStatus == TASK_SCHED_STATUS__WAITING) { int32_t code = streamExecForAll(pTask); if (code < 0) { atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED); return -1; } + atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE); if (!taosQueueEmpty(pTask->inputQueue->queue)) { streamSchedExec(pTask); } } + return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 66d98e90bf..577f6d6e00 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -23,6 +23,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + int32_t len = strlen(path) + 20; char* streamPath = taosMemoryCalloc(1, len); sprintf(streamPath, "%s/%s", path, "stream"); @@ -83,7 +84,7 @@ void streamMetaClose(SStreamMeta* pMeta) { taosTmrStop(pTask->timer); pTask->timer = NULL; } - tFreeSStreamTask(pTask); + tFreeStreamTask(pTask); /*streamMetaReleaseTask(pMeta, pTask);*/ } taosHashCleanup(pMeta->pTasks); @@ -111,12 +112,12 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, goto FAIL; } - if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { goto FAIL; } - if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) { - taosHashRemove(pMeta->pTasks, &pTask->taskId, sizeof(int32_t)); + if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), msg, msgLen, pMeta->txn) < 0) { + taosHashRemove(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t)); ASSERT(0); goto FAIL; } @@ -124,7 +125,7 @@ int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t ver, char* msg, return 0; FAIL: - if (pTask) tFreeSStreamTask(pTask); + if (pTask) tFreeStreamTask(pTask); return -1; } #endif @@ -147,7 +148,7 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { tEncodeSStreamTask(&encoder, pTask); tEncoderClear(&encoder); - if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) { + if (tdbTbUpsert(pMeta->pTaskDb, &pTask->id.taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) { return -1; } @@ -165,7 +166,7 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { return -1; } - taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); + taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)); return 0; } @@ -207,7 +208,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { ASSERT(left >= 0); if (left == 0) { ASSERT(atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING); - tFreeSStreamTask(pTask); + tFreeStreamTask(pTask); } } @@ -297,7 +298,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 87058bf490..3e7a02b8d5 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -16,7 +16,7 @@ #include "streamInc.h" int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { - qDebug("task %d at node %d launch recover", pTask->taskId, pTask->nodeId); + qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId); if (pTask->taskLevel == TASK_LEVEL__SOURCE) { atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_PREPARE); streamSetParamForRecover(pTask); @@ -56,8 +56,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { // checkstatus int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { SStreamTaskCheckReq req = { - .streamId = pTask->streamId, - .upstreamTaskId = pTask->taskId, + .streamId = pTask->id.streamId, + .upstreamTaskId = pTask->id.taskId, .upstreamNodeId = pTask->nodeId, .childId = pTask->selfChildId, }; @@ -68,7 +68,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("task %d at node %d check downstream task %d at node %d", pTask->taskId, pTask->nodeId, req.downstreamTaskId, + qDebug("task %d at node %d check downstream task %d at node %d", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -83,12 +83,12 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->taskId, pTask->nodeId, + qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - qDebug("task %d at node %d direct launch recover since no downstream", pTask->taskId, pTask->nodeId); + qDebug("task %d at node %d direct launch recover since no downstream", pTask->id.taskId, pTask->nodeId); streamTaskLaunchRecover(pTask, version); } return 0; @@ -104,7 +104,7 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp .downstreamNodeId = pRsp->downstreamNodeId, .childId = pRsp->childId, }; - qDebug("task %d at node %d check downstream task %d at node %d (recheck)", pTask->taskId, pTask->nodeId, + qDebug("task %d at node %d check downstream task %d at node %d (recheck)", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); @@ -160,11 +160,11 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* // common int32_t streamSetParamForRecover(SStreamTask* pTask) { - void* exec = pTask->exec.executor; + void* exec = pTask->exec.pExecutor; return qStreamSetParamForRecover(exec); } int32_t streamRestoreParam(SStreamTask* pTask) { - void* exec = pTask->exec.executor; + void* exec = pTask->exec.pExecutor; return qStreamRestoreParam(exec); } int32_t streamSetStatusNormal(SStreamTask* pTask) { @@ -174,14 +174,14 @@ int32_t streamSetStatusNormal(SStreamTask* pTask) { // source int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver) { - void* exec = pTask->exec.executor; + void* exec = pTask->exec.pExecutor; return qStreamSourceRecoverStep1(exec, ver); } int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq) { pReq->msgHead.vgId = pTask->nodeId; - pReq->streamId = pTask->streamId; - pReq->taskId = pTask->taskId; + pReq->streamId = pTask->id.streamId; + pReq->taskId = pTask->id.taskId; return 0; } @@ -192,13 +192,13 @@ int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) { int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq) { pReq->msgHead.vgId = pTask->nodeId; - pReq->streamId = pTask->streamId; - pReq->taskId = pTask->taskId; + pReq->streamId = pTask->id.streamId; + pReq->taskId = pTask->id.taskId; return 0; } int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { - void* exec = pTask->exec.executor; + void* exec = pTask->exec.pExecutor; if (qStreamSourceRecoverStep2(exec, ver) < 0) { } return streamScanExec(pTask, 100); @@ -206,7 +206,7 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { SStreamRecoverFinishReq req = { - .streamId = pTask->streamId, + .streamId = pTask->id.streamId, .childId = pTask->selfChildId, }; // serialize @@ -227,13 +227,13 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { // agg int32_t streamAggRecoverPrepare(SStreamTask* pTask) { - void* exec = pTask->exec.executor; + void* exec = pTask->exec.pExecutor; pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo); return 0; } int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { - void* exec = pTask->exec.executor; + void* exec = pTask->exec.pExecutor; if (qStreamRestoreParam(exec) < 0) { return -1; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 411726075e..04a1414438 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -121,7 +121,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int char statePath[1024]; if (!specPath) { - sprintf(statePath, "%s/%d", path, pTask->taskId); + sprintf(statePath, "%s/%d", path, pTask->id.taskId); } else { memset(statePath, 0, 1024); tstrncpy(statePath, path, 1024); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e9aba0bc39..c1f50178dd 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -21,8 +21,14 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { if (pTask == NULL) { return NULL; } - pTask->taskId = tGenIdPI32(); - pTask->streamId = streamId; + + pTask->id.taskId = tGenIdPI32(); + pTask->id.streamId = streamId; + + char buf[128] = {0}; + sprintf(buf, "0x%"PRIx64"-%d", pTask->id.streamId, pTask->id.taskId); + + pTask->id.idStr = taosStrdup(buf); pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; @@ -50,8 +56,8 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; @@ -103,8 +109,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; @@ -162,24 +168,43 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { return 0; } -void tFreeSStreamTask(SStreamTask* pTask) { - qDebug("free stream task %d", pTask->taskId); - if (pTask->inputQueue) streamQueueClose(pTask->inputQueue); - if (pTask->outputQueue) streamQueueClose(pTask->outputQueue); - if (pTask->exec.qmsg) taosMemoryFree(pTask->exec.qmsg); - if (pTask->exec.executor) qDestroyTask(pTask->exec.executor); +void tFreeStreamTask(SStreamTask* pTask) { + qDebug("free s-task:%s", pTask->id.idStr); + + if (pTask->inputQueue) { + streamQueueClose(pTask->inputQueue); + } + if (pTask->outputQueue) { + streamQueueClose(pTask->outputQueue); + } + if (pTask->exec.qmsg) { + taosMemoryFree(pTask->exec.qmsg); + } + + if (pTask->exec.pExecutor) { + qDestroyTask(pTask->exec.pExecutor); + pTask->exec.pExecutor = NULL; + } + + if (pTask->exec.pTqReader != NULL) { + pTask->exec.pTqReader = NULL; + } + taosArrayDestroyP(pTask->childEpInfo, taosMemoryFree); if (pTask->outputType == TASK_OUTPUT__TABLE) { tDeleteSSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); } + if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; } - if (pTask->pState) streamStateClose(pTask->pState); + if (pTask->pState) { + streamStateClose(pTask->pState); + } taosMemoryFree(pTask); }