This commit is contained in:
chenhaoran 2024-03-04 18:20:18 +08:00
commit 522ff054ca
22 changed files with 933 additions and 353 deletions

View File

@ -786,8 +786,11 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask);
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt, bool metaLock);
int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event);
void streamTaskRestoreStatus(SStreamTask* pTask);
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param);
int32_t streamTaskRestoreStatus(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);

View File

@ -354,6 +354,7 @@ SRequestObj* acquireRequest(int64_t rid);
int32_t releaseRequest(int64_t rid);
int32_t removeRequest(int64_t rid);
void doDestroyRequest(void* p);
int64_t removeFromMostPrevReq(SRequestObj* pRequest);
char* getDbOfConnection(STscObj* pObj);
void setConnectionDB(STscObj* pTscObj, const char* db);

View File

@ -385,6 +385,33 @@ int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, ri
int32_t removeRequest(int64_t rid) { return taosRemoveRef(clientReqRefPool, rid); }
/// return the most previous req ref id
int64_t removeFromMostPrevReq(SRequestObj* pRequest) {
int64_t mostPrevReqRefId = pRequest->self;
SRequestObj* pTmp = pRequest;
while (pTmp->relation.prevRefId) {
pTmp = acquireRequest(pTmp->relation.prevRefId);
if (pTmp) {
mostPrevReqRefId = pTmp->self;
releaseRequest(mostPrevReqRefId);
} else {
break;
}
}
removeRequest(mostPrevReqRefId);
return mostPrevReqRefId;
}
void destroyNextReq(int64_t nextRefId) {
if (nextRefId) {
SRequestObj* pObj = acquireRequest(nextRefId);
if (pObj) {
releaseRequest(nextRefId);
releaseRequest(nextRefId);
}
}
}
void destroySubRequests(SRequestObj *pRequest) {
int32_t reqIdx = -1;
SRequestObj *pReqList[16] = {NULL};
@ -435,7 +462,7 @@ void doDestroyRequest(void *p) {
uint64_t reqId = pRequest->requestId;
tscTrace("begin to destroy request %" PRIx64 " p:%p", reqId, pRequest);
destroySubRequests(pRequest);
int64_t nextReqRefId = pRequest->relation.nextRefId;
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
@ -471,6 +498,7 @@ void doDestroyRequest(void *p) {
taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
destroyNextReq(nextReqRefId);
}
void destroyRequest(SRequestObj *pRequest) {
@ -479,7 +507,7 @@ void destroyRequest(SRequestObj *pRequest) {
}
taos_stop_query(pRequest);
removeRequest(pRequest->self);
removeFromMostPrevReq(pRequest);
}
void taosStopQueryImpl(SRequestObj *pRequest) {

View File

@ -1254,54 +1254,34 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
}
void restartAsyncQuery(SRequestObj *pRequest, int32_t code) {
int32_t reqIdx = 0;
SRequestObj *pReqList[16] = {NULL};
SRequestObj *pUserReq = NULL;
pReqList[0] = pRequest;
uint64_t tmpRefId = 0;
SRequestObj *pTmp = pRequest;
while (pTmp->relation.prevRefId) {
tmpRefId = pTmp->relation.prevRefId;
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
pReqList[++reqIdx] = pTmp;
releaseRequest(tmpRefId);
} else {
tscError("prev req ref 0x%" PRIx64 " is not there", tmpRefId);
tscInfo("restart request: %s p: %p", pRequest->sqlstr, pRequest);
SRequestObj* pUserReq = pRequest;
acquireRequest(pRequest->self);
while (pUserReq) {
if (pUserReq->self == pUserReq->relation.userRefId || pUserReq->relation.userRefId == 0) {
break;
}
}
tmpRefId = pRequest->relation.nextRefId;
while (tmpRefId) {
pTmp = acquireRequest(tmpRefId);
if (pTmp) {
tmpRefId = pTmp->relation.nextRefId;
removeRequest(pTmp->self);
releaseRequest(pTmp->self);
} else {
tscError("next req ref 0x%" PRIx64 " is not there", tmpRefId);
break;
int64_t nextRefId = pUserReq->relation.nextRefId;
releaseRequest(pUserReq->self);
if (nextRefId) {
pUserReq = acquireRequest(nextRefId);
}
}
}
for (int32_t i = reqIdx; i >= 0; i--) {
destroyCtxInRequest(pReqList[i]);
if (pReqList[i]->relation.userRefId == pReqList[i]->self || 0 == pReqList[i]->relation.userRefId) {
pUserReq = pReqList[i];
} else {
removeRequest(pReqList[i]->self);
}
}
bool hasSubRequest = pUserReq != pRequest || pRequest->relation.prevRefId != 0;
if (pUserReq) {
destroyCtxInRequest(pUserReq);
pUserReq->prevCode = code;
memset(&pUserReq->relation, 0, sizeof(pUserReq->relation));
} else {
tscError("user req is missing");
tscError("User req is missing");
removeFromMostPrevReq(pRequest);
return;
}
if (hasSubRequest)
removeFromMostPrevReq(pRequest);
else
releaseRequest(pUserReq->self);
doAsyncQuery(pUserReq, true);
}

View File

@ -1010,19 +1010,8 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
}
taosSsleep(2); // sleep 2s for hb to send offset and rows to server
int32_t rsp;
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new();
while (1) {
rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
int32_t rsp = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
return rsp;
}
@ -1272,10 +1261,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
int32_t retryCnt = 0;
while (syncAskEp(tmq) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT) {
while ((code = syncAskEp(tmq)) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto FAIL;
}
@ -2148,26 +2136,19 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
// if auto commit is set, commit before close consumer. Otherwise, do nothing.
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
return rsp;
int32_t code = tmq_commit_sync(tmq, NULL);
if (code != 0) {
return code;
}
}
taosSsleep(2); // sleep 2s for hb to send offset and rows to server
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new();
while (1) {
int32_t rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
int32_t code = tmq_subscribe(tmq, lst);
tmq_list_destroy(lst);
if (code != 0) {
return code;
}
} else {
tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
}

View File

@ -721,6 +721,8 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
// add into buffer firstly
// to make sure when the hb from vnode arrived, the newly created tasks have been in the task map already.
taosThreadMutexLock(&execInfo.lock);
mDebug("stream stream:%s tasks register into node list", createReq.name);
saveStreamTasksInfo(&streamObj, &execInfo);

View File

@ -917,6 +917,22 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
}
}
int32_t handleStep2Async(SStreamTask* pStreamTask, void* param) {
STQ* pTq = param;
SStreamMeta* pMeta = pStreamTask->pMeta;
STaskId hId = pStreamTask->hTaskInfo.id;
SStreamTask* pTask = streamMetaAcquireTask(pStreamTask->pMeta, hId.streamId, hId.taskId);
if (pTask == NULL) {
// todo handle error
}
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
// this function should be executed by only one thread, so we set an sentinel to protect this function
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont;
@ -1007,37 +1023,27 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// the following procedure should be executed, no matter status is stop/pause or not
tqDebug("s-task:%s scan-history(step 1) ended, elapsed time:%.2fs", id, pTask->execInfo.step1El);
if (pTask->info.fillHistory) {
SStreamTask* pStreamTask = NULL;
ASSERT(pTask->info.fillHistory == 1);
// 1. get the related stream task
pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
// 1. get the related stream task
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask == NULL) {
tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s",
pTask->streamTaskId.taskId, pTask->id.idStr);
tqDebug("s-task:%s fill-history task set status to be dropping and drop it", id);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, 0);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask);
return -1;
}
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code == TSDB_CODE_SUCCESS) {
doStartFillhistoryStep2(pTask, pStreamTask, pTq);
} else {
tqError("s-task:%s failed to halt s-task:%s, not launch step2", id, pStreamTask->id.idStr);
}
streamMetaReleaseTask(pMeta, pStreamTask);
} else {
ASSERT(0);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask);
return -1;
}
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq);
streamMetaReleaseTask(pMeta, pStreamTask);
atomic_store_32(&pTask->status.inScanHistorySentinel, 0);
streamMetaReleaseTask(pMeta, pTask);
return code;

View File

@ -28,8 +28,8 @@ static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDurat
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int64_t st = taosGetTimestampMs();
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);

View File

@ -865,7 +865,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
if (pHistoryTask == NULL) {
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
", it may have been dropped already",
", it may have been dropped already",
pMeta->vgId, pTask->hTaskInfo.id.taskId);
streamMetaReleaseTask(pMeta, pTask);

View File

@ -1199,6 +1199,18 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
return code;
}
static void freeSSortSource(SSortSource* source) {
if (NULL == source) return;
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}
if (!source->onlyRef && source->src.pBlock) {
blockDataDestroy(source->src.pBlock);
source->src.pBlock = NULL;
}
taosMemoryFree(source);
}
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
@ -1231,14 +1243,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}
if (!source->onlyRef && source->src.pBlock) {
blockDataDestroy(source->src.pBlock);
source->src.pBlock = NULL;
}
taosMemoryFree(source);
freeSSortSource(source);
return code;
}
@ -1248,15 +1253,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
int64_t p = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}
if (!source->onlyRef && source->src.pBlock) {
blockDataDestroy(source->src.pBlock);
source->src.pBlock = NULL;
}
taosMemoryFree(source);
freeSSortSource(source);
return code;
}
@ -1265,16 +1262,13 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
freeSSortSource(source);
return code;
}
}
}
if (source->param && !source->onlyRef) {
taosMemoryFree(source->param);
}
taosMemoryFree(source);
freeSSortSource(source);
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
size_t size = blockDataGetSize(pHandle->pDataBlock);

View File

@ -26,21 +26,21 @@ extern "C" {
typedef int32_t (*__state_trans_fn)(SStreamTask*);
typedef int32_t (*__state_trans_succ_fn)(SStreamTask*);
typedef struct SAttachedEventInfo {
typedef struct SFutureHandleEventInfo {
ETaskStatus status; // required status that this event can be handled
EStreamTaskEvent event; // the delayed handled event
void* pParam;
void* pFn;
} SAttachedEventInfo;
__state_trans_user_fn callBackFn;
} SFutureHandleEventInfo;
typedef struct STaskStateTrans {
bool autoInvokeEndFn;
SStreamTaskState state;
EStreamTaskEvent event;
SStreamTaskState next;
__state_trans_fn pAction;
__state_trans_succ_fn pSuccAction;
SAttachedEventInfo attachEvent;
bool autoInvokeEndFn;
SStreamTaskState state;
EStreamTaskEvent event;
SStreamTaskState next;
__state_trans_fn pAction;
__state_trans_succ_fn pSuccAction;
SFutureHandleEventInfo attachEvent;
} STaskStateTrans;
struct SStreamTaskSM {

View File

@ -410,6 +410,12 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
static int32_t haltCallback(SStreamTask* pTask, void* param) {
streamTaskOpenAllUpstreamInput(pTask);
streamTaskSendCheckpointReq(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamMeta* pMeta = pTask->pMeta;
@ -419,11 +425,12 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask);
} else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
} else {
// no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask != NULL) {
// halt the related stream sink task
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, haltCallback, NULL);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
pStreamTask->id.idStr, tstrerror(code));
@ -432,9 +439,6 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
} else {
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
}
streamTaskOpenAllUpstreamInput(pStreamTask);
streamTaskSendCheckpointReq(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
}
}

View File

@ -669,6 +669,13 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* i
}
}
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamTaskSendCheckpointSourceRsp(pTask);
}
return 0;
}
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
SStreamTask* pTask = NULL;
@ -687,7 +694,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
}
// handle the dropping event
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_DROPPING);
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
} else {
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
streamMetaWUnLock(pMeta);

View File

@ -156,6 +156,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
pPos->beFlushed = false;
*pKey = *pDestWinKey;
goto _end;
}
@ -167,6 +168,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
pPos->beFlushed = false;
*pKey = *pDestWinKey;
goto _end;
}
@ -380,6 +382,14 @@ static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, co
(*pWins) = pWinStates;
}
if (size > 0 && index == -1) {
SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
SSessionKey* pWin = (SSessionKey*)pPos->pKey;
if (pWinKey->win.skey == pWin->win.skey) {
index = 0;
}
}
if (index >= 0) {
pCur = createSessionStateCursor(pFileState);
pCur->buffIndex = index;
@ -387,6 +397,7 @@ static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, co
*pIndex = index;
}
}
return pCur;
}
@ -666,6 +677,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
pPos->beFlushed = false;
*key = *pDestWinKey;
goto _end;
}
@ -679,6 +691,7 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
pPos->beFlushed = false;
*key = *pDestWinKey;
goto _end;
}
@ -771,6 +784,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
pPos->beFlushed = false;
*pWinKey = *pDestWinKey;
goto _end;
}
@ -799,6 +813,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
pPos->beFlushed = false;
*pWinKey = *pDestWinKey;
goto _end;
}

View File

@ -385,7 +385,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
streamTaskOnHandleEventSuccess(pTask->status.pSM, event);
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
int64_t initTs = pTask->execInfo.init;
int64_t startTs = pTask->execInfo.start;

View File

@ -647,8 +647,6 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
pDispatcher->taskId, nodeId, buf);
}
} else {
// do nothing
}
}
@ -869,8 +867,8 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->chkpointTransId = pSrc->chkpointTransId;
}
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_PAUSE);
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t num = atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s pause stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
@ -882,24 +880,24 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
}
stDebug("vgId:%d s-task:%s set pause flag and pause task", pMeta->vgId, pTask->id.idStr);
return TSDB_CODE_SUCCESS;
}
void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask) {
streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_PAUSE, taskPauseCallback, NULL);
}
void streamTaskResume(SStreamTask* pTask) {
SStreamTaskState prevState = *streamTaskGetStatus(pTask);
SStreamMeta* pMeta = pTask->pMeta;
if (prevState.state == TASK_STATUS__PAUSE || prevState.state == TASK_STATUS__HALT) {
streamTaskRestoreStatus(pTask);
char* pNew = streamTaskGetStatus(pTask)->name;
if (prevState.state == TASK_STATUS__PAUSE) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
} else {
stInfo("s-task:%s status:%s resume from %s", pTask->id.idStr, pNew, prevState.name);
}
SStreamMeta* pMeta = pTask->pMeta;
int32_t code = streamTaskRestoreStatus(pTask);
if (code == TSDB_CODE_SUCCESS) {
char* pNew = streamTaskGetStatus(pTask)->name;
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("s-task:%s status:%s resume from %s, paused task(s):%d", pTask->id.idStr, pNew, prevState.name, num);
} else {
stDebug("s-task:%s status:%s not in pause/halt status, no need to resume", pTask->id.idStr, prevState.name);
stInfo("s-task:%s status:%s no need to resume, paused task(s):%d", pTask->id.idStr, prevState.name, pMeta->numOfPausedTasks);
}
}

View File

@ -59,20 +59,23 @@ static int32_t streamTaskInitStatus(SStreamTask* pTask);
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
static int32_t initStateTransferTable();
static void doInitStateTransferTable(void);
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask);
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
__state_trans_fn fn, __state_trans_succ_fn succFn,
SAttachedEventInfo* pEventInfo, bool autoInvoke);
SFutureHandleEventInfo* pEventInfo);
static int32_t dummyFn(SStreamTask* UNUSED_PARAM(p)) { return TSDB_CODE_SUCCESS; }
static int32_t attachEvent(SStreamTask* pTask, SAttachedEventInfo* pEvtInfo) {
static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEvtInfo) {
char* p = streamTaskGetStatus(pTask)->name;
stDebug("s-task:%s status:%s attach event:%s required status:%s, since not allowed to handle it", pTask->id.idStr, p,
GET_EVT_NAME(pEvtInfo->event), StreamTaskStatusList[pEvtInfo->status].name);
taosArrayPush(pTask->status.pSM->pWaitingEventList, pEvtInfo);
SArray* pList = pTask->status.pSM->pWaitingEventList;
taosArrayPush(pList, pEvtInfo);
stDebug("s-task:%s add into waiting list, total waiting events:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pList));
return 0;
}
@ -85,18 +88,6 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) {
return 0;
}
static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
stDebug("s-task:%s start to do checkpoint", pTask->id.idStr);
return 0;
}
int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamTaskSendCheckpointSourceRsp(pTask);
}
return 0;
}
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stError("s-task:%s no related fill-history task, since it may have been dropped already", pTask->id.idStr);
@ -170,9 +161,11 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
pEventName, el, pSM->prev.state.name, pSM->current.name);
SAttachedEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1);
// OK, let's handle the attached event, since the task has reached the required status now
SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
// OK, let's handle the waiting event, since the task has reached the required status now
if (pSM->current.state == pEvtInfo->status) {
stDebug("s-task:%s handle the event:%s in waiting list, state:%s", pTask->id.idStr,
GET_EVT_NAME(pEvtInfo->event), pSM->current.name);
@ -189,7 +182,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
code = pNextTrans->pAction(pSM->pTask);
if (pNextTrans->autoInvokeEndFn) {
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event);
return streamTaskOnHandleEventSuccess(pSM, pNextTrans->event, pEvtInfo->callBackFn, pEvtInfo->pParam);
} else {
return code;
}
@ -203,30 +196,61 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
return code;
}
void streamTaskRestoreStatus(SStreamTask* pTask) {
static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) {
SStreamTaskSM* pSM = pTask->status.pSM;
bool removed = false;
taosThreadMutexLock(&pTask->lock);
ASSERT(pSM->pActiveTrans == NULL);
ASSERT(pSM->current.state == TASK_STATUS__PAUSE || pSM->current.state == TASK_STATUS__HALT);
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
for (int32_t i = 0; i < num; ++i) {
SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
if (pInfo->event == event) {
taosArrayRemove(pSM->pWaitingEventList, i);
stDebug("s-task:%s pause event in waiting list not be handled yet, remove it from waiting list, remaining:%d",
pTask->id.idStr, pInfo->event);
removed = true;
break;
}
}
SStreamTaskState state = pSM->current;
pSM->current = pSM->prev.state;
pSM->prev.state = state;
pSM->prev.evt = 0;
pSM->startTs = taosGetTimestampMs();
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
} else {
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
if (!removed) {
stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name);
}
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskRestoreStatus(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
int32_t code = 0;
taosThreadMutexLock(&pTask->lock);
if (pSM->current.state == TASK_STATUS__PAUSE && pSM->pActiveTrans == NULL) {
SStreamTaskState state = pSM->current;
pSM->current = pSM->prev.state;
pSM->prev.state = state;
pSM->prev.evt = 0;
pSM->startTs = taosGetTimestampMs();
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
stDebug("s-task:%s restore status, %s -> %s, and then handle waiting event", pTask->id.idStr,
pSM->prev.state.name, pSM->current.name);
doHandleWaitingEvent(pSM, "restore-pause/halt", pTask);
} else {
stDebug("s-task:%s restore status, %s -> %s", pTask->id.idStr, pSM->prev.state.name, pSM->current.name);
}
} else {
removeEventInWaitingList(pTask, TASK_EVENT_PAUSE);
code = -1; // failed to restore the status
}
taosThreadMutexUnlock(&pTask->lock);
return code;
}
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
@ -242,7 +266,7 @@ SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask) {
}
pSM->pTask = pTask;
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SAttachedEventInfo));
pSM->pWaitingEventList = taosArrayInit(4, sizeof(SFutureHandleEventInfo));
if (pSM->pWaitingEventList == NULL) {
taosMemoryFree(pSM);
@ -273,7 +297,7 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
const char* id = pTask->id.idStr;
if (pTrans->attachEvent.event != 0) {
attachEvent(pTask, &pTrans->attachEvent);
attachWaitedEvent(pTask, &pTrans->attachEvent);
taosThreadMutexUnlock(&pTask->lock);
while (1) {
@ -303,7 +327,32 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt
// todo handle error code;
if (pTrans->autoInvokeEndFn) {
streamTaskOnHandleEventSuccess(pSM, event);
streamTaskOnHandleEventSuccess(pSM, event, NULL, NULL);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t doHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskStateTrans* pTrans, __state_trans_user_fn callbackFn, void* param) {
SStreamTask* pTask = pSM->pTask;
if (pTrans->attachEvent.event != 0) {
SFutureHandleEventInfo info = pTrans->attachEvent;
info.pParam = param;
info.callBackFn = callbackFn;
attachWaitedEvent(pTask, &info);
taosThreadMutexUnlock(&pTask->lock);
} else { // override current active trans
pSM->pActiveTrans = pTrans;
pSM->startTs = taosGetTimestampMs();
taosThreadMutexUnlock(&pTask->lock);
int32_t code = pTrans->pAction(pTask);
// todo handle error code;
if (pTrans->autoInvokeEndFn) {
streamTaskOnHandleEventSuccess(pSM, event, callbackFn, param);
}
}
@ -349,6 +398,45 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) {
return code;
}
int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamTask* pTask = pSM->pTask;
STaskStateTrans* pTrans = NULL;
while (1) {
taosThreadMutexLock(&pTask->lock);
if (pSM->pActiveTrans != NULL && pSM->pActiveTrans->autoInvokeEndFn) {
EStreamTaskEvent evt = pSM->pActiveTrans->event;
taosThreadMutexUnlock(&pTask->lock);
stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed",
pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt));
taosMsleep(100);
} else {
// no active event trans exists, handle this event directly
pTrans = streamTaskFindTransform(pSM->current.state, event);
if (pTrans == NULL) {
stDebug("s-task:%s failed to handle event:%s, status:%s", pTask->id.idStr, GET_EVT_NAME(event), pSM->current.name);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
if (pSM->pActiveTrans != NULL) {
// currently in some state transfer procedure, not auto invoke transfer, quit from this procedure
stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now",
pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name,
pSM->pActiveTrans->next.name, GET_EVT_NAME(event));
}
code = doHandleEventAsync(pSM, event, pTrans, callbackFn, param);
break;
}
}
return code;
}
static void keepPrevInfo(SStreamTaskSM* pSM) {
STaskStateTrans* pTrans = pSM->pActiveTrans;
@ -356,8 +444,9 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
pSM->prev.evt = pTrans->event;
}
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event) {
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
SStreamTask* pTask = pSM->pTask;
const char* id = pTask->id.idStr;
// do update the task status
taosThreadMutexLock(&pTask->lock);
@ -369,16 +458,16 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY);
// the pSM->prev.evt may be 0, so print string is not appropriate.
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", pTask->id.idStr,
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event),
pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
if (pTrans->event != event) {
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", pTask->id.idStr,
GET_EVT_NAME(event), pSM->current.name, GET_EVT_NAME(pTrans->event));
stWarn("s-task:%s handle event:%s failed, current status:%s, active trans evt:%s", id, GET_EVT_NAME(event),
pSM->current.name, GET_EVT_NAME(pTrans->event));
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_INVALID_STATETRANS;
}
@ -388,16 +477,31 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
pSM->current = pTrans->next;
pSM->pActiveTrans = NULL;
// todo remove it
// on success callback, add into lock if necessary, or maybe we should add an option for this?
pTrans->pSuccAction(pTask);
taosThreadMutexUnlock(&pTask->lock);
// todo: add parameter to control lock
// after handling the callback function assigned by invoker, go on handling the waiting tasks
if (callbackFn != NULL) {
stDebug("s-task:%s start to handle user-specified callback fn for event:%s", id, GET_EVT_NAME(pTrans->event));
callbackFn(pSM->pTask, param);
stDebug("s-task:%s handle user-specified callback fn for event:%s completed", id, GET_EVT_NAME(pTrans->event));
}
taosThreadMutexLock(&pTask->lock);
// tasks in waiting list
if (taosArrayGetSize(pSM->pWaitingEventList) > 0) {
doHandleWaitingEvent(pSM, GET_EVT_NAME(pTrans->event), pTask);
} else {
taosThreadMutexUnlock(&pTask->lock);
int64_t el = (taosGetTimestampMs() - pSM->startTs);
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", id,
GET_EVT_NAME(pTrans->event), el, pSM->prev.state.name, pSM->current.name);
}
@ -453,7 +557,7 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
}
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,
__state_trans_succ_fn succFn, SAttachedEventInfo* pEventInfo, bool autoInvoke) {
__state_trans_succ_fn succFn, SFutureHandleEventInfo* pEventInfo) {
STaskStateTrans trans = {0};
trans.state = StreamTaskStatusList[current];
trans.next = StreamTaskStatusList[next];
@ -468,7 +572,7 @@ STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStr
trans.pAction = (fn != NULL) ? fn : dummyFn;
trans.pSuccAction = (succFn != NULL) ? succFn : dummyFn;
trans.autoInvokeEndFn = autoInvoke;
trans.autoInvokeEndFn = (fn == NULL);
return trans;
}
@ -482,93 +586,93 @@ void doInitStateTransferTable(void) {
streamTaskSMTrans = taosArrayInit(8, sizeof(STaskStateTrans));
// initialization event handle
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, false, false);
STaskStateTrans trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__READY, TASK_EVENT_INIT, streamTaskInitStatus, streamTaskOnNormalTaskReady, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, false, false);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__SCAN_HISTORY, TASK_EVENT_INIT_SCANHIST, streamTaskInitStatus, streamTaskOnScanhistoryTaskReady, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
// scan-history related event
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__READY, TASK_EVENT_SCANHIST_DONE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
// halt stream task, from other task status
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
SAttachedEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
SFutureHandleEventInfo info = {.status = TASK_STATUS__READY, .event = TASK_EVENT_HALT};
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, &info);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL, true);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__HALT, TASK_EVENT_HALT, NULL, streamTaskKeepCurrentVerInWal, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
// checkpoint related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
// pause & resume related event handle
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
info = (SAttachedEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
info = (SFutureHandleEventInfo){.status = TASK_STATUS__READY, .event = TASK_EVENT_PAUSE};
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info, true);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, &info);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__PAUSE, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_PAUSE, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
// resume is completed by restore status of state-machine
// stop related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
// dropping related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true);
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL);
taosArrayPush(streamTaskSMTrans, &trans);
}
//clang-format on

View File

@ -6,7 +6,7 @@ sql connect
print ======================== dnode1 start
$db = testdb
sql drop database if exists $db
sql create database $db cachemodel 'both' minrows 10 stt_trigger 1
sql create database $db cachemodel 'none' minrows 10 stt_trigger 1
sql use $db
sql create stable st2 (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp) tags (id int)
@ -69,13 +69,13 @@ sql drop table tbf;
sql alter table st2 add column c1 int;
sql alter table st2 drop column c1;
run tsim/parser/last_cache_query.sim
run tsim/parser/last_both_query.sim
sql flush database $db
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
run tsim/parser/last_cache_query.sim
run tsim/parser/last_both_query.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
@ -145,6 +145,6 @@ sql alter database $db cachemodel 'both'
sql alter database $db cachesize 2
sleep 11000
run tsim/parser/last_cache_query.sim
run tsim/parser/last_both_query.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,496 @@
sql connect
$db = testdb
sql use $db
print "test tb1"
sql select last(ts) from tb1
if $rows != 1 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
print $data00
return -1
endi
sql select last(f1) from tb1
if $rows != 1 then
return -1
endi
if $data00 != 6 then
print $data00
return -1
endi
sql select last(*) from tb1
if $rows != 1 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
print $data00
return -1
endi
if $data01 != 6 then
return -1
endi
if $data02 != 5.000000000 then
print $data02
return -1
endi
if $data03 != 3 then
print expect 3, actual: $data03
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
return -1
endi
sql select last(tb1.*,ts,f4) from tb1
if $rows != 1 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
print $data00
return -1
endi
if $data01 != 6 then
return -1
endi
if $data02 != 5.000000000 then
print $data02
return -1
endi
if $data03 != 3 then
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
return -1
endi
if $data05 != @21-05-12 10:10:12.000@ then
print $data00
return -1
endi
if $data06 != @70-01-01 07:59:57.000@ then
return -1
endi
print "test tb2"
sql select last(ts) from tb2
if $rows != 1 then
return -1
endi
if $data00 != @21-05-11 10:11:15.000@ then
print $data00
return -1
endi
sql select last(f1) from tb2
if $rows != 1 then
return -1
endi
if $data00 != -6 then
print $data00
return -1
endi
sql select last(*) from tb2
if $rows != 1 then
return -1
endi
if $data00 != @21-05-11 10:11:15.000@ then
print $data00
return -1
endi
if $data01 != -6 then
return -1
endi
if $data02 != -7.000000000 then
print $data02
return -1
endi
if $data03 != -8 then
return -1
endi
if $data04 != @70-01-01 07:59:56.999@ then
if $data04 != @70-01-01 07:59:57.-01@ then
return -1
endi
endi
sql select last(tb2.*,ts,f4) from tb2
if $rows != 1 then
return -1
endi
if $data00 != @21-05-11 10:11:15.000@ then
print $data00
return -1
endi
if $data01 != -6 then
return -1
endi
if $data02 != -7.000000000 then
print $data02
return -1
endi
if $data03 != -8 then
return -1
endi
if $data04 != @70-01-01 07:59:56.999@ then
if $data04 != @70-01-01 07:59:57.-01@ then
return -1
endi
endi
if $data05 != @21-05-11 10:11:15.000@ then
print $data00
return -1
endi
if $data06 != @70-01-01 07:59:56.999@ then
if $data04 != @70-01-01 07:59:57.-01@ then
return -1
endi
endi
print "test tbd"
sql select last(*) from tbd
if $rows != 1 then
return -1
endi
if $data00 != @21-05-11 10:12:29.000@ then
print $data00
return -1
endi
if $data01 != NULL then
return -1
endi
if $data02 != NULL then
print $data02
return -1
endi
if $data03 != NULL then
return -1
endi
if $data04 != NULL then
return -1
endi
print "test tbe"
sql select last(*) from tbe
if $rows != 0 then
return -1
endi
print "test stable"
sql select last(ts) from st2
if $rows != 1 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
print $data00
return -1
endi
sql select last(f1) from st2
if $rows != 1 then
return -1
endi
if $data00 != 6 then
print $data00
return -1
endi
sql select last(*) from st2
if $rows != 1 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
print $data00
return -1
endi
if $data01 != 6 then
return -1
endi
if $data02 != 37.000000000 then
print expect 37.000000000 actual: $data02
return -1
endi
if $data03 != 27 then
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
return -1
endi
sql select last(st2.*,ts,f4) from st2
if $rows != 1 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
print $data00
return -1
endi
if $data01 != 6 then
return -1
endi
if $data02 != 37.000000000 then
print expect 37.000000000, acutal: $data02
return -1
endi
if $data03 != 27 then
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
return -1
endi
if $data05 != @21-05-12 10:10:12.000@ then
print $data00
return -1
endi
if $data06 != @70-01-01 07:59:57.000@ then
return -1
endi
sql select last(*), id from st2 group by id order by id
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print ===> $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
print ===> $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
print ===> $data30 $data31 $data32 $data33 $data34 $data35 $data36 $data37 $data38 $data39
print ===> $data40 $data41 $data42 $data43 $data44 $data45 $data46 $data47 $data48 $data49
if $rows != 5 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
return -1
endi
if $data01 != 6 then
return -1
endi
if $data02 != 5.000000000 then
print $data02
return -1
endi
if $data03 != 21 then
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
return -1
endi
if $data05 != 1 then
return -1
endi
if $data10 != @21-05-11 10:12:23.000@ then
return -1
endi
if $data11 != 22 then
return -1
endi
if $data12 != 23.000000000 then
print $data02
return -1
endi
if $data13 != -8 then
return -1
endi
if $data14 != @70-01-01 07:59:58.-04@ then
return -1
endi
if $data15 != 2 then
return -1
endi
if $data20 != @21-05-10 10:12:24.000@ then
return -1
endi
if $data21 != 24 then
return -1
endi
if $data22 != 11.000000000 then
print expect 11.000000000 actual: $data22
return -1
endi
if $data23 != 25 then
return -1
endi
if $data24 != @70-01-01 07:59:57.-04@ then =
return -1
endi
if $data25 != 3 then
return -1
endi
if $data30 != @21-05-11 10:12:25.000@ then
return -1
endi
if $data31 != 26 then
return -1
endi
if $data32 != 17.000000000 then
print $data02
return -1
endi
if $data33 != 27 then
return -1
endi
if $data34 != @70-01-01 07:59:56.-04@ then
return -1
endi
if $data35 != 4 then
return -1
endi
if $data40 != @21-05-11 10:12:29.000@ then
return -1
endi
if $data41 != 36 then
return -1
endi
if $data42 != 37.000000000 then
print $data02
return -1
endi
if $data43 != 35 then
return -1
endi
if $data44 != @70-01-01 07:59:56.-05@ then
return -1
endi
if $data45 != 5 then
return -1
endi
sql select last_row(*), id from st2 group by id order by id
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print ===> $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
print ===> $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
print ===> $data30 $data31 $data32 $data33 $data34 $data35 $data36 $data37 $data38 $data39
print ===> $data40 $data41 $data42 $data43 $data44 $data45 $data46 $data47 $data48 $data49
if $rows != 5 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
return -1
endi
if $data01 != 6 then
return -1
endi
if $data02 != NULL then
print $data02
return -1
endi
if $data03 != NULL then
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
return -1
endi
if $data05 != 1 then
return -1
endi
if $data10 != @21-05-11 10:12:23.000@ then
return -1
endi
if $data11 != 22 then
return -1
endi
if $data12 != 23.000000000 then
print $data02
return -1
endi
if $data13 != NULL then
return -1
endi
if $data14 != @70-01-01 07:59:58.-04@ then
return -1
endi
if $data15 != 2 then
return -1
endi
if $data20 != @21-05-10 10:12:24.000@ then
return -1
endi
if $data21 != 24 then
return -1
endi
if $data22 != NULL then
print expect NULL actual: $data22
return -1
endi
if $data23 != 25 then
return -1
endi
if $data24 != @70-01-01 07:59:57.-04@ then =
return -1
endi
if $data25 != 3 then
return -1
endi
if $data30 != @21-05-11 10:12:25.000@ then
return -1
endi
if $data31 != 26 then
return -1
endi
if $data32 != NULL then
print $data02
return -1
endi
if $data33 != 27 then
return -1
endi
if $data34 != @70-01-01 07:59:56.-04@ then
return -1
endi
if $data35 != 4 then
return -1
endi
if $data40 != @21-05-11 10:12:29.000@ then
return -1
endi
#if $data41 != NULL then
# return -1
#endi
#if $data42 != NULL then
# print $data02
# return -1
#endi
if $data43 != NULL then
return -1
endi
#if $data44 != NULL then
# return -1
#endi
if $data45 != 5 then
return -1
endi
print "test tbn"
sql create table if not exists tbn (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp)
sql insert into tbn values ("2021-05-09 10:10:10", 1, 2.0, '3', -1000)
sql insert into tbn values ("2021-05-10 10:10:11", 4, 5.0, NULL, -2000)
sql insert into tbn values ("2021-05-12 10:10:12", 6,NULL, NULL, -3000)
sql insert into tbn values ("2021-05-13 10:10:12", NULL,NULL, NULL,NULL)
sql select last(*) from tbn;
if $rows != 1 then
return -1
endi
if $data00 != @21-05-13 10:10:12.000@ then
print $data00
return -1
endi
if $data01 != 6 then
return -1
endi
if $data02 != 5.000000000 then
print $data02
return -1
endi
if $data03 != 3 then
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
return -1
endi
sql alter table tbn add column c1 int;
sql alter table tbn drop column c1;

