Merge pull request #24503 from taosdata/fix/3_liaohj
fix(tsdb): fix race condition.
This commit is contained in:
commit
4a2cd87de7
|
@ -889,6 +889,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
|||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
||||
tmr_h streamTimerGetInstance();
|
||||
|
||||
// checkpoint
|
||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||
|
|
|
@ -107,7 +107,6 @@ struct STQ {
|
|||
TTB* pExecStore;
|
||||
TTB* pCheckStore;
|
||||
SStreamMeta* pStreamMeta;
|
||||
void* tqTimer;
|
||||
};
|
||||
|
||||
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
||||
|
|
|
@ -26,18 +26,6 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_
|
|||
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
|
||||
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
|
||||
|
||||
static int32_t tqTimerInit(STQ* pTq) {
|
||||
pTq->tqTimer = taosTmrInit(100, 100, 1000, "TQ");
|
||||
if (pTq->tqTimer == NULL) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tqTimerCleanUp(STQ* pTq) {
|
||||
taosTmrCleanUp(pTq->tqTimer);
|
||||
}
|
||||
|
||||
void tqDestroyTqHandle(void* data) {
|
||||
STqHandle* pData = (STqHandle*)data;
|
||||
qDestroyTask(pData->execHandle.task);
|
||||
|
@ -118,7 +106,6 @@ int32_t tqInitialize(STQ* pTq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
tqTimerInit(pTq);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -149,7 +136,6 @@ void tqClose(STQ* pTq) {
|
|||
taosMemoryFree(pTq->path);
|
||||
tqMetaClose(pTq);
|
||||
streamMetaClose(pTq->pStreamMeta);
|
||||
tqTimerCleanUp(pTq);
|
||||
|
||||
qDebug("end to close tq");
|
||||
taosMemoryFree(pTq);
|
||||
|
|
|
@ -95,10 +95,14 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
|||
|
||||
pParam->pTq = pTq;
|
||||
pParam->numOfTasks = numOfTasks;
|
||||
|
||||
tmr_h pTimer = streamTimerGetInstance();
|
||||
ASSERT(pTimer);
|
||||
|
||||
if (pMeta->scanInfo.scanTimer == NULL) {
|
||||
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTq->tqTimer);
|
||||
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
|
||||
} else {
|
||||
taosTmrReset(doStartScanWal, idleDuration, pParam, pTq->tqTimer, &pMeta->scanInfo.scanTimer);
|
||||
taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -4120,8 +4120,10 @@ void tsdbReaderClose2(STsdbReader* pReader) {
|
|||
taosMemoryFreeClear(pReader->status.uidList.tableUidList);
|
||||
|
||||
qTrace("tsdb/reader-close: %p, untake snapshot", pReader);
|
||||
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true);
|
||||
pReader->pReadSnap = NULL;
|
||||
void* p = pReader->pReadSnap;
|
||||
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
||||
tsdbUntakeReadSnap2(pReader, p, true);
|
||||
}
|
||||
|
||||
tsem_destroy(&pReader->resumeAfterSuspend);
|
||||
tsdbReleaseReader(pReader);
|
||||
|
@ -4195,8 +4197,12 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
|
|||
doSuspendCurrentReader(pReader);
|
||||
}
|
||||
|
||||
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
|
||||
pReader->pReadSnap = NULL;
|
||||
// make sure only release once
|
||||
void* p = pReader->pReadSnap;
|
||||
if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) {
|
||||
tsdbUntakeReadSnap2(pReader, p, false);
|
||||
}
|
||||
|
||||
if (pReader->bFilesetDelimited) {
|
||||
pReader->status.memTableMinKey = INT64_MAX;
|
||||
pReader->status.memTableMaxKey = INT64_MIN;
|
||||
|
|
|
@ -35,6 +35,10 @@ void streamTimerCleanUp() {
|
|||
streamTimer = NULL;
|
||||
}
|
||||
|
||||
tmr_h streamTimerGetInstance() {
|
||||
return streamTimer;
|
||||
}
|
||||
|
||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
|
||||
char buf[128] = {0};
|
||||
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
|
||||
|
|
Loading…
Reference in New Issue