diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 1be7552a89..84b2c297d0 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -241,8 +241,12 @@ static void doCreateStream(void *param, TAOS_RES *result, int32_t code) { SCqObj* pObj = (SCqObj*)param; SCqContext* pContext = pObj->pContext; SSqlObj* pSql = (SSqlObj*)result; - pContext->dbConn = pSql->pTscObj; + if (atomic_val_compare_exchange_ptr(&(pContext->dbConn), NULL, pSql->pTscObj) != NULL) { + taos_close(pSql->pTscObj); + } + pthread_mutex_lock(&pContext->mutex); cqCreateStream(pContext, pObj); + pthread_mutex_unlock(&pContext->mutex); } static void cqProcessCreateTimer(void *param, void *tmrId) { @@ -253,7 +257,9 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { cDebug("vgId:%d, try connect to TDengine", pContext->vgId); taos_connect_a(NULL, pContext->user, pContext->pass, pContext->db, 0, doCreateStream, param, NULL); } else { + pthread_mutex_lock(&pContext->mutex); cqCreateStream(pContext, pObj); + pthread_mutex_unlock(&pContext->mutex); } } @@ -267,12 +273,14 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { } pObj->tmrId = 0; - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); - if (pObj->pStream) { - pContext->num++; - cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); - } else { - cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); + if (pObj->pStream == NULL) { + pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, 0, pObj, NULL); + if (pObj->pStream) { + pContext->num++; + cInfo("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr); + } else { + cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr); + } } } diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index e204989b02..d9d58fccf3 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -925,6 +925,8 @@ static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); } + ASSERT((blkIdx == pIdx->numOfBlocks -1) || (!pCompBlock->last)); + tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); @@ -1050,6 +1052,8 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; + ASSERT((blkIdx == pIdx->numOfBlocks-1) || (!pCompBlock->last)); + tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); @@ -1630,11 +1634,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, pCfg->update); if (pDataCols->numOfRows == 0) break; - if (tblkIdx == pIdx->numOfBlocks - 1) { - if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; - } else { - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - } + if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; if (round == 0) { if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index 1fa6a3d096..64fea2843b 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -93,17 +93,23 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) { return ret; } -int64_t tfFsync(int64_t tfd) { +int32_t tfFsync(int64_t tfd) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; int32_t fd = (int32_t)(uintptr_t)p; - return fsync(fd); + int32_t code = fsync(fd); + + taosReleaseRef(tsFileRsetId, tfd); + return code; } bool tfValid(int64_t tfd) { void *p = taosAcquireRef(tsFileRsetId, tfd); - return p != NULL; + if (p == NULL) return false; + + taosReleaseRef(tsFileRsetId, tfd); + return true; } int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) { @@ -111,7 +117,10 @@ int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) { if (p == NULL) return -1; int32_t fd = (int32_t)(uintptr_t)p; - return taosLSeek(fd, offset, whence); + int64_t ret = taosLSeek(fd, offset, whence); + + taosReleaseRef(tsFileRsetId, tfd); + return ret; } int32_t tfFtruncate(int64_t tfd, int64_t length) { @@ -119,5 +128,8 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) { if (p == NULL) return -1; int32_t fd = (int32_t)(uintptr_t)p; - return taosFtruncate(fd, length); + int32_t code = taosFtruncate(fd, length); + + taosReleaseRef(tsFileRsetId, tfd); + return code; } diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 0222a6d80a..baf396f030 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -560,6 +560,37 @@ void taosTmrCleanUp(void* handle) { tmrDebug("%s timer controller is cleaned up.", ctrl->label); ctrl->label[0] = 0; + // cancel all timers of this controller + for (size_t i = 0; i < timerMap.size; i++) { + timer_list_t* list = timerMap.slots + i; + lockTimerList(list); + + tmr_obj_t* t = list->timers; + tmr_obj_t* prev = NULL; + while (t != NULL) { + tmr_obj_t* next = t->mnext; + if (t->ctrl != ctrl) { + prev = t; + t = next; + continue; + } + + uint8_t state = atomic_val_compare_exchange_8(&t->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); + if (state == TIMER_STATE_WAITING) { + removeFromWheel(t); + } + timerDecRef(t); + if (prev == NULL) { + list->timers = next; + } else { + prev->mnext = next; + } + t = next; + } + + unlockTimerList(list); + } + pthread_mutex_lock(&tmrCtrlMutex); ctrl->next = unusedTmrCtrl; numOfTmrCtrl--;