View File

@ -357,112 +357,6 @@ if $data45 != 5 then
return -1
endi
sql select last_row(*), id from st2 group by id order by id
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
print ===> $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
print ===> $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
print ===> $data30 $data31 $data32 $data33 $data34 $data35 $data36 $data37 $data38 $data39
print ===> $data40 $data41 $data42 $data43 $data44 $data45 $data46 $data47 $data48 $data49
if $rows != 5 then
return -1
endi
if $data00 != @21-05-12 10:10:12.000@ then
return -1
endi
if $data01 != 6 then
return -1
endi
if $data02 != NULL then
print $data02
return -1
endi
if $data03 != NULL then
return -1
endi
if $data04 != @70-01-01 07:59:57.000@ then
return -1
endi
if $data05 != 1 then
return -1
endi
if $data10 != @21-05-11 10:12:23.000@ then
return -1
endi
if $data11 != 22 then
return -1
endi
if $data12 != 23.000000000 then
print $data02
return -1
endi
if $data13 != NULL then
return -1
endi
if $data14 != @70-01-01 07:59:58.-04@ then
return -1
endi
if $data15 != 2 then
return -1
endi
if $data20 != @21-05-10 10:12:24.000@ then
return -1
endi
if $data21 != 24 then
return -1
endi
if $data22 != NULL then
print expect NULL actual: $data22
return -1
endi
if $data23 != 25 then
return -1
endi
if $data24 != @70-01-01 07:59:57.-04@ then =
return -1
endi
if $data25 != 3 then
return -1
endi
if $data30 != @21-05-11 10:12:25.000@ then
return -1
endi
if $data31 != 26 then
return -1
endi
if $data32 != NULL then
print $data02
return -1
endi
if $data33 != 27 then
return -1
endi
if $data34 != @70-01-01 07:59:56.-04@ then
return -1
endi
if $data35 != 4 then
return -1
endi
if $data40 != @21-05-11 10:12:29.000@ then
return -1
endi
if $data41 != 36 then
return -1
endi
if $data42 != 37.000000000 then
print $data02
return -1
endi
if $data43 != NULL then
return -1
endi
if $data44 != @70-01-01 07:59:56.-05@ then
return -1
endi
if $data45 != 5 then
return -1
endi
print "test tbn"
sql create table if not exists tbn (ts timestamp, f1 int, f2 double, f3 binary(10), f4 timestamp)
sql insert into tbn values ("2021-05-09 10:10:10", 1, 2.0, '3', -1000)

