Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/TD-22023

This commit is contained in:
54liuyao 2024-01-29 16:13:39 +08:00
commit d7fb8db086
41 changed files with 779 additions and 955 deletions

View File

@ -306,7 +306,7 @@ def pre_test_build_win() {
cd %WIN_CONNECTOR_ROOT%
python.exe -m pip install --upgrade pip
python -m pip uninstall taospy -y
python -m pip install taospy==2.7.12
python -m pip install taospy==2.7.13
python -m pip uninstall taos-ws-py -y
python -m pip install taos-ws-py==0.3.1
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32

View File

@ -3334,7 +3334,7 @@ typedef struct {
SMsgHead head;
int64_t streamId;
int32_t taskId;
} SVPauseStreamTaskReq, SVResetStreamTaskReq, SVDropHTaskReq;
} SVPauseStreamTaskReq, SVResetStreamTaskReq;
typedef struct {
int8_t reserved;

View File

@ -343,7 +343,6 @@
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) //7 << 8
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)

View File

@ -23,8 +23,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);

View File

@ -210,7 +210,6 @@ void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);

View File

@ -628,17 +628,6 @@ typedef struct {
int8_t igUntreated;
} SStreamScanHistoryReq;
typedef struct {
int64_t streamId;
int32_t upstreamTaskId;
int32_t downstreamTaskId;
int32_t upstreamNodeId;
int32_t childId;
} SStreamScanHistoryFinishReq;
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq);
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq);
// mndTrigger: denote if this checkpoint is triggered by mnode or as requested from tasks when transfer-state finished
typedef struct {
int64_t streamId;
@ -713,17 +702,6 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
void streamMetaClearHbMsg(SStreamHbMsg* pMsg);
typedef struct {
int64_t streamId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t downstreamId;
int32_t downstreamNode;
} SStreamCompleteHistoryMsg;
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq);
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq);
typedef struct SNodeUpdateInfo {
int32_t nodeId;
SEpSet prevEp;
@ -820,7 +798,6 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
void streamTaskRestoreStatus(SStreamTask* pTask);
@ -829,7 +806,6 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq*
SRpcHandleInfo* pRpcInfo, int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated);
int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration);
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer);
@ -859,11 +835,6 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
// agg level
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, SRpcHandleInfo* pInfo);
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask);
// stream task meta
void streamMetaInit();

View File

@ -406,10 +406,6 @@ int32_t stmtGetFromCache(STscStmt* pStmt) {
if (NULL == pStmt->sql.pTableCache || taosHashGetSize(pStmt->sql.pTableCache) <= 0) {
if (pStmt->bInfo.inExecCache) {
if (ASSERT(taosHashGetSize(pStmt->exec.pBlockHash) == 1)) {
tscError("stmtGetFromCache error");
return TSDB_CODE_TSC_STMT_CACHE_ERROR;
}
pStmt->bInfo.needParse = false;
tscDebug("reuse stmt block for tb %s in execBlock", pStmt->bInfo.tbFName);
return TSDB_CODE_SUCCESS;

View File

@ -24,6 +24,7 @@ static int32_t (*tColDataAppendValueImpl[8][3])(SColData *pColData, uint8_t *pDa
static int32_t (*tColDataUpdateValueImpl[8][3])(SColData *pColData, uint8_t *pData, uint32_t nData, bool forward);
// SBuffer ================================
#ifdef BUILD_NO_CALL
void tBufferDestroy(SBuffer *pBuffer) {
tFree(pBuffer->pBuf);
pBuffer->pBuf = NULL;
@ -55,7 +56,7 @@ int32_t tBufferReserve(SBuffer *pBuffer, int64_t nData, void **ppData) {
return code;
}
#endif
// ================================
static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson);
@ -1148,6 +1149,7 @@ static int tTagValJsonCmprFn(const void *p1, const void *p2) {
return strcmp(((STagVal *)p1)[0].pKey, ((STagVal *)p2)[0].pKey);
}
#ifdef TD_DEBUG_PRINT_TAG
static void debugPrintTagVal(int8_t type, const void *val, int32_t vlen, const char *tag, int32_t ln) {
switch (type) {
case TSDB_DATA_TYPE_VARBINARY:
@ -1239,6 +1241,7 @@ void debugPrintSTag(STag *pTag, const char *tag, int32_t ln) {
}
printf("\n");
}
#endif
static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) {
int32_t n = 0;
@ -2576,6 +2579,7 @@ _exit:
return code;
}
#ifdef BUILD_NO_CALL
static int32_t tColDataSwapValue(SColData *pColData, int32_t i, int32_t j) {
int32_t code = 0;
@ -2658,6 +2662,7 @@ static void tColDataSwap(SColData *pColData, int32_t i, int32_t j) {
break;
}
}
#endif
static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SColData *pToColData, int32_t iToRow) {
int32_t code = TSDB_CODE_SUCCESS;

View File

@ -86,8 +86,6 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;

View File

@ -828,8 +828,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -707,13 +707,6 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
void tFreeStreamObj(SStreamObj* pObj);
// typedef struct {
// char streamName[TSDB_STREAM_FNAME_LEN];
// int64_t uid;
// int64_t streamUid;
// SArray* childInfo; // SArray<SStreamChildEpInfo>
// } SStreamCheckpointObj;
#define VIEW_TYPE_UPDATABLE (1 << 0)
#define VIEW_TYPE_MATERIALIZED (1 << 1)

View File

@ -33,6 +33,11 @@ typedef struct SStreamTransInfo {
int32_t transId;
} SStreamTransInfo;
typedef struct SVgroupChangeInfo {
SHashObj *pDBMap;
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard
// to avoid too many checkpoints for a taskk in the waiting list
typedef struct SCheckpointCandEntry {
@ -64,12 +69,6 @@ typedef struct SNodeEntry {
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SFailedCheckpointInfo {
int64_t streamUid;
int64_t checkpointId;
int32_t transId;
} SFailedCheckpointInfo;
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
@ -92,27 +91,35 @@ int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId)
bool mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamUid);
typedef struct SOrphanTask {
int64_t streamId;
int32_t taskId;
int32_t nodeId;
} SOrphanTask;
// for sma
// TODO refactor
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams);
int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream);
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
void mndKillTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName);
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo);
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
int32_t mndProcessStreamHb(SRpcMsg *pReq);
void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
int32_t initStreamNodeList(SMnode *pMnode);
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj* pStream, int8_t igUntreated);
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray *pList);
#ifdef __cplusplus
}

View File

@ -865,7 +865,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
} else {
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
goto _OVER;
@ -917,7 +917,7 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p
SStreamObj *pStream = mndAcquireStream(pMnode, streamName);
if (pStream != NULL && pStream->smaId == pSma->uid) {
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
mndReleaseStream(pMnode, pStream);
goto _OVER;

View File

@ -29,10 +29,9 @@
#define MND_STREAM_MAX_NUM 60
typedef struct SVgroupChangeInfo {
SHashObj *pDBMap;
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
typedef struct SMStreamNodeCheckMsg {
int8_t placeHolder; // // to fix windows compile error, define place holder
} SMStreamNodeCheckMsg;
static int32_t mndNodeCheckSentinel = 0;
SStreamExecInfo execInfo;
@ -60,14 +59,11 @@ static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static void freeCheckpointCandEntry(void *);
static void freeTaskList(void *param);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static void freeCheckpointCandEntry(void *);
static void freeTaskList(void *param);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream);
@ -470,10 +466,8 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, SStreamTask *pTask) {
tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
STransAction action = {0};
action.mTraceId = pTrans->mTraceId;
initTransAction(&action, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
int32_t code = setTransAction(pTrans, buf, tlen, TDMT_STREAM_TASK_DEPLOY, &pTask->info.epSet, 0);
if (code != 0) {
taosMemoryFree(buf);
return -1;
}
@ -614,59 +608,6 @@ _OVER:
return -1;
}
static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
STransAction action = {0};
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
// no valid epset, return directly without redoAction
if (!hasEpset) {
return TSDB_CODE_SUCCESS;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
initTransAction(&action, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t lv = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < lv; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (mndPersistTaskDropReq(pMnode, pTrans, pTask) < 0) {
return -1;
}
}
}
return 0;
}
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
@ -752,17 +693,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_CREATE_NAME);
STrans *pTrans = doCreateTrans(pMnode, &streamObj, pReq, MND_STREAM_CREATE_NAME, "create stream tasks on dnodes");
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans);
goto _OVER;
}
@ -808,6 +740,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans);
taosThreadMutexLock(&execInfo.lock);
mDebug("stream tasks register into node list");
saveStreamTasksInfo(&streamObj, &execInfo);
taosThreadMutexUnlock(&execInfo.lock);
@ -940,20 +873,14 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, MND_STREAM_CHECKPOINT_NAME);
STrans *pTrans = doCreateTrans(pMnode, pStream, NULL, MND_STREAM_CHECKPOINT_NAME, "gen checkpoint for stream");
if (pTrans == NULL) {
return -1;
}
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
goto _ERR;
}
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
mDebug("start to trigger checkpoint for stream:%s, checkpoint: %" PRId64 "", pStream->name, checkpointId);
taosWLockLatch(&pStream->lock);
@ -970,27 +897,26 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
void *buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
mndReleaseVgroup(pMnode, pVgObj);
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
STransAction act = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
SEpSet epset = {0};
bool hasEpset = false;
code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(buf);
taosWUnLockLatch(&pStream->lock);
goto _ERR;
}
initTransAction(&act, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (mndTransAppendRedoAction(pTrans, &act) != 0) {
code = setTransAction(pTrans, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
TSDB_CODE_SYN_PROPOSE_NOT_READY);
if (code != 0) {
taosMemoryFree(buf);
taosWUnLockLatch(&pStream->lock);
goto _ERR;
@ -1219,7 +1145,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME);
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_DROP_NAME, "drop stream");
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1227,20 +1153,10 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeMDropStreamReq(&dropReq);
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -1304,7 +1220,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
return -1;
} else {
#if 0
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
sdbCancelFetch(pSdb, pIter);
@ -1563,18 +1479,6 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
return TSDB_CODE_SUCCESS;
}
static int32_t getNumOfTasks(SArray *pTaskList) {
int32_t numOfLevels = taosArrayGetSize(pTaskList);
int32_t count = 0;
for (int32_t i = 0; i < numOfLevels; i++) {
SArray *pLevel = taosArrayGetP(pTaskList, i);
count += taosArrayGetSize(pLevel);
}
return count;
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -1590,7 +1494,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// lock
taosRLockLatch(&pStream->lock);
int32_t count = getNumOfTasks(pStream->tasks);
int32_t count = mndGetNumOfStreamTasks(pStream);
if (numOfRows + count > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + count);
}
@ -1623,21 +1527,6 @@ static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndPersistStreamLog(STrans *pTrans, SStreamObj *pStream, int8_t status) {
taosWLockLatch(&pStream->lock);
pStream->status = status;
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
taosWUnLockLatch(&pStream->lock);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
@ -1652,9 +1541,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
if (pStream == NULL) {
if (pauseReq.igNotExists) {
mInfo("stream:%s, not exist, if exist is set", pauseReq.name);
mInfo("stream:%s, not exist, not pause stream", pauseReq.name);
return 0;
} else {
mError("stream:%s not exist, failed to pause stream", pauseReq.name);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
@ -1683,26 +1573,17 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_PAUSE_NAME);
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_PAUSE_NAME, "pause the stream");
if (pTrans == NULL) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
// if nodeUpdate happened, not send pause trans
if (mndPauseStreamTasks(pMnode, pTrans, pStream) < 0) {
if (mndStreamSetPauseAction(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -1710,12 +1591,18 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
// pause stream
if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__PAUSE) < 0) {
taosWLockLatch(&pStream->lock);
pStream->status = STREAM_STATUS__PAUSE;
if (mndPersistTransLog(pStream, pTrans,SDB_STATUS_READY) < 0) {
taosWUnLockLatch(&pStream->lock);
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
taosWUnLockLatch(&pStream->lock);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1743,10 +1630,11 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
if (pStream == NULL) {
if (pauseReq.igNotExists) {
mInfo("stream:%s, not exist, if exist is set", pauseReq.name);
mInfo("stream:%s not exist, not resume stream", pauseReq.name);
sdbRelease(pMnode->pSdb, pStream);
return 0;
} else {
mError("stream:%s not exist, failed to resume stream", pauseReq.name);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
return -1;
}
@ -1769,26 +1657,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_RESUME_NAME);
STrans* pTrans = doCreateTrans(pMnode, pStream, pReq, MND_STREAM_RESUME_NAME, "resume the stream");
if (pTrans == NULL) {
mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
// resume all tasks
if (mndResumeStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
// set the resume action
if (mndStreamSetResumeAction(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
mError("stream:%s, failed to drop task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -1796,12 +1675,17 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
// resume stream
if (mndPersistStreamLog(pTrans, pStream, STREAM_STATUS__NORMAL) < 0) {
taosWLockLatch(&pStream->lock);
pStream->status = STREAM_STATUS__NORMAL;
if (mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY) < 0) {
taosWUnLockLatch(&pStream->lock);
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
taosWUnLockLatch(&pStream->lock);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1815,91 +1699,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t transId) {
pMsg->streamId = pId->streamId;
pMsg->taskId = pId->taskId;
pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}
// todo extract method: traverse stream tasks
// build trans to update the epset
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid);
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
void *pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
STransAction action = {0};
initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pBuf);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
}
taosWUnLockLatch(&pStream->lock);
return 0;
}
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
const SEp *p = GET_ACTIVE_EP(pCurrent);
@ -1997,7 +1796,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
return terrno;
}
mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid);
}
void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
@ -2011,7 +1810,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
pStream->name, pTrans->id);
int32_t code = createStreamUpdateTrans(pStream, pChangeInfo, pTrans);
int32_t code = mndStreamSetUpdateEpsetAction(pStream, pChangeInfo, pTrans);
// todo: not continue, drop all and retry again
if (code != TSDB_CODE_SUCCESS) {
@ -2171,26 +1970,6 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
return 0;
}
// kill all trans in the dst DB
static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("start to clear checkpoints in all Dbs");
void *pIter = NULL;
while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
char *pDb = (char *)pIter;
size_t len = 0;
void *pKey = taosHashGetKey(pDb, &len);
char *p = strndup(pKey, len);
mDebug("clear checkpoint trans in Db:%s", p);
doKillCheckpointTrans(pMnode, pKey, len);
taosMemoryFree(p);
}
mDebug("complete clear checkpoints in Dbs");
}
// this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t code = 0;
@ -2260,10 +2039,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
}
typedef struct SMStreamNodeCheckMsg {
int8_t placeHolder; // // to fix windows compile error, define place holder
} SMStreamNodeCheckMsg;
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
@ -2335,26 +2110,6 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
// data in the hash table will be removed automatically, no need to remove it here.
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
if (pTransInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
// not checkpoint trans, ignore
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
return TSDB_CODE_SUCCESS;
}
char *pDupDBName = strndup(pDBName, len);
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName);
taosMemoryFree(pDupDBName);
return TSDB_CODE_SUCCESS;
}
void freeCheckpointCandEntry(void *param) {
SCheckpointCandEntry *pEntry = param;
taosMemoryFreeClear(pEntry->pName);
@ -2402,8 +2157,15 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
if (pStream == NULL) {
mError("failed to find the stream:0x%"PRIx64" not handle the checkpoint req", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock);
return -1;
}
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
SArray **pReqTaskList = (SArray**)taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(int32_t));

View File

@ -16,6 +16,12 @@
#include "mndStream.h"
#include "mndTrans.h"
typedef struct SFailedCheckpointInfo {
int64_t streamUid;
int64_t checkpointId;
int32_t transId;
} SFailedCheckpointInfo;
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
@ -65,6 +71,8 @@ static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream)
return terrno;
}
/*int32_t code = */mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_RESET_NAME, pStream->uid);
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
@ -92,19 +100,13 @@ static int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream)
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
taosMemoryFree(pReq);
continue;
}
if (!hasEpset) {
taosMemoryFree(pReq);
continue;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
code = setTransAction(pTrans, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
taosWUnLockLatch(&pStream->lock);
mndTransDrop(pTrans);
@ -181,10 +183,51 @@ static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
return TSDB_CODE_SUCCESS;
}
static int32_t mndDropOrphanTasks(SMnode* pMnode, SArray* pList) {
SOrphanTask* pTask = taosArrayGet(pList, 0);
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false);
if (conflict) {
return -1;
}
SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""};
STrans* pTrans = doCreateTrans(pMnode, &dummyObj, NULL, MND_STREAM_DROP_NAME, "drop stream");
if (pTrans == NULL) {
mError("failed to create trans to drop orphan tasks since %s", terrstr());
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId);
// drop all tasks
if (mndStreamSetDropActionFromList(pMnode, pTrans, pList) < 0) {
mError("failed to create trans to drop orphan tasks since %s", terrstr());
mndTransDrop(pTrans);
return -1;
}
// drop stream
if (mndPersistTransLog(&dummyObj, pTrans, SDB_STATUS_DROPPED) < 0) {
mndTransDrop(pTrans);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
return 0;
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0};
SArray *pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
SArray *pFailedTasks = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
SArray *pOrphanTasks = taosArrayInit(3, sizeof(SOrphanTask));
SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
@ -202,8 +245,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
// extract stream task list
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
if (numOfExisted == 0) {
if (taosHashGetSize(execInfo.pTaskMap) == 0) {
doExtractTasksFromStream(pMnode);
}
@ -222,6 +264,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
if (pTaskEntry == NULL) {
mError("s-task:0x%" PRIx64 " not found in mnode task list", p->id.taskId);
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
taosArrayPush(pOrphanTasks, &oTask);
continue;
}
@ -244,15 +289,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
streamTaskStatusCopy(pTaskEntry, p);
if (p->checkpointId != 0) {
if (p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
if ((p->checkpointId != 0) && p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
SFailedCheckpointInfo info = {
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
addIntoCheckpointList(pList, &info);
}
SFailedCheckpointInfo info = {
.transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId};
addIntoCheckpointList(pFailedTasks, &info);
}
}
@ -270,15 +313,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (taosArrayGetSize(pList) > 0) {
if (taosArrayGetSize(pFailedTasks) > 0) {
bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i);
for(int32_t i = 0; i < taosArrayGetSize(pFailedTasks); ++i) {
SFailedCheckpointInfo *pInfo = taosArrayGet(pFailedTasks, i);
mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status",
pInfo->checkpointId, pInfo->transId);
@ -289,9 +332,16 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
}
// handle the orphan tasks that are invalid but not removed in some vnodes or snode due to some unknown errors.
if (taosArrayGetSize(pOrphanTasks) > 0) {
mndDropOrphanTasks(pMnode, pOrphanTasks);
}
taosThreadMutexUnlock(&execInfo.lock);
streamMetaClearHbMsg(&req);
taosArrayDestroy(pList);
taosArrayDestroy(pFailedTasks);
taosArrayDestroy(pOrphanTasks);
return TSDB_CODE_SUCCESS;
}

View File

@ -169,7 +169,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const
return NULL;
}
mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg);
mInfo("s-task:0x%" PRIx64 " start to build trans %s, transId:%d", pStream->uid, pMsg, pTrans->id);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -255,11 +255,132 @@ int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans, int32_t status)
return 0;
}
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode) {
pAction->epSet = *pEpset;
pAction->contLen = contLen;
pAction->pCont = pCont;
pAction->msgType = msgType;
pAction->retryCode = retryCode;
int32_t setTransAction(STrans *pTrans, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode) {
STransAction action = {.epSet = *pEpset, .contLen = contLen, .pCont = pCont, .msgType = msgType, .retryCode = retryCode};
return mndTransAppendRedoAction(pTrans, &action);
}
int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
// data in the hash table will be removed automatically, no need to remove it here.
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
if (pTransInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
// not checkpoint trans, ignore
if (strcmp(pTransInfo->name, MND_STREAM_CHECKPOINT_NAME) != 0) {
mDebug("not checkpoint trans, not kill it, name:%s, transId:%d", pTransInfo->name, pTransInfo->transId);
return TSDB_CODE_SUCCESS;
}
char *pDupDBName = strndup(pDBName, len);
mndKillTransImpl(pMnode, pTransInfo->transId, pDupDBName);
taosMemoryFree(pDupDBName);
return TSDB_CODE_SUCCESS;
}
// kill all trans in the dst DB
void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("start to clear checkpoints in all Dbs");
void *pIter = NULL;
while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
char *pDb = (char *)pIter;
size_t len = 0;
void *pKey = taosHashGetKey(pDb, &len);
char *p = strndup(pKey, len);
mDebug("clear checkpoint trans in Db:%s", p);
doKillCheckpointTrans(pMnode, pKey, len);
taosMemoryFree(p);
}
mDebug("complete clear checkpoints in Dbs");
}
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t transId) {
pMsg->streamId = pId->streamId;
pMsg->taskId = pId->taskId;
pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
int32_t code = 0;
int32_t blen;
tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(req.pNodeList);
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(nodeId);
tEncoderClear(&encoder);
*pBuf = buf;
*pLen = tlen;
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}
// todo extract method: traverse stream tasks
// build trans to update the epset
int32_t mndStreamSetUpdateEpsetAction(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans) {
mDebug("stream:0x%" PRIx64 " set tasks epset update action", pStream->uid);
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
void *pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
int32_t code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet, 0);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pBuf);
taosWUnLockLatch(&pStream->lock);
return -1;
}
}
}
taosWUnLockLatch(&pStream->lock);
return 0;
}

View File

@ -18,6 +18,66 @@
#include "tmisce.h"
#include "mndVgroup.h"
typedef struct SStreamTaskIter {
SStreamObj *pStream;
int32_t level;
int32_t ordinalIndex;
int32_t totalLevel;
SStreamTask *pTask;
} SStreamTaskIter;
SStreamTaskIter* createTaskIter(SStreamObj* pStream) {
SStreamTaskIter* pIter = taosMemoryCalloc(1, sizeof(SStreamTaskIter));
if (pIter == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pIter->level = -1;
pIter->ordinalIndex = 0;
pIter->pStream = pStream;
pIter->totalLevel = taosArrayGetSize(pStream->tasks);
pIter->pTask = NULL;
return pIter;
}
bool taskIterNextTask(SStreamTaskIter* pIter) {
if (pIter->level >= pIter->totalLevel) {
pIter->pTask = NULL;
return false;
}
if (pIter->level == -1) {
pIter->level += 1;
}
while(pIter->level < pIter->totalLevel) {
SArray *pList = taosArrayGetP(pIter->pStream->tasks, pIter->level);
if (pIter->ordinalIndex >= taosArrayGetSize(pList)) {
pIter->level += 1;
pIter->ordinalIndex = 0;
pIter->pTask = NULL;
continue;
}
pIter->pTask = taosArrayGetP(pList, pIter->ordinalIndex);
pIter->ordinalIndex += 1;
return true;
}
pIter->pTask = NULL;
return false;
}
SStreamTask* taskIterGetCurrent(SStreamTaskIter* pIter) {
return pIter->pTask;
}
void destroyTaskIter(SStreamTaskIter* pIter) {
taosMemoryFree(pIter);
}
SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
@ -143,7 +203,7 @@ int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t
}
}
static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
@ -160,15 +220,14 @@ static int32_t doResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *p
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
terrno = code;
taosMemoryFree(pReq);
return -1;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
}
@ -201,14 +260,14 @@ int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
return num;
}
int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t mndStreamSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream, int8_t igUntreated) {
int32_t size = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doResumeStreamTask(pTrans, pMnode, pTask, igUntreated) < 0) {
if (doSetResumeAction(pTrans, pMnode, pTask, igUntreated) < 0) {
return -1;
}
@ -220,7 +279,7 @@ int32_t mndResumeStreamTasks(STrans *pTrans, SMnode *pMnode, SStreamObj *pStream
return 0;
}
static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
@ -233,49 +292,122 @@ static int32_t doPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *pT
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps);
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS || !hasEpset) {
terrno = code;
taosMemoryFree(pReq);
return -1;
return code;
}
// no valid epset, return directly without redoAction
if (!hasEpset) {
taosMemoryFree(pReq);
return TSDB_CODE_SUCCESS;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
mDebug("pause node:%d, epset:%d", pTask->info.nodeId, epset.numOfEps);
code = setTransAction(pTrans, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndPauseStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SArray *tasks = pStream->tasks;
int32_t mndStreamSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = createTaskIter(pStream);
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (doPauseStreamTask(pMnode, pTrans, pTask) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
while (taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
if (doSetPauseAction(pMnode, pTrans, pTask) < 0) {
destroyTaskIter(pIter);
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.statusBackup, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
}
destroyTaskIter(pIter);
return 0;
}
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
terrno = code;
return -1;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndStreamSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
SStreamTaskIter *pIter = createTaskIter(pStream);
while(taskIterNextTask(pIter)) {
SStreamTask *pTask = taskIterGetCurrent(pIter);
if (doSetDropAction(pMnode, pTrans, pTask) < 0) {
destroyTaskIter(pIter);
return -1;
}
}
destroyTaskIter(pIter);
return 0;
}
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->taskId = pTask->taskId;
pReq->streamId = pTask->streamId;
SEpSet epset = {0};
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->taskId, pTask->nodeId);
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { // no valid epset, return directly without redoAction
terrno = code;
taosMemoryFree(pReq);
return -1;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0);
if (code != 0) {
taosMemoryFree(pReq);
return -1;
}
return 0;
}
int32_t mndStreamSetDropActionFromList(SMnode *pMnode, STrans *pTrans, SArray* pList) {
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SOrphanTask* pTask = taosArrayGet(pList, i);
mDebug("add drop task:0x%x action to drop orphan task", pTask->taskId);
doSetDropActionFromId(pMnode, pTrans, pTask);
}
return 0;
}

View File

@ -171,10 +171,6 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return tqStreamTaskProcessRetrieveReq(pSnode->pMeta, pMsg);
case TDMT_STREAM_RETRIEVE_RSP: // 1036
break;
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
return tqStreamTaskProcessScanHistoryFinishReq(pSnode->pMeta, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqStreamTaskProcessScanHistoryFinishRsp(pSnode->pMeta, pMsg);
case TDMT_VND_STREAM_TASK_CHECK:
return tqStreamTaskProcessCheckReq(pSnode->pMeta, pMsg);
case TDMT_VND_STREAM_TASK_CHECK_RSP:

View File

@ -267,8 +267,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
// sma
int32_t smaInit();

View File

@ -1043,15 +1043,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return code;
}
// only the agg tasks and the sink tasks will receive this message from upstream tasks
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessScanHistoryFinishReq(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessScanHistoryFinishRsp(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont;

View File

@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask);
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
@ -300,21 +300,21 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
return true;
}
bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
const char* id = pTask->id.idStr;
int32_t numOfNewItems = 0;
while(1) {
while (1) {
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
*numOfItems += numOfNewItems;
return numOfNewItems > 0;
}
SStreamQueueItem* pItem = NULL;
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id);
if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue
int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer);
if (itemInFillhistory) {
numOfNewItems += 1;
}
@ -334,7 +334,9 @@ bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* num
break;
}
} else {
tqError("s-task:%s append input queue failed, code: too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
tqError("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id,
pTask->chkInfo.nextProcessVer);
break;
}
}
@ -399,7 +401,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems);
bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems);
taosThreadMutexUnlock(&pTask->lock);
if ((numOfItems > 0) || hasNewData) {

View File

@ -328,74 +328,6 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return 0;
}
int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamScanHistoryFinishReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
req.downstreamTaskId);
return -1;
}
tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId);
int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t code = TSDB_CODE_SUCCESS;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamCompleteHistoryMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeCompleteHistoryDataMsg(&decoder, &req);
tDecoderClear(&decoder);
if (pMeta->role == NODE_ROLE_FOLLOWER) {
tqError("s-task:0x%x (vgId:%d) not handle the scan-history finish rsp, since it becomes follower",
req.upstreamTaskId, pMeta->vgId);
return TASK_DOWNSTREAM_NOT_LEADER;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
req.upstreamTaskId);
return -1;
}
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
if (remain > 0) {
tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d",
pTask->id.idStr, req.downstreamId, remain);
} else {
tqDebug(
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg",
pTask->id.idStr, req.downstreamId);
code = streamProcessScanHistoryFinishRsp(pTask);
}
streamMetaReleaseTask(pMeta, pTask);
return code;
}
int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));

View File

@ -789,10 +789,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY:
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
default:

View File

@ -204,6 +204,10 @@ void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), vo
*/
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple);
/**
* @brief set the merge limit reached callback. it calls mergeLimitReached param with tableUid and param
*/
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReached)(uint64_t tableUid, void* param), void* param);
#ifdef __cplusplus
}
#endif

View File

@ -1027,57 +1027,6 @@ int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
return 0;
}
int32_t qResetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
pSup->calTriggerSaved = 0;
pSup->deleteMarkSaved = 0;
qInfo("reset stream param for interval: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
pSup->calTriggerSaved = 0;
pSup->deleteMarkSaved = 0;
qInfo("reset stream param for session: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
pSup->calTriggerSaved = 0;
pSup->deleteMarkSaved = 0;
qInfo("reset stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
} else if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) {
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
STimeWindowAggSupp* pSup = &pInfo->twAggSup;
pSup->calTriggerSaved = 0;
pSup->deleteMarkSaved = 0;
qInfo("save stream param for state: %d, %" PRId64, pSup->calTrigger, pSup->deleteMark);
}
// iterate operator tree
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
return 0;
} else {
pOperator = pOperator->pDownstream[0];
}
}
}
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
const char* id = GET_TASKID(pTaskInfo);

View File

@ -3366,26 +3366,16 @@ _error:
return NULL;
}
static int32_t tableMergeScanDoSkipTable(STableMergeScanInfo* pInfo, SSDataBlock* pBlock) {
int64_t nRows = 0;
void* pNum = tSimpleHashGet(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid));
if (pNum == NULL) {
nRows = pBlock->info.rows;
tSimpleHashPut(pInfo->mTableNumRows, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &nRows, sizeof(nRows));
} else {
*(int64_t*)pNum = *(int64_t*)pNum + pBlock->info.rows;
nRows = *(int64_t*)pNum;
}
if (nRows >= pInfo->mergeLimit) {
if (pInfo->mSkipTables == NULL) {
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
STableMergeScanInfo* pInfo = pTableMergeScanInfo;
if (pInfo->mSkipTables == NULL) {
pInfo->mSkipTables = taosHashInit(pInfo->tableEndIndex - pInfo->tableStartIndex + 1,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
}
int bSkip = 1;
taosHashPut(pInfo->mSkipTables, &pBlock->info.id.uid, sizeof(pBlock->info.id.uid), &bSkip, sizeof(bSkip));
}
return TSDB_CODE_SUCCESS;
int bSkip = 1;
if (pInfo->mSkipTables != NULL) {
taosHashPut(pInfo->mSkipTables, &uid, sizeof(uid), &bSkip, sizeof(bSkip));
}
}
static void doGetBlockForTableMergeScan(SOperatorInfo* pOperator, bool* pFinished, bool* pSkipped) {
@ -3501,10 +3491,6 @@ static SSDataBlock* getBlockForTableMergeScan(void* param) {
}
pBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pBlock->info.id.uid);
if (pInfo->mergeLimit != -1) {
tableMergeScanDoSkipTable(pInfo, pBlock);
}
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
return pBlock;
@ -3571,6 +3557,7 @@ int32_t startDurationForGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetMergeLimit(pInfo->pSortHandle, pInfo->mergeLimit);
tsortSetMergeLimitReachedFp(pInfo->pSortHandle, tableMergeScanDoSkipTable, pInfo);
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
@ -3702,7 +3689,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows);
@ -3798,8 +3785,6 @@ void destroyTableMergeScanOperatorInfo(void* param) {
taosArrayDestroy(pTableScanInfo->sortSourceParams);
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
pTableScanInfo->pSortHandle = NULL;
tSimpleHashCleanup(pTableScanInfo->mTableNumRows);
pTableScanInfo->mTableNumRows = NULL;
taosHashCleanup(pTableScanInfo->mSkipTables);
pTableScanInfo->mSkipTables = NULL;
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
@ -3891,8 +3876,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
pInfo->mTableNumRows = tSimpleHashInit(1024,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
pInfo->mergeLimit = -1;
bool hasLimit = pInfo->limitInfo.limit.limit != -1 || pInfo->limitInfo.limit.offset != -1;
if (hasLimit) {

View File

@ -75,6 +75,9 @@ struct SSortHandle {
bool (*abortCheckFn)(void* param);
void* abortCheckParam;
void (*mergeLimitReachedFn)(uint64_t tableUid, void* param);
void* mergeLimitReachedParam;
};
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
@ -885,7 +888,7 @@ static int32_t appendDataBlockToPageBuf(SSortHandle* pHandle, SSDataBlock* blk,
int32_t size = blockDataGetSize(blk) + sizeof(int32_t) + taosArrayGetSize(blk->pDataBlock) * sizeof(int32_t);
ASSERT(size <= getBufPageSize(pHandle->pBuf));
blockDataToBuf(pPage, blk);
setBufPageDirty(pPage, true);
@ -1040,6 +1043,39 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SBlockO
return 0;
}
static SSDataBlock* getRowsBlockWithinMergeLimit(const SSortHandle* pHandle, SSHashObj* mTableNumRows, SSDataBlock* pOrigBlk, bool* pExtractedBlock) {
int64_t nRows = 0;
int64_t prevRows = 0;
void* pNum = tSimpleHashGet(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid));
if (pNum == NULL) {
prevRows = 0;
nRows = pOrigBlk->info.rows;
tSimpleHashPut(mTableNumRows, &pOrigBlk->info.id.uid, sizeof(pOrigBlk->info.id.uid), &nRows, sizeof(nRows));
} else {
prevRows = *(int64_t*)pNum;
*(int64_t*)pNum = *(int64_t*)pNum + pOrigBlk->info.rows;
nRows = *(int64_t*)pNum;
}
int64_t keepRows = pOrigBlk->info.rows;
if (nRows >= pHandle->mergeLimit) {
if (pHandle->mergeLimitReachedFn) {
pHandle->mergeLimitReachedFn(pOrigBlk->info.id.uid, pHandle->mergeLimitReachedParam);
}
keepRows = pHandle->mergeLimit - prevRows;
}
SSDataBlock* pBlock = NULL;
if (keepRows != pOrigBlk->info.rows) {
pBlock = blockDataExtractBlock(pOrigBlk, 0, keepRows);
*pExtractedBlock = true;
} else {
*pExtractedBlock = false;
pBlock = pOrigBlk;
}
return pBlock;
}
static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
SBlockOrderInfo* pOrder = taosArrayGet(pHandle->pSortInfo, 0);
size_t nSrc = taosArrayGetSize(pHandle->pOrderedSource);
@ -1062,10 +1098,18 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
pHandle->currMergeLimitTs = INT64_MIN;
}
SSHashObj* mTableNumRows = tSimpleHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
SArray* aBlkSort = taosArrayInit(8, POINTER_BYTES);
SSHashObj* mUidBlk = tSimpleHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT));
while (1) {
SSDataBlock* pBlk = pHandle->fetchfp(pSrc->param);
int64_t p = taosGetTimestampUs();
bool bExtractedBlock = false;
if (pBlk != NULL && pHandle->mergeLimit > 0) {
pBlk = getRowsBlockWithinMergeLimit(pHandle, mTableNumRows, pBlk, &bExtractedBlock);
}
if (pBlk != NULL) {
SColumnInfoData* tsCol = taosArrayGet(pBlk->pDataBlock, pOrder->slotId);
int64_t firstRowTs = *(int64_t*)tsCol->pData;
@ -1074,6 +1118,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
continue;
}
}
if (pBlk != NULL) {
szSort += blockDataGetSize(pBlk);
@ -1081,8 +1126,11 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if (ppBlk != NULL) {
SSDataBlock* tBlk = *(SSDataBlock**)(ppBlk);
blockDataMerge(tBlk, pBlk);
if (bExtractedBlock) {
blockDataDestroy(pBlk);
}
} else {
SSDataBlock* tBlk = createOneDataBlock(pBlk, true);
SSDataBlock* tBlk = (bExtractedBlock) ? pBlk : createOneDataBlock(pBlk, true);
tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
taosArrayPush(aBlkSort, &tBlk);
}
@ -1091,7 +1139,6 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
if ((pBlk != NULL && szSort > maxBufSize) || (pBlk == NULL && szSort > 0)) {
tSimpleHashClear(mUidBlk);
int64_t p = taosGetTimestampUs();
code = sortBlocksToExtSource(pHandle, aBlkSort, pOrder, aExtSrc);
if (code != TSDB_CODE_SUCCESS) {
tSimpleHashCleanup(mUidBlk);
@ -1131,7 +1178,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
}
taosArrayDestroy(aExtSrc);
tSimpleHashCleanup(mTableNumRows);
pHandle->type = SORT_SINGLESOURCE_SORT;
return TSDB_CODE_SUCCESS;
}
@ -1610,3 +1657,8 @@ int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* ke
}
return ret;
}
void tsortSetMergeLimitReachedFp(SSortHandle* pHandle, void (*mergeLimitReachedCb)(uint64_t tableUid, void* param), void* param) {
pHandle->mergeLimitReachedFn = mergeLimitReachedCb;
pHandle->mergeLimitReachedParam = param;
}

View File

@ -123,8 +123,6 @@ int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t
STaskId streamTaskGetTaskId(const SStreamTask* pTask);
void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo);
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
int32_t* pLen);
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
void streamClearChkptReadyMsg(SStreamTask* pTask);
@ -134,10 +132,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type);
SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);

View File

@ -34,9 +34,6 @@ static int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* p
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
static int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
int32_t vgSz, int64_t groupId);
static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
@ -676,41 +673,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
SStreamScanHistoryFinishReq req = {
.streamId = pTask->id.streamId,
.childId = pTask->info.selfChildId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->pMeta->vgId,
};
// serialize
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
pTask->notReadyTasks = 1;
doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
&pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s send scan-history data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s",
pTask->id.idStr, numOfVgs, pState->name);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamTaskId = pVgInfo->taskId;
doDispatchScanHistoryFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else {
stDebug("s-task:%s no downstream tasks, invoke scan-history finish rsp directly", pTask->id.idStr);
streamProcessScanHistoryFinishRsp(pTask);
}
return 0;
}
// this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pReadyMsgList);
@ -782,48 +744,6 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
return 0;
}
int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
return code;
}
tEncoderClear(&encoder);
initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
tmsgSendReq(pEpSet, &msg);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pState->name,
pReq->downstreamTaskId, vgId);
return 0;
}
int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
@ -989,109 +909,6 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) {
taosArrayClear(pTask->pReadyMsgList);
}
int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamNode) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamNode) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->upstreamNodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer,
int32_t* pLen) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
SStreamCompleteHistoryMsg msg = {
.streamId = pReq->streamId,
.upstreamTaskId = pReq->upstreamTaskId,
.upstreamNodeId = pReq->upstreamNodeId,
.downstreamId = pReq->downstreamTaskId,
.downstreamNode = pTask->pMeta->vgId,
};
tEncodeSize(tEncodeCompleteHistoryDataMsg, &msg, len, code);
if (code < 0) {
return code;
}
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeCompleteHistoryDataMsg(&encoder, &msg);
tEncoderClear(&encoder);
*pBuffer = pBuf;
*pLen = len;
return 0;
}
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq) {
void* pBuf = NULL;
int32_t len = 0;
streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len);
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
SStreamContinueExecInfo info = {.taskId = pReq->upstreamTaskId, .epset = pInfo->epSet};
initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
info.msg.info = *pRpcInfo;
taosThreadMutexLock(&pTask->lock);
if (pTask->pRspMsgList == NULL) {
pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo));
}
taosArrayPush(pTask->pRspMsgList, &info);
taosThreadMutexUnlock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
stDebug("s-task:%s add scan-history finish rsp msg for task:0x%x, total:%d", pTask->id.idStr, pReq->upstreamTaskId,
num);
return TSDB_CODE_SUCCESS;
}
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
const char* id = pTask->id.idStr;
int32_t level = pTask->info.taskLevel;
int32_t num = taosArrayGetSize(pTask->pRspMsgList);
for (int32_t i = 0; i < num; ++i) {
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
tmsgSendRsp(&pInfo->msg);
stDebug("s-task:%s level:%d notify upstream:0x%x continuing handle data in WAL", id, level, pInfo->taskId);
}
taosArrayClear(pTask->pRspMsgList);
stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", id, level, num);
return 0;
}
// this message has been sent successfully, let's try next one.
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);

View File

@ -592,108 +592,6 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) {
return -1;
}
if (qStreamRecoverFinish(exec) < 0) {
return -1;
}
return 0;
}
int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq,
SRpcHandleInfo* pRpcInfo) {
int32_t taskLevel = pTask->info.taskLevel;
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
const char* id = pTask->id.idStr;
SStreamTaskState* p = streamTaskGetStatus(pTask);
if (p->state != TASK_STATUS__SCAN_HISTORY) {
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id,
p->name, pReq->upstreamTaskId);
void* pBuf = NULL;
int32_t len = 0;
streamTaskBuildScanhistoryRspMsg(pTask, pReq, &pBuf, &len);
SRpcMsg msg = {.info = *pRpcInfo};
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
tmsgSendRsp(&msg);
stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, taskLevel,
pReq->upstreamTaskId, pReq->upstreamNodeId);
return 0;
}
// sink tasks do not send end of scan history msg to its upstream, which is agg task.
streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
ASSERT(left >= 0);
if (left == 0) {
int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList);
if (taskLevel == TASK_LEVEL__AGG) {
stDebug(
"s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data processing "
"and send rsp to all upstream tasks",
id, numOfTasks);
streamAggUpstreamScanHistoryFinish(pTask);
} else {
stDebug("s-task:%s all %d upstream task(s) finish scan-history data, and rsp to all upstream tasks", id,
numOfTasks);
}
// all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data
// from the WAL files, which contains the real time stream data.
streamNotifyUpstreamContinue(pTask);
// mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks.
if (taskLevel == TASK_LEVEL__AGG) {
/*int32_t code = */ streamTaskScanHistoryDataComplete(pTask);
} else { // for sink task, set normal
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
}
} else {
stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", id,
pReq->upstreamTaskId, pReq->childId, left);
}
return 0;
}
int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
ETaskStatus status = streamTaskGetStatus(pTask)->state;
// task restart now, not handle the scan-history finish rsp
if (status == TASK_STATUS__UNINIT) {
return TSDB_CODE_INVALID_MSG;
}
ASSERT(status == TASK_STATUS__SCAN_HISTORY/* || status == TASK_STATUS__STREAM_SCAN_HISTORY*/);
SStreamMeta* pMeta = pTask->pMeta;
// execute in the scan history complete call back msg, ready to process data from inputQ
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
streamTaskSetSchedStatusInactive(pTask);
streamMetaWLock(pMeta);
streamMetaSaveTask(pMeta, pTask);
streamMetaCommit(pMeta);
streamMetaWUnLock(pMeta);
// for source tasks, let's continue execute.
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamSchedExec(pTask);
}
return TSDB_CODE_SUCCESS;
}
static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
SDataRange* pRange = &pHTask->dataRange;
@ -946,29 +844,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
}
}
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__DROPPING) {
return 0;
}
// restore param
int32_t code = 0;
if (pTask->info.fillHistory) {
code = streamRestoreParam(pTask);
if (code < 0) {
return -1;
}
}
// dispatch scan-history finish req to all related downstream task
code = streamDispatchScanHistoryFinishMsg(pTask);
if (code < 0) {
return -1;
}
return 0;
}
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask) {
void* exec = pTask->exec.pExecutor;
return qStreamInfoResetTimewindowFilter(exec);
@ -1072,28 +947,6 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint
return 0;
}
int32_t tEncodeStreamScanHistoryFinishReq(SEncoder* pEncoder, const SStreamScanHistoryFinishReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistoryFinishReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;

View File

@ -994,6 +994,12 @@ void *taosCacheIterGetKey(const SCacheIter *pIter, size_t *len) {
}
void taosCacheDestroyIter(SCacheIter *pIter) {
for (int32_t i = 0; i < pIter->numOfObj; ++i) {
if (!pIter->pCurrent[i]) continue;
char *p = pIter->pCurrent[i]->data;
taosCacheRelease(pIter->pCacheObj, (void **)&p, false);
pIter->pCurrent[i] = NULL;
}
taosMemoryFreeClear(pIter->pCurrent);
taosMemoryFreeClear(pIter);
}

View File

@ -573,6 +573,9 @@ void taosPrintSlowLog(const char *format, ...) {
len += vsnprintf(buffer + len, LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2 - len, format, argpointer);
va_end(argpointer);
if (len < 0 || len > LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2) {
len = LOG_MAX_LINE_DUMP_BUFFER_SIZE - 2;
}
buffer[len++] = '\n';
buffer[len] = 0;

View File

@ -0,0 +1,62 @@
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"connection_pool_size": 8,
"num_of_records_per_req": 3000,
"prepared_rand": 3000,
"thread_count": 2,
"create_table_thread_count": 1,
"confirm_parameter_prompt": "no",
"continue_if_fail": "yes",
"databases": [
{
"dbinfo": {
"name": "db",
"drop": "yes",
"vgroups": 2,
"replica": 3,
"duration":"1d",
"wal_retention_period": 1,
"wal_retention_size": 1,
"keep": "3d,6d,30d"
},
"super_tables": [
{
"name": "stb",
"child_table_exists": "no",
"childtable_count": 10,
"insert_rows": 100000000,
"childtable_prefix": "d",
"insert_mode": "taosc",
"timestamp_step": 10000,
"start_timestamp":"now-12d",
"columns": [
{ "type": "bool", "name": "bc"},
{ "type": "float", "name": "fc" },
{ "type": "double", "name": "dc"},
{ "type": "tinyint", "name": "ti"},
{ "type": "smallint", "name": "si" },
{ "type": "int", "name": "ic" },
{ "type": "bigint", "name": "bi" },
{ "type": "utinyint", "name": "uti"},
{ "type": "usmallint", "name": "usi"},
{ "type": "uint", "name": "ui" },
{ "type": "ubigint", "name": "ubi"},
{ "type": "binary", "name": "bin", "len": 16},
{ "type": "nchar", "name": "nch", "len": 32}
],
"tags": [
{"type": "tinyint", "name": "groupid","max": 10,"min": 1},
{"name": "location","type": "binary", "len": 16, "values":
["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"]
}
]
}
]
}
]
}

View File

@ -0,0 +1,133 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import time
import random
import taos
import frame
import frame.etool
import json
import threading
from frame.log import *
from frame.cases import *
from frame.sql import *
from frame.caseBase import *
from frame import *
from frame.autogen import *
from frame.srvCtl import *
class TDTestCase(TBase):
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to init {__file__}")
self.replicaVar = int(replicaVar)
tdSql.init(conn.cursor(), logSql) # output sql.txt file
self.configJsonFile('splitVgroupByLearner.json', 'db', 1, 1, 'splitVgroupByLearner.json', 100000)
def configJsonFile(self, fileName, dbName, vgroups, replica, newFileName='', insert_rows=100000,
timestamp_step=10000):
tdLog.debug(f"configJsonFile {fileName}")
filePath = etool.curFile(__file__, fileName)
with open(filePath, 'r') as f:
data = json.load(f)
if len(newFileName) == 0:
newFileName = fileName
data['databases'][0]['dbinfo']['name'] = dbName
data['databases'][0]['dbinfo']['vgroups'] = vgroups
data['databases'][0]['dbinfo']['replica'] = replica
data['databases'][0]['super_tables'][0]['insert_rows'] = insert_rows
data['databases'][0]['super_tables'][0]['timestamp_step'] = timestamp_step
json_data = json.dumps(data)
filePath = etool.curFile(__file__, newFileName)
with open(filePath, "w") as file:
file.write(json_data)
tdLog.debug(f"configJsonFile {json_data}")
def splitVgroupThread(self, configFile, event):
# self.insertData(configFile)
event.wait()
time.sleep(5)
tdLog.debug("splitVgroupThread start")
tdSql.execute('ALTER DATABASE db REPLICA 3')
time.sleep(5)
tdSql.execute('use db')
rowLen = tdSql.query('show vgroups')
if rowLen > 0:
vgroupId = tdSql.getData(0, 0)
tdLog.debug(f"splitVgroupThread vgroupId:{vgroupId}")
tdSql.execute(f"split vgroup {vgroupId}")
else:
tdLog.exit("get vgroupId fail!")
# self.configJsonFile(configFile, 'db1', 1, 1, configFile, 100000000)
# self.insertData(configFile)
def dnodeNodeStopThread(self, event):
event.wait()
tdLog.debug("dnodeNodeStopThread start")
time.sleep(10)
on = 2
for i in range(5):
if i % 2 == 0:
on = 2
else:
on = 3
sc.dnodeStop(on)
time.sleep(5)
sc.dnodeStart(on)
time.sleep(5)
def dbInsertThread(self, configFile, event):
tdLog.debug(f"dbInsertThread start {configFile}")
self.insertData(configFile)
event.set()
tdLog.debug(f"dbInsertThread first end {event}")
self.configJsonFile(configFile, 'db', 2, 3, configFile, 100000)
self.insertData(configFile)
def insertData(self, configFile):
tdLog.info(f"insert data.")
# taosBenchmark run
jfile = etool.curFile(__file__, configFile)
etool.benchMark(json=jfile)
# run
def run(self):
tdLog.debug(f"start to excute {__file__}")
event = threading.Event()
t1 = threading.Thread(target=self.splitVgroupThread, args=('splitVgroupByLearner.json', event))
t2 = threading.Thread(target=self.dbInsertThread, args=('splitVgroupByLearner.json', event))
t3 = threading.Thread(target=self.dnodeNodeStopThread, args=(event))
t1.start()
t2.start()
t3.start()
tdLog.debug("threading started!!!!!")
t1.join()
t2.join()
t3.join()
tdLog.success(f"{__file__} successfully executed")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -91,8 +91,7 @@ class TDTestCase(TBase):
# -C
etool.exeBinFile("taosd", "-C")
# -k
rets = etool.runBinFile("taosd", "-C")
self.checkListNotEmpty(rets)
etool.exeBinFile("taosd", "-k", False)
# -V
rets = etool.runBinFile("taosd", "-V")
self.checkListNotEmpty(rets)

View File

@ -375,6 +375,8 @@ class TDTestCase(TBase):
sql = f"select stateduration(9.9,'{ops[i]}',11.1,1s);"
#tdSql.checkFirstValue(sql, vals[i]) bug need fix
tdSql.execute(sql)
sql = "select statecount(9,'EQAAAA',10);"
tdSql.error(sql)
# histogram check crash
sqls = [
@ -396,6 +398,26 @@ class TDTestCase(TBase):
sql = "select first(100-90-1),last(2*5),first(11.1),last(22.2)"
tdSql.checkDataMem(sql, [[9, 10, 11.1, 22.2]])
# sample
sql = "select sample(6, 1);"
tdSql.checkFirstValue(sql, 6)
# spread
sql = "select spread(12);"
tdSql.checkFirstValue(sql, 0)
# percentile
sql = "select percentile(10.1,100);"
tdSql.checkFirstValue(sql, 10.1)
sql = "select percentile(10, 0);"
tdSql.checkFirstValue(sql, 10)
sql = "select percentile(100, 60, 70, 80);"
tdSql.execute(sql)
# apercentile
sql = "select apercentile(10.1,100);"
tdSql.checkFirstValue(sql, 10.1)
# run
def run(self):
tdLog.debug(f"start to excute {__file__}")

View File

@ -21,7 +21,7 @@ fi
,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3 -L 3 -D 2
,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3
,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3
,,n,army,python3 ./test.py -f community/cmdline/fullopt.py

View File

@ -79,7 +79,7 @@ md5sum /home/TDinternal/debug/build/lib/libtaos.so
#define taospy 2.7.10
pip3 list|grep taospy
pip3 uninstall taospy -y
pip3 install --default-timeout=120 taospy==2.7.12
pip3 install --default-timeout=120 taospy==2.7.13
#define taos-ws-py 0.3.1
pip3 list|grep taos-ws-py

View File

@ -139,7 +139,7 @@ class TDCom:
self.stream_suffix = "_stream"
self.range_count = 5
self.default_interval = 5
self.stream_timeout = 12
self.stream_timeout = 60
self.create_stream_sleep = 0.5
self.record_history_ts = str()
self.precision = "ms"
@ -1688,8 +1688,8 @@ class TDCom:
res1 = self.round_handle(res1)
res2 = self.round_handle(res2)
if latency < self.stream_timeout:
latency += 0.2
time.sleep(0.2)
latency += 0.5
time.sleep(0.5)
else:
if latency == 0:
return False

View File

@ -219,7 +219,7 @@ function lcovFunc {
# generate result
echo "generate result"
lcov -l --branch-coverage --function-coverage coverage.info | tee -a $TDENGINE_COVERAGE_REPORT
lcov -l coverage.info --branch-coverage --function-coverage | tee -a $TDENGINE_COVERAGE_REPORT
sed -i 's/\/root\/TDengine\/sql.c/\/root\/TDengine\/source\/libs\/parser\/inc\/sql.c/g' coverage.info
sed -i 's/\/root\/TDengine\/sql.y/\/root\/TDengine\/source\/libs\/parser\/inc\/sql.y/g' coverage.info
@ -289,4 +289,4 @@ lcovFunc
stopTaosd
date >> $WORK_DIR/cron.log
echo "End of Coverage Test" | tee -a $WORK_DIR/cron.log
echo "End of Coverage Test" | tee -a $WORK_DIR/cron.log

View File

@ -129,6 +129,7 @@ endi
$offset = $tbNum * $rowNum
$offset = $offset - 1
print select * from $stb order by ts limit 2 offset $offset
sql select * from $stb order by ts limit 2 offset $offset
if $rows != 1 then
return -1