diff --git a/Jenkinsfile2 b/Jenkinsfile2 index d3fc05a1d2..e9372ab686 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -306,7 +306,7 @@ def pre_test_build_win() { cd %WIN_CONNECTOR_ROOT% python.exe -m pip install --upgrade pip python -m pip uninstall taospy -y - python -m pip install taospy==2.7.12 + python -m pip install taospy==2.7.13 python -m pip uninstall taos-ws-py -y python -m pip install taos-ws-py==0.3.1 xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32 diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5cf18fbb66..69677e6bc1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3331,7 +3331,7 @@ typedef struct { SMsgHead head; int64_t streamId; int32_t taskId; -} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq; +} SVPauseStreamTaskReq, SVResetStreamTaskReq; typedef struct { int8_t reserved; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index f389bc1a61..3bf22ec339 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -343,7 +343,6 @@ TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8 TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL) diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 50458d684f..dc145819ca 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -23,8 +23,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg); -int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index be11d04ff8..f78b7a3126 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -210,7 +210,6 @@ void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); -int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo); int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7ff47d2d59..9b3ce36bdd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -628,17 +628,6 @@ typedef struct { int8_t igUntreated; } SStreamScanHistoryReq; -typedef struct { - int64_t streamId; - int32_t upstreamTaskId; - int32_t downstreamTaskId; - int32_t upstreamNodeId; - int32_t childId; -} SStreamScanHistoryFinishReq; - -int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq); -int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq); - // mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished typedef struct { int64_t streamId; @@ -713,17 +702,6 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp); void streamMetaClearHbMsg(SStreamHbMsg* pMsg); -typedef struct { - int64_t streamId; - int32_t upstreamTaskId; - int32_t upstreamNodeId; - int32_t downstreamId; - int32_t downstreamNode; -} SStreamCompleteHistoryMsg; - -int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq); -int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); - typedef struct SNodeUpdateInfo { int32_t nodeId; SEpSet prevEp; @@ -820,7 +798,6 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); -int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); void streamTaskRestoreStatus(SStreamTask* pTask); @@ -829,7 +806,6 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* SRpcHandleInfo* pRpcInfo, int32_t taskId); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); -int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); @@ -859,11 +835,6 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st); -int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); - -// agg level -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo); -int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask); // stream task meta void streamMetaInit(); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 8ac9550aca..36a3e50aef 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -406,10 +406,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) { if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) { if (pStmt->bInfo.inExecCache) { - if (ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1)) { - tscError("stmtGetFromCache error"); - return TSDB_CODE_TSC_STMT_CACHE_ERROR; - } pStmt->bInfo.needParse = false; tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName); return TSDB_CODE_SUCCESS; diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 02dfbfebfe..b98c89542a 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -24,6 +24,7 @@ static int32_t (*tColDataAppendValueImpl[8][3])(SColData *pColData, uint8_t *pDa static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward); // SBuffer ================================ +#ifdef BUILD_NO_CALL void tBufferDestroy(SBuffer *pBuffer) { tFree(pBuffer->pBuf); pBuffer->pBuf = NULL; @@ -55,7 +56,7 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData) { return code; } - +#endif // ================================ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson); @@ -1148,6 +1149,7 @@ static int tTagValJsonCmprFn(const void *p1, const void *p2) { return strcmp(((STagVal *)p1)[0].pKey, ((STagVal *)p2)[0].pKey); } +#ifdef TD_DEBUG_PRINT_TAG static void debugPrintTagVal(int8_t type, const void *val, int32_t vlen, const char *tag, int32_t ln) { switch (type) { case TSDB_DATA_TYPE_VARBINARY: @@ -1239,6 +1241,7 @@ void debugPrintSTag(STag *pTag, const char *tag, int32_t ln) { } printf("\n"); } +#endif static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) { int32_t n = 0; @@ -2576,6 +2579,7 @@ _exit: return code; } +#ifdef BUILD_NO_CALL static int32_t tColDataSwapValue(SColData *pColData, int32_t i, int32_t j) { int32_t code = 0; @@ -2658,6 +2662,7 @@ static void tColDataSwap(SColData *pColData, int32_t i, int32_t j) { break; } } +#endif static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SColData *pToColData, int32_t iToRow) { int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index a1af11f2ec..1488df3cb1 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -86,8 +86,6 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 6781947849..a2f0b7aced 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -828,8 +828,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 8dfd03622f..b056d561c7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -707,13 +707,6 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver); void tFreeStreamObj(SStreamObj* pObj); -// typedef struct { -// char streamName[TSDB_STREAM_FNAME_LEN]; -// int64_t uid; -// int64_t streamUid; -// SArray* childInfo; // SArray -// } SStreamCheckpointObj; - #define VIEW_TYPE_UPDATABLE (1 << 0) #define VIEW_TYPE_MATERIALIZED (1 << 1) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 871e12c5e6..d884227249 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -33,6 +33,11 @@ typedef struct SStreamTransInfo { int32_t transId; } SStreamTransInfo; +typedef struct SVgroupChangeInfo { + SHashObj *pDBMap; + SArray *pUpdateNodeList; // SArray +} SVgroupChangeInfo; + // time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard // to avoid too many checkpoints for a taskk in the waiting list typedef struct SCheckpointCandEntry { @@ -64,12 +69,6 @@ typedef struct SNodeEntry { int64_t hbTimestamp; // second } SNodeEntry; -typedef struct SFailedCheckpointInfo { - int64_t streamUid; - int64_t checkpointId; - int32_t transId; -} SFailedCheckpointInfo; - #define MND_STREAM_CREATE_NAME "stream-create" #define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint" #define MND_STREAM_PAUSE_NAME "stream-pause" @@ -92,27 +91,35 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId) bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock); int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid); +typedef struct SOrphanTask { + int64_t streamId; + int32_t taskId; + int32_t nodeId; +} SOrphanTask; + // for sma // TODO refactor -int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); -int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); - int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams); int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream); SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady); void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName); -void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode); +int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, + int32_t retryCode); STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg); int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); +void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo); +int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); + SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode); int32_t initStreamNodeList(SMnode *pMnode); -int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated); -int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated); +int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); +int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index a89136e7d3..e6027a0332 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -865,7 +865,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p sdbRelease(pMnode->pSdb, pStream); goto _OVER; } else { - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); sdbRelease(pMnode->pSdb, pStream); goto _OVER; @@ -917,7 +917,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p SStreamObj *pStream = mndAcquireStream(pMnode, streamName); if (pStream != NULL && pStream->smaId == pSma->uid) { - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); mndReleaseStream(pMnode, pStream); goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 771aaaf211..8df5bc1b46 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -29,10 +29,9 @@ #define MND_STREAM_MAX_NUM 60 -typedef struct SVgroupChangeInfo { - SHashObj *pDBMap; - SArray *pUpdateNodeList; // SArray -} SVgroupChangeInfo; +typedef struct SMStreamNodeCheckMsg { + int8_t placeHolder; // // to fix windows compile error, define place holder +} SMStreamNodeCheckMsg; static int32_t mndNodeCheckSentinel = 0; SStreamExecInfo execInfo; @@ -60,14 +59,11 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); -static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans); -static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); -static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); -static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); - -static void freeCheckpointCandEntry(void *); -static void freeTaskList(void *param); - +static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); +static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot); +static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len); +static void freeCheckpointCandEntry(void *); +static void freeTaskList(void *param); static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream); @@ -470,10 +466,8 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) { tEncodeStreamTask(&encoder, pTask); tEncoderClear(&encoder); - STransAction action = {0}; - action.mTraceId = pTrans->mTraceId; - initTransAction(&action, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0); + if (code != 0) { taosMemoryFree(buf); return -1; } @@ -614,59 +608,6 @@ _OVER: return -1; } - - -static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { - SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); - if (pReq == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pReq->head.vgId = htonl(pTask->info.nodeId); - pReq->taskId = pTask->id.taskId; - pReq->streamId = pTask->id.streamId; - - STransAction action = {0}; - SEpSet epset = {0}; - bool hasEpset = false; - - int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; - } - - // no valid epset, return directly without redoAction - if (!hasEpset) { - return TSDB_CODE_SUCCESS; - } - - // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. - initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pReq); - return -1; - } - - return 0; -} - -int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - int32_t lv = taosArrayGetSize(pStream->tasks); - for (int32_t i = 0; i < lv; i++) { - SArray *pTasks = taosArrayGetP(pStream->tasks, i); - int32_t sz = taosArrayGetSize(pTasks); - for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) { - return -1; - } - } - } - return 0; -} - static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks int32_t numOfStream = 0; SStreamObj *pStream = NULL; @@ -757,17 +698,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_CREATE_NAME); + STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes"); if (pTrans == NULL) { - mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); - goto _OVER; - } - - mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); - - mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mndTransDrop(pTrans); goto _OVER; } @@ -813,6 +745,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); taosThreadMutexLock(&execInfo.lock); + mDebug("stream tasks register into node list"); saveStreamTasksInfo(&streamObj, &execInfo); taosThreadMutexUnlock(&execInfo.lock); @@ -945,20 +878,14 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, MND_STREAM_CHECKPOINT_NAME); + STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream"); if (pTrans == NULL) { - return -1; - } - - mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTrancCheckConflict(pMnode, pTrans) != 0) { mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); goto _ERR; } + mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid); mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId); taosWLockLatch(&pStream->lock); @@ -975,27 +902,26 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - if (pVgObj == NULL) { - taosWUnLockLatch(&pStream->lock); - goto _ERR; - } - void *buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, - pTask->id.taskId, pTrans->id, mndTrigger) < 0) { - mndReleaseVgroup(pMnode, pVgObj); + pTask->id.taskId, pTrans->id, mndTrigger) < 0) { taosWUnLockLatch(&pStream->lock); goto _ERR; } - STransAction act = {0}; - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); + SEpSet epset = {0}; + bool hasEpset = false; + code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { + taosMemoryFree(buf); + taosWUnLockLatch(&pStream->lock); + goto _ERR; + } - initTransAction(&act, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY); - if (mndTransAppendRedoAction(pTrans, &act) != 0) { + code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, + TSDB_CODE_SYN_PROPOSE_NOT_READY); + if (code != 0) { taosMemoryFree(buf); taosWUnLockLatch(&pStream->lock); goto _ERR; @@ -1224,7 +1150,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME); + STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream"); if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1232,20 +1158,10 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - tFreeMDropStreamReq(&dropReq); - return -1; - } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid); // drop all tasks - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1309,7 +1225,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { return -1; } else { #if 0 - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); sdbRelease(pMnode->pSdb, pStream); sdbCancelFetch(pSdb, pIter); @@ -1568,18 +1484,6 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS return TSDB_CODE_SUCCESS; } -static int32_t getNumOfTasks(SArray *pTaskList) { - int32_t numOfLevels = taosArrayGetSize(pTaskList); - - int32_t count = 0; - for (int32_t i = 0; i < numOfLevels; i++) { - SArray *pLevel = taosArrayGetP(pTaskList, i); - count += taosArrayGetSize(pLevel); - } - - return count; -} - static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1595,7 +1499,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock // lock taosRLockLatch(&pStream->lock); - int32_t count = getNumOfTasks(pStream->tasks); + int32_t count = mndGetNumOfStreamTasks(pStream); if (numOfRows + count > rowsCapacity) { blockDataEnsureCapacity(pBlock, numOfRows + count); } @@ -1628,21 +1532,6 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } -static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) { - taosWLockLatch(&pStream->lock); - pStream->status = status; - SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); - - taosWUnLockLatch(&pStream->lock); - if (pCommitRaw == NULL) return -1; - if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { - mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); - return -1; - } - (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - return 0; -} - static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; @@ -1657,10 +1546,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (pauseReq.igNotExists) { - mInfo("stream:%s, not exist 1, if exist is set", pauseReq.name); + mInfo("stream:%s, not exist, not pause stream", pauseReq.name); return 0; } else { - mInfo("stream:%s, not exist 2, if exist is set,%p,%d,%p", pauseReq.name, pReq->pCont, pReq->contLen, pReq); + mError("stream:%s not exist, failed to pause stream", pauseReq.name); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; } @@ -1689,26 +1578,17 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_PAUSE_NAME); + STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream"); if (pTrans == NULL) { mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); return -1; } - mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid); // if nodeUpdate happened, not send pause trans - if (mndPauseStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1716,12 +1596,18 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { } // pause stream - if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__PAUSE) < 0) { + taosWLockLatch(&pStream->lock); + pStream->status = STREAM_STATUS__PAUSE; + if (mndPersistTransLog(pStream, pTrans,SDB_STATUS_READY) < 0) { + taosWUnLockLatch(&pStream->lock); + sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; } + taosWUnLockLatch(&pStream->lock); + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1754,10 +1640,11 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { if (pStream == NULL) { if (resumeReq.igNotExists) { - mInfo("stream:%s, not exist, if exist is set", resumeReq.name); + mInfo("stream:%s not exist, not resume stream", resumeReq.name); sdbRelease(pMnode->pSdb, pStream); return 0; } else { + mError("stream:%s not exist, failed to resume stream", resumeReq.name); terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; return -1; } @@ -1780,26 +1667,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return -1; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_RESUME_NAME); + STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream"); if (pTrans == NULL) { mError("stream:%s, failed to resume stream since %s", resumeReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); return -1; } - mInfo("trans:%d used to resume stream:%s", pTrans->id, resumeReq.name); - - mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); - if (mndTransCheckConflict(pMnode, pTrans) != 0) { - sdbRelease(pMnode->pSdb, pStream); - mndTransDrop(pTrans); - return -1; - } - int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid); - // resume all tasks - if (mndResumeStreamTasks(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) { + // set the resume action + if (mndStreamSetResumeAction(pTrans, pMnode, pStream, resumeReq.igUntreated) < 0) { mError("stream:%s, failed to drop task since %s", resumeReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -1807,12 +1685,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } // resume stream - if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__NORMAL) < 0) { + taosWLockLatch(&pStream->lock); + pStream->status = STREAM_STATUS__NORMAL; + if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) { + taosWUnLockLatch(&pStream->lock); + sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); return -1; } + taosWUnLockLatch(&pStream->lock); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); @@ -1826,91 +1709,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, - int32_t transId) { - pMsg->streamId = pId->streamId; - pMsg->taskId = pId->taskId; - pMsg->transId = transId; - pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); - taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); -} - -static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, - SStreamTaskId *pId, int32_t transId) { - SStreamTaskNodeUpdateMsg req = {0}; - initNodeUpdateMsg(&req, pInfo, pId, transId); - - int32_t code = 0; - int32_t blen; - - tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); - if (code < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(req.pNodeList); - return -1; - } - - int32_t tlen = sizeof(SMsgHead) + blen; - - void *buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(req.pNodeList); - return -1; - } - - void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - tEncodeStreamTaskUpdateMsg(&encoder, &req); - - SMsgHead *pMsgHead = (SMsgHead *)buf; - pMsgHead->contLen = htonl(tlen); - pMsgHead->vgId = htonl(nodeId); - - tEncoderClear(&encoder); - - *pBuf = buf; - *pLen = tlen; - - taosArrayDestroy(req.pNodeList); - return TSDB_CODE_SUCCESS; -} - -// todo extract method: traverse stream tasks -// build trans to update the epset -static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { - mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid); - - taosWLockLatch(&pStream->lock); - int32_t numOfLevels = taosArrayGetSize(pStream->tasks); - - for (int32_t j = 0; j < numOfLevels; ++j) { - SArray *pLevel = taosArrayGetP(pStream->tasks, j); - - int32_t numOfTasks = taosArrayGetSize(pLevel); - for (int32_t k = 0; k < numOfTasks; ++k) { - SStreamTask *pTask = taosArrayGetP(pLevel, k); - - void *pBuf = NULL; - int32_t len = 0; - streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); - - STransAction action = {0}; - initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - taosMemoryFree(pBuf); - taosWUnLockLatch(&pStream->lock); - return -1; - } - } - } - - taosWUnLockLatch(&pStream->lock); - return 0; -} - static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); const SEp *p = GET_ACTIVE_EP(pCurrent); @@ -2008,7 +1806,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange return terrno; } - mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); + mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); } void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); @@ -2022,7 +1820,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, pStream->name, pTrans->id); - int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans); + int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again if (code != TSDB_CODE_SUCCESS) { @@ -2182,26 +1980,6 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) { return 0; } -// kill all trans in the dst DB -static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { - mDebug("start to clear checkpoints in all Dbs"); - - void *pIter = NULL; - while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { - char *pDb = (char *)pIter; - - size_t len = 0; - void *pKey = taosHashGetKey(pDb, &len); - char *p = strndup(pKey, len); - - mDebug("clear checkpoint trans in Db:%s", p); - doKillCheckpointTrans(pMnode, pKey, len); - taosMemoryFree(p); - } - - mDebug("complete clear checkpoints in Dbs"); -} - // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; @@ -2271,10 +2049,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } -typedef struct SMStreamNodeCheckMsg { - int8_t placeHolder; // // to fix windows compile error, define place holder -} SMStreamNodeCheckMsg; - static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -2346,26 +2120,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) { ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); } -int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { - // data in the hash table will be removed automatically, no need to remove it here. - SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); - if (pTransInfo == NULL) { - return TSDB_CODE_SUCCESS; - } - - // not checkpoint trans, ignore - if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { - mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId); - return TSDB_CODE_SUCCESS; - } - - char *pDupDBName = strndup(pDBName, len); - mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName); - taosMemoryFree(pDupDBName); - - return TSDB_CODE_SUCCESS; -} - void freeCheckpointCandEntry(void *param) { SCheckpointCandEntry *pEntry = param; taosMemoryFreeClear(pEntry->pName); @@ -2412,8 +2166,15 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId); - int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); + if (pStream == NULL) { + mError("failed to find the stream:0x%"PRIx64" not handle the checkpoint req", req.streamId); + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + taosThreadMutexUnlock(&execInfo.lock); + return -1; + } + + int32_t numOfTasks = mndGetNumOfStreamTasks(pStream); SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId)); if (pReqTaskList == NULL) { SArray *pList = taosArrayInit(4, sizeof(int32_t)); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 6e29389ac8..0cad0a8226 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -16,6 +16,12 @@ #include "mndStream.h" #include "mndTrans.h" +typedef struct SFailedCheckpointInfo { + int64_t streamUid; + int64_t checkpointId; + int32_t transId; +} SFailedCheckpointInfo; + static void doExtractTasksFromStream(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; @@ -65,6 +71,8 @@ static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) return terrno; } + /*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid); + taosWLockLatch(&pStream->lock); int32_t numOfLevels = taosArrayGetSize(pStream->tasks); @@ -92,19 +100,13 @@ static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) SEpSet epset = {0}; bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || !hasEpset) { taosMemoryFree(pReq); continue; } - if (!hasEpset) { - taosMemoryFree(pReq); - continue; - } - - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0); + if (code != 0) { taosMemoryFree(pReq); taosWUnLockLatch(&pStream->lock); mndTransDrop(pTrans); @@ -181,6 +183,45 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) { return TSDB_CODE_SUCCESS; } +static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) { + SOrphanTask* pTask = taosArrayGet(pList, 0); + + // check if it is conflict with other trans in both sourceDb and targetDb. + bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); + if (conflict) { + return -1; + } + + SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; + STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, MND_STREAM_DROP_NAME, "drop stream"); + if (pTrans == NULL) { + mError("failed to create trans to drop orphan tasks since %s", terrstr()); + return -1; + } + + int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); + + // drop all tasks + if (mndStreamSetDropActionFromList(pMnode, pTrans, pList) < 0) { + mError("failed to create trans to drop orphan tasks since %s", terrstr()); + mndTransDrop(pTrans); + return -1; + } + + // drop stream + if (mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED) < 0) { + mndTransDrop(pTrans); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + return 0; +} + int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; @@ -217,7 +258,8 @@ int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo* info){ int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; - SArray *pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); + SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); + SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask)); if(grantCheck(TSDB_GRANT_STREAMS) < 0){ if(suspendAllStreams(pMnode, &pReq->info) < 0){ @@ -241,8 +283,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexLock(&execInfo.lock); // extract stream task list - int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap); - if (numOfExisted == 0) { + if (taosHashGetSize(execInfo.pTaskMap) == 0) { doExtractTasksFromStream(pMnode); } @@ -261,6 +302,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id)); if (pTaskEntry == NULL) { mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId); + + SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId}; + taosArrayPush(pOrphanTasks, &oTask); continue; } @@ -283,15 +327,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } streamTaskStatusCopy(pTaskEntry, p); - if (p->checkpointId != 0) { - if (p->checkpointFailed) { - mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, - p->checkpointId, p->chkpointTransId); + if ((p->checkpointId != 0) && p->checkpointFailed) { + mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, + p->checkpointId, p->chkpointTransId); - SFailedCheckpointInfo info = { - .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; - addIntoCheckpointList(pList, &info); - } + SFailedCheckpointInfo info = { + .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; + addIntoCheckpointList(pFailedTasks, &info); } } @@ -309,15 +351,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal - if (taosArrayGetSize(pList) > 0) { + if (taosArrayGetSize(pFailedTasks) > 0) { bool allReady = true; SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); taosArrayDestroy(p); if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal - for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { - SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i); + for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) { + SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i); mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", pInfo->checkpointId, pInfo->transId); @@ -328,9 +370,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } + // handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors. + if (taosArrayGetSize(pOrphanTasks) > 0) { + mndDropOrphanTasks(pMnode, pOrphanTasks); + } + taosThreadMutexUnlock(&execInfo.lock); streamMetaClearHbMsg(&req); - taosArrayDestroy(pList); + taosArrayDestroy(pFailedTasks); + taosArrayDestroy(pOrphanTasks); + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 959f69944c..0a7397827e 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -169,7 +169,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const return NULL; } - mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg); + mInfo("s-task:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName); if (mndTransCheckConflict(pMnode, pTrans) != 0) { @@ -255,11 +255,132 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status) return 0; } -void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, - int32_t retryCode) { - pAction->epSet = *pEpset; - pAction->contLen = contLen; - pAction->pCont = pCont; - pAction->msgType = msgType; - pAction->retryCode = retryCode; +int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset, + int32_t retryCode) { + STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode}; + return mndTransAppendRedoAction(pTrans, &action); +} + +int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) { + // data in the hash table will be removed automatically, no need to remove it here. + SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len); + if (pTransInfo == NULL) { + return TSDB_CODE_SUCCESS; + } + + // not checkpoint trans, ignore + if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) { + mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId); + return TSDB_CODE_SUCCESS; + } + + char *pDupDBName = strndup(pDBName, len); + mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName); + taosMemoryFree(pDupDBName); + + return TSDB_CODE_SUCCESS; +} + +// kill all trans in the dst DB +void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { + mDebug("start to clear checkpoints in all Dbs"); + + void *pIter = NULL; + while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { + char *pDb = (char *)pIter; + + size_t len = 0; + void *pKey = taosHashGetKey(pDb, &len); + char *p = strndup(pKey, len); + + mDebug("clear checkpoint trans in Db:%s", p); + doKillCheckpointTrans(pMnode, pKey, len); + taosMemoryFree(p); + } + + mDebug("complete clear checkpoints in Dbs"); +} + +static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, + int32_t transId) { + pMsg->streamId = pId->streamId; + pMsg->taskId = pId->taskId; + pMsg->transId = transId; + pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); + taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); +} + +static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, + SStreamTaskId *pId, int32_t transId) { + SStreamTaskNodeUpdateMsg req = {0}; + initNodeUpdateMsg(&req, pInfo, pId, transId); + + int32_t code = 0; + int32_t blen; + + tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); + if (code < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(req.pNodeList); + return -1; + } + + int32_t tlen = sizeof(SMsgHead) + blen; + + void *buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + taosArrayDestroy(req.pNodeList); + return -1; + } + + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + tEncodeStreamTaskUpdateMsg(&encoder, &req); + + SMsgHead *pMsgHead = (SMsgHead *)buf; + pMsgHead->contLen = htonl(tlen); + pMsgHead->vgId = htonl(nodeId); + + tEncoderClear(&encoder); + + *pBuf = buf; + *pLen = tlen; + + taosArrayDestroy(req.pNodeList); + return TSDB_CODE_SUCCESS; +} + +// todo extract method: traverse stream tasks +// build trans to update the epset +int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) { + mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid); + + taosWLockLatch(&pStream->lock); + int32_t numOfLevels = taosArrayGetSize(pStream->tasks); + + for (int32_t j = 0; j < numOfLevels; ++j) { + SArray *pLevel = taosArrayGetP(pStream->tasks, j); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask *pTask = taosArrayGetP(pLevel, k); + + void *pBuf = NULL; + int32_t len = 0; + streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); + doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); + + int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(pBuf); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } + } + + taosWUnLockLatch(&pStream->lock); + return 0; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index b8bd323fa3..5d6e34856b 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -18,6 +18,66 @@ #include "tmisce.h" #include "mndVgroup.h" +typedef struct SStreamTaskIter { + SStreamObj *pStream; + int32_t level; + int32_t ordinalIndex; + int32_t totalLevel; + SStreamTask *pTask; +} SStreamTaskIter; + +SStreamTaskIter* createTaskIter(SStreamObj* pStream) { + SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter)); + if (pIter == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pIter->level = -1; + pIter->ordinalIndex = 0; + pIter->pStream = pStream; + pIter->totalLevel = taosArrayGetSize(pStream->tasks); + pIter->pTask = NULL; + + return pIter; +} + +bool taskIterNextTask(SStreamTaskIter* pIter) { + if (pIter->level >= pIter->totalLevel) { + pIter->pTask = NULL; + return false; + } + + if (pIter->level == -1) { + pIter->level += 1; + } + + while(pIter->level < pIter->totalLevel) { + SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level); + if (pIter->ordinalIndex >= taosArrayGetSize(pList)) { + pIter->level += 1; + pIter->ordinalIndex = 0; + pIter->pTask = NULL; + continue; + } + + pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex); + pIter->ordinalIndex += 1; + return true; + } + + pIter->pTask = NULL; + return false; +} + +SStreamTask* taskIterGetCurrent(SStreamTaskIter* pIter) { + return pIter->pTask; +} + +void destroyTaskIter(SStreamTaskIter* pIter) { + taosMemoryFree(pIter); +} + SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; @@ -143,7 +203,7 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t } } -static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { +static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) { SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), @@ -160,15 +220,14 @@ static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *p SEpSet epset = {0}; bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { terrno = code; taosMemoryFree(pReq); return -1; } - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0); + if (code != 0) { taosMemoryFree(pReq); return -1; } @@ -201,14 +260,14 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) { return num; } -int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { +int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) { int32_t size = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < size; i++) { SArray *pTasks = taosArrayGetP(pStream->tasks, i); int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (doResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) { + if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) { return -1; } @@ -220,7 +279,7 @@ int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream return 0; } -static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { +static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq)); if (pReq == NULL) { mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq), @@ -233,49 +292,122 @@ static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pT pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - SEpSet epset = {0}; - mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps); + SEpSet epset = {0}; bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || !hasEpset) { terrno = code; taosMemoryFree(pReq); - return -1; + return code; } - // no valid epset, return directly without redoAction - if (!hasEpset) { - taosMemoryFree(pReq); - return TSDB_CODE_SUCCESS; - } - - STransAction action = {0}; - initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); - if (mndTransAppendRedoAction(pTrans, &action) != 0) { + mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps); + code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0); + if (code != 0) { taosMemoryFree(pReq); return -1; } return 0; } -int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { - SArray *tasks = pStream->tasks; +int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + SStreamTaskIter *pIter = createTaskIter(pStream); - int32_t size = taosArrayGetSize(tasks); - for (int32_t i = 0; i < size; i++) { - SArray *pTasks = taosArrayGetP(tasks, i); - int32_t sz = taosArrayGetSize(pTasks); - for (int32_t j = 0; j < sz; j++) { - SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (doPauseStreamTask(pMnode, pTrans, pTask) < 0) { - return -1; - } - - if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { - atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); - } + while (taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); + if (doSetPauseAction(pMnode, pTrans, pTask) < 0) { + destroyTaskIter(pIter); + return -1; } + + if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) { + atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE); + } + } + + destroyTaskIter(pIter); + return 0; +} + +static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { + SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = htonl(pTask->info.nodeId); + pReq->taskId = pTask->id.taskId; + pReq->streamId = pTask->id.streamId; + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); + if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction + terrno = code; + return -1; + } + + // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + if (code != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + SStreamTaskIter *pIter = createTaskIter(pStream); + + while(taskIterNextTask(pIter)) { + SStreamTask *pTask = taskIterGetCurrent(pIter); + if (doSetDropAction(pMnode, pTrans, pTask) < 0) { + destroyTaskIter(pIter); + return -1; + } + } + destroyTaskIter(pIter); + return 0; +} + +static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) { + SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = htonl(pTask->nodeId); + pReq->taskId = pTask->taskId; + pReq->streamId = pTask->streamId; + + SEpSet epset = {0}; + bool hasEpset = false; + int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId); + if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction + terrno = code; + taosMemoryFree(pReq); + return -1; + } + + // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. + code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0); + if (code != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) { + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + SOrphanTask* pTask = taosArrayGet(pList, i); + mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId); + doSetDropActionFromId(pMnode, pTrans, pTask); } return 0; } \ No newline at end of file diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 4284b0838e..f173c327c7 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -171,10 +171,6 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessRetrieveReq(pSnode->pMeta, pMsg); case TDMT_STREAM_RETRIEVE_RSP: // 1036 break; - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return tqStreamTaskProcessScanHistoryFinishReq(pSnode->pMeta, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return tqStreamTaskProcessScanHistoryFinishRsp(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK: return tqStreamTaskProcessCheckReq(pSnode->pMeta, pMsg); case TDMT_VND_STREAM_TASK_CHECK_RSP: diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 38c3441d43..23f79158c3 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -267,8 +267,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2e947e4a4c..a689932754 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1043,15 +1043,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return code; } -// only the agg tasks and the sink tasks will receive this message from upstream tasks -int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessScanHistoryFinishReq(pTq->pStreamMeta, pMsg); -} - -int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessScanHistoryFinishRsp(pTq->pStreamMeta, pMsg); -} - int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTaskRunReq* pReq = pMsg->pCont; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index cdb5cc26f8..280c110711 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); -static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); +static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. @@ -300,21 +300,21 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { return true; } -bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { +bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { const char* id = pTask->id.idStr; int32_t numOfNewItems = 0; - while(1) { + while (1) { if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { *numOfItems += numOfNewItems; return numOfNewItems > 0; } SStreamQueueItem* pItem = NULL; - int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); + int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer); + bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer); if (itemInFillhistory) { numOfNewItems += 1; } @@ -334,7 +334,9 @@ bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* num break; } } else { - tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); + walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); + tqError("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, + pTask->chkInfo.nextProcessVer); break; } } @@ -399,7 +401,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems); + bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems); taosThreadMutexUnlock(&pTask->lock); if ((numOfItems > 0) || hasNewData) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index ac1818f877..21bc09eba0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -328,74 +328,6 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return 0; } -int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamScanHistoryFinishReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeStreamScanHistoryFinishReq(&decoder, &req); - tDecoderClear(&decoder); - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pMeta->vgId, - req.downstreamTaskId); - return -1; - } - - tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId); - - int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info); - streamMetaReleaseTask(pMeta, pTask); - return code; -} - -int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - int32_t code = TSDB_CODE_SUCCESS; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - - // deserialize - SStreamCompleteHistoryMsg req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); - tDecodeCompleteHistoryDataMsg(&decoder, &req); - tDecoderClear(&decoder); - - if (pMeta->role == NODE_ROLE_FOLLOWER) { - tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower", - req.upstreamTaskId, pMeta->vgId); - return TASK_DOWNSTREAM_NOT_LEADER; - } - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); - if (pTask == NULL) { - tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId, - req.upstreamTaskId); - return -1; - } - - int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); - if (remain > 0) { - tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d", - pTask->id.idStr, req.downstreamId, remain); - } else { - tqDebug( - "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " - "completed msg", - pTask->id.idStr, req.downstreamId); - code = streamProcessScanHistoryFinishRsp(pTask); - } - - streamMetaReleaseTask(pMeta, pTask); - return code; -} - int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7e470bafbe..3ec6adee41 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -789,10 +789,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: - return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: - return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY: return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); default: diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 365acf2bff..436d1cefb8 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -204,6 +204,10 @@ void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), vo */ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple); +/** + * @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param +*/ +void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index e872c87237..3472f4e14d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1027,57 +1027,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { return 0; } -int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - SOperatorInfo* pOperator = pTaskInfo->pRoot; - - while (1) { - int32_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { - SStreamIntervalOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("reset stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("reset stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("reset stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - - } else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) { - SStreamEventAggOperatorInfo* pInfo = pOperator->info; - STimeWindowAggSupp* pSup = &pInfo->twAggSup; - - pSup->calTriggerSaved = 0; - pSup->deleteMarkSaved = 0; - qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark); - } - - // iterate operator tree - if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { - return 0; - } else { - pOperator = pOperator->pDownstream[0]; - } - } -} - int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; const char* id = GET_TASKID(pTaskInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9243f385d0..ec1d6b9f75 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3311,26 +3311,16 @@ _error: return NULL; } -static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) { - int64_t nRows = 0; - void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid)); - if (pNum == NULL) { - nRows = pBlock->info.rows; - tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows)); - } else { - *(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows; - nRows = *(int64_t*)pNum; - } - - if (nRows >= pInfo->mergeLimit) { - if (pInfo->mSkipTables == NULL) { +static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) { + STableMergeScanInfo* pInfo = pTableMergeScanInfo; + if (pInfo->mSkipTables == NULL) { pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - } - int bSkip = 1; - taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip)); } - return TSDB_CODE_SUCCESS; + int bSkip = 1; + if (pInfo->mSkipTables != NULL) { + taosHashPut(pInfo->mSkipTables, &uid, sizeof(uid), &bSkip, sizeof(bSkip)); + } } static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) { @@ -3446,10 +3436,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) { } pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid); - if (pInfo->mergeLimit != -1) { - tableMergeScanDoSkipTable(pInfo, pBlock); - } - pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; return pBlock; @@ -3516,6 +3502,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit); + tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo); tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo); tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL); @@ -3647,7 +3634,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* terrno = TSDB_CODE_TSC_QUERY_CANCELLED; T_LONG_JMP(pOperator->pTaskInfo->env, terrno); } - + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, pInfo->limitInfo.numOfOutputRows); @@ -3743,8 +3730,6 @@ void destroyTableMergeScanOperatorInfo(void* param) { taosArrayDestroy(pTableScanInfo->sortSourceParams); tsortDestroySortHandle(pTableScanInfo->pSortHandle); pTableScanInfo->pSortHandle = NULL; - tSimpleHashCleanup(pTableScanInfo->mTableNumRows); - pTableScanInfo->mTableNumRows = NULL; taosHashCleanup(pTableScanInfo->mSkipTables); pTableScanInfo->mSkipTables = NULL; destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); @@ -3836,8 +3821,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo); - pInfo->mTableNumRows = tSimpleHashInit(1024, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); + pInfo->mergeLimit = -1; bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1; if (hasLimit) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index ee1d831a24..db9266cb8f 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -75,6 +75,9 @@ struct SSortHandle { bool (*abortCheckFn)(void* param); void* abortCheckParam; + + void (*mergeLimitReachedFn)(uint64_t tableUid, void* param); + void* mergeLimitReachedParam; }; void tsortSetSingleTableMerge(SSortHandle* pHandle) { @@ -885,7 +888,7 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk, int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t); ASSERT(size <= getBufPageSize(pHandle->pBuf)); - + blockDataToBuf(pPage, blk); setBufPageDirty(pPage, true); @@ -1040,6 +1043,39 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO return 0; } +static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) { + int64_t nRows = 0; + int64_t prevRows = 0; + void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid)); + if (pNum == NULL) { + prevRows = 0; + nRows = pOrigBlk->info.rows; + tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows)); + } else { + prevRows = *(int64_t*)pNum; + *(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows; + nRows = *(int64_t*)pNum; + } + + int64_t keepRows = pOrigBlk->info.rows; + if (nRows >= pHandle->mergeLimit) { + if (pHandle->mergeLimitReachedFn) { + pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam); + } + keepRows = pHandle->mergeLimit - prevRows; + } + + SSDataBlock* pBlock = NULL; + if (keepRows != pOrigBlk->info.rows) { + pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows); + *pExtractedBlock = true; + } else { + *pExtractedBlock = false; + pBlock = pOrigBlk; + } + return pBlock; +} + static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0); size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource); @@ -1062,10 +1098,18 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { pHandle->currMergeLimitTs = INT64_MIN; } + SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES); SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT)); while (1) { SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param); + + int64_t p = taosGetTimestampUs(); + bool bExtractedBlock = false; + if (pBlk != NULL && pHandle->mergeLimit > 0) { + pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock); + } + if (pBlk != NULL) { SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId); int64_t firstRowTs = *(int64_t*)tsCol->pData; @@ -1074,6 +1118,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { continue; } } + if (pBlk != NULL) { szSort += blockDataGetSize(pBlk); @@ -1081,8 +1126,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if (ppBlk != NULL) { SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk); blockDataMerge(tBlk, pBlk); + if (bExtractedBlock) { + blockDataDestroy(pBlk); + } } else { - SSDataBlock* tBlk = createOneDataBlock(pBlk, true); + SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true); tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES); taosArrayPush(aBlkSort, &tBlk); } @@ -1091,7 +1139,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) { tSimpleHashClear(mUidBlk); - int64_t p = taosGetTimestampUs(); code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc); if (code != TSDB_CODE_SUCCESS) { tSimpleHashCleanup(mUidBlk); @@ -1131,7 +1178,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); } taosArrayDestroy(aExtSrc); - + tSimpleHashCleanup(mTableNumRows); pHandle->type = SORT_SINGLESOURCE_SORT; return TSDB_CODE_SUCCESS; } @@ -1610,3 +1657,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke } return ret; } + +void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param), void* param) { + pHandle->mergeLimitReachedFn = mergeLimitReachedCb; + pHandle->mergeLimitReachedParam = param; +} diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index bd5bddcece..0534f2c30c 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -123,8 +123,6 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t STaskId streamTaskGetTaskId(const SStreamTask* pTask); void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo); void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, - int32_t* pLen); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); void streamClearChkptReadyMsg(SStreamTask* pTask); @@ -134,10 +132,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); - -int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); -int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); -int32_t streamTransferStateToStreamTask(SStreamTask* pTask); +int32_t streamTransferStateToStreamTask(SStreamTask* pTask); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b51845d152..6b7c0fc69a 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -34,9 +34,6 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq); static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int64_t groupId); -static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet); - static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type); @@ -676,41 +673,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { - SStreamScanHistoryFinishReq req = { - .streamId = pTask->id.streamId, - .childId = pTask->info.selfChildId, - .upstreamTaskId = pTask->id.taskId, - .upstreamNodeId = pTask->pMeta->vgId, - }; - - // serialize - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - pTask->notReadyTasks = 1; - doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, - &pTask->outputInfo.fixedDispatcher.epSet); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t numOfVgs = taosArrayGetSize(vgInfo); - pTask->notReadyTasks = numOfVgs; - - SStreamTaskState* pState = streamTaskGetStatus(pTask); - stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", - pTask->id.idStr, numOfVgs, pState->name); - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.downstreamTaskId = pVgInfo->taskId; - doDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - } - } else { - stDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr); - streamProcessScanHistoryFinishRsp(pTask); - } - - return 0; -} - // this function is usually invoked by sink/agg task int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pReadyMsgList); @@ -782,48 +744,6 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch return 0; } -int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet) { - void* buf = NULL; - int32_t code = -1; - SRpcMsg msg = {0}; - - int32_t tlen; - tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code); - if (code < 0) { - return -1; - } - - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - ((SMsgHead*)buf)->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) { - if (buf) { - rpcFreeCont(buf); - } - return code; - } - - tEncoderClear(&encoder); - - initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); - - tmsgSendReq(pEpSet, &msg); - - SStreamTaskState* pState = streamTaskGetStatus(pTask); - stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pState->name, - pReq->downstreamTaskId, vgId); - return 0; -} - int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; @@ -989,109 +909,6 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) { taosArrayClear(pTask->pReadyMsgList); } -int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - -int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, - int32_t* pLen) { - int32_t len = 0; - int32_t code = 0; - SEncoder encoder; - - SStreamCompleteHistoryMsg msg = { - .streamId = pReq->streamId, - .upstreamTaskId = pReq->upstreamTaskId, - .upstreamNodeId = pReq->upstreamNodeId, - .downstreamId = pReq->downstreamTaskId, - .downstreamNode = pTask->pMeta->vgId, - }; - - tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code); - if (code < 0) { - return code; - } - - void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len); - if (pBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - ((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId); - - void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); - - tEncoderInit(&encoder, (uint8_t*)abuf, len); - tEncodeCompleteHistoryDataMsg(&encoder, &msg); - tEncoderClear(&encoder); - - *pBuffer = pBuf; - *pLen = len; - return 0; -} - -int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) { - void* pBuf = NULL; - int32_t len = 0; - - streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len); - SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); - - SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet}; - initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); - info.msg.info = *pRpcInfo; - - taosThreadMutexLock(&pTask->lock); - - if (pTask->pRspMsgList == NULL) { - pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); - } - taosArrayPush(pTask->pRspMsgList, &info); - taosThreadMutexUnlock(&pTask->lock); - - int32_t num = taosArrayGetSize(pTask->pRspMsgList); - stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId, - num); - return TSDB_CODE_SUCCESS; -} - -int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK); - - const char* id = pTask->id.idStr; - int32_t level = pTask->info.taskLevel; - - int32_t num = taosArrayGetSize(pTask->pRspMsgList); - for (int32_t i = 0; i < num; ++i) { - SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); - tmsgSendRsp(&pInfo->msg); - - stDebug("s-task:%s level:%d notify upstream:0x%x continuing handle data in WAL", id, level, pInfo->taskId); - } - - taosArrayClear(pTask->pRspMsgList); - stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", id, level, num); - return 0; -} - // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 140a22ee73..20fdcff7d9 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -592,108 +592,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; - if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) { - return -1; - } - - if (qStreamRecoverFinish(exec) < 0) { - return -1; - } - return 0; -} - -int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, - SRpcHandleInfo* pRpcInfo) { - int32_t taskLevel = pTask->info.taskLevel; - ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); - - const char* id = pTask->id.idStr; - SStreamTaskState* p = streamTaskGetStatus(pTask); - - if (p->state != TASK_STATUS__SCAN_HISTORY) { - stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, - p->name, pReq->upstreamTaskId); - - void* pBuf = NULL; - int32_t len = 0; - streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len); - - SRpcMsg msg = {.info = *pRpcInfo}; - initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); - - tmsgSendRsp(&msg); - stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, taskLevel, - pReq->upstreamTaskId, pReq->upstreamNodeId); - return 0; - } - - // sink tasks do not send end of scan history msg to its upstream, which is agg task. - streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq); - - int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); - ASSERT(left >= 0); - - if (left == 0) { - int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList); - if (taskLevel == TASK_LEVEL__AGG) { - stDebug( - "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data processing " - "and send rsp to all upstream tasks", - id, numOfTasks); - streamAggUpstreamScanHistoryFinish(pTask); - } else { - stDebug("s-task:%s all %d upstream task(s) finish scan-history data, and rsp to all upstream tasks", id, - numOfTasks); - } - - // all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data - // from the WAL files, which contains the real time stream data. - streamNotifyUpstreamContinue(pTask); - - // mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks. - if (taskLevel == TASK_LEVEL__AGG) { - /*int32_t code = */ streamTaskScanHistoryDataComplete(pTask); - } else { // for sink task, set normal - streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - } - } else { - stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", id, - pReq->upstreamTaskId, pReq->childId, left); - } - - return 0; -} - -int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) { - ETaskStatus status = streamTaskGetStatus(pTask)->state; - - // task restart now, not handle the scan-history finish rsp - if (status == TASK_STATUS__UNINIT) { - return TSDB_CODE_INVALID_MSG; - } - - ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/); - SStreamMeta* pMeta = pTask->pMeta; - - // execute in the scan history complete call back msg, ready to process data from inputQ - int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE); - streamTaskSetSchedStatusInactive(pTask); - - streamMetaWLock(pMeta); - streamMetaSaveTask(pMeta, pTask); - streamMetaCommit(pMeta); - streamMetaWUnLock(pMeta); - - // for source tasks, let's continue execute. - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - streamSchedExec(pTask); - } - - return TSDB_CODE_SUCCESS; -} - static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) { SDataRange* pRange = &pHTask->dataRange; @@ -946,29 +844,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } } -int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__DROPPING) { - return 0; - } - - // restore param - int32_t code = 0; - if (pTask->info.fillHistory) { - code = streamRestoreParam(pTask); - if (code < 0) { - return -1; - } - } - - // dispatch scan-history finish req to all related downstream task - code = streamDispatchScanHistoryFinishMsg(pTask); - if (code < 0) { - return -1; - } - - return 0; -} - int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamInfoResetTimewindowFilter(exec); @@ -1072,28 +947,6 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint return 0; } -int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) { - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; - tEndDecode(pDecoder); - return 0; -} - void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 11f8df4c93..0a7842194e 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -994,6 +994,12 @@ void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *len) { } void taosCacheDestroyIter(SCacheIter *pIter) { + for (int32_t i = 0; i < pIter->numOfObj; ++i) { + if (!pIter->pCurrent[i]) continue; + char *p = pIter->pCurrent[i]->data; + taosCacheRelease(pIter->pCacheObj, (void **)&p, false); + pIter->pCurrent[i] = NULL; + } taosMemoryFreeClear(pIter->pCurrent); taosMemoryFreeClear(pIter); } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index bd6c37a7b5..505ce61eca 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -573,6 +573,9 @@ void taosPrintSlowLog(const char *format, ...) { len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer); va_end(argpointer); + if (len < 0 || len > LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2) { + len = LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2; + } buffer[len++] = '\n'; buffer[len] = 0; diff --git a/tests/army/community/cluster/splitVgroupByLearner.json b/tests/army/community/cluster/splitVgroupByLearner.json new file mode 100644 index 0000000000..d02cf50fda --- /dev/null +++ b/tests/army/community/cluster/splitVgroupByLearner.json @@ -0,0 +1,62 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "connection_pool_size": 8, + "num_of_records_per_req": 3000, + "prepared_rand": 3000, + "thread_count": 2, + "create_table_thread_count": 1, + "confirm_parameter_prompt": "no", + "continue_if_fail": "yes", + "databases": [ + { + "dbinfo": { + "name": "db", + "drop": "yes", + "vgroups": 2, + "replica": 3, + "duration":"1d", + "wal_retention_period": 1, + "wal_retention_size": 1, + "keep": "3d,6d,30d" + }, + "super_tables": [ + { + "name": "stb", + "child_table_exists": "no", + "childtable_count": 10, + "insert_rows": 100000000, + "childtable_prefix": "d", + "insert_mode": "taosc", + "timestamp_step": 10000, + "start_timestamp":"now-12d", + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc" }, + { "type": "double", "name": "dc"}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si" }, + { "type": "int", "name": "ic" }, + { "type": "bigint", "name": "bi" }, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi"}, + { "type": "uint", "name": "ui" }, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 32} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"name": "location","type": "binary", "len": 16, "values": + ["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} diff --git a/tests/army/community/cluster/splitVgroupByLearner.py b/tests/army/community/cluster/splitVgroupByLearner.py new file mode 100644 index 0000000000..5f75db2db5 --- /dev/null +++ b/tests/army/community/cluster/splitVgroupByLearner.py @@ -0,0 +1,133 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import random + +import taos +import frame +import frame.etool +import json +import threading + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * +from frame.autogen import * +from frame.srvCtl import * + + +class TDTestCase(TBase): + + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to init {__file__}") + self.replicaVar = int(replicaVar) + tdSql.init(conn.cursor(), logSql) # output sql.txt file + self.configJsonFile('splitVgroupByLearner.json', 'db', 1, 1, 'splitVgroupByLearner.json', 100000) + + def configJsonFile(self, fileName, dbName, vgroups, replica, newFileName='', insert_rows=100000, + timestamp_step=10000): + tdLog.debug(f"configJsonFile {fileName}") + filePath = etool.curFile(__file__, fileName) + with open(filePath, 'r') as f: + data = json.load(f) + + if len(newFileName) == 0: + newFileName = fileName + + data['databases'][0]['dbinfo']['name'] = dbName + data['databases'][0]['dbinfo']['vgroups'] = vgroups + data['databases'][0]['dbinfo']['replica'] = replica + data['databases'][0]['super_tables'][0]['insert_rows'] = insert_rows + data['databases'][0]['super_tables'][0]['timestamp_step'] = timestamp_step + json_data = json.dumps(data) + filePath = etool.curFile(__file__, newFileName) + with open(filePath, "w") as file: + file.write(json_data) + + tdLog.debug(f"configJsonFile {json_data}") + + def splitVgroupThread(self, configFile, event): + # self.insertData(configFile) + event.wait() + time.sleep(5) + tdLog.debug("splitVgroupThread start") + tdSql.execute('ALTER DATABASE db REPLICA 3') + time.sleep(5) + tdSql.execute('use db') + rowLen = tdSql.query('show vgroups') + if rowLen > 0: + vgroupId = tdSql.getData(0, 0) + tdLog.debug(f"splitVgroupThread vgroupId:{vgroupId}") + tdSql.execute(f"split vgroup {vgroupId}") + else: + tdLog.exit("get vgroupId fail!") + # self.configJsonFile(configFile, 'db1', 1, 1, configFile, 100000000) + # self.insertData(configFile) + + def dnodeNodeStopThread(self, event): + event.wait() + tdLog.debug("dnodeNodeStopThread start") + time.sleep(10) + on = 2 + for i in range(5): + if i % 2 == 0: + on = 2 + else: + on = 3 + sc.dnodeStop(on) + time.sleep(5) + sc.dnodeStart(on) + time.sleep(5) + + def dbInsertThread(self, configFile, event): + tdLog.debug(f"dbInsertThread start {configFile}") + self.insertData(configFile) + + event.set() + tdLog.debug(f"dbInsertThread first end {event}") + self.configJsonFile(configFile, 'db', 2, 3, configFile, 100000) + self.insertData(configFile) + + def insertData(self, configFile): + tdLog.info(f"insert data.") + # taosBenchmark run + jfile = etool.curFile(__file__, configFile) + etool.benchMark(json=jfile) + + # run + def run(self): + tdLog.debug(f"start to excute {__file__}") + event = threading.Event() + t1 = threading.Thread(target=self.splitVgroupThread, args=('splitVgroupByLearner.json', event)) + t2 = threading.Thread(target=self.dbInsertThread, args=('splitVgroupByLearner.json', event)) + t3 = threading.Thread(target=self.dnodeNodeStopThread, args=(event)) + t1.start() + t2.start() + t3.start() + tdLog.debug("threading started!!!!!") + t1.join() + t2.join() + t3.join() + tdLog.success(f"{__file__} successfully executed") + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/army/community/cmdline/fullopt.py b/tests/army/community/cmdline/fullopt.py index 39d1d581ed..d1d4421018 100644 --- a/tests/army/community/cmdline/fullopt.py +++ b/tests/army/community/cmdline/fullopt.py @@ -91,8 +91,7 @@ class TDTestCase(TBase): # -C etool.exeBinFile("taosd", "-C") # -k - rets = etool.runBinFile("taosd", "-C") - self.checkListNotEmpty(rets) + etool.exeBinFile("taosd", "-k", False) # -V rets = etool.runBinFile("taosd", "-V") self.checkListNotEmpty(rets) diff --git a/tests/army/community/query/query_basic.py b/tests/army/community/query/query_basic.py index 588ac707eb..bfe88e483e 100644 --- a/tests/army/community/query/query_basic.py +++ b/tests/army/community/query/query_basic.py @@ -375,6 +375,8 @@ class TDTestCase(TBase): sql = f"select stateduration(9.9,'{ops[i]}',11.1,1s);" #tdSql.checkFirstValue(sql, vals[i]) bug need fix tdSql.execute(sql) + sql = "select statecount(9,'EQAAAA',10);" + tdSql.error(sql) # histogram check crash sqls = [ @@ -396,6 +398,26 @@ class TDTestCase(TBase): sql = "select first(100-90-1),last(2*5),first(11.1),last(22.2)" tdSql.checkDataMem(sql, [[9, 10, 11.1, 22.2]]) + # sample + sql = "select sample(6, 1);" + tdSql.checkFirstValue(sql, 6) + + # spread + sql = "select spread(12);" + tdSql.checkFirstValue(sql, 0) + + # percentile + sql = "select percentile(10.1,100);" + tdSql.checkFirstValue(sql, 10.1) + sql = "select percentile(10, 0);" + tdSql.checkFirstValue(sql, 10) + sql = "select percentile(100, 60, 70, 80);" + tdSql.execute(sql) + + # apercentile + sql = "select apercentile(10.1,100);" + tdSql.checkFirstValue(sql, 10.1) + # run def run(self): tdLog.debug(f"start to excute {__file__}") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 0dfbdf3038..9bc2827157 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -21,7 +21,7 @@ fi ,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3 - +,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3 ,,n,army,python3 ./test.py -f community/cmdline/fullopt.py @@ -292,7 +292,7 @@ fi ,,n,system-test,python3 ./test.py -f 0-others/timeRangeWise.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/delete_check.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_hot_refresh_configurations.py -,,n,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/subscribe_stream_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/insert_double.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py diff --git a/tests/parallel_test/run_case.sh b/tests/parallel_test/run_case.sh index 7c80ecdbb7..7429c9976b 100755 --- a/tests/parallel_test/run_case.sh +++ b/tests/parallel_test/run_case.sh @@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so #define taospy 2.7.10 pip3 list|grep taospy pip3 uninstall taospy -y -pip3 install --default-timeout=120 taospy==2.7.12 +pip3 install --default-timeout=120 taospy==2.7.13 #define taos-ws-py 0.3.1 pip3 list|grep taos-ws-py diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index c4885747d1..61cb770a10 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -139,7 +139,7 @@ class TDCom: self.stream_suffix = "_stream" self.range_count = 5 self.default_interval = 5 - self.stream_timeout = 12 + self.stream_timeout = 60 self.create_stream_sleep = 0.5 self.record_history_ts = str() self.precision = "ms" @@ -1688,8 +1688,8 @@ class TDCom: res1 = self.round_handle(res1) res2 = self.round_handle(res2) if latency < self.stream_timeout: - latency += 0.2 - time.sleep(0.2) + latency += 0.5 + time.sleep(0.5) else: if latency == 0: return False diff --git a/tests/script/coverage_test.sh b/tests/script/coverage_test.sh index c5e0c31f83..d8f1999b26 100644 --- a/tests/script/coverage_test.sh +++ b/tests/script/coverage_test.sh @@ -219,7 +219,7 @@ function lcovFunc { # generate result echo "generate result" - lcov -l --branch-coverage --function-coverage coverage.info | tee -a $TDENGINE_COVERAGE_REPORT + lcov -l coverage.info --branch-coverage --function-coverage | tee -a $TDENGINE_COVERAGE_REPORT sed -i 's/\/root\/TDengine\/sql.c/\/root\/TDengine\/source\/libs\/parser\/inc\/sql.c/g' coverage.info sed -i 's/\/root\/TDengine\/sql.y/\/root\/TDengine\/source\/libs\/parser\/inc\/sql.y/g' coverage.info @@ -289,4 +289,4 @@ lcovFunc stopTaosd date >> $WORK_DIR/cron.log -echo "End of Coverage Test" | tee -a $WORK_DIR/cron.log \ No newline at end of file +echo "End of Coverage Test" | tee -a $WORK_DIR/cron.log diff --git a/tests/script/tsim/parser/limit_stb.sim b/tests/script/tsim/parser/limit_stb.sim index 7d6aff3b51..2e8f029260 100644 --- a/tests/script/tsim/parser/limit_stb.sim +++ b/tests/script/tsim/parser/limit_stb.sim @@ -129,6 +129,7 @@ endi $offset = $tbNum * $rowNum $offset = $offset - 1 +print select * from $stb order by ts limit 2 offset $offset sql select * from $stb order by ts limit 2 offset $offset if $rows != 1 then return -1