View File

@ -22,7 +22,7 @@
"vgroups": 2,
"replica": 1,
"precision": "ms",
"stt_trigger": 8,
"stt_trigger": 1,
"minRows": 100,
"maxRows": 4096
},

View File

@ -1,11 +1,13 @@
from urllib.parse import uses_relative
import taos
import taosws
import sys
import os
import time
import platform
import inspect
from taos.tmq import Consumer
from taos.tmq import *
from pathlib import Path
from util.log import *
@ -17,7 +19,7 @@ from util.dnodes import TDDnode
from util.cluster import *
import subprocess
BASEVERSION = "3.0.2.3"
BASEVERSION = "3.2.0.0"
class TDTestCase:
def caseDescription(self):
f'''
@ -30,7 +32,7 @@ class TDTestCase:
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300 stt_trigger 4; ;use deldata;
self.deletedDataSql= '''drop database if exists deldata;create database deldata duration 300 stt_trigger 1; ;use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
@ -104,8 +106,19 @@ class TDTestCase:
print(f"{packageName} has been exists")
os.system(f" cd {packagePath} && tar xvf {packageName} && cd {packageTPath} && ./install.sh -e no " )
tdDnodes.stop(1)
print(f"start taosd: rm -rf {dataPath}/* && nohup taosd -c {cPath} & ")
os.system(f"rm -rf {dataPath}/* && nohup taosd -c {cPath} & " )
print(f"start taosd: rm -rf {dataPath}/* && nohup /usr/bin/taosd -c {cPath} & ")
os.system(f"rm -rf {dataPath}/* && nohup /usr/bin/taosd -c {cPath} & " )
os.system(f"killall taosadapter" )
os.system(f"cp /etc/taos/taosadapter.toml {cPath}/taosadapter.toml " )
taosadapter_cfg = cPath + "/taosadapter.toml"
taosadapter_log_path = cPath + "/../log/"
print(f"taosadapter_cfg:{taosadapter_cfg},taosadapter_log_path:{taosadapter_log_path} ")
self.alter_string_in_file(taosadapter_cfg,"#path = \"/var/log/taos\"",f"path = \"{taosadapter_log_path}\"")
self.alter_string_in_file(taosadapter_cfg,"taosConfigDir = \"\"",f"taosConfigDir = \"{cPath}\"")
print("/usr/bin/taosadapter --version")
os.system(f" /usr/bin/taosadapter --version" )
print(f" LD_LIBRARY_PATH=/usr/lib -c {taosadapter_cfg} 2>&1 & ")
os.system(f" LD_LIBRARY_PATH=/usr/lib /usr/bin/taosadapter -c {taosadapter_cfg} 2>&1 & " )
sleep(5)
@ -116,7 +129,24 @@ class TDTestCase:
def is_list_same_as_ordered_list(self,unordered_list, ordered_list):
sorted_list = sorted(unordered_list)
return sorted_list == ordered_list
def alter_string_in_file(self,file,old_str,new_str):
"""
replace str in file
:param file
:param old_str
:param new_str
:return:
"""
file_data = ""
with open(file, "r", encoding="utf-8") as f:
for line in f:
if old_str in line:
line = line.replace(old_str,new_str)
file_data += line
with open(file,"w",encoding="utf-8") as f:
f.write(file_data)
def run(self):
scriptsPath = os.path.dirname(os.path.realpath(__file__))
distro_id = distro.id()
@ -131,7 +161,7 @@ class TDTestCase:
dbname = "test"
stb = f"{dbname}.meters"
self.installTaosd(bPath,cPath)
os.system("echo 'debugFlag 143' > /etc/taos/taos.cfg ")
# os.system(f"echo 'debugFlag 143' >> {cPath}/taos.cfg ")
tableNumbers=100
recordNumbers1=100
recordNumbers2=1000
@ -163,11 +193,46 @@ class TDTestCase:
# os.system(f"LD_LIBRARY_PATH=/usr/lib taos -s 'use test;create stream current_stream into current_stream_output_stb as select _wstart as `start`, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);' ")
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;create stream power_stream into power_stream_output_stb as select ts, concat_ws(\\".\\", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;" ')
# os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show streams;" ')
os.system(f"sed -i 's/\/etc\/taos/{cPath}/' 0-others/tmqBasic.json ")
self.alter_string_in_file("0-others/tmqBasic.json", "/etc/taos/", cPath)
# os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/tmqBasic.json -y ")
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "create topic if not exists tmq_test_topic as select current,voltage,phase from test.meters where voltage <= 106 and current <= 5;" ')
os.system('LD_LIBRARY_PATH=/usr/lib taos -s "use test;show topics;" ')
os.system(f" /usr/bin/taosadapter --version " )
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = taosws.Consumer(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
try:
consumer.subscribe(["tmq_test_topic"])
except TmqError:
tdLog.exit(f"subscribe error")
while True:
message = consumer.poll(timeout=1.0)
if message:
print("message")
id = message.vgroup()
topic = message.topic()
database = message.database()
for block in message:
nrows = block.nrows()
ncols = block.ncols()
for row in block:
print(row)
values = block.fetchall()
print(nrows, ncols)
consumer.commit(message)
else:
print("break")
break
consumer.close()
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
@ -184,7 +249,8 @@ class TDTestCase:
os.system("pkill taosd") # make sure all the data are saved in disk.
self.checkProcessPid("taosd")
os.system("pkill taosadapter") # make sure all the data are saved in disk.
self.checkProcessPid("taosadapter")
tdLog.printNoPrefix("==========step2:update new version ")
self.buildTaosd(bPath)
@ -193,6 +259,7 @@ class TDTestCase:
tdsql=tdCom.newTdSql()
print(tdsql)
cmd = f" LD_LIBRARY_PATH=/usr/lib taos -h localhost ;"
print(os.system(cmd))
if os.system(cmd) == 0:
raise Exception("failed to execute system command. cmd: %s" % cmd)