Merge pull request #18192 from taosdata/feature/stream
fix(stream): timer refer stream task
This commit is contained in:
commit
b62a85dfdc
|
@ -51,6 +51,7 @@ void streamSchedByTimer(void* param, void* tmrId) {
|
|||
SStreamTask* pTask = (void*)param;
|
||||
|
||||
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
|
||||
streamMetaReleaseTask(NULL, pTask);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -80,6 +81,8 @@ void streamSchedByTimer(void* param, void* tmrId) {
|
|||
|
||||
int32_t streamSetupTrigger(SStreamTask* pTask) {
|
||||
if (pTask->triggerParam != 0) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||
ASSERT(ref == 2);
|
||||
pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
|
||||
pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE;
|
||||
}
|
||||
|
|
|
@ -80,7 +80,12 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
|||
pIter = taosHashIterate(pMeta->pTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->timer) {
|
||||
taosTmrStop(pTask->timer);
|
||||
pTask->timer = NULL;
|
||||
}
|
||||
tFreeSStreamTask(pTask);
|
||||
/*streamMetaReleaseTask(pMeta, pTask);*/
|
||||
}
|
||||
taosHashCleanup(pMeta->pTasks);
|
||||
taosMemoryFree(pMeta->path);
|
||||
|
@ -202,6 +207,10 @@ void streamMetaRemoveTask1(SStreamMeta* pMeta, int32_t taskId) {
|
|||
if (ppTask) {
|
||||
SStreamTask* pTask = *ppTask;
|
||||
taosHashRemove(pMeta->pTasks, &taskId, sizeof(int32_t));
|
||||
/*if (pTask->timer) {
|
||||
* taosTmrStop(pTask->timer);*/
|
||||
/*pTask->timer = NULL;*/
|
||||
/*}*/
|
||||
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
|
|
|
@ -102,6 +102,7 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
|
|||
|
||||
int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver);
|
||||
int64_t code;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) {
|
||||
|
@ -123,8 +124,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
int fileSetSize = taosArrayGetSize(pWal->fileInfoSet);
|
||||
for (int i = pWal->writeCur + 1; i < fileSetSize; i++) {
|
||||
walBuildLogName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
||||
wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
|
||||
taosRemoveFile(fnameStr);
|
||||
walBuildIdxName(pWal, ((SWalFileInfo *)taosArrayGet(pWal->fileInfoSet, i))->firstVer, fnameStr);
|
||||
wDebug("vgId:%d, wal remove file %s for rollback", pWal->cfg.vgId, fnameStr);
|
||||
taosRemoveFile(fnameStr);
|
||||
}
|
||||
// pop from fileInfoSet
|
||||
|
@ -157,6 +160,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
|
||||
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr);
|
||||
if (pLogFile == NULL) {
|
||||
// TODO
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -324,9 +328,9 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
pInfo++;
|
||||
}
|
||||
if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) {
|
||||
wDebug("vgId:%d, begin remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
|
||||
wDebug("vgId:%d, wal end remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
|
||||
} else {
|
||||
wDebug("vgId:%d, no remove", pWal->cfg.vgId);
|
||||
wDebug("vgId:%d, wal no remove", pWal->cfg.vgId);
|
||||
}
|
||||
// iterate files, until the searched result
|
||||
for (SWalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
|
||||
|
@ -343,12 +347,12 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
for (int i = 0; i < deleteCnt; i++) {
|
||||
pInfo = taosArrayGet(pWal->fileInfoSet, i);
|
||||
walBuildLogName(pWal, pInfo->firstVer, fnameStr);
|
||||
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
if (taosRemoveFile(fnameStr) < 0) {
|
||||
goto UPDATE_META;
|
||||
}
|
||||
walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
|
||||
wDebug("vgId:%d, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
wDebug("vgId:%d, wal remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
if (taosRemoveFile(fnameStr) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue