Merge pull request #25383 from taosdata/fix/dropStreamData
Fix/drop stream data
This commit is contained in:
commit
05d853bd9e
|
@ -552,7 +552,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
|
|
||||||
typedef struct SMStreamCheckpointReadyRspMsg {
|
typedef struct SMStreamCheckpointReadyRspMsg {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
}SMStreamCheckpointReadyRspMsg;
|
} SMStreamCheckpointReadyRspMsg;
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
@ -675,6 +675,17 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tqStreamRmTaskBackend(SStreamMeta* pMeta, STaskId* id) {
|
||||||
|
char taskKey[128] = {0};
|
||||||
|
sprintf(taskKey, "0x%" PRIx64 "-0x%x", id->streamId, (int32_t)id->taskId);
|
||||||
|
|
||||||
|
char* path = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
|
||||||
|
sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, taskKey);
|
||||||
|
taosRemoveDir(path);
|
||||||
|
taosMemoryFree(path);
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
@ -720,6 +731,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
tqStreamRmTaskBackend(pMeta, &id);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -823,7 +836,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
|
} else if (type == STREAM_EXEC_T_STOP_ALL_TASKS) {
|
||||||
streamMetaStopAllTasks(pMeta);
|
streamMetaStopAllTasks(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
if (pTask != NULL) {
|
if (pTask != NULL) {
|
||||||
|
@ -849,8 +862,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
|
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
if (streamTaskReadyToRun(pTask, &p)) {
|
if (streamTaskReadyToRun(pTask, &p)) {
|
||||||
tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
|
||||||
p, pTask->chkInfo.nextProcessVer);
|
pTask->id.idStr, p, pTask->chkInfo.nextProcessVer);
|
||||||
streamExecTask(pTask);
|
streamExecTask(pTask);
|
||||||
} else {
|
} else {
|
||||||
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||||
|
@ -938,7 +951,7 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
|
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
|
||||||
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
|
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
@ -975,9 +988,10 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated, bool fromVnode) {
|
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated,
|
||||||
SStreamMeta *pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
|
bool fromVnode) {
|
||||||
int32_t vgId = pMeta->vgId;
|
SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -999,7 +1013,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
||||||
// discard all the data when the stream task is suspended.
|
// discard all the data when the stream task is suspended.
|
||||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
|
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
|
||||||
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
|
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
|
||||||
", schedStatus:%d",
|
", schedStatus:%d",
|
||||||
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
|
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
|
||||||
} else { // from the previous paused version and go on
|
} else { // from the previous paused version and go on
|
||||||
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
|
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
|
||||||
|
@ -1025,11 +1039,11 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode){
|
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
|
||||||
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
||||||
SStreamMeta *pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
|
SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
|
int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1043,9 +1057,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
|
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); }
|
||||||
return taosArrayGetSize(pMeta->pTaskList);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
|
@ -171,6 +171,8 @@ if $data13 != 789 then
|
||||||
goto loop2
|
goto loop2
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
sql drop stream s1
|
||||||
|
|
||||||
_OVER:
|
_OVER:
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
print =============== check
|
print =============== check
|
||||||
|
|
Loading…
Reference in New Issue